Home
Insert
Update snippet 'An IAsyncEnumerable computation expression'
Title
Passcode
Description
A basic implementation.
Source code
open System.Threading.Tasks open System.Collections.Generic // This is a simple implementation, with brievity in mind. // No consideration has been given about disposability, cancellation and performance. type SeqAsync<'a> = IAsyncEnumerable<'a> // [snippet:First, an `IAsyncEnumerator' implementation and a few primitive functions] module AsyncEnumerator = type AsyncEnumerator<'a> (moveNextAsync: unit -> Task<bool>, getCurrent: unit -> 'a) = interface IAsyncEnumerator<'a> with member _.MoveNextAsync () = moveNextAsync () |> ValueTask<bool> member _.Current = getCurrent () interface System.IAsyncDisposable with member _.DisposeAsync () = ValueTask.CompletedTask let empty<'a> = let mutable started = false let moveNext () = started <- true ; Task.FromResult false let current () : 'a = (if not started then "Sequence not started" else "Sequence exhausted") |> invalidOp AsyncEnumerator (moveNext, current) let singleton x = let mutable started = false let moveNext () = try Task.FromResult (not started) finally started <- true AsyncEnumerator (moveNext, fun () -> x) let append (itor1: IAsyncEnumerator<'a>) (itor2: IAsyncEnumerator<'a>) : IAsyncEnumerator<'a> = let mutable itering1 = true AsyncEnumerator ( moveNextAsync = (fun () -> task { let! next = itor1.MoveNextAsync () if next then return true else itering1 <- false ; return! itor2.MoveNextAsync () } ), getCurrent = fun () -> if itering1 then itor1.Current else itor2.Current ) /// Bind a `Task` to an `IAsyncEnumerator`. The task is started and awaited only once, at first enumeration. /// Its result is applied by the `getEnumerator`argument. let bind (getDelay: unit -> Task<'a>) (getEnumerator: 'a -> IAsyncEnumerator<'b>) = let mutable itor = null AsyncEnumerator ( moveNextAsync = (fun () -> task { if itor = null then let! x = getDelay () in itor <- getEnumerator x return! itor.MoveNextAsync () } ), getCurrent = fun () -> itor.Current ) // [/snippet] // [snippet:Definition of the computation expression] module SeqAsync = /// Build an `SeqAsync` from a fuction returning an `IAsyncEnumerator`. let ofEnumerator (getAsyncEnumerator: unit -> IAsyncEnumerator<'a>) : SeqAsync<'a> = { new IAsyncEnumerable<'a> with member _.GetAsyncEnumerator _ = getAsyncEnumerator () } // The rest is usual boiler plate. let bind (expr: unit -> Task<'a>) (func: 'a -> SeqAsync<'b>) : SeqAsync<'b> = ofEnumerator (fun () -> AsyncEnumerator.bind expr (fun x -> (func x).GetAsyncEnumerator ())) let delay (delayed: unit -> SeqAsync<'a>) = bind Task.FromResult delayed let empty<'a> = ofEnumerator (fun () -> AsyncEnumerator.empty<'a>) let singleton x = ofEnumerator (fun () -> AsyncEnumerator.singleton x) let append (source1: SeqAsync<'a>) (source2: SeqAsync<'a>) = ofEnumerator (fun () -> AsyncEnumerator.append <|| (source1.GetAsyncEnumerator (), source2.GetAsyncEnumerator ())) let rec whileLoop (condition: unit -> bool) (body: SeqAsync<'a>) = if condition () then append body (delay (fun () -> whileLoop condition body)) else empty let forLoop (sequence: seq<'a>) (body: 'a -> SeqAsync<'b>) = use itor = sequence.GetEnumerator() whileLoop itor.MoveNext (delay (fun () -> body itor.Current)) type SeqAsyncBuilder () = member _.Delay tail = delay tail member _.Yield x = singleton x member _.Combine (xs, ys) = append xs ys member _.YieldFrom xs = xs member _.Bind (expr: Task<'a>, body) = body |> bind (fun () -> expr) member _.Bind (expr: Task, body) = body |> bind (fun () -> task { do! expr }) member _.Zero () = empty member _.While (condition, body) = whileLoop condition body member _.For (sequence: seq<'a>, body) : SeqAsync<'b> = forLoop sequence body let seqAsync = SeqAsyncBuilder () // [/snippet] // [snippet:Some utilities] /// Returns a `Task` retreiving the head and tail (as an `option`) from the source let tryUnconsAsync (source: SeqAsync<'a>) : Task<('a * SeqAsync<'a>) option> = task { let itor = source.GetAsyncEnumerator () match! itor.MoveNextAsync() with | false -> return None | true -> return Some (itor.Current, ofEnumerator (fun () -> itor)) } /// Mirrors `collect` function from `Seq` and `List` modules. let rec collect (mapping: 'a -> SeqAsync<'b>) (source: SeqAsync<'a>) = seqAsync { match! tryUnconsAsync source with | None -> () | Some (x, xs) -> yield! mapping x ; yield! collect mapping xs } type SeqAsyncBuilder with /// Add iteration over `SeqAsync` with `for` in the computation expression. member _.For (sequence: SeqAsync<'a>, body) : SeqAsync<'b> = collect body sequence /// Take elements from multiple sources, re-emitting them as they come. let merge (sources: SeqAsync<'a> list) = delay (fun () -> let rec loop (awaitedCons: Task<('a * SeqAsync<'a>) option> list) = seqAsync { if not awaitedCons.IsEmpty then let! arrived = Task.WhenAny (awaitedCons) match! arrived with | None -> yield! awaitedCons |> List.filter ((<>) arrived) |> loop | Some (head, tail) -> yield head yield! (tryUnconsAsync tail) :: awaitedCons |> List.filter ((<>) arrived) |> loop } sources |> List.map tryUnconsAsync |> loop ) let iter (body: 'a -> unit) (source: SeqAsync<'a>) = let itor = source.GetAsyncEnumerator () while itor.MoveNextAsync().Result do body itor.Current // [/snippet] // [snippet:Demonstration] let seqAsync = SeqAsync.seqAsync let fizzBuzz (msDelay: int) n = let stream () = seqAsync { for i in 1..n do yield i do! Task.Delay msDelay } [ seqAsync { for i in stream () do if i % 3 <> 0 && i % 5 <> 0 then yield string i } seqAsync { for i in stream () do if i % 3 = 0 && i % 5 <> 0 then yield "Fizz" } seqAsync { for i in stream () do if i % 3 <> 0 && i % 5 = 0 then yield "Buzz" } seqAsync { for i in stream () do if i % 3 = 0 && i % 5 = 0 then yield "FizzBuzz" } ] |> SeqAsync.merge let demo = fizzBuzz 200 15 demo |> SeqAsync.iter (printfn "%s") // [/snippet]
Tags
computation expression
iasyncenumerable
computation expression
iasyncenumerable
Author
Link
Reference NuGet packages
If your snippet has external dependencies, enter the names of NuGet packages to reference, separated by a comma (
#r
directives are not required).
Update