0 people like it.
Like the snippet!
An IAsyncEnumerable computation expression (complete)
(Please ignore previous entry of the same name)
1:
2:
3:
4:
5:
6:
7:
|
open System.Threading.Tasks
open System.Collections.Generic
// This is a simple implementation, with brievity in mind.
// No consideration has been given about disposability, cancellation and performance.
type SeqAsync<'a> = IAsyncEnumerable<'a>
|
1:
2:
3:
4:
5:
6:
7:
8:
9:
10:
11:
12:
13:
14:
15:
16:
17:
18:
19:
20:
21:
22:
23:
24:
25:
26:
27:
28:
29:
30:
31:
32:
33:
34:
35:
36:
37:
38:
39:
40:
41:
42:
43:
44:
45:
|
module AsyncEnumerator =
type AsyncEnumerator<'a> (moveNextAsync: unit -> Task<bool>, getCurrent: unit -> 'a) =
interface IAsyncEnumerator<'a> with
member _.MoveNextAsync () = moveNextAsync () |> ValueTask<bool>
member _.Current = getCurrent ()
interface System.IAsyncDisposable with
member _.DisposeAsync () = ValueTask.CompletedTask
let empty<'a> =
let mutable started = false
let moveNext () = started <- true ; Task.FromResult false
let current () : 'a = (if not started then "Sequence not started" else "Sequence exhausted") |> invalidOp
AsyncEnumerator (moveNext, current)
let singleton x =
let mutable started = false
let moveNext () = try Task.FromResult (not started) finally started <- true
AsyncEnumerator (moveNext, fun () -> x)
let append (itor1: IAsyncEnumerator<'a>) (itor2: IAsyncEnumerator<'a>) : IAsyncEnumerator<'a> =
let mutable itering1 = true
AsyncEnumerator (
moveNextAsync = (fun () ->
task {
let! next = itor1.MoveNextAsync ()
if next then return true
else itering1 <- false ; return! itor2.MoveNextAsync ()
}
),
getCurrent = fun () -> if itering1 then itor1.Current else itor2.Current
)
/// Bind a `Task` to an `IAsyncEnumerator`. The task is started and awaited only once, at first enumeration.
/// Its result is applied by the `getEnumerator`argument.
let bind (getDelay: unit -> Task<'a>) (getEnumerator: 'a -> IAsyncEnumerator<'b>) =
let mutable itor = null
AsyncEnumerator (
moveNextAsync = (fun () ->
task {
if itor = null then let! x = getDelay () in itor <- getEnumerator x
return! itor.MoveNextAsync ()
}
),
getCurrent = fun () -> itor.Current
)
|
1:
2:
3:
4:
5:
6:
7:
8:
9:
10:
11:
12:
13:
14:
15:
16:
17:
18:
19:
20:
21:
22:
23:
24:
25:
26:
27:
28:
29:
30:
31:
32:
33:
34:
35:
36:
37:
38:
39:
40:
|
module SeqAsync =
/// Build an `SeqAsync` from a fuction returning an `IAsyncEnumerator`.
let ofEnumerator (getAsyncEnumerator: unit -> IAsyncEnumerator<'a>) : SeqAsync<'a> =
{ new IAsyncEnumerable<'a> with member _.GetAsyncEnumerator _ = getAsyncEnumerator () }
// The rest is usual boiler plate.
let bind (expr: unit -> Task<'a>) (func: 'a -> SeqAsync<'b>) : SeqAsync<'b> =
ofEnumerator (fun () -> AsyncEnumerator.bind expr (fun x -> (func x).GetAsyncEnumerator ()))
let delay (delayed: unit -> SeqAsync<'a>) = bind Task.FromResult delayed
let empty<'a> = ofEnumerator (fun () -> AsyncEnumerator.empty<'a>)
let singleton x = ofEnumerator (fun () -> AsyncEnumerator.singleton x)
let append (source1: SeqAsync<'a>) (source2: SeqAsync<'a>) =
ofEnumerator (fun () -> AsyncEnumerator.append <|| (source1.GetAsyncEnumerator (), source2.GetAsyncEnumerator ()))
let rec whileLoop (condition: unit -> bool) (body: SeqAsync<'a>) =
if condition () then
append body (delay (fun () -> whileLoop condition body))
else empty
let forLoop (sequence: seq<'a>) (body: 'a -> SeqAsync<'b>) =
use itor = sequence.GetEnumerator()
whileLoop itor.MoveNext (delay (fun () -> body itor.Current))
type SeqAsyncBuilder () =
member _.Delay tail = delay tail
member _.Yield x = singleton x
member _.Combine (xs, ys) = append xs ys
member _.YieldFrom xs = xs
member _.Bind (expr: Task<'a>, body) = body |> bind (fun () -> expr)
member _.Bind (expr: Task, body) = body |> bind (fun () -> task { do! expr })
member _.Zero () = empty
member _.While (condition, body) = whileLoop condition body
member _.For (sequence: seq<'a>, body) : SeqAsync<'b> = forLoop sequence body
let seqAsync = SeqAsyncBuilder ()
|
1:
2:
3:
4:
5:
6:
7:
8:
9:
10:
11:
12:
13:
14:
15:
16:
17:
18:
19:
20:
21:
22:
23:
24:
25:
26:
27:
28:
29:
30:
31:
32:
33:
34:
35:
36:
37:
38:
39:
40:
|
/// Returns a `Task` retreiving the head and tail (as an `option`) from the source
let tryUnconsAsync (source: SeqAsync<'a>) : Task<('a * SeqAsync<'a>) option> =
task {
let itor = source.GetAsyncEnumerator ()
match! itor.MoveNextAsync() with
| false -> return None
| true -> return Some (itor.Current, ofEnumerator (fun () -> itor))
}
/// Mirrors `collect` function from `Seq` and `List` modules.
let rec collect (mapping: 'a -> SeqAsync<'b>) (source: SeqAsync<'a>) =
seqAsync {
match! tryUnconsAsync source with
| None -> ()
| Some (x, xs) -> yield! mapping x ; yield! collect mapping xs
}
type SeqAsyncBuilder with
/// Add iteration over `SeqAsync` with `for` in the computation expression.
member _.For (sequence: SeqAsync<'a>, body) : SeqAsync<'b> = collect body sequence
/// Take elements from multiple sources, re-emitting them as they come.
let merge (sources: SeqAsync<'a> list) =
delay (fun () ->
let rec loop (awaitedCons: Task<('a * SeqAsync<'a>) option> list) =
seqAsync {
if not awaitedCons.IsEmpty then
let! arrived = Task.WhenAny (awaitedCons)
match! arrived with
| None -> yield! awaitedCons |> List.filter ((<>) arrived) |> loop
| Some (head, tail) ->
yield head
yield! (tryUnconsAsync tail) :: awaitedCons |> List.filter ((<>) arrived) |> loop
}
sources |> List.map tryUnconsAsync |> loop
)
let iter (body: 'a -> unit) (source: SeqAsync<'a>) =
let itor = source.GetAsyncEnumerator ()
while itor.MoveNextAsync().Result do body itor.Current
|
1:
2:
3:
4:
5:
6:
7:
8:
9:
10:
11:
12:
13:
14:
15:
16:
17:
18:
19:
20:
|
let seqAsync = SeqAsync.seqAsync
let fizzBuzz (msDelay: int) n =
let stream () =
seqAsync {
for i in 1..n do
yield i
do! Task.Delay msDelay
}
[
seqAsync { for i in stream () do if i % 3 <> 0 && i % 5 <> 0 then yield string i }
seqAsync { for i in stream () do if i % 3 = 0 && i % 5 <> 0 then yield "Fizz" }
seqAsync { for i in stream () do if i % 3 <> 0 && i % 5 = 0 then yield "Buzz" }
seqAsync { for i in stream () do if i % 3 = 0 && i % 5 = 0 then yield "FizzBuzz" }
] |> SeqAsync.merge
let demo = fizzBuzz 200 15
demo
|> SeqAsync.iter (printfn "%s")
|
namespace System
namespace System.Threading
namespace System.Threading.Tasks
namespace System.Collections
namespace System.Collections.Generic
type SeqAsync<'a> = IAsyncEnumerable<'a>
type IAsyncEnumerable<'T> =
member GetAsyncEnumerator : ?cancellationToken:CancellationToken -> IAsyncEnumerator<'T>
Multiple items
type AsyncEnumerator<'a> =
interface IAsyncDisposable
interface IAsyncEnumerator<'a>
new : moveNextAsync:(unit -> Task<bool>) * getCurrent:(unit -> 'a) -> AsyncEnumerator<'a>
--------------------
new : moveNextAsync:(unit -> Task<bool>) * getCurrent:(unit -> 'a) -> AsyncEnumerator<'a>
val moveNextAsync : (unit -> Task<bool>)
type unit = Unit
Multiple items
type Task =
new : action:Action -> Task + 7 overloads
member AsyncState : obj
member ConfigureAwait : continueOnCapturedContext:bool -> ConfiguredTaskAwaitable
member ContinueWith : continuationAction:Action<Task> -> Task + 19 overloads
member CreationOptions : TaskCreationOptions
member Dispose : unit -> unit
member Exception : AggregateException
member GetAwaiter : unit -> TaskAwaiter
member Id : int
member IsCanceled : bool
...
--------------------
type Task<'TResult> =
inherit Task
new : function:Func<'TResult> -> Task<'TResult> + 7 overloads
member ConfigureAwait : continueOnCapturedContext:bool -> ConfiguredTaskAwaitable<'TResult>
member ContinueWith : continuationAction:Action<Task<'TResult>> -> Task + 19 overloads
member GetAwaiter : unit -> TaskAwaiter<'TResult>
member Result : 'TResult
static member Factory : TaskFactory<'TResult>
--------------------
Task(action: System.Action) : Task
Task(action: System.Action, cancellationToken: System.Threading.CancellationToken) : Task
Task(action: System.Action, creationOptions: TaskCreationOptions) : Task
Task(action: System.Action<obj>, state: obj) : Task
Task(action: System.Action, cancellationToken: System.Threading.CancellationToken, creationOptions: TaskCreationOptions) : Task
Task(action: System.Action<obj>, state: obj, cancellationToken: System.Threading.CancellationToken) : Task
Task(action: System.Action<obj>, state: obj, creationOptions: TaskCreationOptions) : Task
Task(action: System.Action<obj>, state: obj, cancellationToken: System.Threading.CancellationToken, creationOptions: TaskCreationOptions) : Task
--------------------
Task(function: System.Func<'TResult>) : Task<'TResult>
Task(function: System.Func<'TResult>, cancellationToken: System.Threading.CancellationToken) : Task<'TResult>
Task(function: System.Func<'TResult>, creationOptions: TaskCreationOptions) : Task<'TResult>
Task(function: System.Func<obj,'TResult>, state: obj) : Task<'TResult>
Task(function: System.Func<'TResult>, cancellationToken: System.Threading.CancellationToken, creationOptions: TaskCreationOptions) : Task<'TResult>
Task(function: System.Func<obj,'TResult>, state: obj, cancellationToken: System.Threading.CancellationToken) : Task<'TResult>
Task(function: System.Func<obj,'TResult>, state: obj, creationOptions: TaskCreationOptions) : Task<'TResult>
Task(function: System.Func<obj,'TResult>, state: obj, cancellationToken: System.Threading.CancellationToken, creationOptions: TaskCreationOptions) : Task<'TResult>
type bool = System.Boolean
val getCurrent : (unit -> 'a)
type IAsyncEnumerator<'T> =
inherit IAsyncDisposable
member Current : 'T
member MoveNextAsync : unit -> ValueTask<bool>
Multiple items
type ValueTask =
struct
new : task:Task -> ValueTask + 1 overload
member AsTask : unit -> Task
member ConfigureAwait : continueOnCapturedContext:bool -> ConfiguredValueTaskAwaitable
member Equals : obj:obj -> bool + 1 overload
member GetAwaiter : unit -> ValueTaskAwaiter
member GetHashCode : unit -> int
member IsCanceled : bool
member IsCompleted : bool
member IsCompletedSuccessfully : bool
member IsFaulted : bool
...
end
--------------------
type ValueTask<'TResult> =
struct
new : result:'TResult -> ValueTask<'TResult> + 2 overloads
member AsTask : unit -> Task<'TResult>
member ConfigureAwait : continueOnCapturedContext:bool -> ConfiguredValueTaskAwaitable<'TResult>
member Equals : obj:obj -> bool + 1 overload
member GetAwaiter : unit -> ValueTaskAwaiter<'TResult>
member GetHashCode : unit -> int
member IsCanceled : bool
member IsCompleted : bool
member IsCompletedSuccessfully : bool
member IsFaulted : bool
...
end
--------------------
ValueTask ()
ValueTask(task: Task) : ValueTask
ValueTask(source: Sources.IValueTaskSource, token: int16) : ValueTask
--------------------
ValueTask ()
ValueTask(result: 'TResult) : ValueTask<'TResult>
ValueTask(task: Task<'TResult>) : ValueTask<'TResult>
ValueTask(source: Sources.IValueTaskSource<'TResult>, token: int16) : ValueTask<'TResult>
type IAsyncDisposable =
member DisposeAsync : unit -> ValueTask
val empty<'a> : AsyncEnumerator<'a>
val mutable started : bool
val moveNext : (unit -> Task<bool>)
Task.FromResult<'TResult>(result: 'TResult) : Task<'TResult>
val current : (unit -> 'a)
val not : value:bool -> bool
val invalidOp : message:string -> 'T
val singleton : x:'a -> AsyncEnumerator<'a>
val x : 'a
val append : itor1:IAsyncEnumerator<'a> -> itor2:IAsyncEnumerator<'a> -> IAsyncEnumerator<'a>
val itor1 : IAsyncEnumerator<'a>
val itor2 : IAsyncEnumerator<'a>
val mutable itering1 : bool
IAsyncEnumerator.MoveNextAsync() : ValueTask<bool>
property IAsyncEnumerator.Current: 'a with get
val bind : getDelay:(unit -> Task<'a>) -> getEnumerator:('a -> IAsyncEnumerator<'b>) -> AsyncEnumerator<'a0>
Bind a `Task` to an `IAsyncEnumerator`. The task is started and awaited only once, at first enumeration.
Its result is applied by the `getEnumerator`argument.
val getDelay : (unit -> Task<'a>)
val getEnumerator : ('a -> IAsyncEnumerator<'b>)
val mutable itor : obj
val ofEnumerator : getAsyncEnumerator:(unit -> IAsyncEnumerator<'a>) -> SeqAsync<'a>
Build an `SeqAsync` from a fuction returning an `IAsyncEnumerator`.
val getAsyncEnumerator : (unit -> IAsyncEnumerator<'a>)
val bind : expr:(unit -> Task<'a>) -> func:('a -> SeqAsync<'b>) -> SeqAsync<'b>
val expr : (unit -> Task<'a>)
val func : ('a -> SeqAsync<'b>)
module AsyncEnumerator
from Script
val bind : getDelay:(unit -> Task<'a>) -> getEnumerator:('a -> IAsyncEnumerator<'b>) -> AsyncEnumerator.AsyncEnumerator<'a0>
Bind a `Task` to an `IAsyncEnumerator`. The task is started and awaited only once, at first enumeration.
Its result is applied by the `getEnumerator`argument.
val delay : delayed:(unit -> SeqAsync<'a>) -> SeqAsync<'a>
val delayed : (unit -> SeqAsync<'a>)
val empty<'a> : SeqAsync<int>
val empty<'a> : AsyncEnumerator.AsyncEnumerator<'a>
val singleton : x:'a -> SeqAsync<'b>
val singleton : x:'a -> AsyncEnumerator.AsyncEnumerator<'a>
val append : source1:SeqAsync<'a> -> source2:SeqAsync<'a> -> SeqAsync<'a>
val source1 : SeqAsync<'a>
val source2 : SeqAsync<'a>
IAsyncEnumerable.GetAsyncEnumerator(?cancellationToken: System.Threading.CancellationToken) : IAsyncEnumerator<'a>
val whileLoop : condition:(unit -> bool) -> body:SeqAsync<int> -> SeqAsync<int>
val condition : (unit -> bool)
val body : SeqAsync<int>
val forLoop : sequence:seq<'a> -> body:('a -> SeqAsync<int>) -> SeqAsync<int>
val sequence : seq<'a>
Multiple items
val seq : sequence:seq<'T> -> seq<'T>
--------------------
type seq<'T> = IEnumerable<'T>
val body : ('a -> SeqAsync<int>)
val itor : IEnumerator<'a>
IEnumerable.GetEnumerator() : IEnumerator<'a>
System.Collections.IEnumerator.MoveNext() : bool
property IEnumerator.Current: 'a with get
Multiple items
type SeqAsyncBuilder =
new : unit -> SeqAsyncBuilder
member Bind : expr:Task<'a> * body:('a -> SeqAsync<'c>) -> SeqAsync<'c>
member Bind : expr:Task * body:('a -> SeqAsync<'b>) -> SeqAsync<'b>
member Combine : xs:SeqAsync<'e> * ys:SeqAsync<'e> -> SeqAsync<'e>
member Delay : tail:(unit -> SeqAsync<'h>) -> SeqAsync<'h>
member For : sequence:seq<'a> * body:('a -> SeqAsync<int>) -> SeqAsync<int>
member For : sequence:SeqAsync<'a> * body:('a -> SeqAsync<int>) -> SeqAsync<int>
member While : condition:(unit -> bool) * body:SeqAsync<int> -> SeqAsync<int>
member Yield : x:'f -> SeqAsync<'g>
member YieldFrom : xs:'d -> 'd
...
--------------------
new : unit -> SeqAsyncBuilder
val tail : (unit -> SeqAsync<'h>)
val x : 'f
val xs : SeqAsync<'e>
val ys : SeqAsync<'e>
val xs : 'd
val expr : Task<'a>
val body : ('a -> SeqAsync<'c>)
val expr : Task
val body : ('a -> SeqAsync<'b>)
val seqAsync : SeqAsyncBuilder
val tryUnconsAsync : source:SeqAsync<'a> -> Task<('a * SeqAsync<'a>) option>
Returns a `Task` retreiving the head and tail (as an `option`) from the source
val source : SeqAsync<'a>
type 'T option = Option<'T>
union case Option.None: Option<'T>
union case Option.Some: Value: 'T -> Option<'T>
val collect : mapping:('a -> SeqAsync<int>) -> source:SeqAsync<'a> -> SeqAsync<int>
Mirrors `collect` function from `Seq` and `List` modules.
val mapping : ('a -> SeqAsync<int>)
val xs : SeqAsync<'a>
val sequence : SeqAsync<'a>
val merge : sources:SeqAsync<'a> list -> SeqAsync<int>
Take elements from multiple sources, re-emitting them as they come.
val sources : SeqAsync<'a> list
type 'T list = List<'T>
val loop : (Task<('a * SeqAsync<'a>) option> list -> SeqAsync<int>)
val awaitedCons : Task<('a * SeqAsync<'a>) option> list
property List.IsEmpty: bool with get
val arrived : Task<('a * SeqAsync<'a>) option>
Task.WhenAny<'TResult>(tasks: IEnumerable<Task<'TResult>>) : Task<Task<'TResult>>
Task.WhenAny<'TResult>([<System.ParamArray>] tasks: Task<'TResult> []) : Task<Task<'TResult>>
Task.WhenAny(tasks: IEnumerable<Task>) : Task<Task>
Task.WhenAny([<System.ParamArray>] tasks: Task []) : Task<Task>
Multiple items
type List<'T> =
new : unit -> List<'T> + 2 overloads
member Add : item:'T -> unit
member AddRange : collection:IEnumerable<'T> -> unit
member AsReadOnly : unit -> ReadOnlyCollection<'T>
member BinarySearch : item:'T -> int + 2 overloads
member Capacity : int with get, set
member Clear : unit -> unit
member Contains : item:'T -> bool
member ConvertAll<'TOutput> : converter:Converter<'T, 'TOutput> -> List<'TOutput>
member CopyTo : array:'T[] -> unit + 2 overloads
...
nested type Enumerator
--------------------
List() : List<'T>
List(capacity: int) : List<'T>
List(collection: IEnumerable<'T>) : List<'T>
val filter : predicate:('T -> bool) -> list:'T list -> 'T list
val head : 'a
val tail : SeqAsync<'a>
val map : mapping:('T -> 'U) -> list:'T list -> 'U list
val iter : body:('a -> unit) -> source:SeqAsync<'a> -> unit
val body : ('a -> unit)
val itor : IAsyncEnumerator<'a>
Multiple items
module Result
from Microsoft.FSharp.Core
--------------------
[<Struct>]
type Result<'T,'TError> =
| Ok of ResultValue: 'T
| Error of ErrorValue: 'TError
val seqAsync : SeqAsync.SeqAsyncBuilder
Multiple items
module SeqAsync
from Script
--------------------
type SeqAsync<'a> = IAsyncEnumerable<'a>
val fizzBuzz : msDelay:int -> n:int -> SeqAsync<int>
val msDelay : int
Multiple items
val int : value:'T -> int (requires member op_Explicit)
--------------------
type int = int32
--------------------
type int<'Measure> = int
val n : int
val stream : (unit -> SeqAsync<int>)
val i : int
Task.Delay(millisecondsDelay: int) : Task
Task.Delay(delay: System.TimeSpan) : Task
Task.Delay(millisecondsDelay: int, cancellationToken: System.Threading.CancellationToken) : Task
Task.Delay(delay: System.TimeSpan, cancellationToken: System.Threading.CancellationToken) : Task
Multiple items
val string : value:'T -> string
--------------------
type string = System.String
val demo : SeqAsync<int>
val printfn : format:Printf.TextWriterFormat<'T> -> 'T
More information