1 people like it.

Mailbox processor that won't die on errors

Sometimes mailboxprocessor may corrupt on errors and the state is not clear. Sometimes you don't care about the existing queue, but you want to have always a processor listening.

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
14: 
15: 
16: 
17: 
18: 
19: 
20: 
21: 
22: 
let internal createMailboxProcessor() = 
    MailboxProcessor.Start(fun inbox ->
        let rec loop () = async {
            let! msg = inbox.Receive()
            //...
            return! loop ()
        }
        loop ())

let mutable internal backgroundProcessor = createMailboxProcessor()

let rec setErrorHandler (p:MailboxProcessor<_>) =
    p.Error.Add(fun e ->
        printfn "Mailbox died. Spawning a new one. Error: %O" e
        backgroundProcessor <- createMailboxProcessor()
        setErrorHandler backgroundProcessor
    )
setErrorHandler backgroundProcessor

// Note: This is not ensuring the handling of the existing queue:
// backgroundProcessor.CurrentQueueLength
// You may want to retry some messages with some logics to skip the ones causing the error.
val internal createMailboxProcessor : unit -> MailboxProcessor<'a>

Full name: Script.createMailboxProcessor
Multiple items
type MailboxProcessor<'Msg> =
  interface IDisposable
  new : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:CancellationToken -> MailboxProcessor<'Msg>
  member Post : message:'Msg -> unit
  member PostAndAsyncReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> Async<'Reply>
  member PostAndReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> 'Reply
  member PostAndTryAsyncReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> Async<'Reply option>
  member Receive : ?timeout:int -> Async<'Msg>
  member Scan : scanner:('Msg -> Async<'T> option) * ?timeout:int -> Async<'T>
  member Start : unit -> unit
  member TryPostAndReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> 'Reply option
  ...

Full name: Microsoft.FSharp.Control.MailboxProcessor<_>

--------------------
new : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:System.Threading.CancellationToken -> MailboxProcessor<'Msg>
static member MailboxProcessor.Start : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:System.Threading.CancellationToken -> MailboxProcessor<'Msg>
val inbox : MailboxProcessor<'a>
val loop : (unit -> Async<'b>)
val async : AsyncBuilder

Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.async
val msg : 'a
member MailboxProcessor.Receive : ?timeout:int -> Async<'Msg>
val mutable internal backgroundProcessor : MailboxProcessor<obj>

Full name: Script.backgroundProcessor
val setErrorHandler : p:MailboxProcessor<obj> -> unit

Full name: Script.setErrorHandler
val p : MailboxProcessor<obj>
event MailboxProcessor.Error: IEvent<Handler<System.Exception>,System.Exception>
member System.IObservable.Add : callback:('T -> unit) -> unit
val e : System.Exception
val printfn : format:Printf.TextWriterFormat<'T> -> 'T

Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.printfn
Raw view Test code New version

More information

Link:http://fssnip.net/7Vt
Posted:5 years ago
Author:Tuomas Hietanen
Tags: error handling , mailboxprocessor