11 people like it.
Like the snippet!
Cancellable agent
The snippet implements a wrapper for standard F# agent that can be cancelled using the IDisposable interface. This makes it possible to use the agent locally (e.g. inside asynchronous workflow). When it is no longer needed, the agent's body is cancelled.
1:
2:
3:
4:
5:
6:
7:
8:
9:
10:
11:
12:
13:
14:
15:
16:
17:
18:
19:
20:
|
type Agent<'T> = MailboxProcessor<'T>
/// Wrapper for the standard F# agent (MailboxProcessor) that
/// supports stopping of the agent's body using the IDisposable
/// interface (the type automatically creates a cancellation token)
type DisposableAgent<'T> private (mbox:Agent<'T>, cts:CancellationTokenSource) =
/// Start a new disposable agent using the specified body function
/// (the method creates a new cancellation token for the agent)
static member Start(f) =
let cts = new CancellationTokenSource()
new DisposableAgent<'T>(Agent<'T>.Start(f, cancellationToken = cts.Token), cts)
(Boilerplate code that wraps standard methods of Agent)
// Disposes the agent and cancels the body
interface IDisposable with
member x.Dispose() =
(mbox :> IDisposable).Dispose()
cts.Cancel()
|
1:
2:
3:
4:
5:
6:
7:
8:
9:
10:
11:
12:
13:
14:
15:
16:
17:
18:
19:
20:
21:
22:
23:
24:
25:
|
let op = async {
// Create a local agent that is disposed when the
// workflow completes (using the 'use' construct)
use agent = DisposableAgent.Start(fun agent -> async {
try
while true do
// Wait for a message - note that we use timeout
// to allow cancellation (when the operation completes)
let! msg = agent.TryReceive(1000)
match msg with
| Some(n, reply:AsyncReplyChannel<unit>) ->
// Print number and reply to the sender
printfn "%d" n
reply.Reply(())
| _ -> ()
finally
// Called when the agent is disposed
printfn "agent completed" })
// Do some processing using the agent...
for i in 0 .. 10 do
do! agent.PostAndAsyncReply(fun r -> i, r)
printfn "workflow completed" }
Async.Start(op)
|
type Agent<'T> = MailboxProcessor<'T>
Full name: Script.Agent<_>
Multiple items
type MailboxProcessor<'Msg> =
interface IDisposable
new : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:CancellationToken -> MailboxProcessor<'Msg>
member Post : message:'Msg -> unit
member PostAndAsyncReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> Async<'Reply>
member PostAndReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> 'Reply
member PostAndTryAsyncReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> Async<'Reply option>
member Receive : ?timeout:int -> Async<'Msg>
member Scan : scanner:('Msg -> Async<'T> option) * ?timeout:int -> Async<'T>
member Start : unit -> unit
member TryPostAndReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> 'Reply option
...
Full name: Microsoft.FSharp.Control.MailboxProcessor<_>
--------------------
new : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:CancellationToken -> MailboxProcessor<'Msg>
Multiple items
type DisposableAgent<'T> =
interface IDisposable
private new : mbox:Agent<'T> * cts:CancellationTokenSource -> DisposableAgent<'T>
member Post : m:'T -> unit
member PostAndAsyncReply : buildMessage:(AsyncReplyChannel<'a> -> 'T) * ?timeout:int -> Async<'a>
member PostAndReply : buildMessage:(AsyncReplyChannel<'a> -> 'T) * ?timeout:int -> 'a
member PostAndTryAsyncReply : buildMessage:(AsyncReplyChannel<'a> -> 'T) * ?timeout:int -> Async<'a option>
member Receive : ?timeout:int -> Async<'T>
member Scan : scanner:('T -> Async<'a> option) * ?timeout:int -> Async<'a>
member TryPostAndReply : buildMessage:(AsyncReplyChannel<'a> -> 'T) * ?timeout:int -> 'a option
member TryReceive : ?timeout:int -> Async<'T option>
...
Full name: Script.DisposableAgent<_>
Wrapper for the standard F# agent (MailboxProcessor) that
supports stopping of the agent's body using the IDisposable
interface (the type automatically creates a cancellation token)
--------------------
private new : mbox:Agent<'T> * cts:CancellationTokenSource -> DisposableAgent<'T>
val mbox : Agent<'T>
val cts : CancellationTokenSource
Multiple items
type CancellationTokenSource =
new : unit -> CancellationTokenSource
member Cancel : unit -> unit + 1 overload
member Dispose : unit -> unit
member IsCancellationRequested : bool
member Token : CancellationToken
static member CreateLinkedTokenSource : [<ParamArray>] tokens:CancellationToken[] -> CancellationTokenSource + 1 overload
Full name: System.Threading.CancellationTokenSource
--------------------
CancellationTokenSource() : unit
static member DisposableAgent.Start : f:(MailboxProcessor<'T> -> Async<unit>) -> DisposableAgent<'T>
Full name: Script.DisposableAgent`1.Start
Start a new disposable agent using the specified body function
(the method creates a new cancellation token for the agent)
val f : (MailboxProcessor<'T> -> Async<unit>)
property CancellationTokenSource.Token: CancellationToken
/// Returns the number of unprocessed messages in the message queue of the agent.
member x.CurrentQueueLength = mbox.CurrentQueueLength
/// Occurs when the execution of the agent results in an exception.
[<CLIEvent>]
member x.Error = mbox.Error
/// Waits for a message. This will consume the first message in arrival order.
member x.Receive(?timeout) = mbox.Receive(?timeout = timeout)
/// Scans for a message by looking through messages in arrival order until scanner
/// returns a Some value. Other messages remain in the queue.
member x.Scan(scanner, ?timeout) = mbox.Scan(scanner, ?timeout = timeout)
/// Like PostAndReply, but returns None if no reply within the timeout period.
member x.TryPostAndReply(buildMessage, ?timeout) =
mbox.TryPostAndReply(buildMessage, ?timeout = timeout)
/// Waits for a message. This will consume the first message in arrival order.
member x.TryReceive(?timeout) =
mbox.TryReceive(?timeout = timeout)
/// Scans for a message by looking through messages in arrival order until scanner
/// returns a Some value. Other messages remain in the queue.
member x.TryScan(scanner, ?timeout) =
mbox.TryScan(scanner, ?timeout = timeout)
/// Posts a message to the message queue of the MailboxProcessor, asynchronously.
member x.Post(m) = mbox.Post(m)
/// Posts a message to an agent and await a reply on the channel, synchronously.
member x.PostAndReply(buildMessage, ?timeout) =
mbox.PostAndReply(buildMessage, ?timeout = timeout)
/// Like PostAndAsyncReply, but returns None if no reply within the timeout period.
member x.PostAndTryAsyncReply(buildMessage, ?timeout) =
mbox.PostAndTryAsyncReply(buildMessage, ?timeout = timeout)
/// Posts a message to an agent and await a reply on the channel, asynchronously.
member x.PostAndAsyncReply(buildMessage, ?timeout) =
mbox.PostAndAsyncReply(buildMessage, ?timeout=timeout)
type IDisposable =
member Dispose : unit -> unit
Full name: System.IDisposable
val x : DisposableAgent<'T>
override DisposableAgent.Dispose : unit -> unit
Full name: Script.DisposableAgent`1.Dispose
CancellationTokenSource.Cancel() : unit
CancellationTokenSource.Cancel(throwOnFirstException: bool) : unit
val op : Async<unit>
Full name: Script.op
val async : AsyncBuilder
Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.async
val agent : DisposableAgent<int * AsyncReplyChannel<unit>>
type DisposableAgent<'T> =
interface IDisposable
private new : mbox:Agent<'T> * cts:CancellationTokenSource -> DisposableAgent<'T>
member Post : m:'T -> unit
member PostAndAsyncReply : buildMessage:(AsyncReplyChannel<'a> -> 'T) * ?timeout:int -> Async<'a>
member PostAndReply : buildMessage:(AsyncReplyChannel<'a> -> 'T) * ?timeout:int -> 'a
member PostAndTryAsyncReply : buildMessage:(AsyncReplyChannel<'a> -> 'T) * ?timeout:int -> Async<'a option>
member Receive : ?timeout:int -> Async<'T>
member Scan : scanner:('T -> Async<'a> option) * ?timeout:int -> Async<'a>
member TryPostAndReply : buildMessage:(AsyncReplyChannel<'a> -> 'T) * ?timeout:int -> 'a option
member TryReceive : ?timeout:int -> Async<'T option>
...
Full name: Script.DisposableAgent<_>
Wrapper for the standard F# agent (MailboxProcessor) that
supports stopping of the agent's body using the IDisposable
interface (the type automatically creates a cancellation token)
static member DisposableAgent.Start : f:(MailboxProcessor<'T> -> Async<unit>) -> DisposableAgent<'T>
Start a new disposable agent using the specified body function
(the method creates a new cancellation token for the agent)
val agent : MailboxProcessor<int * AsyncReplyChannel<unit>>
val msg : (int * AsyncReplyChannel<unit>) option
member MailboxProcessor.TryReceive : ?timeout:int -> Async<'Msg option>
union case Option.Some: Value: 'T -> Option<'T>
val n : int
val reply : AsyncReplyChannel<unit>
type AsyncReplyChannel<'Reply>
member Reply : value:'Reply -> unit
Full name: Microsoft.FSharp.Control.AsyncReplyChannel<_>
type unit = Unit
Full name: Microsoft.FSharp.Core.unit
val printfn : format:Printf.TextWriterFormat<'T> -> 'T
Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.printfn
member AsyncReplyChannel.Reply : value:'Reply -> unit
val i : int
member DisposableAgent.PostAndAsyncReply : buildMessage:(AsyncReplyChannel<'a> -> 'T) * ?timeout:int -> Async<'a>
Posts a message to an agent and await a reply on the channel, asynchronously.
val r : AsyncReplyChannel<unit>
Multiple items
type Async
static member AsBeginEnd : computation:('Arg -> Async<'T>) -> ('Arg * AsyncCallback * obj -> IAsyncResult) * (IAsyncResult -> 'T) * (IAsyncResult -> unit)
static member AwaitEvent : event:IEvent<'Del,'T> * ?cancelAction:(unit -> unit) -> Async<'T> (requires delegate and 'Del :> Delegate)
static member AwaitIAsyncResult : iar:IAsyncResult * ?millisecondsTimeout:int -> Async<bool>
static member AwaitTask : task:Task<'T> -> Async<'T>
static member AwaitWaitHandle : waitHandle:WaitHandle * ?millisecondsTimeout:int -> Async<bool>
static member CancelDefaultToken : unit -> unit
static member Catch : computation:Async<'T> -> Async<Choice<'T,exn>>
static member FromBeginEnd : beginAction:(AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg:'Arg1 * beginAction:('Arg1 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg1:'Arg1 * arg2:'Arg2 * beginAction:('Arg1 * 'Arg2 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg1:'Arg1 * arg2:'Arg2 * arg3:'Arg3 * beginAction:('Arg1 * 'Arg2 * 'Arg3 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromContinuations : callback:(('T -> unit) * (exn -> unit) * (OperationCanceledException -> unit) -> unit) -> Async<'T>
static member Ignore : computation:Async<'T> -> Async<unit>
static member OnCancel : interruption:(unit -> unit) -> Async<IDisposable>
static member Parallel : computations:seq<Async<'T>> -> Async<'T []>
static member RunSynchronously : computation:Async<'T> * ?timeout:int * ?cancellationToken:CancellationToken -> 'T
static member Sleep : millisecondsDueTime:int -> Async<unit>
static member Start : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
static member StartAsTask : computation:Async<'T> * ?taskCreationOptions:TaskCreationOptions * ?cancellationToken:CancellationToken -> Task<'T>
static member StartChild : computation:Async<'T> * ?millisecondsTimeout:int -> Async<Async<'T>>
static member StartChildAsTask : computation:Async<'T> * ?taskCreationOptions:TaskCreationOptions -> Async<Task<'T>>
static member StartImmediate : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
static member StartWithContinuations : computation:Async<'T> * continuation:('T -> unit) * exceptionContinuation:(exn -> unit) * cancellationContinuation:(OperationCanceledException -> unit) * ?cancellationToken:CancellationToken -> unit
static member SwitchToContext : syncContext:SynchronizationContext -> Async<unit>
static member SwitchToNewThread : unit -> Async<unit>
static member SwitchToThreadPool : unit -> Async<unit>
static member TryCancelled : computation:Async<'T> * compensation:(OperationCanceledException -> unit) -> Async<'T>
static member CancellationToken : Async<CancellationToken>
static member DefaultCancellationToken : CancellationToken
Full name: Microsoft.FSharp.Control.Async
--------------------
type Async<'T>
Full name: Microsoft.FSharp.Control.Async<_>
static member Async.Start : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
More information