5 people like it.

MailboxProcessor with exception handling

An extension of MailboxProcessor that catches all unhandled exceptions (in the body of the mailbox processor) and reports them using an event. Otherwise, the public interface is the same as for MailboxProcessor.

Definition of HandlingMailbox

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
14: 
15: 
16: 
17: 
18: 
19: 
20: 
21: 
22: 
23: 
24: 
/// A wrapper for MailboxProcessor that catches all unhandled exceptions
/// and reports them via the 'OnError' event. Otherwise, the API
/// is the same as the API of 'MailboxProcessor'
type HandlingMailbox<'T>(f:HandlingMailbox<'T> -> Async<unit>) as self =
  // Create an event for reporting errors
  let errorEvent = Event<_>()
  // Start standard MailboxProcessor 
  let inbox = new MailboxProcessor<_>(fun inbox -> async {
    // Run the user-provided function & handle exceptions
    try return! f self
    with e -> errorEvent.Trigger(e) })
  /// Triggered when an unhandled exception occurs
  member x.OnError = errorEvent.Publish
  /// Starts the mailbox processor
  member x.Start() = inbox.Start()
  /// Receive a message from the mailbox processor
  member x.Receive() = inbox.Receive()
  /// Post a message to the mailbox processor
  member x.Post(v:'T) = inbox.Post(v)
  /// Start the mailbox processor
  static member Start(f) =
    let mbox = new HandlingMailbox<_>(f)
    mbox.Start()
    mbox

Example of use

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
// The usage is the same as with standard MailboxProcessor
let counter = HandlingMailbox<_>.Start(fun inbox -> async {
  while true do 
    printfn "waiting for data..." 
    let! data = inbox.Receive()
    // Simulate an exception 
    failwith "fail!" })

// Specify callback for unhandled errors & send message to mailbox
counter.OnError.Add(printfn "Exception: %A")
counter.Post(42) 
Multiple items
type HandlingMailbox<'T> =
  new : f:(HandlingMailbox<'T> -> Async<unit>) -> HandlingMailbox<'T>
  member Post : v:'T -> unit
  member Receive : unit -> Async<'T>
  member Start : unit -> unit
  member OnError : IEvent<exn>
  static member Start : f:(HandlingMailbox<'a> -> Async<unit>) -> HandlingMailbox<'a>

Full name: Script.HandlingMailbox<_>


 A wrapper for MailboxProcessor that catches all unhandled exceptions
 and reports them via the 'OnError' event. Otherwise, the API
 is the same as the API of 'MailboxProcessor'


--------------------
new : f:(HandlingMailbox<'T> -> Async<unit>) -> HandlingMailbox<'T>
val f : (HandlingMailbox<'T> -> Async<unit>)
Multiple items
type Async
static member AsBeginEnd : computation:('Arg -> Async<'T>) -> ('Arg * AsyncCallback * obj -> IAsyncResult) * (IAsyncResult -> 'T) * (IAsyncResult -> unit)
static member AwaitEvent : event:IEvent<'Del,'T> * ?cancelAction:(unit -> unit) -> Async<'T> (requires delegate and 'Del :> Delegate)
static member AwaitIAsyncResult : iar:IAsyncResult * ?millisecondsTimeout:int -> Async<bool>
static member AwaitTask : task:Task<'T> -> Async<'T>
static member AwaitWaitHandle : waitHandle:WaitHandle * ?millisecondsTimeout:int -> Async<bool>
static member CancelDefaultToken : unit -> unit
static member Catch : computation:Async<'T> -> Async<Choice<'T,exn>>
static member FromBeginEnd : beginAction:(AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg:'Arg1 * beginAction:('Arg1 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg1:'Arg1 * arg2:'Arg2 * beginAction:('Arg1 * 'Arg2 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg1:'Arg1 * arg2:'Arg2 * arg3:'Arg3 * beginAction:('Arg1 * 'Arg2 * 'Arg3 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromContinuations : callback:(('T -> unit) * (exn -> unit) * (OperationCanceledException -> unit) -> unit) -> Async<'T>
static member Ignore : computation:Async<'T> -> Async<unit>
static member OnCancel : interruption:(unit -> unit) -> Async<IDisposable>
static member Parallel : computations:seq<Async<'T>> -> Async<'T []>
static member RunSynchronously : computation:Async<'T> * ?timeout:int * ?cancellationToken:CancellationToken -> 'T
static member Sleep : millisecondsDueTime:int -> Async<unit>
static member Start : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
static member StartAsTask : computation:Async<'T> * ?taskCreationOptions:TaskCreationOptions * ?cancellationToken:CancellationToken -> Task<'T>
static member StartChild : computation:Async<'T> * ?millisecondsTimeout:int -> Async<Async<'T>>
static member StartChildAsTask : computation:Async<'T> * ?taskCreationOptions:TaskCreationOptions -> Async<Task<'T>>
static member StartImmediate : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
static member StartWithContinuations : computation:Async<'T> * continuation:('T -> unit) * exceptionContinuation:(exn -> unit) * cancellationContinuation:(OperationCanceledException -> unit) * ?cancellationToken:CancellationToken -> unit
static member SwitchToContext : syncContext:SynchronizationContext -> Async<unit>
static member SwitchToNewThread : unit -> Async<unit>
static member SwitchToThreadPool : unit -> Async<unit>
static member TryCancelled : computation:Async<'T> * compensation:(OperationCanceledException -> unit) -> Async<'T>
static member CancellationToken : Async<CancellationToken>
static member DefaultCancellationToken : CancellationToken

Full name: Microsoft.FSharp.Control.Async

--------------------
type Async<'T>

Full name: Microsoft.FSharp.Control.Async<_>
type unit = Unit

Full name: Microsoft.FSharp.Core.unit
val self : HandlingMailbox<'T>
val errorEvent : Event<exn>
Multiple items
module Event

from Microsoft.FSharp.Control

--------------------
type Event<'T> =
  new : unit -> Event<'T>
  member Trigger : arg:'T -> unit
  member Publish : IEvent<'T>

Full name: Microsoft.FSharp.Control.Event<_>

--------------------
type Event<'Delegate,'Args (requires delegate and 'Delegate :> Delegate)> =
  new : unit -> Event<'Delegate,'Args>
  member Trigger : sender:obj * args:'Args -> unit
  member Publish : IEvent<'Delegate,'Args>

Full name: Microsoft.FSharp.Control.Event<_,_>

--------------------
new : unit -> Event<'T>

--------------------
new : unit -> Event<'Delegate,'Args>
val inbox : MailboxProcessor<'T>
Multiple items
type MailboxProcessor<'Msg> =
  interface IDisposable
  new : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:CancellationToken -> MailboxProcessor<'Msg>
  member Post : message:'Msg -> unit
  member PostAndAsyncReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> Async<'Reply>
  member PostAndReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> 'Reply
  member PostAndTryAsyncReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> Async<'Reply option>
  member Receive : ?timeout:int -> Async<'Msg>
  member Scan : scanner:('Msg -> Async<'T> option) * ?timeout:int -> Async<'T>
  member Start : unit -> unit
  member TryPostAndReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> 'Reply option
  ...

Full name: Microsoft.FSharp.Control.MailboxProcessor<_>

--------------------
new : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:System.Threading.CancellationToken -> MailboxProcessor<'Msg>
val async : AsyncBuilder

Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.async
val e : exn
member Event.Trigger : arg:'T -> unit
val x : HandlingMailbox<'T>
member HandlingMailbox.OnError : IEvent<exn>

Full name: Script.HandlingMailbox`1.OnError


 Triggered when an unhandled exception occurs
property Event.Publish: IEvent<exn>
member HandlingMailbox.Start : unit -> unit

Full name: Script.HandlingMailbox`1.Start


 Starts the mailbox processor
member MailboxProcessor.Start : unit -> unit
member HandlingMailbox.Receive : unit -> Async<'T>

Full name: Script.HandlingMailbox`1.Receive


 Receive a message from the mailbox processor
member MailboxProcessor.Receive : ?timeout:int -> Async<'Msg>
member HandlingMailbox.Post : v:'T -> unit

Full name: Script.HandlingMailbox`1.Post


 Post a message to the mailbox processor
val v : 'T
member MailboxProcessor.Post : message:'Msg -> unit
static member HandlingMailbox.Start : f:(HandlingMailbox<'a> -> Async<unit>) -> HandlingMailbox<'a>

Full name: Script.HandlingMailbox`1.Start


 Start the mailbox processor
val f : (HandlingMailbox<'a> -> Async<unit>)
val mbox : HandlingMailbox<'a>
member HandlingMailbox.Start : unit -> unit


 Starts the mailbox processor
val counter : HandlingMailbox<int>

Full name: Script.counter
val inbox : HandlingMailbox<int>
val printfn : format:Printf.TextWriterFormat<'T> -> 'T

Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.printfn
val data : int
member HandlingMailbox.Receive : unit -> Async<'T>


 Receive a message from the mailbox processor
val failwith : message:string -> 'T

Full name: Microsoft.FSharp.Core.Operators.failwith
property HandlingMailbox.OnError: IEvent<exn>


 Triggered when an unhandled exception occurs
member System.IObservable.Add : callback:('T -> unit) -> unit
member HandlingMailbox.Post : v:'T -> unit


 Post a message to the mailbox processor
Raw view Test code New version

More information

Link:http://fssnip.net/cj
Posted:12 years ago
Author:Tomas Petricek
Tags: mailboxprocessor , agent , async , exception