2 people like it.

Posting async requests to an unreliable agent with error handling

This example shows an implementation a MailboxProcessor<'T> extension method for posting messages and awaiting an asynchronous reply from a mailbox processor agent which mail fail. The error continuation is called if a failure event is triggered on the agent after the message is posted. The agent goes into a failed state where the result of all future messages posted with this mechanism is failure.

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
14: 
15: 
16: 
17: 
18: 
19: 
20: 
21: 
22: 
23: 
24: 
25: 
26: 
27: 
28: 
29: 
30: 
31: 
32: 
33: 
34: 
35: 
36: 
37: 
38: 
39: 
40: 
41: 
42: 
43: 
44: 
45: 
46: 
47: 
48: 
49: 
50: 
51: 
52: 
53: 
54: 
55: 
56: 
57: 
58: 
59: 
60: 
61: 
62: 
63: 
64: 
65: 
66: 
67: 
68: 
69: 
70: 
71: 
72: 
73: 
74: 
75: 
76: 
77: 
78: 
79: 
80: 
81: 
82: 
83: 
84: 
85: 
86: 
87: 
88: 
89: 
90: 
91: 
92: 
93: 
94: 
95: 
open System

type Agent<'T> = MailboxProcessor<'T>

type MailboxProcessor<'T> with
    member agent.PostAndAsyncReplyFailable (failureEvent : IObservable<exn>) buildMessage = async {
        let! token = Async.CancellationToken // capture the current cancellation token
        return! Async.FromContinuations(fun (cont, econt, ccont) ->
            // start an agent which will wait for a message indicating which continuation to call
            let continuator = Agent.Start((fun (mailbox : Agent<Choice<'Reply, exn, OperationCanceledException>>) ->
                async {
                    // if the cancellation token is canceled or the agent fails, post the appropriate message
                    use __ = token.Register((fun _ ->
                        let result = Choice3Of3 (new OperationCanceledException("The opeartion was cancelled."))
                        mailbox.Post result))
                    use __ = failureEvent.Subscribe(fun exn -> mailbox.Post(Choice2Of3 exn))

                    // wait for a single message and call the appropriate continuation
                    let! message = mailbox.Receive()
                    match message with
                    | Choice1Of3 reply -> cont reply
                    | Choice2Of3 exn -> econt exn
                    | Choice3Of3 exn -> ccont exn }))
            
            // start another async wokrflow which will post a message to the agent and wait for a response, then
            // forward it to the continuator agent
            Async.Start( async {
                let! reply = agent.PostAndAsyncReply buildMessage
                continuator.Post(Choice1Of3 reply) }, token)) }

// example: randomly failing agent 
type RandomlyFailingAgent(failureProbability, responseDelay) =
    let agentFailure = new Event<exn>() // event fired when an error occurs

    // start an agent mailbox which
    let agent = Agent.Start(fun (mailbox : Agent<AsyncReplyChannel<unit>>) ->
        
        // returns true in case of failure and false otherwise
        let failRandomly =
            let gen = new Random()
            (fun () -> gen.NextDouble() < failureProbability)
    
        // message processing loop
        let rec loop () = async {
            let! message = mailbox.Receive()
            do! Async.Sleep responseDelay
            
            if failRandomly() then // if a failure occurs then go to the failed state
                return! failed (new Exception("Agent died unexpectedly."))
            else // otherwise reply to the message and keep processing
                message.Reply()
                return! loop () }

        // failed loop just triggers the failur event so that the error continuation is called for new messages
        and failed exn = async {
            agentFailure.Trigger exn
            let! __ = mailbox.Receive()
            return! failed exn }
        
        loop ())

    // post a failable message to the agent mailbox
    member __.MakeRequestAsync() =
        (fun replyChannel -> replyChannel)
        |> agent.PostAndAsyncReplyFailable (agentFailure.Publish)

[<EntryPoint>]
let main _ = 
    let cancellationCapability = new System.Threading.CancellationTokenSource()
    let unreliable = new RandomlyFailingAgent(0.01, 10) // create an unreliable agent
    
    // define an asynchronous workflow which will keep posting messages until one fails
    let rec loop n = async {
        do! unreliable.MakeRequestAsync()
        printfn "Successfully completed %d requests." n
        do! loop (n + 1) }
    
    Async.StartWithContinuations((loop 1),
        (ignore), // computation will run until failure or cancellation
        (fun exn -> printfn "Failed due to error: %A." exn.Message), 
        (fun exn -> printfn "Canceled: %A." exn.Message),
        cancellationCapability.Token)

    // press enter to cancel the cancellation capability
    Console.ReadLine() |> ignore
    cancellationCapability.Cancel()
    
    // make one more request to see if the agent is still alive
    Async.StartWithContinuations(unreliable.MakeRequestAsync(),
        (fun () -> printfn "Successfully made a follow-up request. Agent is still alive and kicking."),
        (fun exn -> printfn "Failed: %s" exn.Message),
        ignore)

    Console.ReadLine() |> ignore
    0 // return an integer exit code
namespace System
type Agent<'T> = MailboxProcessor<'T>

Full name: Script.Agent<_>
Multiple items
type MailboxProcessor<'Msg> =
  interface IDisposable
  new : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:CancellationToken -> MailboxProcessor<'Msg>
  member Post : message:'Msg -> unit
  member PostAndAsyncReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> Async<'Reply>
  member PostAndReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> 'Reply
  member PostAndTryAsyncReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> Async<'Reply option>
  member Receive : ?timeout:int -> Async<'Msg>
  member Scan : scanner:('Msg -> Async<'T> option) * ?timeout:int -> Async<'T>
  member Start : unit -> unit
  member TryPostAndReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> 'Reply option
  ...

Full name: Microsoft.FSharp.Control.MailboxProcessor<_>

--------------------
new : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:Threading.CancellationToken -> MailboxProcessor<'Msg>
val agent : MailboxProcessor<'Msg>
member MailboxProcessor.PostAndAsyncReplyFailable : failureEvent:IObservable<exn> -> buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) -> Async<'Reply>

Full name: Script.PostAndAsyncReplyFailable
val failureEvent : IObservable<exn>
type IObservable<'T> =
  member Subscribe : observer:IObserver<'T> -> IDisposable

Full name: System.IObservable<_>
type exn = Exception

Full name: Microsoft.FSharp.Core.exn
val buildMessage : (AsyncReplyChannel<'Reply> -> 'Msg)
val async : AsyncBuilder

Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.async
val token : Threading.CancellationToken
Multiple items
type Async
static member AsBeginEnd : computation:('Arg -> Async<'T>) -> ('Arg * AsyncCallback * obj -> IAsyncResult) * (IAsyncResult -> 'T) * (IAsyncResult -> unit)
static member AwaitEvent : event:IEvent<'Del,'T> * ?cancelAction:(unit -> unit) -> Async<'T> (requires delegate and 'Del :> Delegate)
static member AwaitIAsyncResult : iar:IAsyncResult * ?millisecondsTimeout:int -> Async<bool>
static member AwaitTask : task:Task<'T> -> Async<'T>
static member AwaitWaitHandle : waitHandle:WaitHandle * ?millisecondsTimeout:int -> Async<bool>
static member CancelDefaultToken : unit -> unit
static member Catch : computation:Async<'T> -> Async<Choice<'T,exn>>
static member FromBeginEnd : beginAction:(AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg:'Arg1 * beginAction:('Arg1 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg1:'Arg1 * arg2:'Arg2 * beginAction:('Arg1 * 'Arg2 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg1:'Arg1 * arg2:'Arg2 * arg3:'Arg3 * beginAction:('Arg1 * 'Arg2 * 'Arg3 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromContinuations : callback:(('T -> unit) * (exn -> unit) * (OperationCanceledException -> unit) -> unit) -> Async<'T>
static member Ignore : computation:Async<'T> -> Async<unit>
static member OnCancel : interruption:(unit -> unit) -> Async<IDisposable>
static member Parallel : computations:seq<Async<'T>> -> Async<'T []>
static member RunSynchronously : computation:Async<'T> * ?timeout:int * ?cancellationToken:CancellationToken -> 'T
static member Sleep : millisecondsDueTime:int -> Async<unit>
static member Start : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
static member StartAsTask : computation:Async<'T> * ?taskCreationOptions:TaskCreationOptions * ?cancellationToken:CancellationToken -> Task<'T>
static member StartChild : computation:Async<'T> * ?millisecondsTimeout:int -> Async<Async<'T>>
static member StartChildAsTask : computation:Async<'T> * ?taskCreationOptions:TaskCreationOptions -> Async<Task<'T>>
static member StartImmediate : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
static member StartWithContinuations : computation:Async<'T> * continuation:('T -> unit) * exceptionContinuation:(exn -> unit) * cancellationContinuation:(OperationCanceledException -> unit) * ?cancellationToken:CancellationToken -> unit
static member SwitchToContext : syncContext:SynchronizationContext -> Async<unit>
static member SwitchToNewThread : unit -> Async<unit>
static member SwitchToThreadPool : unit -> Async<unit>
static member TryCancelled : computation:Async<'T> * compensation:(OperationCanceledException -> unit) -> Async<'T>
static member CancellationToken : Async<CancellationToken>
static member DefaultCancellationToken : CancellationToken

Full name: Microsoft.FSharp.Control.Async

--------------------
type Async<'T>

Full name: Microsoft.FSharp.Control.Async<_>
property Async.CancellationToken: Async<Threading.CancellationToken>
static member Async.FromContinuations : callback:(('T -> unit) * (exn -> unit) * (OperationCanceledException -> unit) -> unit) -> Async<'T>
val cont : ('Reply -> unit)
val econt : (exn -> unit)
val ccont : (OperationCanceledException -> unit)
val continuator : MailboxProcessor<Choice<'Reply,exn,OperationCanceledException>>
static member MailboxProcessor.Start : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:Threading.CancellationToken -> MailboxProcessor<'Msg>
val mailbox : MailboxProcessor<Choice<'Reply,exn,OperationCanceledException>>
Multiple items
type Choice<'T1,'T2> =
  | Choice1Of2 of 'T1
  | Choice2Of2 of 'T2

Full name: Microsoft.FSharp.Core.Choice<_,_>

--------------------
type Choice<'T1,'T2,'T3> =
  | Choice1Of3 of 'T1
  | Choice2Of3 of 'T2
  | Choice3Of3 of 'T3

Full name: Microsoft.FSharp.Core.Choice<_,_,_>

--------------------
type Choice<'T1,'T2,'T3,'T4> =
  | Choice1Of4 of 'T1
  | Choice2Of4 of 'T2
  | Choice3Of4 of 'T3
  | Choice4Of4 of 'T4

Full name: Microsoft.FSharp.Core.Choice<_,_,_,_>

--------------------
type Choice<'T1,'T2,'T3,'T4,'T5> =
  | Choice1Of5 of 'T1
  | Choice2Of5 of 'T2
  | Choice3Of5 of 'T3
  | Choice4Of5 of 'T4
  | Choice5Of5 of 'T5

Full name: Microsoft.FSharp.Core.Choice<_,_,_,_,_>

--------------------
type Choice<'T1,'T2,'T3,'T4,'T5,'T6> =
  | Choice1Of6 of 'T1
  | Choice2Of6 of 'T2
  | Choice3Of6 of 'T3
  | Choice4Of6 of 'T4
  | Choice5Of6 of 'T5
  | Choice6Of6 of 'T6

Full name: Microsoft.FSharp.Core.Choice<_,_,_,_,_,_>

--------------------
type Choice<'T1,'T2,'T3,'T4,'T5,'T6,'T7> =
  | Choice1Of7 of 'T1
  | Choice2Of7 of 'T2
  | Choice3Of7 of 'T3
  | Choice4Of7 of 'T4
  | Choice5Of7 of 'T5
  | Choice6Of7 of 'T6
  | Choice7Of7 of 'T7

Full name: Microsoft.FSharp.Core.Choice<_,_,_,_,_,_,_>
Multiple items
type OperationCanceledException =
  inherit SystemException
  new : unit -> OperationCanceledException + 5 overloads
  member CancellationToken : CancellationToken with get, set

Full name: System.OperationCanceledException

--------------------
OperationCanceledException() : unit
OperationCanceledException(message: string) : unit
OperationCanceledException(token: Threading.CancellationToken) : unit
OperationCanceledException(message: string, innerException: exn) : unit
OperationCanceledException(message: string, token: Threading.CancellationToken) : unit
OperationCanceledException(message: string, innerException: exn, token: Threading.CancellationToken) : unit
val __ : Threading.CancellationTokenRegistration
Threading.CancellationToken.Register(callback: Action) : Threading.CancellationTokenRegistration
Threading.CancellationToken.Register(callback: Action<obj>, state: obj) : Threading.CancellationTokenRegistration
Threading.CancellationToken.Register(callback: Action, useSynchronizationContext: bool) : Threading.CancellationTokenRegistration
Threading.CancellationToken.Register(callback: Action<obj>, state: obj, useSynchronizationContext: bool) : Threading.CancellationTokenRegistration
val result : Choice<'Reply,exn,OperationCanceledException>
union case Choice.Choice3Of3: 'T3 -> Choice<'T1,'T2,'T3>
member MailboxProcessor.Post : message:'Msg -> unit
val __ : IDisposable
member IObservable.Subscribe : callback:('T -> unit) -> IDisposable
IObservable.Subscribe(observer: IObserver<exn>) : IDisposable
Multiple items
val exn : exn

--------------------
type exn = Exception

Full name: Microsoft.FSharp.Core.exn
union case Choice.Choice2Of3: 'T2 -> Choice<'T1,'T2,'T3>
val message : Choice<'Reply,exn,OperationCanceledException>
member MailboxProcessor.Receive : ?timeout:int -> Async<'Msg>
union case Choice.Choice1Of3: 'T1 -> Choice<'T1,'T2,'T3>
val reply : 'Reply
Multiple items
val exn : OperationCanceledException

--------------------
type exn = Exception

Full name: Microsoft.FSharp.Core.exn
static member Async.Start : computation:Async<unit> * ?cancellationToken:Threading.CancellationToken -> unit
member MailboxProcessor.PostAndAsyncReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> Async<'Reply>
Multiple items
type RandomlyFailingAgent =
  new : failureProbability:float * responseDelay:int -> RandomlyFailingAgent
  member MakeRequestAsync : unit -> Async<unit>

Full name: Script.RandomlyFailingAgent

--------------------
new : failureProbability:float * responseDelay:int -> RandomlyFailingAgent
val failureProbability : float
val responseDelay : int
val agentFailure : Event<exn>
Multiple items
module Event

from Microsoft.FSharp.Control

--------------------
type Event<'T> =
  new : unit -> Event<'T>
  member Trigger : arg:'T -> unit
  member Publish : IEvent<'T>

Full name: Microsoft.FSharp.Control.Event<_>

--------------------
type Event<'Delegate,'Args (requires delegate and 'Delegate :> Delegate)> =
  new : unit -> Event<'Delegate,'Args>
  member Trigger : sender:obj * args:'Args -> unit
  member Publish : IEvent<'Delegate,'Args>

Full name: Microsoft.FSharp.Control.Event<_,_>

--------------------
new : unit -> Event<'T>

--------------------
new : unit -> Event<'Delegate,'Args>
val agent : MailboxProcessor<AsyncReplyChannel<unit>>
val mailbox : MailboxProcessor<AsyncReplyChannel<unit>>
type AsyncReplyChannel<'Reply>
member Reply : value:'Reply -> unit

Full name: Microsoft.FSharp.Control.AsyncReplyChannel<_>
type unit = Unit

Full name: Microsoft.FSharp.Core.unit
val failRandomly : (unit -> bool)
val gen : Random
Multiple items
type Random =
  new : unit -> Random + 1 overload
  member Next : unit -> int + 2 overloads
  member NextBytes : buffer:byte[] -> unit
  member NextDouble : unit -> float

Full name: System.Random

--------------------
Random() : unit
Random(Seed: int) : unit
Random.NextDouble() : float
val loop : (unit -> Async<'a>)
val message : AsyncReplyChannel<unit>
static member Async.Sleep : millisecondsDueTime:int -> Async<unit>
val failed : (Exception -> Async<'a>)
Multiple items
type Exception =
  new : unit -> Exception + 2 overloads
  member Data : IDictionary
  member GetBaseException : unit -> Exception
  member GetObjectData : info:SerializationInfo * context:StreamingContext -> unit
  member GetType : unit -> Type
  member HelpLink : string with get, set
  member InnerException : Exception
  member Message : string
  member Source : string with get, set
  member StackTrace : string
  ...

Full name: System.Exception

--------------------
Exception() : unit
Exception(message: string) : unit
Exception(message: string, innerException: exn) : unit
member AsyncReplyChannel.Reply : value:'Reply -> unit
Multiple items
val exn : Exception

--------------------
type exn = Exception

Full name: Microsoft.FSharp.Core.exn
member Event.Trigger : arg:'T -> unit
val __ : AsyncReplyChannel<unit>
member RandomlyFailingAgent.MakeRequestAsync : unit -> Async<unit>

Full name: Script.RandomlyFailingAgent.MakeRequestAsync
val replyChannel : AsyncReplyChannel<unit>
member MailboxProcessor.PostAndAsyncReplyFailable : failureEvent:IObservable<exn> -> buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) -> Async<'Reply>
property Event.Publish: IEvent<exn>
Multiple items
type EntryPointAttribute =
  inherit Attribute
  new : unit -> EntryPointAttribute

Full name: Microsoft.FSharp.Core.EntryPointAttribute

--------------------
new : unit -> EntryPointAttribute
val main : string [] -> int

Full name: Script.main
val cancellationCapability : Threading.CancellationTokenSource
namespace System.Threading
Multiple items
type CancellationTokenSource =
  new : unit -> CancellationTokenSource
  member Cancel : unit -> unit + 1 overload
  member Dispose : unit -> unit
  member IsCancellationRequested : bool
  member Token : CancellationToken
  static member CreateLinkedTokenSource : [<ParamArray>] tokens:CancellationToken[] -> CancellationTokenSource + 1 overload

Full name: System.Threading.CancellationTokenSource

--------------------
Threading.CancellationTokenSource() : unit
val unreliable : RandomlyFailingAgent
val loop : (int -> Async<unit>)
val n : int
member RandomlyFailingAgent.MakeRequestAsync : unit -> Async<unit>
val printfn : format:Printf.TextWriterFormat<'T> -> 'T

Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.printfn
static member Async.StartWithContinuations : computation:Async<'T> * continuation:('T -> unit) * exceptionContinuation:(exn -> unit) * cancellationContinuation:(OperationCanceledException -> unit) * ?cancellationToken:Threading.CancellationToken -> unit
val ignore : value:'T -> unit

Full name: Microsoft.FSharp.Core.Operators.ignore
property Exception.Message: string
property Threading.CancellationTokenSource.Token: Threading.CancellationToken
type Console =
  static member BackgroundColor : ConsoleColor with get, set
  static member Beep : unit -> unit + 1 overload
  static member BufferHeight : int with get, set
  static member BufferWidth : int with get, set
  static member CapsLock : bool
  static member Clear : unit -> unit
  static member CursorLeft : int with get, set
  static member CursorSize : int with get, set
  static member CursorTop : int with get, set
  static member CursorVisible : bool with get, set
  ...

Full name: System.Console
Console.ReadLine() : string
Threading.CancellationTokenSource.Cancel() : unit
Threading.CancellationTokenSource.Cancel(throwOnFirstException: bool) : unit
Raw view Test code New version

More information

Link:http://fssnip.net/pD
Posted:9 years ago
Author:Anton Tcholakov
Tags: asynchronous programming