2 people like it.
Like the snippet!
Posting async requests to an unreliable agent with error handling
This example shows an implementation a MailboxProcessor<'T> extension method for posting messages and awaiting an asynchronous reply from a mailbox processor agent which mail fail. The error continuation is called if a failure event is triggered on the agent after the message is posted. The agent goes into a failed state where the result of all future messages posted with this mechanism is failure.
1:
2:
3:
4:
5:
6:
7:
8:
9:
10:
11:
12:
13:
14:
15:
16:
17:
18:
19:
20:
21:
22:
23:
24:
25:
26:
27:
28:
29:
30:
31:
32:
33:
34:
35:
36:
37:
38:
39:
40:
41:
42:
43:
44:
45:
46:
47:
48:
49:
50:
51:
52:
53:
54:
55:
56:
57:
58:
59:
60:
61:
62:
63:
64:
65:
66:
67:
68:
69:
70:
71:
72:
73:
74:
75:
76:
77:
78:
79:
80:
81:
82:
83:
84:
85:
86:
87:
88:
89:
90:
91:
92:
93:
94:
95:
|
open System
type Agent<'T> = MailboxProcessor<'T>
type MailboxProcessor<'T> with
member agent.PostAndAsyncReplyFailable (failureEvent : IObservable<exn>) buildMessage = async {
let! token = Async.CancellationToken // capture the current cancellation token
return! Async.FromContinuations(fun (cont, econt, ccont) ->
// start an agent which will wait for a message indicating which continuation to call
let continuator = Agent.Start((fun (mailbox : Agent<Choice<'Reply, exn, OperationCanceledException>>) ->
async {
// if the cancellation token is canceled or the agent fails, post the appropriate message
use __ = token.Register((fun _ ->
let result = Choice3Of3 (new OperationCanceledException("The opeartion was cancelled."))
mailbox.Post result))
use __ = failureEvent.Subscribe(fun exn -> mailbox.Post(Choice2Of3 exn))
// wait for a single message and call the appropriate continuation
let! message = mailbox.Receive()
match message with
| Choice1Of3 reply -> cont reply
| Choice2Of3 exn -> econt exn
| Choice3Of3 exn -> ccont exn }))
// start another async wokrflow which will post a message to the agent and wait for a response, then
// forward it to the continuator agent
Async.Start( async {
let! reply = agent.PostAndAsyncReply buildMessage
continuator.Post(Choice1Of3 reply) }, token)) }
// example: randomly failing agent
type RandomlyFailingAgent(failureProbability, responseDelay) =
let agentFailure = new Event<exn>() // event fired when an error occurs
// start an agent mailbox which
let agent = Agent.Start(fun (mailbox : Agent<AsyncReplyChannel<unit>>) ->
// returns true in case of failure and false otherwise
let failRandomly =
let gen = new Random()
(fun () -> gen.NextDouble() < failureProbability)
// message processing loop
let rec loop () = async {
let! message = mailbox.Receive()
do! Async.Sleep responseDelay
if failRandomly() then // if a failure occurs then go to the failed state
return! failed (new Exception("Agent died unexpectedly."))
else // otherwise reply to the message and keep processing
message.Reply()
return! loop () }
// failed loop just triggers the failur event so that the error continuation is called for new messages
and failed exn = async {
agentFailure.Trigger exn
let! __ = mailbox.Receive()
return! failed exn }
loop ())
// post a failable message to the agent mailbox
member __.MakeRequestAsync() =
(fun replyChannel -> replyChannel)
|> agent.PostAndAsyncReplyFailable (agentFailure.Publish)
[<EntryPoint>]
let main _ =
let cancellationCapability = new System.Threading.CancellationTokenSource()
let unreliable = new RandomlyFailingAgent(0.01, 10) // create an unreliable agent
// define an asynchronous workflow which will keep posting messages until one fails
let rec loop n = async {
do! unreliable.MakeRequestAsync()
printfn "Successfully completed %d requests." n
do! loop (n + 1) }
Async.StartWithContinuations((loop 1),
(ignore), // computation will run until failure or cancellation
(fun exn -> printfn "Failed due to error: %A." exn.Message),
(fun exn -> printfn "Canceled: %A." exn.Message),
cancellationCapability.Token)
// press enter to cancel the cancellation capability
Console.ReadLine() |> ignore
cancellationCapability.Cancel()
// make one more request to see if the agent is still alive
Async.StartWithContinuations(unreliable.MakeRequestAsync(),
(fun () -> printfn "Successfully made a follow-up request. Agent is still alive and kicking."),
(fun exn -> printfn "Failed: %s" exn.Message),
ignore)
Console.ReadLine() |> ignore
0 // return an integer exit code
|
namespace System
type Agent<'T> = MailboxProcessor<'T>
Full name: Script.Agent<_>
Multiple items
type MailboxProcessor<'Msg> =
interface IDisposable
new : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:CancellationToken -> MailboxProcessor<'Msg>
member Post : message:'Msg -> unit
member PostAndAsyncReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> Async<'Reply>
member PostAndReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> 'Reply
member PostAndTryAsyncReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> Async<'Reply option>
member Receive : ?timeout:int -> Async<'Msg>
member Scan : scanner:('Msg -> Async<'T> option) * ?timeout:int -> Async<'T>
member Start : unit -> unit
member TryPostAndReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> 'Reply option
...
Full name: Microsoft.FSharp.Control.MailboxProcessor<_>
--------------------
new : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:Threading.CancellationToken -> MailboxProcessor<'Msg>
val agent : MailboxProcessor<'Msg>
member MailboxProcessor.PostAndAsyncReplyFailable : failureEvent:IObservable<exn> -> buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) -> Async<'Reply>
Full name: Script.PostAndAsyncReplyFailable
val failureEvent : IObservable<exn>
type IObservable<'T> =
member Subscribe : observer:IObserver<'T> -> IDisposable
Full name: System.IObservable<_>
type exn = Exception
Full name: Microsoft.FSharp.Core.exn
val buildMessage : (AsyncReplyChannel<'Reply> -> 'Msg)
val async : AsyncBuilder
Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.async
val token : Threading.CancellationToken
Multiple items
type Async
static member AsBeginEnd : computation:('Arg -> Async<'T>) -> ('Arg * AsyncCallback * obj -> IAsyncResult) * (IAsyncResult -> 'T) * (IAsyncResult -> unit)
static member AwaitEvent : event:IEvent<'Del,'T> * ?cancelAction:(unit -> unit) -> Async<'T> (requires delegate and 'Del :> Delegate)
static member AwaitIAsyncResult : iar:IAsyncResult * ?millisecondsTimeout:int -> Async<bool>
static member AwaitTask : task:Task<'T> -> Async<'T>
static member AwaitWaitHandle : waitHandle:WaitHandle * ?millisecondsTimeout:int -> Async<bool>
static member CancelDefaultToken : unit -> unit
static member Catch : computation:Async<'T> -> Async<Choice<'T,exn>>
static member FromBeginEnd : beginAction:(AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg:'Arg1 * beginAction:('Arg1 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg1:'Arg1 * arg2:'Arg2 * beginAction:('Arg1 * 'Arg2 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg1:'Arg1 * arg2:'Arg2 * arg3:'Arg3 * beginAction:('Arg1 * 'Arg2 * 'Arg3 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromContinuations : callback:(('T -> unit) * (exn -> unit) * (OperationCanceledException -> unit) -> unit) -> Async<'T>
static member Ignore : computation:Async<'T> -> Async<unit>
static member OnCancel : interruption:(unit -> unit) -> Async<IDisposable>
static member Parallel : computations:seq<Async<'T>> -> Async<'T []>
static member RunSynchronously : computation:Async<'T> * ?timeout:int * ?cancellationToken:CancellationToken -> 'T
static member Sleep : millisecondsDueTime:int -> Async<unit>
static member Start : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
static member StartAsTask : computation:Async<'T> * ?taskCreationOptions:TaskCreationOptions * ?cancellationToken:CancellationToken -> Task<'T>
static member StartChild : computation:Async<'T> * ?millisecondsTimeout:int -> Async<Async<'T>>
static member StartChildAsTask : computation:Async<'T> * ?taskCreationOptions:TaskCreationOptions -> Async<Task<'T>>
static member StartImmediate : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
static member StartWithContinuations : computation:Async<'T> * continuation:('T -> unit) * exceptionContinuation:(exn -> unit) * cancellationContinuation:(OperationCanceledException -> unit) * ?cancellationToken:CancellationToken -> unit
static member SwitchToContext : syncContext:SynchronizationContext -> Async<unit>
static member SwitchToNewThread : unit -> Async<unit>
static member SwitchToThreadPool : unit -> Async<unit>
static member TryCancelled : computation:Async<'T> * compensation:(OperationCanceledException -> unit) -> Async<'T>
static member CancellationToken : Async<CancellationToken>
static member DefaultCancellationToken : CancellationToken
Full name: Microsoft.FSharp.Control.Async
--------------------
type Async<'T>
Full name: Microsoft.FSharp.Control.Async<_>
property Async.CancellationToken: Async<Threading.CancellationToken>
static member Async.FromContinuations : callback:(('T -> unit) * (exn -> unit) * (OperationCanceledException -> unit) -> unit) -> Async<'T>
val cont : ('Reply -> unit)
val econt : (exn -> unit)
val ccont : (OperationCanceledException -> unit)
val continuator : MailboxProcessor<Choice<'Reply,exn,OperationCanceledException>>
static member MailboxProcessor.Start : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:Threading.CancellationToken -> MailboxProcessor<'Msg>
val mailbox : MailboxProcessor<Choice<'Reply,exn,OperationCanceledException>>
Multiple items
type Choice<'T1,'T2> =
| Choice1Of2 of 'T1
| Choice2Of2 of 'T2
Full name: Microsoft.FSharp.Core.Choice<_,_>
--------------------
type Choice<'T1,'T2,'T3> =
| Choice1Of3 of 'T1
| Choice2Of3 of 'T2
| Choice3Of3 of 'T3
Full name: Microsoft.FSharp.Core.Choice<_,_,_>
--------------------
type Choice<'T1,'T2,'T3,'T4> =
| Choice1Of4 of 'T1
| Choice2Of4 of 'T2
| Choice3Of4 of 'T3
| Choice4Of4 of 'T4
Full name: Microsoft.FSharp.Core.Choice<_,_,_,_>
--------------------
type Choice<'T1,'T2,'T3,'T4,'T5> =
| Choice1Of5 of 'T1
| Choice2Of5 of 'T2
| Choice3Of5 of 'T3
| Choice4Of5 of 'T4
| Choice5Of5 of 'T5
Full name: Microsoft.FSharp.Core.Choice<_,_,_,_,_>
--------------------
type Choice<'T1,'T2,'T3,'T4,'T5,'T6> =
| Choice1Of6 of 'T1
| Choice2Of6 of 'T2
| Choice3Of6 of 'T3
| Choice4Of6 of 'T4
| Choice5Of6 of 'T5
| Choice6Of6 of 'T6
Full name: Microsoft.FSharp.Core.Choice<_,_,_,_,_,_>
--------------------
type Choice<'T1,'T2,'T3,'T4,'T5,'T6,'T7> =
| Choice1Of7 of 'T1
| Choice2Of7 of 'T2
| Choice3Of7 of 'T3
| Choice4Of7 of 'T4
| Choice5Of7 of 'T5
| Choice6Of7 of 'T6
| Choice7Of7 of 'T7
Full name: Microsoft.FSharp.Core.Choice<_,_,_,_,_,_,_>
Multiple items
type OperationCanceledException =
inherit SystemException
new : unit -> OperationCanceledException + 5 overloads
member CancellationToken : CancellationToken with get, set
Full name: System.OperationCanceledException
--------------------
OperationCanceledException() : unit
OperationCanceledException(message: string) : unit
OperationCanceledException(token: Threading.CancellationToken) : unit
OperationCanceledException(message: string, innerException: exn) : unit
OperationCanceledException(message: string, token: Threading.CancellationToken) : unit
OperationCanceledException(message: string, innerException: exn, token: Threading.CancellationToken) : unit
val __ : Threading.CancellationTokenRegistration
Threading.CancellationToken.Register(callback: Action) : Threading.CancellationTokenRegistration
Threading.CancellationToken.Register(callback: Action<obj>, state: obj) : Threading.CancellationTokenRegistration
Threading.CancellationToken.Register(callback: Action, useSynchronizationContext: bool) : Threading.CancellationTokenRegistration
Threading.CancellationToken.Register(callback: Action<obj>, state: obj, useSynchronizationContext: bool) : Threading.CancellationTokenRegistration
val result : Choice<'Reply,exn,OperationCanceledException>
union case Choice.Choice3Of3: 'T3 -> Choice<'T1,'T2,'T3>
member MailboxProcessor.Post : message:'Msg -> unit
val __ : IDisposable
member IObservable.Subscribe : callback:('T -> unit) -> IDisposable
IObservable.Subscribe(observer: IObserver<exn>) : IDisposable
Multiple items
val exn : exn
--------------------
type exn = Exception
Full name: Microsoft.FSharp.Core.exn
union case Choice.Choice2Of3: 'T2 -> Choice<'T1,'T2,'T3>
val message : Choice<'Reply,exn,OperationCanceledException>
member MailboxProcessor.Receive : ?timeout:int -> Async<'Msg>
union case Choice.Choice1Of3: 'T1 -> Choice<'T1,'T2,'T3>
val reply : 'Reply
Multiple items
val exn : OperationCanceledException
--------------------
type exn = Exception
Full name: Microsoft.FSharp.Core.exn
static member Async.Start : computation:Async<unit> * ?cancellationToken:Threading.CancellationToken -> unit
member MailboxProcessor.PostAndAsyncReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> Async<'Reply>
Multiple items
type RandomlyFailingAgent =
new : failureProbability:float * responseDelay:int -> RandomlyFailingAgent
member MakeRequestAsync : unit -> Async<unit>
Full name: Script.RandomlyFailingAgent
--------------------
new : failureProbability:float * responseDelay:int -> RandomlyFailingAgent
val failureProbability : float
val responseDelay : int
val agentFailure : Event<exn>
Multiple items
module Event
from Microsoft.FSharp.Control
--------------------
type Event<'T> =
new : unit -> Event<'T>
member Trigger : arg:'T -> unit
member Publish : IEvent<'T>
Full name: Microsoft.FSharp.Control.Event<_>
--------------------
type Event<'Delegate,'Args (requires delegate and 'Delegate :> Delegate)> =
new : unit -> Event<'Delegate,'Args>
member Trigger : sender:obj * args:'Args -> unit
member Publish : IEvent<'Delegate,'Args>
Full name: Microsoft.FSharp.Control.Event<_,_>
--------------------
new : unit -> Event<'T>
--------------------
new : unit -> Event<'Delegate,'Args>
val agent : MailboxProcessor<AsyncReplyChannel<unit>>
val mailbox : MailboxProcessor<AsyncReplyChannel<unit>>
type AsyncReplyChannel<'Reply>
member Reply : value:'Reply -> unit
Full name: Microsoft.FSharp.Control.AsyncReplyChannel<_>
type unit = Unit
Full name: Microsoft.FSharp.Core.unit
val failRandomly : (unit -> bool)
val gen : Random
Multiple items
type Random =
new : unit -> Random + 1 overload
member Next : unit -> int + 2 overloads
member NextBytes : buffer:byte[] -> unit
member NextDouble : unit -> float
Full name: System.Random
--------------------
Random() : unit
Random(Seed: int) : unit
Random.NextDouble() : float
val loop : (unit -> Async<'a>)
val message : AsyncReplyChannel<unit>
static member Async.Sleep : millisecondsDueTime:int -> Async<unit>
val failed : (Exception -> Async<'a>)
Multiple items
type Exception =
new : unit -> Exception + 2 overloads
member Data : IDictionary
member GetBaseException : unit -> Exception
member GetObjectData : info:SerializationInfo * context:StreamingContext -> unit
member GetType : unit -> Type
member HelpLink : string with get, set
member InnerException : Exception
member Message : string
member Source : string with get, set
member StackTrace : string
...
Full name: System.Exception
--------------------
Exception() : unit
Exception(message: string) : unit
Exception(message: string, innerException: exn) : unit
member AsyncReplyChannel.Reply : value:'Reply -> unit
Multiple items
val exn : Exception
--------------------
type exn = Exception
Full name: Microsoft.FSharp.Core.exn
member Event.Trigger : arg:'T -> unit
val __ : AsyncReplyChannel<unit>
member RandomlyFailingAgent.MakeRequestAsync : unit -> Async<unit>
Full name: Script.RandomlyFailingAgent.MakeRequestAsync
val replyChannel : AsyncReplyChannel<unit>
member MailboxProcessor.PostAndAsyncReplyFailable : failureEvent:IObservable<exn> -> buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) -> Async<'Reply>
property Event.Publish: IEvent<exn>
Multiple items
type EntryPointAttribute =
inherit Attribute
new : unit -> EntryPointAttribute
Full name: Microsoft.FSharp.Core.EntryPointAttribute
--------------------
new : unit -> EntryPointAttribute
val main : string [] -> int
Full name: Script.main
val cancellationCapability : Threading.CancellationTokenSource
namespace System.Threading
Multiple items
type CancellationTokenSource =
new : unit -> CancellationTokenSource
member Cancel : unit -> unit + 1 overload
member Dispose : unit -> unit
member IsCancellationRequested : bool
member Token : CancellationToken
static member CreateLinkedTokenSource : [<ParamArray>] tokens:CancellationToken[] -> CancellationTokenSource + 1 overload
Full name: System.Threading.CancellationTokenSource
--------------------
Threading.CancellationTokenSource() : unit
val unreliable : RandomlyFailingAgent
val loop : (int -> Async<unit>)
val n : int
member RandomlyFailingAgent.MakeRequestAsync : unit -> Async<unit>
val printfn : format:Printf.TextWriterFormat<'T> -> 'T
Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.printfn
static member Async.StartWithContinuations : computation:Async<'T> * continuation:('T -> unit) * exceptionContinuation:(exn -> unit) * cancellationContinuation:(OperationCanceledException -> unit) * ?cancellationToken:Threading.CancellationToken -> unit
val ignore : value:'T -> unit
Full name: Microsoft.FSharp.Core.Operators.ignore
property Exception.Message: string
property Threading.CancellationTokenSource.Token: Threading.CancellationToken
type Console =
static member BackgroundColor : ConsoleColor with get, set
static member Beep : unit -> unit + 1 overload
static member BufferHeight : int with get, set
static member BufferWidth : int with get, set
static member CapsLock : bool
static member Clear : unit -> unit
static member CursorLeft : int with get, set
static member CursorSize : int with get, set
static member CursorTop : int with get, set
static member CursorVisible : bool with get, set
...
Full name: System.Console
Console.ReadLine() : string
Threading.CancellationTokenSource.Cancel() : unit
Threading.CancellationTokenSource.Cancel(throwOnFirstException: bool) : unit
More information