1 people like it.
Like the snippet!
Mailbox processor that won't die on errors
Sometimes mailboxprocessor may corrupt on errors and the state is not clear.
Sometimes you don't care about the existing queue, but you want to have always a processor listening.
1:
2:
3:
4:
5:
6:
7:
8:
9:
10:
11:
12:
13:
14:
15:
16:
17:
18:
19:
20:
21:
22:
|
let internal createMailboxProcessor() =
MailboxProcessor.Start(fun inbox ->
let rec loop () = async {
let! msg = inbox.Receive()
//...
return! loop ()
}
loop ())
let mutable internal backgroundProcessor = createMailboxProcessor()
let rec setErrorHandler (p:MailboxProcessor<_>) =
p.Error.Add(fun e ->
printfn "Mailbox died. Spawning a new one. Error: %O" e
backgroundProcessor <- createMailboxProcessor()
setErrorHandler backgroundProcessor
)
setErrorHandler backgroundProcessor
// Note: This is not ensuring the handling of the existing queue:
// backgroundProcessor.CurrentQueueLength
// You may want to retry some messages with some logics to skip the ones causing the error.
|
val internal createMailboxProcessor : unit -> MailboxProcessor<'a>
Full name: Script.createMailboxProcessor
Multiple items
type MailboxProcessor<'Msg> =
interface IDisposable
new : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:CancellationToken -> MailboxProcessor<'Msg>
member Post : message:'Msg -> unit
member PostAndAsyncReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> Async<'Reply>
member PostAndReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> 'Reply
member PostAndTryAsyncReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> Async<'Reply option>
member Receive : ?timeout:int -> Async<'Msg>
member Scan : scanner:('Msg -> Async<'T> option) * ?timeout:int -> Async<'T>
member Start : unit -> unit
member TryPostAndReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> 'Reply option
...
Full name: Microsoft.FSharp.Control.MailboxProcessor<_>
--------------------
new : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:System.Threading.CancellationToken -> MailboxProcessor<'Msg>
static member MailboxProcessor.Start : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:System.Threading.CancellationToken -> MailboxProcessor<'Msg>
val inbox : MailboxProcessor<'a>
val loop : (unit -> Async<'b>)
val async : AsyncBuilder
Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.async
val msg : 'a
member MailboxProcessor.Receive : ?timeout:int -> Async<'Msg>
val mutable internal backgroundProcessor : MailboxProcessor<obj>
Full name: Script.backgroundProcessor
val setErrorHandler : p:MailboxProcessor<obj> -> unit
Full name: Script.setErrorHandler
val p : MailboxProcessor<obj>
event MailboxProcessor.Error: IEvent<Handler<System.Exception>,System.Exception>
member System.IObservable.Add : callback:('T -> unit) -> unit
val e : System.Exception
val printfn : format:Printf.TextWriterFormat<'T> -> 'T
Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.printfn
More information