#r "nuget: FSharp.Control.AsyncSeq, Version=2.0.24" open System open FSharp.Control module AsyncSeq = let forkThrottled<'t> n (capacity:int) (xs:AsyncSeq<'t>) = let pcs = [for _ in 1 .. n -> new System.Collections.Concurrent.BlockingCollection<'t>(capacity)] let aseqs = pcs |> List.map(fun buf -> asyncSeq { for x in buf.GetConsumingEnumerable() do yield x }) let runner = async { do! xs |> AsyncSeq.iter (fun x-> for p in pcs do p.Add(x)) do pcs |> List.iter (fun p -> p.CompleteAdding()) } Async.Start runner aseqs (* let inputSeq = [for i in 1 .. 20 -> i] |> AsyncSeq.ofSeq let forkedSeqs = AsyncSeq.forkThrottled 2 10 inputSeq let consumer1 = forkedSeqs.[0] let consumer2 = forkedSeqs.[1] //start consumer1 first it will run for a while and then stop //as consumer2 is not running consumer1 |> AsyncSeq.iterAsync (fun i -> async{ do! Async.Sleep 10; do printfn $"C1 {i}" }) |> Async.Start //start consumer2; rate of consumer 1 will be gated by the slower rate of consumer 2 consumer2 |> AsyncSeq.iterAsync (fun i -> async{ do! Async.Sleep 100; do printfn $"C2 {i}" }) |> Async.Start *)