11 people like it.

Cancellable agent

The snippet implements a wrapper for standard F# agent that can be cancelled using the IDisposable interface. This makes it possible to use the agent locally (e.g. inside asynchronous workflow). When it is no longer needed, the agent's body is cancelled.

Implementation

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
14: 
15: 
16: 
17: 
18: 
19: 
20: 
type Agent<'T> = MailboxProcessor<'T>

/// Wrapper for the standard F# agent (MailboxProcessor) that
/// supports stopping of the agent's body using the IDisposable 
/// interface (the type automatically creates a cancellation token)
type DisposableAgent<'T> private (mbox:Agent<'T>, cts:CancellationTokenSource) = 

  /// Start a new disposable agent using the specified body function
  /// (the method creates a new cancellation token for the agent)
  static member Start(f) = 
    let cts = new CancellationTokenSource()
    new DisposableAgent<'T>(Agent<'T>.Start(f, cancellationToken = cts.Token), cts)

  (Boilerplate code that wraps standard methods of Agent)

  // Disposes the agent and cancels the body
  interface IDisposable with
    member x.Dispose() = 
      (mbox :> IDisposable).Dispose()
      cts.Cancel()

Sample usage

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
14: 
15: 
16: 
17: 
18: 
19: 
20: 
21: 
22: 
23: 
24: 
25: 
let op = async {
  // Create a local agent that is disposed when the 
  // workflow completes (using the 'use' construct)
  use agent = DisposableAgent.Start(fun agent -> async { 
    try 
      while true do
        // Wait for a message - note that we use timeout
        // to allow cancellation (when the operation completes)
        let! msg = agent.TryReceive(1000)
        match msg with 
        | Some(n, reply:AsyncReplyChannel<unit>) ->
            // Print number and reply to the sender
            printfn "%d" n
            reply.Reply(())
        | _ -> ()
    finally 
      // Called when the agent is disposed
      printfn "agent completed" })
  
  // Do some processing using the agent...
  for i in 0 .. 10 do 
    do! agent.PostAndAsyncReply(fun r -> i, r) 
  printfn "workflow completed" }

Async.Start(op)
type Agent<'T> = MailboxProcessor<'T>

Full name: Script.Agent<_>
Multiple items
type MailboxProcessor<'Msg> =
  interface IDisposable
  new : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:CancellationToken -> MailboxProcessor<'Msg>
  member Post : message:'Msg -> unit
  member PostAndAsyncReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> Async<'Reply>
  member PostAndReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> 'Reply
  member PostAndTryAsyncReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> Async<'Reply option>
  member Receive : ?timeout:int -> Async<'Msg>
  member Scan : scanner:('Msg -> Async<'T> option) * ?timeout:int -> Async<'T>
  member Start : unit -> unit
  member TryPostAndReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> 'Reply option
  ...

Full name: Microsoft.FSharp.Control.MailboxProcessor<_>

--------------------
new : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:CancellationToken -> MailboxProcessor<'Msg>
Multiple items
type DisposableAgent<'T> =
  interface IDisposable
  private new : mbox:Agent<'T> * cts:CancellationTokenSource -> DisposableAgent<'T>
  member Post : m:'T -> unit
  member PostAndAsyncReply : buildMessage:(AsyncReplyChannel<'a> -> 'T) * ?timeout:int -> Async<'a>
  member PostAndReply : buildMessage:(AsyncReplyChannel<'a> -> 'T) * ?timeout:int -> 'a
  member PostAndTryAsyncReply : buildMessage:(AsyncReplyChannel<'a> -> 'T) * ?timeout:int -> Async<'a option>
  member Receive : ?timeout:int -> Async<'T>
  member Scan : scanner:('T -> Async<'a> option) * ?timeout:int -> Async<'a>
  member TryPostAndReply : buildMessage:(AsyncReplyChannel<'a> -> 'T) * ?timeout:int -> 'a option
  member TryReceive : ?timeout:int -> Async<'T option>
  ...

Full name: Script.DisposableAgent<_>


 Wrapper for the standard F# agent (MailboxProcessor) that
 supports stopping of the agent's body using the IDisposable
 interface (the type automatically creates a cancellation token)


--------------------
private new : mbox:Agent<'T> * cts:CancellationTokenSource -> DisposableAgent<'T>
val mbox : Agent<'T>
val cts : CancellationTokenSource
Multiple items
type CancellationTokenSource =
  new : unit -> CancellationTokenSource
  member Cancel : unit -> unit + 1 overload
  member Dispose : unit -> unit
  member IsCancellationRequested : bool
  member Token : CancellationToken
  static member CreateLinkedTokenSource : [<ParamArray>] tokens:CancellationToken[] -> CancellationTokenSource + 1 overload

Full name: System.Threading.CancellationTokenSource

--------------------
CancellationTokenSource() : unit
static member DisposableAgent.Start : f:(MailboxProcessor<'T> -> Async<unit>) -> DisposableAgent<'T>

Full name: Script.DisposableAgent`1.Start


 Start a new disposable agent using the specified body function
 (the method creates a new cancellation token for the agent)
val f : (MailboxProcessor<'T> -> Async<unit>)
property CancellationTokenSource.Token: CancellationToken
/// Returns the number of unprocessed messages in the message queue of the agent.
  member x.CurrentQueueLength = mbox.CurrentQueueLength
  /// Occurs when the execution of the agent results in an exception.
  [<CLIEvent>]
  member x.Error = mbox.Error
  /// Waits for a message. This will consume the first message in arrival order.
  member x.Receive(?timeout) = mbox.Receive(?timeout = timeout)
  /// Scans for a message by looking through messages in arrival order until scanner
  /// returns a Some value. Other messages remain in the queue.
  member x.Scan(scanner, ?timeout) = mbox.Scan(scanner, ?timeout = timeout)
  /// Like PostAndReply, but returns None if no reply within the timeout period.
  member x.TryPostAndReply(buildMessage, ?timeout) =
    mbox.TryPostAndReply(buildMessage, ?timeout = timeout)
  /// Waits for a message. This will consume the first message in arrival order.
  member x.TryReceive(?timeout) =
    mbox.TryReceive(?timeout = timeout)
  /// Scans for a message by looking through messages in arrival order until scanner
  /// returns a Some value. Other messages remain in the queue.
  member x.TryScan(scanner, ?timeout) =
    mbox.TryScan(scanner, ?timeout = timeout)
  /// Posts a message to the message queue of the MailboxProcessor, asynchronously.
  member x.Post(m) = mbox.Post(m)
  /// Posts a message to an agent and await a reply on the channel, synchronously.
  member x.PostAndReply(buildMessage, ?timeout) =
    mbox.PostAndReply(buildMessage, ?timeout = timeout)
  /// Like PostAndAsyncReply, but returns None if no reply within the timeout period.
  member x.PostAndTryAsyncReply(buildMessage, ?timeout) =
    mbox.PostAndTryAsyncReply(buildMessage, ?timeout = timeout)
  /// Posts a message to an agent and await a reply on the channel, asynchronously.
  member x.PostAndAsyncReply(buildMessage, ?timeout) =
    mbox.PostAndAsyncReply(buildMessage, ?timeout=timeout)
type IDisposable =
  member Dispose : unit -> unit

Full name: System.IDisposable
val x : DisposableAgent<'T>
override DisposableAgent.Dispose : unit -> unit

Full name: Script.DisposableAgent`1.Dispose
CancellationTokenSource.Cancel() : unit
CancellationTokenSource.Cancel(throwOnFirstException: bool) : unit
val op : Async<unit>

Full name: Script.op
val async : AsyncBuilder

Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.async
val agent : DisposableAgent<int * AsyncReplyChannel<unit>>
type DisposableAgent<'T> =
  interface IDisposable
  private new : mbox:Agent<'T> * cts:CancellationTokenSource -> DisposableAgent<'T>
  member Post : m:'T -> unit
  member PostAndAsyncReply : buildMessage:(AsyncReplyChannel<'a> -> 'T) * ?timeout:int -> Async<'a>
  member PostAndReply : buildMessage:(AsyncReplyChannel<'a> -> 'T) * ?timeout:int -> 'a
  member PostAndTryAsyncReply : buildMessage:(AsyncReplyChannel<'a> -> 'T) * ?timeout:int -> Async<'a option>
  member Receive : ?timeout:int -> Async<'T>
  member Scan : scanner:('T -> Async<'a> option) * ?timeout:int -> Async<'a>
  member TryPostAndReply : buildMessage:(AsyncReplyChannel<'a> -> 'T) * ?timeout:int -> 'a option
  member TryReceive : ?timeout:int -> Async<'T option>
  ...

Full name: Script.DisposableAgent<_>


 Wrapper for the standard F# agent (MailboxProcessor) that
 supports stopping of the agent's body using the IDisposable
 interface (the type automatically creates a cancellation token)
static member DisposableAgent.Start : f:(MailboxProcessor<'T> -> Async<unit>) -> DisposableAgent<'T>


 Start a new disposable agent using the specified body function
 (the method creates a new cancellation token for the agent)
val agent : MailboxProcessor<int * AsyncReplyChannel<unit>>
val msg : (int * AsyncReplyChannel<unit>) option
member MailboxProcessor.TryReceive : ?timeout:int -> Async<'Msg option>
union case Option.Some: Value: 'T -> Option<'T>
val n : int
val reply : AsyncReplyChannel<unit>
type AsyncReplyChannel<'Reply>
member Reply : value:'Reply -> unit

Full name: Microsoft.FSharp.Control.AsyncReplyChannel<_>
type unit = Unit

Full name: Microsoft.FSharp.Core.unit
val printfn : format:Printf.TextWriterFormat<'T> -> 'T

Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.printfn
member AsyncReplyChannel.Reply : value:'Reply -> unit
val i : int
member DisposableAgent.PostAndAsyncReply : buildMessage:(AsyncReplyChannel<'a> -> 'T) * ?timeout:int -> Async<'a>


 Posts a message to an agent and await a reply on the channel, asynchronously.
val r : AsyncReplyChannel<unit>
Multiple items
type Async
static member AsBeginEnd : computation:('Arg -> Async<'T>) -> ('Arg * AsyncCallback * obj -> IAsyncResult) * (IAsyncResult -> 'T) * (IAsyncResult -> unit)
static member AwaitEvent : event:IEvent<'Del,'T> * ?cancelAction:(unit -> unit) -> Async<'T> (requires delegate and 'Del :> Delegate)
static member AwaitIAsyncResult : iar:IAsyncResult * ?millisecondsTimeout:int -> Async<bool>
static member AwaitTask : task:Task<'T> -> Async<'T>
static member AwaitWaitHandle : waitHandle:WaitHandle * ?millisecondsTimeout:int -> Async<bool>
static member CancelDefaultToken : unit -> unit
static member Catch : computation:Async<'T> -> Async<Choice<'T,exn>>
static member FromBeginEnd : beginAction:(AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg:'Arg1 * beginAction:('Arg1 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg1:'Arg1 * arg2:'Arg2 * beginAction:('Arg1 * 'Arg2 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg1:'Arg1 * arg2:'Arg2 * arg3:'Arg3 * beginAction:('Arg1 * 'Arg2 * 'Arg3 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromContinuations : callback:(('T -> unit) * (exn -> unit) * (OperationCanceledException -> unit) -> unit) -> Async<'T>
static member Ignore : computation:Async<'T> -> Async<unit>
static member OnCancel : interruption:(unit -> unit) -> Async<IDisposable>
static member Parallel : computations:seq<Async<'T>> -> Async<'T []>
static member RunSynchronously : computation:Async<'T> * ?timeout:int * ?cancellationToken:CancellationToken -> 'T
static member Sleep : millisecondsDueTime:int -> Async<unit>
static member Start : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
static member StartAsTask : computation:Async<'T> * ?taskCreationOptions:TaskCreationOptions * ?cancellationToken:CancellationToken -> Task<'T>
static member StartChild : computation:Async<'T> * ?millisecondsTimeout:int -> Async<Async<'T>>
static member StartChildAsTask : computation:Async<'T> * ?taskCreationOptions:TaskCreationOptions -> Async<Task<'T>>
static member StartImmediate : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
static member StartWithContinuations : computation:Async<'T> * continuation:('T -> unit) * exceptionContinuation:(exn -> unit) * cancellationContinuation:(OperationCanceledException -> unit) * ?cancellationToken:CancellationToken -> unit
static member SwitchToContext : syncContext:SynchronizationContext -> Async<unit>
static member SwitchToNewThread : unit -> Async<unit>
static member SwitchToThreadPool : unit -> Async<unit>
static member TryCancelled : computation:Async<'T> * compensation:(OperationCanceledException -> unit) -> Async<'T>
static member CancellationToken : Async<CancellationToken>
static member DefaultCancellationToken : CancellationToken

Full name: Microsoft.FSharp.Control.Async

--------------------
type Async<'T>

Full name: Microsoft.FSharp.Control.Async<_>
static member Async.Start : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit

More information

Link:http://fssnip.net/64
Posted:12 years ago
Author:Tomas Petricek
Tags: agent , message passing , mailboxprocessor , cancellation