11 people like it.

Cancellable agent

The snippet implements a wrapper for standard F# agent that can be cancelled using the IDisposable interface. This makes it possible to use the agent locally (e.g. inside asynchronous workflow). When it is no longer needed, the agent's body is cancelled.

Implementation

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
14: 
15: 
16: 
17: 
18: 
19: 
20: 
21: 
22: 
23: 
24: 
25: 
26: 
27: 
28: 
29: 
30: 
31: 
32: 
33: 
34: 
35: 
36: 
37: 
38: 
39: 
40: 
41: 
42: 
43: 
44: 
45: 
46: 
47: 
48: 
49: 
type Agent<'T> = MailboxProcessor<'T>

/// Wrapper for the standard F# agent (MailboxProcessor) that
/// supports stopping of the agent's body using the IDisposable 
/// interface (the type automatically creates a cancellation token)
type DisposableAgent<'T> private (mbox:Agent<'T>, cts:CancellationTokenSource) = 

  /// Start a new disposable agent using the specified body function
  /// (the method creates a new cancellation token for the agent)
  static member Start(f) = 
    let cts = new CancellationTokenSource()
    new DisposableAgent<'T>(Agent<'T>.Start(f, cancellationToken = cts.Token), cts)
  
  /// Returns the number of unprocessed messages in the message queue of the agent.
  member x.CurrentQueueLength = mbox.CurrentQueueLength
  /// Occurs when the execution of the agent results in an exception.
  [<CLIEvent>]
  member x.Error = mbox.Error
  /// Waits for a message. This will consume the first message in arrival order.
  member x.Receive(?timeout) = mbox.Receive(?timeout = timeout)
  /// Scans for a message by looking through messages in arrival order until scanner 
  /// returns a Some value. Other messages remain in the queue.
  member x.Scan(scanner, ?timeout) = mbox.Scan(scanner, ?timeout = timeout)
  /// Like PostAndReply, but returns None if no reply within the timeout period.
  member x.TryPostAndReply(buildMessage, ?timeout) = 
    mbox.TryPostAndReply(buildMessage, ?timeout = timeout)
  /// Waits for a message. This will consume the first message in arrival order.
  member x.TryReceive(?timeout) = 
    mbox.TryReceive(?timeout = timeout)
  /// Scans for a message by looking through messages in arrival order until scanner
  /// returns a Some value. Other messages remain in the queue.
  member x.TryScan(scanner, ?timeout) = 
    mbox.TryScan(scanner, ?timeout = timeout)
  /// Posts a message to the message queue of the MailboxProcessor, asynchronously.
  member x.Post(m) = mbox.Post(m)
  /// Posts a message to an agent and await a reply on the channel, synchronously.
  member x.PostAndReply(buildMessage, ?timeout) = 
    mbox.PostAndReply(buildMessage, ?timeout = timeout)
  /// Like PostAndAsyncReply, but returns None if no reply within the timeout period.
  member x.PostAndTryAsyncReply(buildMessage, ?timeout) = 
    mbox.PostAndTryAsyncReply(buildMessage, ?timeout = timeout)
  /// Posts a message to an agent and await a reply on the channel, asynchronously.
  member x.PostAndAsyncReply(buildMessage, ?timeout) = 
    mbox.PostAndAsyncReply(buildMessage, ?timeout=timeout)

  interface IDisposable with
    member x.Dispose() = 
      (mbox :> IDisposable).Dispose()
      cts.Cancel()

Sample usage

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
14: 
15: 
16: 
17: 
18: 
19: 
20: 
21: 
22: 
23: 
24: 
25: 
let op = async {
  // Create a local agent that is disposed when the 
  // workflow completes (using the 'use' construct)
  use agent = DisposableAgent.Start(fun agent -> async { 
    try 
      while true do
        // Wait for a message - note that we use timeout
        // to allow cancellation (when the operation completes)
        let! msg = agent.TryReceive(1000)
        match msg with 
        | Some(n, reply:AsyncReplyChannel<unit>) ->
            // Print number and reply to the sender
            printfn "%d" n
            reply.Reply(())
        | _ -> ()
    finally 
      // Called when the agent is disposed
      printfn "agent completed" })
  
  // Do some processing using the agent...
  for i in 0 .. 10 do 
    do! agent.PostAndAsyncReply(fun r -> i, r) 
  printfn "workflow completed" }

Async.Start(op)
type Agent<'T> = MailboxProcessor<'T>

Full name: Script.Agent<_>
Multiple items
type MailboxProcessor<'Msg> =
  interface IDisposable
  new : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:CancellationToken -> MailboxProcessor<'Msg>
  member Post : message:'Msg -> unit
  member PostAndAsyncReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> Async<'Reply>
  member PostAndReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> 'Reply
  member PostAndTryAsyncReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> Async<'Reply option>
  member Receive : ?timeout:int -> Async<'Msg>
  member Scan : scanner:('Msg -> Async<'T> option) * ?timeout:int -> Async<'T>
  member Start : unit -> unit
  member TryPostAndReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> 'Reply option
  ...

Full name: Microsoft.FSharp.Control.MailboxProcessor<_>

--------------------
new : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:CancellationToken -> MailboxProcessor<'Msg>
Multiple items
type DisposableAgent<'T> =
  interface IDisposable
  private new : mbox:Agent<'T> * cts:CancellationTokenSource -> DisposableAgent<'T>
  member Post : m:'T -> unit
  member PostAndAsyncReply : buildMessage:(AsyncReplyChannel<'a> -> 'T) * ?timeout:int -> Async<'a>
  member PostAndReply : buildMessage:(AsyncReplyChannel<'a> -> 'T) * ?timeout:int -> 'a
  member PostAndTryAsyncReply : buildMessage:(AsyncReplyChannel<'a> -> 'T) * ?timeout:int -> Async<'a option>
  member Receive : ?timeout:int -> Async<'T>
  member Scan : scanner:('T -> Async<'a> option) * ?timeout:int -> Async<'a>
  member TryPostAndReply : buildMessage:(AsyncReplyChannel<'a> -> 'T) * ?timeout:int -> 'a option
  member TryReceive : ?timeout:int -> Async<'T option>
  ...

Full name: Script.DisposableAgent<_>


 Wrapper for the standard F# agent (MailboxProcessor) that
 supports stopping of the agent's body using the IDisposable
 interface (the type automatically creates a cancellation token)


--------------------
private new : mbox:Agent<'T> * cts:CancellationTokenSource -> DisposableAgent<'T>
val mbox : Agent<'T>
val cts : CancellationTokenSource
Multiple items
type CancellationTokenSource =
  new : unit -> CancellationTokenSource
  member Cancel : unit -> unit + 1 overload
  member Dispose : unit -> unit
  member IsCancellationRequested : bool
  member Token : CancellationToken
  static member CreateLinkedTokenSource : [<ParamArray>] tokens:CancellationToken[] -> CancellationTokenSource + 1 overload

Full name: System.Threading.CancellationTokenSource

--------------------
CancellationTokenSource() : unit
static member DisposableAgent.Start : f:(MailboxProcessor<'T> -> Async<unit>) -> DisposableAgent<'T>

Full name: Script.DisposableAgent`1.Start


 Start a new disposable agent using the specified body function
 (the method creates a new cancellation token for the agent)
val f : (MailboxProcessor<'T> -> Async<unit>)
property CancellationTokenSource.Token: CancellationToken
val x : DisposableAgent<'T>
member DisposableAgent.CurrentQueueLength : int

Full name: Script.DisposableAgent`1.CurrentQueueLength


 Returns the number of unprocessed messages in the message queue of the agent.
property MailboxProcessor.CurrentQueueLength: int
Multiple items
type CLIEventAttribute =
  inherit Attribute
  new : unit -> CLIEventAttribute

Full name: Microsoft.FSharp.Core.CLIEventAttribute

--------------------
new : unit -> CLIEventAttribute
member DisposableAgent.Error : IEvent<Handler<Exception>,Exception>

Full name: Script.DisposableAgent`1.Error


 Occurs when the execution of the agent results in an exception.
event MailboxProcessor.Error: IEvent<Handler<Exception>,Exception>
member DisposableAgent.Receive : ?timeout:int -> Async<'T>

Full name: Script.DisposableAgent`1.Receive


 Waits for a message. This will consume the first message in arrival order.
val timeout : int option
member MailboxProcessor.Receive : ?timeout:int -> Async<'Msg>
member DisposableAgent.Scan : scanner:('T -> Async<'a> option) * ?timeout:int -> Async<'a>

Full name: Script.DisposableAgent`1.Scan


 Scans for a message by looking through messages in arrival order until scanner
 returns a Some value. Other messages remain in the queue.
val scanner : ('T -> Async<'a> option)
member MailboxProcessor.Scan : scanner:('Msg -> Async<'T> option) * ?timeout:int -> Async<'T>
member DisposableAgent.TryPostAndReply : buildMessage:(AsyncReplyChannel<'a> -> 'T) * ?timeout:int -> 'a option

Full name: Script.DisposableAgent`1.TryPostAndReply


 Like PostAndReply, but returns None if no reply within the timeout period.
val buildMessage : (AsyncReplyChannel<'a> -> 'T)
member MailboxProcessor.TryPostAndReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> 'Reply option
member DisposableAgent.TryReceive : ?timeout:int -> Async<'T option>

Full name: Script.DisposableAgent`1.TryReceive


 Waits for a message. This will consume the first message in arrival order.
member MailboxProcessor.TryReceive : ?timeout:int -> Async<'Msg option>
member DisposableAgent.TryScan : scanner:('T -> Async<'a> option) * ?timeout:int -> Async<'a option>

Full name: Script.DisposableAgent`1.TryScan


 Scans for a message by looking through messages in arrival order until scanner
 returns a Some value. Other messages remain in the queue.
member MailboxProcessor.TryScan : scanner:('Msg -> Async<'T> option) * ?timeout:int -> Async<'T option>
member DisposableAgent.Post : m:'T -> unit

Full name: Script.DisposableAgent`1.Post


 Posts a message to the message queue of the MailboxProcessor, asynchronously.
val m : 'T
member MailboxProcessor.Post : message:'Msg -> unit
member DisposableAgent.PostAndReply : buildMessage:(AsyncReplyChannel<'a> -> 'T) * ?timeout:int -> 'a

Full name: Script.DisposableAgent`1.PostAndReply


 Posts a message to an agent and await a reply on the channel, synchronously.
member MailboxProcessor.PostAndReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> 'Reply
member DisposableAgent.PostAndTryAsyncReply : buildMessage:(AsyncReplyChannel<'a> -> 'T) * ?timeout:int -> Async<'a option>

Full name: Script.DisposableAgent`1.PostAndTryAsyncReply


 Like PostAndAsyncReply, but returns None if no reply within the timeout period.
member MailboxProcessor.PostAndTryAsyncReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> Async<'Reply option>
member DisposableAgent.PostAndAsyncReply : buildMessage:(AsyncReplyChannel<'a> -> 'T) * ?timeout:int -> Async<'a>

Full name: Script.DisposableAgent`1.PostAndAsyncReply


 Posts a message to an agent and await a reply on the channel, asynchronously.
member MailboxProcessor.PostAndAsyncReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> Async<'Reply>
type IDisposable =
  member Dispose : unit -> unit

Full name: System.IDisposable
override DisposableAgent.Dispose : unit -> unit

Full name: Script.DisposableAgent`1.Dispose
CancellationTokenSource.Cancel() : unit
CancellationTokenSource.Cancel(throwOnFirstException: bool) : unit
val op : Async<unit>

Full name: Script.op
val async : AsyncBuilder

Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.async
val agent : DisposableAgent<int * AsyncReplyChannel<unit>>
type DisposableAgent<'T> =
  interface IDisposable
  private new : mbox:Agent<'T> * cts:CancellationTokenSource -> DisposableAgent<'T>
  member Post : m:'T -> unit
  member PostAndAsyncReply : buildMessage:(AsyncReplyChannel<'a> -> 'T) * ?timeout:int -> Async<'a>
  member PostAndReply : buildMessage:(AsyncReplyChannel<'a> -> 'T) * ?timeout:int -> 'a
  member PostAndTryAsyncReply : buildMessage:(AsyncReplyChannel<'a> -> 'T) * ?timeout:int -> Async<'a option>
  member Receive : ?timeout:int -> Async<'T>
  member Scan : scanner:('T -> Async<'a> option) * ?timeout:int -> Async<'a>
  member TryPostAndReply : buildMessage:(AsyncReplyChannel<'a> -> 'T) * ?timeout:int -> 'a option
  member TryReceive : ?timeout:int -> Async<'T option>
  ...

Full name: Script.DisposableAgent<_>


 Wrapper for the standard F# agent (MailboxProcessor) that
 supports stopping of the agent's body using the IDisposable
 interface (the type automatically creates a cancellation token)
static member DisposableAgent.Start : f:(MailboxProcessor<'T> -> Async<unit>) -> DisposableAgent<'T>


 Start a new disposable agent using the specified body function
 (the method creates a new cancellation token for the agent)
val agent : MailboxProcessor<int * AsyncReplyChannel<unit>>
val msg : (int * AsyncReplyChannel<unit>) option
union case Option.Some: Value: 'T -> Option<'T>
val n : int
val reply : AsyncReplyChannel<unit>
type AsyncReplyChannel<'Reply>
member Reply : value:'Reply -> unit

Full name: Microsoft.FSharp.Control.AsyncReplyChannel<_>
type unit = Unit

Full name: Microsoft.FSharp.Core.unit
val printfn : format:Printf.TextWriterFormat<'T> -> 'T

Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.printfn
member AsyncReplyChannel.Reply : value:'Reply -> unit
val i : int
member DisposableAgent.PostAndAsyncReply : buildMessage:(AsyncReplyChannel<'a> -> 'T) * ?timeout:int -> Async<'a>


 Posts a message to an agent and await a reply on the channel, asynchronously.
val r : AsyncReplyChannel<unit>
Multiple items
type Async
static member AsBeginEnd : computation:('Arg -> Async<'T>) -> ('Arg * AsyncCallback * obj -> IAsyncResult) * (IAsyncResult -> 'T) * (IAsyncResult -> unit)
static member AwaitEvent : event:IEvent<'Del,'T> * ?cancelAction:(unit -> unit) -> Async<'T> (requires delegate and 'Del :> Delegate)
static member AwaitIAsyncResult : iar:IAsyncResult * ?millisecondsTimeout:int -> Async<bool>
static member AwaitTask : task:Task<'T> -> Async<'T>
static member AwaitWaitHandle : waitHandle:WaitHandle * ?millisecondsTimeout:int -> Async<bool>
static member CancelDefaultToken : unit -> unit
static member Catch : computation:Async<'T> -> Async<Choice<'T,exn>>
static member FromBeginEnd : beginAction:(AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg:'Arg1 * beginAction:('Arg1 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg1:'Arg1 * arg2:'Arg2 * beginAction:('Arg1 * 'Arg2 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg1:'Arg1 * arg2:'Arg2 * arg3:'Arg3 * beginAction:('Arg1 * 'Arg2 * 'Arg3 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromContinuations : callback:(('T -> unit) * (exn -> unit) * (OperationCanceledException -> unit) -> unit) -> Async<'T>
static member Ignore : computation:Async<'T> -> Async<unit>
static member OnCancel : interruption:(unit -> unit) -> Async<IDisposable>
static member Parallel : computations:seq<Async<'T>> -> Async<'T []>
static member RunSynchronously : computation:Async<'T> * ?timeout:int * ?cancellationToken:CancellationToken -> 'T
static member Sleep : millisecondsDueTime:int -> Async<unit>
static member Start : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
static member StartAsTask : computation:Async<'T> * ?taskCreationOptions:TaskCreationOptions * ?cancellationToken:CancellationToken -> Task<'T>
static member StartChild : computation:Async<'T> * ?millisecondsTimeout:int -> Async<Async<'T>>
static member StartChildAsTask : computation:Async<'T> * ?taskCreationOptions:TaskCreationOptions -> Async<Task<'T>>
static member StartImmediate : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
static member StartWithContinuations : computation:Async<'T> * continuation:('T -> unit) * exceptionContinuation:(exn -> unit) * cancellationContinuation:(OperationCanceledException -> unit) * ?cancellationToken:CancellationToken -> unit
static member SwitchToContext : syncContext:SynchronizationContext -> Async<unit>
static member SwitchToNewThread : unit -> Async<unit>
static member SwitchToThreadPool : unit -> Async<unit>
static member TryCancelled : computation:Async<'T> * compensation:(OperationCanceledException -> unit) -> Async<'T>
static member CancellationToken : Async<CancellationToken>
static member DefaultCancellationToken : CancellationToken

Full name: Microsoft.FSharp.Control.Async

--------------------
type Async<'T>

Full name: Microsoft.FSharp.Control.Async<_>
static member Async.Start : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
Next Version Raw view Test code New version

More information

Link:http://fssnip.net/64
Posted:13 years ago
Author:Tomas Petricek
Tags: agent , message passing , mailboxprocessor , cancellation