3 people like it.

Fork an AsyncSeq into n async sequences where the consumption rate is throttled

Replicate an AsyncSeq n-ways. The consumption rate of the resulting consumers are kept in check against each other so that a single consumer does not race too far ahead

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
14: 
15: 
16: 
17: 
18: 
19: 
20: 
21: 
22: 
23: 
24: 
25: 
26: 
27: 
28: 
29: 
30: 
31: 
32: 
33: 
34: 
35: 
36: 
37: 
38: 
39: 
40: 
41: 
42: 
43: 
44: 
45: 
46: 
47: 
48: 
49: 
#r "nuget: FSharp.Control.AsyncSeq, Version=2.0.24"
open System
open FSharp.Control
module AsyncSeq =
    let forkThrottled<'t> n (capacity:int) (xs:AsyncSeq<'t>) =
        let pcs = [for _ in 1 .. n -> new System.Collections.Concurrent.BlockingCollection<'t>(capacity)]
        let aseqs = 
            pcs 
            |> List.map(fun buf -> 
                asyncSeq {
                for x in buf.GetConsumingEnumerable() do
                    yield x
                })
        let runner = 
            async {
                do!
                    xs
                    |> AsyncSeq.iter (fun x->
                        for p in pcs do
                            p.Add(x))
                do pcs |> List.iter (fun p -> p.CompleteAdding())
            }
        Async.Start runner
        aseqs
(*
let inputSeq = [for i in 1 .. 20 -> i] |> AsyncSeq.ofSeq
let forkedSeqs = AsyncSeq.forkThrottled 2 10 inputSeq
let consumer1 = forkedSeqs.[0]
let consumer2 = forkedSeqs.[1]

//start consumer1 first it will run for a while and then stop
//as consumer2 is not running
consumer1 
|> AsyncSeq.iterAsync (fun i -> 
    async{
        do! Async.Sleep 10; 
        do printfn $"C1 {i}"
    })
|> Async.Start

//start consumer2; rate of consumer 1 will be gated by the slower rate of consumer 2
consumer2 
|> AsyncSeq.iterAsync (fun i -> 
    async{
        do! Async.Sleep 100; 
        do printfn $"C2 {i}"
    })
|> Async.Start
*)
namespace System
namespace Microsoft.FSharp
namespace Microsoft.FSharp.Control
module AsyncSeq

from Script
val forkThrottled<'t> : n:int -> capacity:int -> xs:obj -> obj list
val n : int
val capacity : int
Multiple items
val int : value:'T -> int (requires member op_Explicit)

--------------------
type int = int32

--------------------
type int<'Measure> = int
val xs : obj
val pcs : Collections.Concurrent.BlockingCollection<'t> list
namespace System.Collections
namespace System.Collections.Concurrent
Multiple items
type BlockingCollection<'T> =
  new : unit -> BlockingCollection<'T> + 3 overloads
  member Add : item:'T -> unit + 1 overload
  member BoundedCapacity : int
  member CompleteAdding : unit -> unit
  member CopyTo : array:'T[] * index:int -> unit
  member Count : int
  member Dispose : unit -> unit
  member GetConsumingEnumerable : unit -> IEnumerable<'T> + 1 overload
  member IsAddingCompleted : bool
  member IsCompleted : bool
  ...

--------------------
Collections.Concurrent.BlockingCollection() : Collections.Concurrent.BlockingCollection<'T>
Collections.Concurrent.BlockingCollection(boundedCapacity: int) : Collections.Concurrent.BlockingCollection<'T>
Collections.Concurrent.BlockingCollection(collection: Collections.Concurrent.IProducerConsumerCollection<'T>) : Collections.Concurrent.BlockingCollection<'T>
Collections.Concurrent.BlockingCollection(collection: Collections.Concurrent.IProducerConsumerCollection<'T>, boundedCapacity: int) : Collections.Concurrent.BlockingCollection<'T>
val aseqs : obj list
Multiple items
module List

from Microsoft.FSharp.Collections

--------------------
type List<'T> =
  | ( [] )
  | ( :: ) of Head: 'T * Tail: 'T list
    interface IReadOnlyList<'T>
    interface IReadOnlyCollection<'T>
    interface IEnumerable
    interface IEnumerable<'T>
    member GetReverseIndex : rank:int * offset:int -> int
    member GetSlice : startIndex:int option * endIndex:int option -> 'T list
    member Head : 'T
    member IsEmpty : bool
    member Item : index:int -> 'T with get
    member Length : int
    ...
val map : mapping:('T -> 'U) -> list:'T list -> 'U list
val buf : Collections.Concurrent.BlockingCollection<'t>
val runner : Async<unit>
val async : AsyncBuilder
val iter : action:('T -> unit) -> list:'T list -> unit
val p : Collections.Concurrent.BlockingCollection<'t>
Multiple items
type Async =
  static member AsBeginEnd : computation:('Arg -> Async<'T>) -> ('Arg * AsyncCallback * obj -> IAsyncResult) * (IAsyncResult -> 'T) * (IAsyncResult -> unit)
  static member AwaitEvent : event:IEvent<'Del,'T> * ?cancelAction:(unit -> unit) -> Async<'T> (requires delegate and 'Del :> Delegate)
  static member AwaitIAsyncResult : iar:IAsyncResult * ?millisecondsTimeout:int -> Async<bool>
  static member AwaitTask : task:Task -> Async<unit>
  static member AwaitTask : task:Task<'T> -> Async<'T>
  static member AwaitWaitHandle : waitHandle:WaitHandle * ?millisecondsTimeout:int -> Async<bool>
  static member CancelDefaultToken : unit -> unit
  static member Catch : computation:Async<'T> -> Async<Choice<'T,exn>>
  static member Choice : computations:seq<Async<'T option>> -> Async<'T option>
  static member FromBeginEnd : beginAction:(AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
  ...

--------------------
type Async<'T> =
static member Async.Start : computation:Async<unit> * ?cancellationToken:Threading.CancellationToken -> unit

More information

Link:http://fssnip.net/80N
Posted:4 years ago
Author:Faisal Waris
Tags: #asyncseq