3 people like it.
Like the snippet!
Fork an AsyncSeq into n async sequences where the consumption rate is throttled
Replicate an AsyncSeq n-ways. The consumption rate of the resulting consumers are kept in check against each other so that a single consumer does not race too far ahead
1:
2:
3:
4:
5:
6:
7:
8:
9:
10:
11:
12:
13:
14:
15:
16:
17:
18:
19:
20:
21:
22:
23:
24:
25:
26:
27:
28:
29:
30:
31:
32:
33:
34:
35:
36:
37:
38:
39:
40:
41:
42:
43:
44:
45:
46:
47:
48:
49:
|
#r "nuget: FSharp.Control.AsyncSeq, Version=2.0.24"
open System
open FSharp.Control
module AsyncSeq =
let forkThrottled<'t> n (capacity:int) (xs:AsyncSeq<'t>) =
let pcs = [for _ in 1 .. n -> new System.Collections.Concurrent.BlockingCollection<'t>(capacity)]
let aseqs =
pcs
|> List.map(fun buf ->
asyncSeq {
for x in buf.GetConsumingEnumerable() do
yield x
})
let runner =
async {
do!
xs
|> AsyncSeq.iter (fun x->
for p in pcs do
p.Add(x))
do pcs |> List.iter (fun p -> p.CompleteAdding())
}
Async.Start runner
aseqs
(*
let inputSeq = [for i in 1 .. 20 -> i] |> AsyncSeq.ofSeq
let forkedSeqs = AsyncSeq.forkThrottled 2 10 inputSeq
let consumer1 = forkedSeqs.[0]
let consumer2 = forkedSeqs.[1]
//start consumer1 first it will run for a while and then stop
//as consumer2 is not running
consumer1
|> AsyncSeq.iterAsync (fun i ->
async{
do! Async.Sleep 10;
do printfn $"C1 {i}"
})
|> Async.Start
//start consumer2; rate of consumer 1 will be gated by the slower rate of consumer 2
consumer2
|> AsyncSeq.iterAsync (fun i ->
async{
do! Async.Sleep 100;
do printfn $"C2 {i}"
})
|> Async.Start
*)
|
namespace System
namespace Microsoft.FSharp
namespace Microsoft.FSharp.Control
module AsyncSeq
from Script
val forkThrottled<'t> : n:int -> capacity:int -> xs:obj -> obj list
val n : int
val capacity : int
Multiple items
val int : value:'T -> int (requires member op_Explicit)
--------------------
type int = int32
--------------------
type int<'Measure> = int
val xs : obj
val pcs : Collections.Concurrent.BlockingCollection<'t> list
namespace System.Collections
namespace System.Collections.Concurrent
Multiple items
type BlockingCollection<'T> =
new : unit -> BlockingCollection<'T> + 3 overloads
member Add : item:'T -> unit + 1 overload
member BoundedCapacity : int
member CompleteAdding : unit -> unit
member CopyTo : array:'T[] * index:int -> unit
member Count : int
member Dispose : unit -> unit
member GetConsumingEnumerable : unit -> IEnumerable<'T> + 1 overload
member IsAddingCompleted : bool
member IsCompleted : bool
...
--------------------
Collections.Concurrent.BlockingCollection() : Collections.Concurrent.BlockingCollection<'T>
Collections.Concurrent.BlockingCollection(boundedCapacity: int) : Collections.Concurrent.BlockingCollection<'T>
Collections.Concurrent.BlockingCollection(collection: Collections.Concurrent.IProducerConsumerCollection<'T>) : Collections.Concurrent.BlockingCollection<'T>
Collections.Concurrent.BlockingCollection(collection: Collections.Concurrent.IProducerConsumerCollection<'T>, boundedCapacity: int) : Collections.Concurrent.BlockingCollection<'T>
val aseqs : obj list
Multiple items
module List
from Microsoft.FSharp.Collections
--------------------
type List<'T> =
| ( [] )
| ( :: ) of Head: 'T * Tail: 'T list
interface IReadOnlyList<'T>
interface IReadOnlyCollection<'T>
interface IEnumerable
interface IEnumerable<'T>
member GetReverseIndex : rank:int * offset:int -> int
member GetSlice : startIndex:int option * endIndex:int option -> 'T list
member Head : 'T
member IsEmpty : bool
member Item : index:int -> 'T with get
member Length : int
...
val map : mapping:('T -> 'U) -> list:'T list -> 'U list
val buf : Collections.Concurrent.BlockingCollection<'t>
val runner : Async<unit>
val async : AsyncBuilder
val iter : action:('T -> unit) -> list:'T list -> unit
val p : Collections.Concurrent.BlockingCollection<'t>
Multiple items
type Async =
static member AsBeginEnd : computation:('Arg -> Async<'T>) -> ('Arg * AsyncCallback * obj -> IAsyncResult) * (IAsyncResult -> 'T) * (IAsyncResult -> unit)
static member AwaitEvent : event:IEvent<'Del,'T> * ?cancelAction:(unit -> unit) -> Async<'T> (requires delegate and 'Del :> Delegate)
static member AwaitIAsyncResult : iar:IAsyncResult * ?millisecondsTimeout:int -> Async<bool>
static member AwaitTask : task:Task -> Async<unit>
static member AwaitTask : task:Task<'T> -> Async<'T>
static member AwaitWaitHandle : waitHandle:WaitHandle * ?millisecondsTimeout:int -> Async<bool>
static member CancelDefaultToken : unit -> unit
static member Catch : computation:Async<'T> -> Async<Choice<'T,exn>>
static member Choice : computations:seq<Async<'T option>> -> Async<'T option>
static member FromBeginEnd : beginAction:(AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
...
--------------------
type Async<'T> =
static member Async.Start : computation:Async<unit> * ?cancellationToken:Threading.CancellationToken -> unit
More information