5 people like it.

Async channel and parallel async groups

Extension for Async module: channel for transmitting data between subsystems, launching parallel groups of asyncs sequentially.

  1: 
  2: 
  3: 
  4: 
  5: 
  6: 
  7: 
  8: 
  9: 
 10: 
 11: 
 12: 
 13: 
 14: 
 15: 
 16: 
 17: 
 18: 
 19: 
 20: 
 21: 
 22: 
 23: 
 24: 
 25: 
 26: 
 27: 
 28: 
 29: 
 30: 
 31: 
 32: 
 33: 
 34: 
 35: 
 36: 
 37: 
 38: 
 39: 
 40: 
 41: 
 42: 
 43: 
 44: 
 45: 
 46: 
 47: 
 48: 
 49: 
 50: 
 51: 
 52: 
 53: 
 54: 
 55: 
 56: 
 57: 
 58: 
 59: 
 60: 
 61: 
 62: 
 63: 
 64: 
 65: 
 66: 
 67: 
 68: 
 69: 
 70: 
 71: 
 72: 
 73: 
 74: 
 75: 
 76: 
 77: 
 78: 
 79: 
 80: 
 81: 
 82: 
 83: 
 84: 
 85: 
 86: 
 87: 
 88: 
 89: 
 90: 
 91: 
 92: 
 93: 
 94: 
 95: 
 96: 
 97: 
 98: 
 99: 
100: 
101: 
102: 
103: 
104: 
105: 
106: 
107: 
108: 
109: 
110: 
111: 
112: 
//Some usefull Async module extensions: 
//      1. CreateAsyncChannel - return channel for transmitting data between 
//         subsystems. Result is two functions - data sender and data receiver.
//      2. ParallelGrouped - async for launching sequence of asyncs in groups 
//         by specified count.
//      3. ParallelGroupedWithResults - async for launching sequence of asyncs 
//         in groups by specified count and return results as array.
// 2. can be usefull instead of (Async.Parallel >> Async.Ignore >> Async.Start) 
// for preventing threadpool starvation (especially on system startup)

module Async =
    type private msg<'a> = 
        | Result of 'a 
        | IntermediateResult of 'a 
        | GetResult of AsyncReplyChannel<'a*bool>
    let CreateAsyncChannel<'a> () =
        let inbox = new MailboxProcessor<_>( fun inbox ->
            let rec waitResponse (repl:AsyncReplyChannel<'a*bool>) = 
                inbox.Scan <| function
                    | GetResult repl -> 
                        Some <| waitResponse repl
                    | IntermediateResult res ->
                        repl.Reply (res, false)
                        Some <| waitRepl ()
                    | Result res -> 
                        repl.Reply (res, true)
                        Some <| async.Return ()
            and waitRepl () =
                inbox.Scan <| function
                    | GetResult repl -> Some <| waitResponse repl
                    | _ -> None
            waitRepl ()
        )
        inbox.Start()
        let resultWaiter timeout =             
            inbox.PostAndTryAsyncReply ((fun replChannel -> GetResult replChannel), timeout)
        let postResult closeChannel = 
            if closeChannel then Result else IntermediateResult
            >> inbox.Post
        (resultWaiter, postResult)
    let ParallelGrouped (delay:int) (n:int) (asyncs: seq<Async<_>>) = async {
        let delay = max 100 delay
        let asyncAcc, lastGroup =
            asyncs |> Seq.fold(fun (asyncAcc, group) op -> 
                let group = op::group
                if group.Length >= n then
                    async {
                        do! group |> Async.Parallel |> Async.Ignore                        
                        do! Async.Sleep delay
                        do! asyncAcc
                    }, []
                else asyncAcc, group) (async.Return (), [])
        do! asyncAcc
        do! lastGroup |> Async.Parallel |> Async.Ignore 
    }
    let ParallelGroupedWithResults (delay:int) (n:int) (asyncs: seq<Async<_>>) = async {
        let delay = max 100 delay
        let asyncAcc, lastGroup =
            asyncs |> Seq.fold(fun (asyncAcc, group) op -> 
                let group = op::group
                if group.Length >= n then
                    async {
                        let! res1 = group |> Async.Parallel                      
                        do! Async.Sleep delay
                        let! res2 = asyncAcc
                        return [|yield! res2; yield! res1|]
                    }, []
                else asyncAcc, group) (async.Return Array.empty, [])
        let! res1 = asyncAcc
        let! res2 = lastGroup |> Async.Parallel
        return [|yield! res1; yield! res2|]
    }

//usage
//dataReciever and dataSender could be passed to different independed parts of program to make channel
let (dataReciever, dataSender) = Async.CreateAsyncChannel<int>()

let timeout = 5000
let rec printer() = async {
    let! resultOpt = dataReciever timeout
    return!
        match resultOpt with
        | None -> //timeout
            printfn "So much free time - maybe other side has some problems"
            printer() //but continue 
        | Some (result, false) -> //intermediate result - channel still open 
            printfn "Retrieved message: %A" result
            printer()
        | Some (result, _) -> //channel is closed by this message
            printfn "Last message: %A" result
            async.Return ()
    }

let cancelation = new System.Threading.CancellationTokenSource()
Async.Start(printer(), cancelation.Token)

async {
    dataSender false 1
    do! Async.Sleep 1000
    dataSender false 2
    do! Async.Sleep 6000
    dataSender false 3
    do! Async.Sleep 2000
    dataSender true 4
} |> Async.Start

cancelation.Cancel()

//example of ParallelGrouped
seq {
    for i in 1..100 do yield async { printfn "%d" i }
} |> Async.ParallelGrouped 10000 10 |> Async.Start
Multiple items
type Async
static member AsBeginEnd : computation:('Arg -> Async<'T>) -> ('Arg * AsyncCallback * obj -> IAsyncResult) * (IAsyncResult -> 'T) * (IAsyncResult -> unit)
static member AwaitEvent : event:IEvent<'Del,'T> * ?cancelAction:(unit -> unit) -> Async<'T> (requires delegate and 'Del :> Delegate)
static member AwaitIAsyncResult : iar:IAsyncResult * ?millisecondsTimeout:int -> Async<bool>
static member AwaitTask : task:Task<'T> -> Async<'T>
static member AwaitWaitHandle : waitHandle:WaitHandle * ?millisecondsTimeout:int -> Async<bool>
static member CancelDefaultToken : unit -> unit
static member Catch : computation:Async<'T> -> Async<Choice<'T,exn>>
static member FromBeginEnd : beginAction:(AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg:'Arg1 * beginAction:('Arg1 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg1:'Arg1 * arg2:'Arg2 * beginAction:('Arg1 * 'Arg2 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg1:'Arg1 * arg2:'Arg2 * arg3:'Arg3 * beginAction:('Arg1 * 'Arg2 * 'Arg3 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromContinuations : callback:(('T -> unit) * (exn -> unit) * (OperationCanceledException -> unit) -> unit) -> Async<'T>
static member Ignore : computation:Async<'T> -> Async<unit>
static member OnCancel : interruption:(unit -> unit) -> Async<IDisposable>
static member Parallel : computations:seq<Async<'T>> -> Async<'T []>
static member RunSynchronously : computation:Async<'T> * ?timeout:int * ?cancellationToken:CancellationToken -> 'T
static member Sleep : millisecondsDueTime:int -> Async<unit>
static member Start : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
static member StartAsTask : computation:Async<'T> * ?taskCreationOptions:TaskCreationOptions * ?cancellationToken:CancellationToken -> Task<'T>
static member StartChild : computation:Async<'T> * ?millisecondsTimeout:int -> Async<Async<'T>>
static member StartChildAsTask : computation:Async<'T> * ?taskCreationOptions:TaskCreationOptions -> Async<Task<'T>>
static member StartImmediate : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
static member StartWithContinuations : computation:Async<'T> * continuation:('T -> unit) * exceptionContinuation:(exn -> unit) * cancellationContinuation:(OperationCanceledException -> unit) * ?cancellationToken:CancellationToken -> unit
static member SwitchToContext : syncContext:SynchronizationContext -> Async<unit>
static member SwitchToNewThread : unit -> Async<unit>
static member SwitchToThreadPool : unit -> Async<unit>
static member TryCancelled : computation:Async<'T> * compensation:(OperationCanceledException -> unit) -> Async<'T>
static member CancellationToken : Async<CancellationToken>
static member DefaultCancellationToken : CancellationToken

Full name: Microsoft.FSharp.Control.Async

--------------------
type Async<'T>

Full name: Microsoft.FSharp.Control.Async<_>
type private msg<'a> =
  | Result of 'a
  | IntermediateResult of 'a
  | GetResult of AsyncReplyChannel<'a * bool>

Full name: Script.Async.msg<_>
union case msg.Result: 'a -> msg<'a>
union case msg.IntermediateResult: 'a -> msg<'a>
union case msg.GetResult: AsyncReplyChannel<'a * bool> -> msg<'a>
type AsyncReplyChannel<'Reply>
member Reply : value:'Reply -> unit

Full name: Microsoft.FSharp.Control.AsyncReplyChannel<_>
type bool = System.Boolean

Full name: Microsoft.FSharp.Core.bool
val CreateAsyncChannel : unit -> (int -> Async<('a * bool) option>) * (bool -> 'a -> unit)

Full name: Script.Async.CreateAsyncChannel
val inbox : MailboxProcessor<msg<'a>>
Multiple items
type MailboxProcessor<'Msg> =
  interface IDisposable
  new : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:CancellationToken -> MailboxProcessor<'Msg>
  member Post : message:'Msg -> unit
  member PostAndAsyncReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> Async<'Reply>
  member PostAndReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> 'Reply
  member PostAndTryAsyncReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> Async<'Reply option>
  member Receive : ?timeout:int -> Async<'Msg>
  member Scan : scanner:('Msg -> Async<'T> option) * ?timeout:int -> Async<'T>
  member Start : unit -> unit
  member TryPostAndReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> 'Reply option
  ...

Full name: Microsoft.FSharp.Control.MailboxProcessor<_>

--------------------
new : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:System.Threading.CancellationToken -> MailboxProcessor<'Msg>
val waitResponse : (AsyncReplyChannel<'a * bool> -> Async<unit>)
val repl : AsyncReplyChannel<'a * bool>
member MailboxProcessor.Scan : scanner:('Msg -> Async<'T> option) * ?timeout:int -> Async<'T>
union case Option.Some: Value: 'T -> Option<'T>
val res : 'a
member AsyncReplyChannel.Reply : value:'Reply -> unit
val waitRepl : (unit -> Async<unit>)
val async : AsyncBuilder

Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.async
member AsyncBuilder.Return : value:'T -> Async<'T>
union case Option.None: Option<'T>
member MailboxProcessor.Start : unit -> unit
val resultWaiter : (int -> Async<('a * bool) option>)
val timeout : int
member MailboxProcessor.PostAndTryAsyncReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> Async<'Reply option>
val replChannel : AsyncReplyChannel<'a * bool>
val postResult : (bool -> 'a -> unit)
val closeChannel : bool
member MailboxProcessor.Post : message:'Msg -> unit
val ParallelGrouped : delay:int -> n:int -> asyncs:seq<Async<'a>> -> Async<unit>

Full name: Script.Async.ParallelGrouped
val delay : int
Multiple items
val int : value:'T -> int (requires member op_Explicit)

Full name: Microsoft.FSharp.Core.Operators.int

--------------------
type int = int32

Full name: Microsoft.FSharp.Core.int

--------------------
type int<'Measure> = int

Full name: Microsoft.FSharp.Core.int<_>
val n : int
val asyncs : seq<Async<'a>>
Multiple items
val seq : sequence:seq<'T> -> seq<'T>

Full name: Microsoft.FSharp.Core.Operators.seq

--------------------
type seq<'T> = System.Collections.Generic.IEnumerable<'T>

Full name: Microsoft.FSharp.Collections.seq<_>
val max : e1:'T -> e2:'T -> 'T (requires comparison)

Full name: Microsoft.FSharp.Core.Operators.max
val asyncAcc : Async<unit>
val lastGroup : Async<'a> list
module Seq

from Microsoft.FSharp.Collections
val fold : folder:('State -> 'T -> 'State) -> state:'State -> source:seq<'T> -> 'State

Full name: Microsoft.FSharp.Collections.Seq.fold
val group : Async<'a> list
val op : Async<'a>
property List.Length: int
static member Async.Parallel : computations:seq<Async<'T>> -> Async<'T []>
static member Async.Ignore : computation:Async<'T> -> Async<unit>
static member Async.Sleep : millisecondsDueTime:int -> Async<unit>
val ParallelGroupedWithResults : delay:int -> n:int -> asyncs:seq<Async<'a>> -> Async<'a []>

Full name: Script.Async.ParallelGroupedWithResults
val asyncAcc : Async<'a []>
val res1 : 'a []
val res2 : 'a []
module Array

from Microsoft.FSharp.Collections
val empty<'T> : 'T []

Full name: Microsoft.FSharp.Collections.Array.empty
val dataReciever : (int -> Async<(int * bool) option>)

Full name: Script.dataReciever
val dataSender : (bool -> int -> unit)

Full name: Script.dataSender
Multiple items
module Async

from Script

--------------------
type Async
static member AsBeginEnd : computation:('Arg -> Async<'T>) -> ('Arg * AsyncCallback * obj -> IAsyncResult) * (IAsyncResult -> 'T) * (IAsyncResult -> unit)
static member AwaitEvent : event:IEvent<'Del,'T> * ?cancelAction:(unit -> unit) -> Async<'T> (requires delegate and 'Del :> Delegate)
static member AwaitIAsyncResult : iar:IAsyncResult * ?millisecondsTimeout:int -> Async<bool>
static member AwaitTask : task:Task<'T> -> Async<'T>
static member AwaitWaitHandle : waitHandle:WaitHandle * ?millisecondsTimeout:int -> Async<bool>
static member CancelDefaultToken : unit -> unit
static member Catch : computation:Async<'T> -> Async<Choice<'T,exn>>
static member FromBeginEnd : beginAction:(AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg:'Arg1 * beginAction:('Arg1 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg1:'Arg1 * arg2:'Arg2 * beginAction:('Arg1 * 'Arg2 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg1:'Arg1 * arg2:'Arg2 * arg3:'Arg3 * beginAction:('Arg1 * 'Arg2 * 'Arg3 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromContinuations : callback:(('T -> unit) * (exn -> unit) * (OperationCanceledException -> unit) -> unit) -> Async<'T>
static member Ignore : computation:Async<'T> -> Async<unit>
static member OnCancel : interruption:(unit -> unit) -> Async<IDisposable>
static member Parallel : computations:seq<Async<'T>> -> Async<'T []>
static member RunSynchronously : computation:Async<'T> * ?timeout:int * ?cancellationToken:CancellationToken -> 'T
static member Sleep : millisecondsDueTime:int -> Async<unit>
static member Start : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
static member StartAsTask : computation:Async<'T> * ?taskCreationOptions:TaskCreationOptions * ?cancellationToken:CancellationToken -> Task<'T>
static member StartChild : computation:Async<'T> * ?millisecondsTimeout:int -> Async<Async<'T>>
static member StartChildAsTask : computation:Async<'T> * ?taskCreationOptions:TaskCreationOptions -> Async<Task<'T>>
static member StartImmediate : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
static member StartWithContinuations : computation:Async<'T> * continuation:('T -> unit) * exceptionContinuation:(exn -> unit) * cancellationContinuation:(OperationCanceledException -> unit) * ?cancellationToken:CancellationToken -> unit
static member SwitchToContext : syncContext:SynchronizationContext -> Async<unit>
static member SwitchToNewThread : unit -> Async<unit>
static member SwitchToThreadPool : unit -> Async<unit>
static member TryCancelled : computation:Async<'T> * compensation:(OperationCanceledException -> unit) -> Async<'T>
static member CancellationToken : Async<CancellationToken>
static member DefaultCancellationToken : CancellationToken

Full name: Microsoft.FSharp.Control.Async

--------------------
type Async<'T>

Full name: Microsoft.FSharp.Control.Async<_>
val timeout : int

Full name: Script.timeout
val printer : unit -> Async<unit>

Full name: Script.printer
val resultOpt : (int * bool) option
val printfn : format:Printf.TextWriterFormat<'T> -> 'T

Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.printfn
val result : int
val cancelation : System.Threading.CancellationTokenSource

Full name: Script.cancelation
namespace System
namespace System.Threading
Multiple items
type CancellationTokenSource =
  new : unit -> CancellationTokenSource
  member Cancel : unit -> unit + 1 overload
  member Dispose : unit -> unit
  member IsCancellationRequested : bool
  member Token : CancellationToken
  static member CreateLinkedTokenSource : [<ParamArray>] tokens:CancellationToken[] -> CancellationTokenSource + 1 overload

Full name: System.Threading.CancellationTokenSource

--------------------
System.Threading.CancellationTokenSource() : unit
static member Async.Start : computation:Async<unit> * ?cancellationToken:System.Threading.CancellationToken -> unit
property System.Threading.CancellationTokenSource.Token: System.Threading.CancellationToken
System.Threading.CancellationTokenSource.Cancel() : unit
System.Threading.CancellationTokenSource.Cancel(throwOnFirstException: bool) : unit
val i : int
Raw view Test code New version

More information

Link:http://fssnip.net/nJ
Posted:10 years ago
Author:dvitel
Tags: async , parallelism