5 people like it.
Like the snippet!
Async channel and parallel async groups
Extension for Async module: channel for transmitting data between subsystems, launching parallel groups of asyncs sequentially.
1:
2:
3:
4:
5:
6:
7:
8:
9:
10:
11:
12:
13:
14:
15:
16:
17:
18:
19:
20:
21:
22:
23:
24:
25:
26:
27:
28:
29:
30:
31:
32:
33:
34:
35:
36:
37:
38:
39:
40:
41:
42:
43:
44:
45:
46:
47:
48:
49:
50:
51:
52:
53:
54:
55:
56:
57:
58:
59:
60:
61:
62:
63:
64:
65:
66:
67:
68:
69:
70:
71:
72:
73:
74:
75:
76:
77:
78:
79:
80:
81:
82:
83:
84:
85:
86:
87:
88:
89:
90:
91:
92:
93:
94:
95:
96:
97:
98:
99:
100:
101:
102:
103:
104:
105:
106:
107:
108:
109:
110:
111:
112:
|
//Some usefull Async module extensions:
// 1. CreateAsyncChannel - return channel for transmitting data between
// subsystems. Result is two functions - data sender and data receiver.
// 2. ParallelGrouped - async for launching sequence of asyncs in groups
// by specified count.
// 3. ParallelGroupedWithResults - async for launching sequence of asyncs
// in groups by specified count and return results as array.
// 2. can be usefull instead of (Async.Parallel >> Async.Ignore >> Async.Start)
// for preventing threadpool starvation (especially on system startup)
module Async =
type private msg<'a> =
| Result of 'a
| IntermediateResult of 'a
| GetResult of AsyncReplyChannel<'a*bool>
let CreateAsyncChannel<'a> () =
let inbox = new MailboxProcessor<_>( fun inbox ->
let rec waitResponse (repl:AsyncReplyChannel<'a*bool>) =
inbox.Scan <| function
| GetResult repl ->
Some <| waitResponse repl
| IntermediateResult res ->
repl.Reply (res, false)
Some <| waitRepl ()
| Result res ->
repl.Reply (res, true)
Some <| async.Return ()
and waitRepl () =
inbox.Scan <| function
| GetResult repl -> Some <| waitResponse repl
| _ -> None
waitRepl ()
)
inbox.Start()
let resultWaiter timeout =
inbox.PostAndTryAsyncReply ((fun replChannel -> GetResult replChannel), timeout)
let postResult closeChannel =
if closeChannel then Result else IntermediateResult
>> inbox.Post
(resultWaiter, postResult)
let ParallelGrouped (delay:int) (n:int) (asyncs: seq<Async<_>>) = async {
let delay = max 100 delay
let asyncAcc, lastGroup =
asyncs |> Seq.fold(fun (asyncAcc, group) op ->
let group = op::group
if group.Length >= n then
async {
do! group |> Async.Parallel |> Async.Ignore
do! Async.Sleep delay
do! asyncAcc
}, []
else asyncAcc, group) (async.Return (), [])
do! asyncAcc
do! lastGroup |> Async.Parallel |> Async.Ignore
}
let ParallelGroupedWithResults (delay:int) (n:int) (asyncs: seq<Async<_>>) = async {
let delay = max 100 delay
let asyncAcc, lastGroup =
asyncs |> Seq.fold(fun (asyncAcc, group) op ->
let group = op::group
if group.Length >= n then
async {
let! res1 = group |> Async.Parallel
do! Async.Sleep delay
let! res2 = asyncAcc
return [|yield! res2; yield! res1|]
}, []
else asyncAcc, group) (async.Return Array.empty, [])
let! res1 = asyncAcc
let! res2 = lastGroup |> Async.Parallel
return [|yield! res1; yield! res2|]
}
//usage
//dataReciever and dataSender could be passed to different independed parts of program to make channel
let (dataReciever, dataSender) = Async.CreateAsyncChannel<int>()
let timeout = 5000
let rec printer() = async {
let! resultOpt = dataReciever timeout
return!
match resultOpt with
| None -> //timeout
printfn "So much free time - maybe other side has some problems"
printer() //but continue
| Some (result, false) -> //intermediate result - channel still open
printfn "Retrieved message: %A" result
printer()
| Some (result, _) -> //channel is closed by this message
printfn "Last message: %A" result
async.Return ()
}
let cancelation = new System.Threading.CancellationTokenSource()
Async.Start(printer(), cancelation.Token)
async {
dataSender false 1
do! Async.Sleep 1000
dataSender false 2
do! Async.Sleep 6000
dataSender false 3
do! Async.Sleep 2000
dataSender true 4
} |> Async.Start
cancelation.Cancel()
//example of ParallelGrouped
seq {
for i in 1..100 do yield async { printfn "%d" i }
} |> Async.ParallelGrouped 10000 10 |> Async.Start
|
Multiple items
type Async
static member AsBeginEnd : computation:('Arg -> Async<'T>) -> ('Arg * AsyncCallback * obj -> IAsyncResult) * (IAsyncResult -> 'T) * (IAsyncResult -> unit)
static member AwaitEvent : event:IEvent<'Del,'T> * ?cancelAction:(unit -> unit) -> Async<'T> (requires delegate and 'Del :> Delegate)
static member AwaitIAsyncResult : iar:IAsyncResult * ?millisecondsTimeout:int -> Async<bool>
static member AwaitTask : task:Task<'T> -> Async<'T>
static member AwaitWaitHandle : waitHandle:WaitHandle * ?millisecondsTimeout:int -> Async<bool>
static member CancelDefaultToken : unit -> unit
static member Catch : computation:Async<'T> -> Async<Choice<'T,exn>>
static member FromBeginEnd : beginAction:(AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg:'Arg1 * beginAction:('Arg1 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg1:'Arg1 * arg2:'Arg2 * beginAction:('Arg1 * 'Arg2 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg1:'Arg1 * arg2:'Arg2 * arg3:'Arg3 * beginAction:('Arg1 * 'Arg2 * 'Arg3 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromContinuations : callback:(('T -> unit) * (exn -> unit) * (OperationCanceledException -> unit) -> unit) -> Async<'T>
static member Ignore : computation:Async<'T> -> Async<unit>
static member OnCancel : interruption:(unit -> unit) -> Async<IDisposable>
static member Parallel : computations:seq<Async<'T>> -> Async<'T []>
static member RunSynchronously : computation:Async<'T> * ?timeout:int * ?cancellationToken:CancellationToken -> 'T
static member Sleep : millisecondsDueTime:int -> Async<unit>
static member Start : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
static member StartAsTask : computation:Async<'T> * ?taskCreationOptions:TaskCreationOptions * ?cancellationToken:CancellationToken -> Task<'T>
static member StartChild : computation:Async<'T> * ?millisecondsTimeout:int -> Async<Async<'T>>
static member StartChildAsTask : computation:Async<'T> * ?taskCreationOptions:TaskCreationOptions -> Async<Task<'T>>
static member StartImmediate : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
static member StartWithContinuations : computation:Async<'T> * continuation:('T -> unit) * exceptionContinuation:(exn -> unit) * cancellationContinuation:(OperationCanceledException -> unit) * ?cancellationToken:CancellationToken -> unit
static member SwitchToContext : syncContext:SynchronizationContext -> Async<unit>
static member SwitchToNewThread : unit -> Async<unit>
static member SwitchToThreadPool : unit -> Async<unit>
static member TryCancelled : computation:Async<'T> * compensation:(OperationCanceledException -> unit) -> Async<'T>
static member CancellationToken : Async<CancellationToken>
static member DefaultCancellationToken : CancellationToken
Full name: Microsoft.FSharp.Control.Async
--------------------
type Async<'T>
Full name: Microsoft.FSharp.Control.Async<_>
type private msg<'a> =
| Result of 'a
| IntermediateResult of 'a
| GetResult of AsyncReplyChannel<'a * bool>
Full name: Script.Async.msg<_>
union case msg.Result: 'a -> msg<'a>
union case msg.IntermediateResult: 'a -> msg<'a>
union case msg.GetResult: AsyncReplyChannel<'a * bool> -> msg<'a>
type AsyncReplyChannel<'Reply>
member Reply : value:'Reply -> unit
Full name: Microsoft.FSharp.Control.AsyncReplyChannel<_>
type bool = System.Boolean
Full name: Microsoft.FSharp.Core.bool
val CreateAsyncChannel : unit -> (int -> Async<('a * bool) option>) * (bool -> 'a -> unit)
Full name: Script.Async.CreateAsyncChannel
val inbox : MailboxProcessor<msg<'a>>
Multiple items
type MailboxProcessor<'Msg> =
interface IDisposable
new : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:CancellationToken -> MailboxProcessor<'Msg>
member Post : message:'Msg -> unit
member PostAndAsyncReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> Async<'Reply>
member PostAndReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> 'Reply
member PostAndTryAsyncReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> Async<'Reply option>
member Receive : ?timeout:int -> Async<'Msg>
member Scan : scanner:('Msg -> Async<'T> option) * ?timeout:int -> Async<'T>
member Start : unit -> unit
member TryPostAndReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> 'Reply option
...
Full name: Microsoft.FSharp.Control.MailboxProcessor<_>
--------------------
new : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:System.Threading.CancellationToken -> MailboxProcessor<'Msg>
val waitResponse : (AsyncReplyChannel<'a * bool> -> Async<unit>)
val repl : AsyncReplyChannel<'a * bool>
member MailboxProcessor.Scan : scanner:('Msg -> Async<'T> option) * ?timeout:int -> Async<'T>
union case Option.Some: Value: 'T -> Option<'T>
val res : 'a
member AsyncReplyChannel.Reply : value:'Reply -> unit
val waitRepl : (unit -> Async<unit>)
val async : AsyncBuilder
Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.async
member AsyncBuilder.Return : value:'T -> Async<'T>
union case Option.None: Option<'T>
member MailboxProcessor.Start : unit -> unit
val resultWaiter : (int -> Async<('a * bool) option>)
val timeout : int
member MailboxProcessor.PostAndTryAsyncReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> Async<'Reply option>
val replChannel : AsyncReplyChannel<'a * bool>
val postResult : (bool -> 'a -> unit)
val closeChannel : bool
member MailboxProcessor.Post : message:'Msg -> unit
val ParallelGrouped : delay:int -> n:int -> asyncs:seq<Async<'a>> -> Async<unit>
Full name: Script.Async.ParallelGrouped
val delay : int
Multiple items
val int : value:'T -> int (requires member op_Explicit)
Full name: Microsoft.FSharp.Core.Operators.int
--------------------
type int = int32
Full name: Microsoft.FSharp.Core.int
--------------------
type int<'Measure> = int
Full name: Microsoft.FSharp.Core.int<_>
val n : int
val asyncs : seq<Async<'a>>
Multiple items
val seq : sequence:seq<'T> -> seq<'T>
Full name: Microsoft.FSharp.Core.Operators.seq
--------------------
type seq<'T> = System.Collections.Generic.IEnumerable<'T>
Full name: Microsoft.FSharp.Collections.seq<_>
val max : e1:'T -> e2:'T -> 'T (requires comparison)
Full name: Microsoft.FSharp.Core.Operators.max
val asyncAcc : Async<unit>
val lastGroup : Async<'a> list
module Seq
from Microsoft.FSharp.Collections
val fold : folder:('State -> 'T -> 'State) -> state:'State -> source:seq<'T> -> 'State
Full name: Microsoft.FSharp.Collections.Seq.fold
val group : Async<'a> list
val op : Async<'a>
property List.Length: int
static member Async.Parallel : computations:seq<Async<'T>> -> Async<'T []>
static member Async.Ignore : computation:Async<'T> -> Async<unit>
static member Async.Sleep : millisecondsDueTime:int -> Async<unit>
val ParallelGroupedWithResults : delay:int -> n:int -> asyncs:seq<Async<'a>> -> Async<'a []>
Full name: Script.Async.ParallelGroupedWithResults
val asyncAcc : Async<'a []>
val res1 : 'a []
val res2 : 'a []
module Array
from Microsoft.FSharp.Collections
val empty<'T> : 'T []
Full name: Microsoft.FSharp.Collections.Array.empty
val dataReciever : (int -> Async<(int * bool) option>)
Full name: Script.dataReciever
val dataSender : (bool -> int -> unit)
Full name: Script.dataSender
Multiple items
module Async
from Script
--------------------
type Async
static member AsBeginEnd : computation:('Arg -> Async<'T>) -> ('Arg * AsyncCallback * obj -> IAsyncResult) * (IAsyncResult -> 'T) * (IAsyncResult -> unit)
static member AwaitEvent : event:IEvent<'Del,'T> * ?cancelAction:(unit -> unit) -> Async<'T> (requires delegate and 'Del :> Delegate)
static member AwaitIAsyncResult : iar:IAsyncResult * ?millisecondsTimeout:int -> Async<bool>
static member AwaitTask : task:Task<'T> -> Async<'T>
static member AwaitWaitHandle : waitHandle:WaitHandle * ?millisecondsTimeout:int -> Async<bool>
static member CancelDefaultToken : unit -> unit
static member Catch : computation:Async<'T> -> Async<Choice<'T,exn>>
static member FromBeginEnd : beginAction:(AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg:'Arg1 * beginAction:('Arg1 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg1:'Arg1 * arg2:'Arg2 * beginAction:('Arg1 * 'Arg2 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg1:'Arg1 * arg2:'Arg2 * arg3:'Arg3 * beginAction:('Arg1 * 'Arg2 * 'Arg3 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromContinuations : callback:(('T -> unit) * (exn -> unit) * (OperationCanceledException -> unit) -> unit) -> Async<'T>
static member Ignore : computation:Async<'T> -> Async<unit>
static member OnCancel : interruption:(unit -> unit) -> Async<IDisposable>
static member Parallel : computations:seq<Async<'T>> -> Async<'T []>
static member RunSynchronously : computation:Async<'T> * ?timeout:int * ?cancellationToken:CancellationToken -> 'T
static member Sleep : millisecondsDueTime:int -> Async<unit>
static member Start : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
static member StartAsTask : computation:Async<'T> * ?taskCreationOptions:TaskCreationOptions * ?cancellationToken:CancellationToken -> Task<'T>
static member StartChild : computation:Async<'T> * ?millisecondsTimeout:int -> Async<Async<'T>>
static member StartChildAsTask : computation:Async<'T> * ?taskCreationOptions:TaskCreationOptions -> Async<Task<'T>>
static member StartImmediate : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
static member StartWithContinuations : computation:Async<'T> * continuation:('T -> unit) * exceptionContinuation:(exn -> unit) * cancellationContinuation:(OperationCanceledException -> unit) * ?cancellationToken:CancellationToken -> unit
static member SwitchToContext : syncContext:SynchronizationContext -> Async<unit>
static member SwitchToNewThread : unit -> Async<unit>
static member SwitchToThreadPool : unit -> Async<unit>
static member TryCancelled : computation:Async<'T> * compensation:(OperationCanceledException -> unit) -> Async<'T>
static member CancellationToken : Async<CancellationToken>
static member DefaultCancellationToken : CancellationToken
Full name: Microsoft.FSharp.Control.Async
--------------------
type Async<'T>
Full name: Microsoft.FSharp.Control.Async<_>
val timeout : int
Full name: Script.timeout
val printer : unit -> Async<unit>
Full name: Script.printer
val resultOpt : (int * bool) option
val printfn : format:Printf.TextWriterFormat<'T> -> 'T
Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.printfn
val result : int
val cancelation : System.Threading.CancellationTokenSource
Full name: Script.cancelation
namespace System
namespace System.Threading
Multiple items
type CancellationTokenSource =
new : unit -> CancellationTokenSource
member Cancel : unit -> unit + 1 overload
member Dispose : unit -> unit
member IsCancellationRequested : bool
member Token : CancellationToken
static member CreateLinkedTokenSource : [<ParamArray>] tokens:CancellationToken[] -> CancellationTokenSource + 1 overload
Full name: System.Threading.CancellationTokenSource
--------------------
System.Threading.CancellationTokenSource() : unit
static member Async.Start : computation:Async<unit> * ?cancellationToken:System.Threading.CancellationToken -> unit
property System.Threading.CancellationTokenSource.Token: System.Threading.CancellationToken
System.Threading.CancellationTokenSource.Cancel() : unit
System.Threading.CancellationTokenSource.Cancel(throwOnFirstException: bool) : unit
val i : int
More information