#r @"..\packages\ExtCore.0.8.45\lib\net45\ExtCore.dll" #r @"..\packages\log4net.2.0.3\lib\net40-full\log4net.dll" open System open ExtCore.Control open log4net [] module CommandRequestAgent = type Agent<'T> = MailboxProcessor<'T> [] module Model = /// Agent message, defining a command or request. Each command or request may fail, so the /// return type of the contained closures is of Choice<'Result, 'Error> where the error /// message is a string. Commands do not return a vale in the success case, so the result /// type is unit. Requests can either return a value type or an object type. Command and /// request messages also include a description which is used for logging. Finally, the /// close message notifies the agent that it should stop processing messages. type internal CommandRequestMessage = | Command of description : string * commandFunc : (unit -> Choice) * replyChannel : AsyncReplyChannel> | ValueRequest of description : string * requestFunc : (unit -> Choice) * replyChannel : AsyncReplyChannel> | ObjectRequest of description : string * requestFunc : (unit -> Choice) * replyChannel : AsyncReplyChannel> | Close of closeFunc : (unit -> Choice) * replyChannel : AsyncReplyChannel> /// A command/request agent processes messages of CommandRequestMessage type. The details /// of the implementation are hidden from the user of the library. type CommandRequestAgent = internal CommandRequestAgent of Agent [] /// Generic command/request agent with error handling used to serialise posted commands and /// requests which are defined by closures. Useful in serialising communications to an /// instrument with a C API. module CommandRequestAgent = /// Returns a string describing the message. let private messageDescription = function | Command (description, _, _) | ValueRequest (description, _, _) | ObjectRequest (description, _, _) -> description | Close _ -> "Close" /// Starts a new command/request agent with the given name (used for logging purposes) and /// start-up function. The start-up function may fail, in which case the agent goes into a /// failed state, returning failure results to future commands and requests. Furthermore, /// the processing of any given message may fail, in which case the agent will immediately /// enter the failed state and return failure to all future messages if it is set to persist /// failure. let create (name : string) persistFailure startupFunc = CommandRequestAgent <| Agent.Start(fun (mailbox : Agent) -> let log = LogManager.GetLogger name // create a logger let logResponse description = function | Choice1Of2 s -> sprintf "Successfully responded to message \"%s\" with %A." description s |> log.Debug | Choice2Of2 f -> sprintf "Failed to respond to message \"%s\" due to error: %A." description f |> log.Error /// Workflow performed when shutting down the agent. let closeAgent closeFunc (replyChannel : AsyncReplyChannel>) = async { "Closing agent." |> log.Info // perform the closing function and respond accordingly let response = match closeFunc () with | Choice2Of2 f -> Choice2Of2 <| sprintf "Failed to close agent due to error: %s" f | Choice1Of2 () when mailbox.CurrentQueueLength <> 0 -> Choice2Of2 <| sprintf "Closing agent with %d messages remaining in mailbox queue." mailbox.CurrentQueueLength | Choice1Of2 () -> Choice1Of2 () response |> replyChannel.Reply logResponse "Close" response } /// Message-processing loop which replies to future messages with failure after an /// error has occured. The provided error description is used in the description of /// the failure when performing commands and requests and for logging. let rec failed error = async { let! message = mailbox.Receive() let errorMessage = sprintf "Received message \"%s\" after agent failed due to error: %s" (messageDescription message) error match message with | Command (_, _, replyChannel) -> let response = Choice2Of2 errorMessage response |> replyChannel.Reply logResponse (messageDescription message) response return! failed error | ValueRequest (_, _, replyChannel) -> let response = Choice2Of2 errorMessage response |> replyChannel.Reply logResponse (messageDescription message) response return! failed error | ObjectRequest (_, _, replyChannel) -> let response = Choice2Of2 errorMessage response |> replyChannel.Reply logResponse (messageDescription message) response return! failed error | Close (closeFunc, replyChannel) -> // shutting down the agent is the only valid message in this state do! closeAgent closeFunc replyChannel } /// Default message-processing loop. let rec loop () = async { let! message = mailbox.Receive() sprintf "Received message: %s." (messageDescription message) |> log.Debug // continues the workflow after a message is processed, either remaining in the // default loop or entering the failed loop depending on the result let continueAfter response = async { match response with | Choice2Of2 f when persistFailure -> return! failed f | _ -> return! loop () } match message with | Command (_, commandFunc, replyChannel) -> let response = commandFunc () response |> replyChannel.Reply logResponse (messageDescription message) response return! continueAfter response | ValueRequest (_, requestFunc, replyChannel) -> let response = requestFunc () response |> replyChannel.Reply logResponse (messageDescription message) response return! continueAfter response | ObjectRequest (_, requestFunc, replyChannel) -> let response = requestFunc () response |> replyChannel.Reply logResponse (messageDescription message) response return! continueAfter response | Close (closeFunc, replyChannel) -> // stop looping once the close message is received do! closeAgent closeFunc replyChannel } async { // perform start-up match startupFunc () with | Choice1Of2 () -> do! loop () // enter the default loop if start-up is successful | Choice2Of2 f -> // enter the failure loop otherwise let error = sprintf "Failed to start agent due to error: %s" f do! failed error } ) /// Posts a command to the message queue which will be executed by calling the provided /// function. The command may succeed or fail, which will be reflected in the asynchronous /// reply. All future messages will automatically result in failure if the command fails /// and the agent is set to persist failure. The supplied description is used for logging. let performCommand description commandFunc (CommandRequestAgent agent) = agent.PostAndAsyncReply (fun replyChannel -> Command(description, commandFunc, replyChannel)) /// Posts a request to the message queue which will be executed by calling the provided /// function. The request may succeed or fail, which will be reflected in the asynchronous /// reply. If the request is successful, it returns an object type. All future messages will /// automatically result in failure if the command fails and the agent is set to persist /// failure. The supplied description is used for logging. let performObjectRequest<'Result when 'Result :> obj> description (requestFunc : unit -> Choice<'Result, string>) (CommandRequestAgent agent) = async { let castRequestFunc = requestFunc >> Choice.map (fun s -> s :> obj) let! response = agent.PostAndAsyncReply (fun replyChannel -> ObjectRequest(description, castRequestFunc, replyChannel)) return response |> Choice.map (fun s -> s :?> 'Result) } /// Posts a request to the message queue which will be executed by calling the provided /// function. The request may succeed or fail, which will be reflected in the asynchronous /// reply. If the request is successful, it returns a value type. All future messages will /// automatically result in failure if the command fails and the agent is set to persist /// failure. The supplied description is used for logging. let performValueRequest<'Result when 'Result :> ValueType> description (requestFunc : unit -> Choice<'Result, string>) (CommandRequestAgent agent) = async { let castRequestFunc = requestFunc >> Choice.map (fun s -> s :> ValueType) let! response = agent.PostAndAsyncReply (fun replyChannel ->ValueRequest(description, castRequestFunc, replyChannel)) return response |> Choice.map (fun s -> s :?> 'Result) } /// Shuts down the message-processing agent after calling the supplied closing function. The /// function may succeed or fail, which will determine the asynchronously-returned result. /// Furthermore, the result will be failure if there are any remaining messages in the queue /// after this point. let close closeFunc (CommandRequestAgent agent) = agent.PostAndAsyncReply (fun replyChannel -> Close(closeFunc, replyChannel)) open ExtCore.Control open log4net.Config BasicConfigurator.Configure() module DemoAgent = type DemoAgent = private DemoAgent of agent : CommandRequestAgent let private divideImpl x y () = if y = 0 then Choice2Of2 "Division by zero." else Choice1Of2 (x / y) let private firstElementImpl str () = if String.length str = 0 then Choice2Of2 "Empty string." else Choice1Of2 (String.sub str 0 1 |> String.ofSubstring) let private printPositiveNumberImp x () = if x > 0 then printfn "%d" x Choice1Of2 () else Choice2Of2 "Negative number." let create () = DemoAgent <| CommandRequestAgent.create "Demo" true (fun () -> printfn "Starting demo agent..." ; Choice1Of2 ()) let divide x y (DemoAgent agent) = CommandRequestAgent.performValueRequest (sprintf "Divide %d by %d" x y) (divideImpl x y) agent let firstElement str (DemoAgent agent) = CommandRequestAgent.performObjectRequest (sprintf "Get first element of \"%s\"" str) (firstElementImpl str) agent let printPositiveNumber x (DemoAgent agent) = CommandRequestAgent.performCommand (sprintf "Printing positive number: %d" x) (printPositiveNumberImp x) agent let close (DemoAgent agent) = CommandRequestAgent.close (fun () -> printfn "Shutting down demo agent..." ; Choice1Of2 ()) agent let demo = DemoAgent.create () let successfulValueRequest = DemoAgent.divide 4 2 demo |> Async.RunSynchronously let successfulObjectRequest = DemoAgent.firstElement "43154" demo |> Async.RunSynchronously let successfulCommand = DemoAgent.printPositiveNumber 53 demo |> Async.RunSynchronously let failedValueRequest = DemoAgent.divide 5 0 demo |> Async.RunSynchronously let failedObjectRequest = DemoAgent.firstElement "ABC" demo |> Async.RunSynchronously let successfulShutdown = DemoAgent.close demo |> Async.RunSynchronously