0 people like it.

An IAsyncEnumerable computation expression (complete)

(Please ignore previous entry of the same name)

First, an `IAsyncEnumerator' implementation and a few primitive functions

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
14: 
15: 
16: 
17: 
18: 
19: 
20: 
21: 
22: 
23: 
24: 
25: 
26: 
27: 
28: 
29: 
30: 
31: 
32: 
33: 
34: 
35: 
36: 
37: 
38: 
39: 
40: 
41: 
42: 
43: 
44: 
45: 
module AsyncEnumerator =
    type AsyncEnumerator<'a> (moveNextAsync: unit -> Task<bool>, getCurrent: unit -> 'a) =
        interface IAsyncEnumerator<'a> with
            member _.MoveNextAsync () = moveNextAsync () |> ValueTask<bool>
            member _.Current = getCurrent ()
        interface System.IAsyncDisposable with
            member _.DisposeAsync () = ValueTask.CompletedTask

    let empty<'a> =
        let mutable started = false
        let moveNext () = started <- true ; Task.FromResult false
        let current () : 'a = (if not started then "Sequence not started" else "Sequence exhausted") |> invalidOp
        AsyncEnumerator (moveNext, current)

    let singleton x =
        let mutable started = false
        let moveNext () = try Task.FromResult (not started) finally started <- true
        AsyncEnumerator (moveNext, fun () -> x)

    let append (itor1: IAsyncEnumerator<'a>) (itor2: IAsyncEnumerator<'a>) : IAsyncEnumerator<'a> =
        let mutable itering1 = true
        AsyncEnumerator (
            moveNextAsync = (fun () -> 
                task {  
                    let! next = itor1.MoveNextAsync ()
                    if next then return true 
                    else itering1 <- false ; return! itor2.MoveNextAsync ()
                }
            ), 
            getCurrent = fun () -> if itering1 then itor1.Current else itor2.Current
        )

    /// Bind a `Task` to an `IAsyncEnumerator`. The task is started and awaited only once, at first enumeration.
    /// Its result is applied by the `getEnumerator`argument.
    let bind (getDelay: unit -> Task<'a>) (getEnumerator:  'a -> IAsyncEnumerator<'b>) =
        let mutable itor = null
        AsyncEnumerator (
            moveNextAsync = (fun () -> 
                task {  
                    if itor = null then let! x = getDelay () in itor <- getEnumerator x
                    return! itor.MoveNextAsync ()
                }
            ), 
            getCurrent = fun () -> itor.Current
        )

Definition of the computation expression

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
14: 
15: 
16: 
17: 
18: 
19: 
20: 
21: 
22: 
23: 
24: 
25: 
26: 
27: 
28: 
29: 
30: 
31: 
32: 
33: 
34: 
35: 
36: 
37: 
38: 
39: 
40: 
module SeqAsync =
    /// Build an `SeqAsync` from a fuction returning an `IAsyncEnumerator`.
    let ofEnumerator (getAsyncEnumerator: unit -> IAsyncEnumerator<'a>) : SeqAsync<'a> =
        { new IAsyncEnumerable<'a> with member _.GetAsyncEnumerator _ = getAsyncEnumerator () }

    // The rest is usual boiler plate.

    let bind (expr: unit -> Task<'a>) (func:  'a -> SeqAsync<'b>) : SeqAsync<'b> =
        ofEnumerator (fun () -> AsyncEnumerator.bind expr (fun x -> (func x).GetAsyncEnumerator ()))

    let delay (delayed: unit -> SeqAsync<'a>) = bind Task.FromResult delayed

    let empty<'a> = ofEnumerator (fun () -> AsyncEnumerator.empty<'a>) 
 
    let singleton x = ofEnumerator (fun () -> AsyncEnumerator.singleton x)

    let append (source1: SeqAsync<'a>) (source2: SeqAsync<'a>) =
        ofEnumerator (fun () -> AsyncEnumerator.append <|| (source1.GetAsyncEnumerator (), source2.GetAsyncEnumerator ()))

    let rec whileLoop (condition: unit -> bool) (body: SeqAsync<'a>) =
        if condition () then 
            append body (delay (fun () -> whileLoop condition body))
        else empty

    let forLoop (sequence: seq<'a>) (body: 'a -> SeqAsync<'b>) =
        use itor = sequence.GetEnumerator()
        whileLoop itor.MoveNext (delay (fun () -> body itor.Current))

    type SeqAsyncBuilder () =
        member _.Delay tail = delay tail
        member _.Yield x = singleton x
        member _.Combine (xs, ys) = append xs ys
        member _.YieldFrom xs = xs
        member _.Bind (expr: Task<'a>, body) = body |> bind (fun () -> expr)
        member _.Bind (expr: Task, body) = body |> bind (fun () -> task { do! expr })
        member _.Zero () = empty
        member _.While (condition, body) = whileLoop condition body
        member _.For (sequence: seq<'a>, body) : SeqAsync<'b> = forLoop sequence body

    let seqAsync = SeqAsyncBuilder ()

Some utilities

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
14: 
15: 
16: 
17: 
18: 
19: 
20: 
21: 
22: 
23: 
24: 
25: 
26: 
27: 
28: 
29: 
30: 
31: 
32: 
33: 
34: 
35: 
36: 
37: 
38: 
39: 
40: 
    /// Returns a `Task` retreiving the head and tail (as an `option`) from the source
    let tryUnconsAsync (source: SeqAsync<'a>) : Task<('a * SeqAsync<'a>) option> =
        task {
            let itor = source.GetAsyncEnumerator ()
            match! itor.MoveNextAsync() with
            | false -> return None
            | true -> return Some (itor.Current, ofEnumerator (fun () -> itor))
        }

    /// Mirrors `collect` function from `Seq` and `List` modules.
    let rec collect (mapping: 'a -> SeqAsync<'b>) (source: SeqAsync<'a>) =
        seqAsync {
            match! tryUnconsAsync source with
            | None -> ()
            | Some (x, xs) -> yield! mapping x ; yield! collect mapping xs
        }
    
    type SeqAsyncBuilder with
        /// Add iteration over `SeqAsync` with `for` in the computation expression.
        member _.For (sequence: SeqAsync<'a>, body) : SeqAsync<'b> = collect body sequence

    /// Take elements from multiple sources, re-emitting them as they come.
    let merge (sources: SeqAsync<'a> list) =
        delay (fun () ->
            let rec loop (awaitedCons: Task<('a * SeqAsync<'a>) option> list) = 
                seqAsync {
                    if not awaitedCons.IsEmpty then
                        let! arrived = Task.WhenAny (awaitedCons)
                        match! arrived with
                        | None -> yield! awaitedCons |> List.filter ((<>) arrived) |> loop
                        | Some (head, tail) ->
                            yield head
                            yield! (tryUnconsAsync tail) :: awaitedCons |> List.filter ((<>) arrived) |> loop
                }
            sources |> List.map tryUnconsAsync |> loop
        )

    let iter (body: 'a -> unit) (source: SeqAsync<'a>) =
        let itor = source.GetAsyncEnumerator ()
        while itor.MoveNextAsync().Result do body itor.Current

Demonstration

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
14: 
15: 
16: 
17: 
18: 
19: 
20: 
let seqAsync = SeqAsync.seqAsync

let fizzBuzz (msDelay: int) n =
    let stream () =
        seqAsync {
            for i in 1..n do
                yield i
                do! Task.Delay msDelay
        }
    [
        seqAsync { for i in stream () do if i % 3 <> 0 && i % 5 <> 0 then yield string i }
        seqAsync { for i in stream () do if i % 3 = 0  && i % 5 <> 0 then yield "Fizz" }
        seqAsync { for i in stream () do if i % 3 <> 0 && i % 5 = 0  then yield "Buzz" }
        seqAsync { for i in stream () do if i % 3 = 0  && i % 5 = 0  then yield "FizzBuzz" }
     ] |> SeqAsync.merge

let demo = fizzBuzz 200 15

demo
|> SeqAsync.iter (printfn "%s")
Multiple items
type AsyncEnumerator<'a> =
  interface IAsyncDisposable
  interface IAsyncEnumerator<'a>
  new : moveNextAsync:(unit -> Task<bool>) * getCurrent:(unit -> 'a) -> AsyncEnumerator<'a>

--------------------
new : moveNextAsync:(unit -> Task<bool>) * getCurrent:(unit -> 'a) -> AsyncEnumerator<'a>
val moveNextAsync : (unit -> Task<bool>)
type unit = Unit
Multiple items
type Task =
  new : action:Action -> Task + 7 overloads
  member AsyncState : obj
  member ConfigureAwait : continueOnCapturedContext:bool -> ConfiguredTaskAwaitable
  member ContinueWith : continuationAction:Action<Task> -> Task + 19 overloads
  member CreationOptions : TaskCreationOptions
  member Dispose : unit -> unit
  member Exception : AggregateException
  member GetAwaiter : unit -> TaskAwaiter
  member Id : int
  member IsCanceled : bool
  ...

--------------------
type Task<'TResult> =
  inherit Task
  new : function:Func<'TResult> -> Task<'TResult> + 7 overloads
  member ConfigureAwait : continueOnCapturedContext:bool -> ConfiguredTaskAwaitable<'TResult>
  member ContinueWith : continuationAction:Action<Task<'TResult>> -> Task + 19 overloads
  member GetAwaiter : unit -> TaskAwaiter<'TResult>
  member Result : 'TResult
  static member Factory : TaskFactory<'TResult>

--------------------
Task(action: System.Action) : Task
Task(action: System.Action, cancellationToken: System.Threading.CancellationToken) : Task
Task(action: System.Action, creationOptions: TaskCreationOptions) : Task
Task(action: System.Action<obj>, state: obj) : Task
Task(action: System.Action, cancellationToken: System.Threading.CancellationToken, creationOptions: TaskCreationOptions) : Task
Task(action: System.Action<obj>, state: obj, cancellationToken: System.Threading.CancellationToken) : Task
Task(action: System.Action<obj>, state: obj, creationOptions: TaskCreationOptions) : Task
Task(action: System.Action<obj>, state: obj, cancellationToken: System.Threading.CancellationToken, creationOptions: TaskCreationOptions) : Task

--------------------
Task(function: System.Func<'TResult>) : Task<'TResult>
Task(function: System.Func<'TResult>, cancellationToken: System.Threading.CancellationToken) : Task<'TResult>
Task(function: System.Func<'TResult>, creationOptions: TaskCreationOptions) : Task<'TResult>
Task(function: System.Func<obj,'TResult>, state: obj) : Task<'TResult>
Task(function: System.Func<'TResult>, cancellationToken: System.Threading.CancellationToken, creationOptions: TaskCreationOptions) : Task<'TResult>
Task(function: System.Func<obj,'TResult>, state: obj, cancellationToken: System.Threading.CancellationToken) : Task<'TResult>
Task(function: System.Func<obj,'TResult>, state: obj, creationOptions: TaskCreationOptions) : Task<'TResult>
Task(function: System.Func<obj,'TResult>, state: obj, cancellationToken: System.Threading.CancellationToken, creationOptions: TaskCreationOptions) : Task<'TResult>
type bool = System.Boolean
val getCurrent : (unit -> 'a)
type IAsyncEnumerator<'T> =
  inherit IAsyncDisposable
  member Current : 'T
  member MoveNextAsync : unit -> ValueTask<bool>
Multiple items
type ValueTask =
  struct
    new : task:Task -> ValueTask + 1 overload
    member AsTask : unit -> Task
    member ConfigureAwait : continueOnCapturedContext:bool -> ConfiguredValueTaskAwaitable
    member Equals : obj:obj -> bool + 1 overload
    member GetAwaiter : unit -> ValueTaskAwaiter
    member GetHashCode : unit -> int
    member IsCanceled : bool
    member IsCompleted : bool
    member IsCompletedSuccessfully : bool
    member IsFaulted : bool
    ...
  end

--------------------
type ValueTask<'TResult> =
  struct
    new : result:'TResult -> ValueTask<'TResult> + 2 overloads
    member AsTask : unit -> Task<'TResult>
    member ConfigureAwait : continueOnCapturedContext:bool -> ConfiguredValueTaskAwaitable<'TResult>
    member Equals : obj:obj -> bool + 1 overload
    member GetAwaiter : unit -> ValueTaskAwaiter<'TResult>
    member GetHashCode : unit -> int
    member IsCanceled : bool
    member IsCompleted : bool
    member IsCompletedSuccessfully : bool
    member IsFaulted : bool
    ...
  end

--------------------
ValueTask ()
ValueTask(task: Task) : ValueTask
ValueTask(source: Sources.IValueTaskSource, token: int16) : ValueTask

--------------------
ValueTask ()
ValueTask(result: 'TResult) : ValueTask<'TResult>
ValueTask(task: Task<'TResult>) : ValueTask<'TResult>
ValueTask(source: Sources.IValueTaskSource<'TResult>, token: int16) : ValueTask<'TResult>
namespace System
type IAsyncDisposable =
  member DisposeAsync : unit -> ValueTask
val empty<'a> : AsyncEnumerator<'a>
val mutable started : bool
val moveNext : (unit -> Task<bool>)
Task.FromResult<'TResult>(result: 'TResult) : Task<'TResult>
val current : (unit -> 'a)
val not : value:bool -> bool
val invalidOp : message:string -> 'T
val singleton : x:'a -> AsyncEnumerator<'a>
val x : 'a
val append : itor1:IAsyncEnumerator<'a> -> itor2:IAsyncEnumerator<'a> -> IAsyncEnumerator<'a>
val itor1 : IAsyncEnumerator<'a>
val itor2 : IAsyncEnumerator<'a>
val mutable itering1 : bool
IAsyncEnumerator.MoveNextAsync() : ValueTask<bool>
property IAsyncEnumerator.Current: 'a with get
val bind : getDelay:(unit -> Task<'a>) -> getEnumerator:('a -> IAsyncEnumerator<'b>) -> AsyncEnumerator<'a0>


 Bind a `Task` to an `IAsyncEnumerator`. The task is started and awaited only once, at first enumeration.
 Its result is applied by the `getEnumerator`argument.
val getDelay : (unit -> Task<'a>)
val getEnumerator : ('a -> IAsyncEnumerator<'b>)
val mutable itor : obj
type SeqAsync<'a> = IAsyncEnumerable<'a>
val ofEnumerator : getAsyncEnumerator:(unit -> IAsyncEnumerator<'a>) -> SeqAsync<'a>


 Build an `SeqAsync` from a fuction returning an `IAsyncEnumerator`.
val getAsyncEnumerator : (unit -> IAsyncEnumerator<'a>)
type IAsyncEnumerable<'T> =
  member GetAsyncEnumerator : ?cancellationToken:CancellationToken -> IAsyncEnumerator<'T>
val bind : expr:(unit -> Task<'a>) -> func:('a -> SeqAsync<'b>) -> SeqAsync<'b>
val expr : (unit -> Task<'a>)
val func : ('a -> SeqAsync<'b>)
module AsyncEnumerator

from Script
val bind : getDelay:(unit -> Task<'a>) -> getEnumerator:('a -> IAsyncEnumerator<'b>) -> AsyncEnumerator.AsyncEnumerator<'a0>


 Bind a `Task` to an `IAsyncEnumerator`. The task is started and awaited only once, at first enumeration.
 Its result is applied by the `getEnumerator`argument.
val delay : delayed:(unit -> SeqAsync<'a>) -> SeqAsync<'a>
val delayed : (unit -> SeqAsync<'a>)
val empty<'a> : SeqAsync<int>
val empty<'a> : AsyncEnumerator.AsyncEnumerator<'a>
val singleton : x:'a -> SeqAsync<'b>
val singleton : x:'a -> AsyncEnumerator.AsyncEnumerator<'a>
val append : source1:SeqAsync<'a> -> source2:SeqAsync<'a> -> SeqAsync<'a>
val source1 : SeqAsync<'a>
val source2 : SeqAsync<'a>
IAsyncEnumerable.GetAsyncEnumerator(?cancellationToken: System.Threading.CancellationToken) : IAsyncEnumerator<'a>
val whileLoop : condition:(unit -> bool) -> body:SeqAsync<int> -> SeqAsync<int>
val condition : (unit -> bool)
val body : SeqAsync<int>
val forLoop : sequence:seq<'a> -> body:('a -> SeqAsync<int>) -> SeqAsync<int>
val sequence : seq<'a>
Multiple items
val seq : sequence:seq<'T> -> seq<'T>

--------------------
type seq<'T> = IEnumerable<'T>
val body : ('a -> SeqAsync<int>)
val itor : IEnumerator<'a>
IEnumerable.GetEnumerator() : IEnumerator<'a>
System.Collections.IEnumerator.MoveNext() : bool
property IEnumerator.Current: 'a with get
Multiple items
type SeqAsyncBuilder =
  new : unit -> SeqAsyncBuilder
  member Bind : expr:Task<'a> * body:('a -> SeqAsync<'c>) -> SeqAsync<'c>
  member Bind : expr:Task * body:('a -> SeqAsync<'b>) -> SeqAsync<'b>
  member Combine : xs:SeqAsync<'e> * ys:SeqAsync<'e> -> SeqAsync<'e>
  member Delay : tail:(unit -> SeqAsync<'h>) -> SeqAsync<'h>
  member For : sequence:seq<'a> * body:('a -> SeqAsync<int>) -> SeqAsync<int>
  member For : sequence:SeqAsync<'a> * body:('a -> SeqAsync<int>) -> SeqAsync<int>
  member While : condition:(unit -> bool) * body:SeqAsync<int> -> SeqAsync<int>
  member Yield : x:'f -> SeqAsync<'g>
  member YieldFrom : xs:'d -> 'd
  ...

--------------------
new : unit -> SeqAsyncBuilder
val tail : (unit -> SeqAsync<'h>)
val x : 'f
val xs : SeqAsync<'e>
val ys : SeqAsync<'e>
val xs : 'd
val expr : Task<'a>
val body : ('a -> SeqAsync<'c>)
val expr : Task
val body : ('a -> SeqAsync<'b>)
val seqAsync : SeqAsyncBuilder
val tryUnconsAsync : source:SeqAsync<'a> -> Task<('a * SeqAsync<'a>) option>


 Returns a `Task` retreiving the head and tail (as an `option`) from the source
val source : SeqAsync<'a>
type 'T option = Option<'T>
union case Option.None: Option<'T>
union case Option.Some: Value: 'T -> Option<'T>
val collect : mapping:('a -> SeqAsync<int>) -> source:SeqAsync<'a> -> SeqAsync<int>


 Mirrors `collect` function from `Seq` and `List` modules.
val mapping : ('a -> SeqAsync<int>)
val xs : SeqAsync<'a>
val sequence : SeqAsync<'a>
val merge : sources:SeqAsync<'a> list -> SeqAsync<int>


 Take elements from multiple sources, re-emitting them as they come.
val sources : SeqAsync<'a> list
type 'T list = List<'T>
val loop : (Task<('a * SeqAsync<'a>) option> list -> SeqAsync<int>)
val awaitedCons : Task<('a * SeqAsync<'a>) option> list
property List.IsEmpty: bool with get
val arrived : Task<('a * SeqAsync<'a>) option>
Task.WhenAny<'TResult>(tasks: IEnumerable<Task<'TResult>>) : Task<Task<'TResult>>
Task.WhenAny<'TResult>([<System.ParamArray>] tasks: Task<'TResult> []) : Task<Task<'TResult>>
Task.WhenAny(tasks: IEnumerable<Task>) : Task<Task>
Task.WhenAny([<System.ParamArray>] tasks: Task []) : Task<Task>
Multiple items
type List<'T> =
  new : unit -> List<'T> + 2 overloads
  member Add : item:'T -> unit
  member AddRange : collection:IEnumerable<'T> -> unit
  member AsReadOnly : unit -> ReadOnlyCollection<'T>
  member BinarySearch : item:'T -> int + 2 overloads
  member Capacity : int with get, set
  member Clear : unit -> unit
  member Contains : item:'T -> bool
  member ConvertAll<'TOutput> : converter:Converter<'T, 'TOutput> -> List<'TOutput>
  member CopyTo : array:'T[] -> unit + 2 overloads
  ...
  nested type Enumerator

--------------------
List() : List<'T>
List(capacity: int) : List<'T>
List(collection: IEnumerable<'T>) : List<'T>
val filter : predicate:('T -> bool) -> list:'T list -> 'T list
val head : 'a
val tail : SeqAsync<'a>
val map : mapping:('T -> 'U) -> list:'T list -> 'U list
val iter : body:('a -> unit) -> source:SeqAsync<'a> -> unit
val body : ('a -> unit)
val itor : IAsyncEnumerator<'a>
Multiple items
module Result

from Microsoft.FSharp.Core

--------------------
[<Struct>]
type Result<'T,'TError> =
  | Ok of ResultValue: 'T
  | Error of ErrorValue: 'TError
val seqAsync : SeqAsync.SeqAsyncBuilder
Multiple items
module SeqAsync

from Script

--------------------
type SeqAsync<'a> = IAsyncEnumerable<'a>
val fizzBuzz : msDelay:int -> n:int -> SeqAsync<int>
val msDelay : int
Multiple items
val int : value:'T -> int (requires member op_Explicit)

--------------------
type int = int32

--------------------
type int<'Measure> = int
val n : int
val stream : (unit -> SeqAsync<int>)
val i : int
Task.Delay(millisecondsDelay: int) : Task
Task.Delay(delay: System.TimeSpan) : Task
Task.Delay(millisecondsDelay: int, cancellationToken: System.Threading.CancellationToken) : Task
Task.Delay(delay: System.TimeSpan, cancellationToken: System.Threading.CancellationToken) : Task
Multiple items
val string : value:'T -> string

--------------------
type string = System.String
val demo : SeqAsync<int>
val printfn : format:Printf.TextWriterFormat<'T> -> 'T
Next Version Raw view Test code New version

More information

Link:http://fssnip.net/869
Posted:2 years ago
Author:Julien Di Lenarda
Tags: computation expression , iasyncenumerable