Cancellable agent

The snippet implements a wrapper for standard F# agent that can be cancelled using the IDisposable interface. This makes it possible to use the agent locally (e.g. inside asynchronous workflow). When it is no longer needed, the agent's body is cancelled.

Copy Source
Copy Link
Tools:

Implementation

 1: type Agent<'T> = MailboxProcessor<'T>
 2: 
 3: /// Wrapper for the standard F# agent (MailboxProcessor) that
 4: /// supports stopping of the agent's body using the IDisposable 
 5: /// interface (the type automatically creates a cancellation token)
 6: type AutoCancelAgent<'T> private (mbox:Agent<'T>, cts:CancellationTokenSource) = 
 7: 
 8:   /// Start a new disposable agent using the specified body function
 9:   /// (the method creates a new cancellation token for the agent)
10:   static member Start(f) = 
11:     let cts = new CancellationTokenSource()
12:     new AutoCancelAgent<'T>(Agent<'T>.Start(f, cancellationToken = cts.Token), cts)
13: 
14:   (Boilerplate code that wraps standard methods of Agent)
15: 
16:   // Disposes the agent and cancels the body
17:   interface IDisposable with
18:     member x.Dispose() = 
19:       (mbox :> IDisposable).Dispose()
20:       cts.Cancel()

Sample usage

 1: let op = async {
 2:   // Create a local agent that is disposed when the 
 3:   // workflow completes (using the 'use' construct)
 4:   use agent = AutoCancelAgent.Start(fun agent -> async { 
 5:     try 
 6:       while true do
 7:         // Wait for a message - note that we use timeout
 8:         // to allow cancellation (when the operation completes)
 9:         let! msg = agent.TryReceive(1000)
10:         match msg with 
11:         | Some(n, reply:AsyncReplyChannel<unit>) ->
12:             // Print number and reply to the sender
13:             printfn "%d" n
14:             reply.Reply(())
15:         | _ -> ()
16:     finally 
17:       // Called when the agent is disposed
18:       printfn "agent completed" })
19:   
20:   // Do some processing using the agent...
21:   for i in 0 .. 10 do 
22:     do! agent.PostAndAsyncReply(fun r -> i, r) 
23:   printfn "workflow completed" }
24: 
25: Async.Start(op)
type Agent<'T> = MailboxProcessor<'T>

Full name: Snippet.Agent<_>

  type: Agent<'T>
  implements: IDisposable
type MailboxProcessor<'Msg> =
  class
    interface IDisposable
    new : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:CancellationToken -> MailboxProcessor<'Msg>
    member Post : message:'Msg -> unit
    member PostAndAsyncReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> Async<'Reply>
    member PostAndReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> 'Reply
    member PostAndTryAsyncReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> Async<'Reply option>
    member Receive : ?timeout:int -> Async<'Msg>
    member Scan : scanner:('Msg -> Async<'T> option) * ?timeout:int -> Async<'T>
    member Start : unit -> unit
    member TryPostAndReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> 'Reply option
    member TryReceive : ?timeout:int -> Async<'Msg option>
    member TryScan : scanner:('Msg -> Async<'T> option) * ?timeout:int -> Async<'T option>
    member add_Error : Handler<Exception> -> unit
    member CurrentQueueLength : int
    member DefaultTimeout : int
    member Error : IEvent<Exception>
    member remove_Error : Handler<Exception> -> unit
    member DefaultTimeout : int with set
    static member Start : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:CancellationToken -> MailboxProcessor<'Msg>
  end

Full name: Microsoft.FSharp.Control.MailboxProcessor<_>

  type: MailboxProcessor<'Msg>
  implements: IDisposable
type AutoCancelAgent<'T> =
  class
    interface IDisposable
    private new : mbox:Agent<'T> * cts:CancellationTokenSource -> AutoCancelAgent<'T>
    member Post : m:'T -> unit
    member PostAndAsyncReply : buildMessage:(AsyncReplyChannel<'a> -> 'T) * ?timeout:int -> Async<'a>
    member PostAndReply : buildMessage:(AsyncReplyChannel<'a> -> 'T) * ?timeout:int -> 'a
    member PostAndTryAsyncReply : buildMessage:(AsyncReplyChannel<'a> -> 'T) * ?timeout:int -> Async<'a option>
    member Receive : ?timeout:int -> Async<'T>
    member Scan : scanner:('T -> Async<'a> option) * ?timeout:int -> Async<'a>
    member TryPostAndReply : buildMessage:(AsyncReplyChannel<'a> -> 'T) * ?timeout:int -> 'a option
    member TryReceive : ?timeout:int -> Async<'T option>
    member TryScan : scanner:('T -> Async<'a> option) * ?timeout:int -> Async<'a option>
    member add_Error : Handler<Exception> -> unit
    member CurrentQueueLength : int
    member Error : IEvent<Handler<Exception>,Exception>
    member remove_Error : Handler<Exception> -> unit
    static member Start : f:(MailboxProcessor<'T> -> Async<unit>) -> AutoCancelAgent<'T>
  end

Full name: Snippet.AutoCancelAgent<_>

  type: AutoCancelAgent<'T>
  implements: IDisposable


Wrapper for the standard F# agent (MailboxProcessor) that
 supports stopping of the agent's body using the IDisposable
 interface (the type automatically creates a cancellation token)

val mbox : Agent<'T>

  type: Agent<'T>
  implements: IDisposable
val cts : CancellationTokenSource

  type: CancellationTokenSource
  implements: IDisposable
type CancellationTokenSource =
  class
    new : unit -> System.Threading.CancellationTokenSource
    member Cancel : unit -> unit
    member Cancel : bool -> unit
    member Dispose : unit -> unit
    member IsCancellationRequested : bool
    member Token : System.Threading.CancellationToken
    static member CreateLinkedTokenSource : System.Threading.CancellationToken [] -> System.Threading.CancellationTokenSource
    static member CreateLinkedTokenSource : System.Threading.CancellationToken * System.Threading.CancellationToken -> System.Threading.CancellationTokenSource
  end

Full name: System.Threading.CancellationTokenSource

  type: CancellationTokenSource
  implements: IDisposable
static member AutoCancelAgent.Start : f:(MailboxProcessor<'T> -> Async<unit>) -> AutoCancelAgent<'T>

Full name: Snippet.AutoCancelAgent`1.Start

Start a new disposable agent using the specified body function
 (the method creates a new cancellation token for the agent)

val f : (MailboxProcessor<'T> -> Async<unit>)
property CancellationTokenSource.Token: CancellationToken
/// Returns the number of unprocessed messages in the message queue of the agent.
member x.CurrentQueueLength = mbox.CurrentQueueLength
/// Occurs when the execution of the agent results in an exception.
[<CLIEvent>]
member x.Error = mbox.Error
/// Waits for a message. This will consume the first message in arrival order.
member x.Receive(?timeout) = mbox.Receive(?timeout = timeout)
/// Scans for a message by looking through messages in arrival order until scanner
/// returns a Some value. Other messages remain in the queue.
member x.Scan(scanner, ?timeout) = mbox.Scan(scanner, ?timeout = timeout)
/// Like PostAndReply, but returns None if no reply within the timeout period.
member x.TryPostAndReply(buildMessage, ?timeout) =
  mbox.TryPostAndReply(buildMessage, ?timeout = timeout)
/// Waits for a message. This will consume the first message in arrival order.
member x.TryReceive(?timeout) =
  mbox.TryReceive(?timeout = timeout)
/// Scans for a message by looking through messages in arrival order until scanner
/// returns a Some value. Other messages remain in the queue.
member x.TryScan(scanner, ?timeout) =
  mbox.TryScan(scanner, ?timeout = timeout)
/// Posts a message to the message queue of the MailboxProcessor, asynchronously.
member x.Post(m) = mbox.Post(m)
/// Posts a message to an agent and await a reply on the channel, synchronously.
member x.PostAndReply(buildMessage, ?timeout) =
  mbox.PostAndReply(buildMessage, ?timeout = timeout)
/// Like PostAndAsyncReply, but returns None if no reply within the timeout period.
member x.PostAndTryAsyncReply(buildMessage, ?timeout) =
  mbox.PostAndTryAsyncReply(buildMessage, ?timeout = timeout)
/// Posts a message to an agent and await a reply on the channel, asynchronously.
member x.PostAndAsyncReply(buildMessage, ?timeout) =
  mbox.PostAndAsyncReply(buildMessage, ?timeout=timeout)
Multiple items
type IDisposable =
  interface
    member Dispose : unit -> unit
  end

Full name: System.IDisposable

--------------------

IDisposable
val x : AutoCancelAgent<'T>

  type: AutoCancelAgent<'T>
  implements: IDisposable
override AutoCancelAgent.Dispose : unit -> unit

Full name: Snippet.AutoCancelAgent`1.Dispose
Multiple overloads
CancellationTokenSource.Cancel() : unit
CancellationTokenSource.Cancel(throwOnFirstException: bool) : unit
val op : Async<unit>

Full name: Snippet.op
val async : AsyncBuilder

Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.async
val agent : AutoCancelAgent<int * AsyncReplyChannel<unit>>

  type: AutoCancelAgent<int * AsyncReplyChannel<unit>>
  implements: IDisposable
static member AutoCancelAgent.Start : f:(MailboxProcessor<'T> -> Async<unit>) -> AutoCancelAgent<'T>

Start a new disposable agent using the specified body function
 (the method creates a new cancellation token for the agent)

val agent : MailboxProcessor<int * AsyncReplyChannel<unit>>

  type: MailboxProcessor<int * AsyncReplyChannel<unit>>
  implements: IDisposable
val msg : (int * AsyncReplyChannel<unit>) option

  type: (int * AsyncReplyChannel<unit>) option
  implements: Collections.IStructuralEquatable
  implements: IComparable<Option<int * AsyncReplyChannel<unit>>>
  implements: IComparable
  implements: Collections.IStructuralComparable
member MailboxProcessor.TryReceive : ?timeout:int -> Async<'Msg option>
union case Option.Some: 'T -> Option<'T>
val n : int

  type: int
  implements: IComparable
  implements: IFormattable
  implements: IConvertible
  implements: IComparable<int>
  implements: IEquatable<int>
  inherits: ValueType
val reply : AsyncReplyChannel<unit>
type AsyncReplyChannel<'Reply>
with
  member Reply : value:'Reply -> unit
end

Full name: Microsoft.FSharp.Control.AsyncReplyChannel<_>
type unit = Unit

Full name: Microsoft.FSharp.Core.unit

  type: unit
  implements: IComparable
val printfn : Printf.TextWriterFormat<'T> -> 'T

Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.printfn
member AsyncReplyChannel.Reply : value:'Reply -> unit
val i : int

  type: int
  implements: IComparable
  implements: IFormattable
  implements: IConvertible
  implements: IComparable<int>
  implements: IEquatable<int>
  inherits: ValueType
member AutoCancelAgent.PostAndAsyncReply : buildMessage:(AsyncReplyChannel<'a> -> 'T) * ?timeout:int -> Async<'a>

Posts a message to an agent and await a reply on the channel, asynchronously.
val r : AsyncReplyChannel<unit>
Multiple items
type Async<'T>

Full name: Microsoft.FSharp.Control.Async<_>

--------------------

type Async
with
  static member AsBeginEnd : computation:('Arg -> Async<'T>) -> ('Arg * AsyncCallback * obj -> IAsyncResult) * (IAsyncResult -> 'T) * (IAsyncResult -> unit)
  static member AwaitEvent : event:IEvent<'Del,'T> * ?cancelAction:(unit -> unit) -> Async<'T> (requires delegate and 'Del :> Delegate)
  static member AwaitIAsyncResult : iar:IAsyncResult * ?millisecondsTimeout:int -> Async<bool>
  static member AwaitTask : task:Tasks.Task<'T> -> Async<'T>
  static member AwaitWaitHandle : waitHandle:WaitHandle * ?millisecondsTimeout:int -> Async<bool>
  static member CancelDefaultToken : unit -> unit
  static member Catch : computation:Async<'T> -> Async<Choice<'T,exn>>
  static member FromBeginEnd : beginAction:(AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
  static member FromBeginEnd : arg:'Arg1 * beginAction:('Arg1 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
  static member FromBeginEnd : arg1:'Arg1 * arg2:'Arg2 * beginAction:('Arg1 * 'Arg2 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
  static member FromBeginEnd : arg1:'Arg1 * arg2:'Arg2 * arg3:'Arg3 * beginAction:('Arg1 * 'Arg2 * 'Arg3 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
  static member FromContinuations : callback:(('T -> unit) * (exn -> unit) * (OperationCanceledException -> unit) -> unit) -> Async<'T>
  static member Ignore : computation:Async<'T> -> Async<unit>
  static member OnCancel : interruption:(unit -> unit) -> Async<IDisposable>
  static member Parallel : computations:seq<Async<'T>> -> Async<'T []>
  static member RunSynchronously : computation:Async<'T> * ?timeout:int * ?cancellationToken:CancellationToken -> 'T
  static member Sleep : millisecondsDueTime:int -> Async<unit>
  static member Start : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
  static member StartAsTask : computation:Async<'T> * ?taskCreationOptions:Tasks.TaskCreationOptions * ?cancellationToken:CancellationToken -> Tasks.Task<'T>
  static member StartChild : computation:Async<'T> * ?millisecondsTimeout:int -> Async<Async<'T>>
  static member StartChildAsTask : computation:Async<'T> * ?taskCreationOptions:Tasks.TaskCreationOptions -> Async<Tasks.Task<'T>>
  static member StartImmediate : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
  static member StartWithContinuations : computation:Async<'T> * continuation:('T -> unit) * exceptionContinuation:(exn -> unit) * cancellationContinuation:(OperationCanceledException -> unit) * ?cancellationToken:CancellationToken -> unit
  static member SwitchToContext : syncContext:SynchronizationContext -> Async<unit>
  static member SwitchToNewThread : unit -> Async<unit>
  static member SwitchToThreadPool : unit -> Async<unit>
  static member TryCancelled : computation:Async<'T> * compensation:(OperationCanceledException -> unit) -> Async<'T>
  static member CancellationToken : Async<CancellationToken>
  static member DefaultCancellationToken : CancellationToken
end

Full name: Microsoft.FSharp.Control.Async
static member Async.Start : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit

More information

Link: http://fssnip.net/64
Posted: 3 years ago
Author: Tomas Petricek (website)
Tags: agent, message passing, mailboxprocessor, cancellation