open System open System.Threading open System.Threading.Tasks /// /// Represents an asynchronous agent that processes messages of type 'T. /// This is a wrapper around FSharp's MailboxProcessor, providing a unified API for agent-based concurrency. /// type Agent<'T>(body: Agent<'T> -> Async) as self = let cts = new CancellationTokenSource() let errEvent = Event<_>() // The main difference with the original MailboxProcessor is that we handle errors in the body function // by triggering an error event instead of throwing exceptions directly. // This allows the agent to continue processing messages even after an error occurs. // The error event can be subscribed to by the user to handle errors gracefully. // The agent will keep running and processing messages, but will notify subscribers of any unhandled exceptions. let mbox = new MailboxProcessor<'T>((fun _ -> let rec loop () = async { try return! body self with | e -> errEvent.Trigger(e) return! loop () } loop () ), true, cts.Token) /// /// Event that is triggered when an unhandled exception occurs in the agent's processing loop. /// member _.OnError = errEvent.Publish /// /// Gets the error event for the underlying MailboxProcessor. /// member _.Error = mbox.Error /// /// Posts a message to the agent asynchronously. /// member _.Post(message) = mbox.Post(message) /// /// Posts a message to the agent and synchronously waits for a reply. /// member _.PostAndReply(messageBuilder) = mbox.PostAndReply(messageBuilder) /// /// Posts a message to the agent and synchronously waits for a reply, with a timeout. /// member _.TryPostAndReply(messageBuilder, timeout) = mbox.TryPostAndReply(messageBuilder, timeout) /// /// Posts a message to the agent and asynchronously waits for a reply. /// member _.PostAndAsyncReply(messageBuilder) = mbox.PostAndAsyncReply(messageBuilder) /// /// Posts a message to the agent and asynchronously waits for a reply, with a timeout. /// member _.PostAndTryAsyncReply(messageBuilder, timeout) = mbox.PostAndTryAsyncReply(messageBuilder, timeout) /// /// Starts the agent's processing loop. /// member _.Start() = mbox.Start() /// /// Starts the agent's processing loop immediately on the current thread. /// member _.StartImmediate () = mbox.StartImmediate() /// /// Receives the next message from the agent's queue asynchronously. /// member _.Receive() = mbox.Receive() /// /// Receives the next message from the agent's queue asynchronously, with a timeout. /// member _.Receive(timeOut) = mbox.Receive(timeOut) /// /// Tries to receive a message from the agent's queue within the specified timeout. /// member _.TryReceive(timeout) = mbox.TryReceive(timeout) /// /// Scans the agent's queue for a message matching the given scanner function. /// member _.Scan(scanner) = mbox.Scan(scanner) /// /// Scans the agent's queue for a message matching the given scanner function, with a timeout. /// member _.TryScan(scanner, timeout) = mbox.TryScan(scanner, timeout) /// /// Gets the current number of messages in the agent's queue. /// member _.CurrentQueueLength = mbox.CurrentQueueLength /// /// Gets or sets the default timeout for reply operations. /// member _.DefaultTimeout with get() = mbox.DefaultTimeout and set value = mbox.DefaultTimeout <- value /// /// Gets the cancellation token associated with the agent. /// member _.CancellationToken = cts.Token /// /// Gets a value indicating whether cancellation has been requested for the agent. /// member _.IsCancellationRequested = cts.Token.IsCancellationRequested /// /// Creates and starts a new agent with the specified body. /// static member Start(body) = let mbox = new Agent<'T>(body) mbox.Start () mbox /// /// Creates and starts a new agent immediately on the current thread. /// static member StartImmediate(body) = let mbox = new Agent<'T>(body) mbox.StartImmediate () mbox interface IDisposable with /// /// Disposes the agent and cancels its processing. /// member _.Dispose() = cts.Cancel() cts.Dispose() (mbox :> IDisposable).Dispose() // Convenience module for creating and using agents with common patterns module Agent = /// /// Creates and starts a simple agent that processes messages of type 'T in a loop. /// The processor function is called for each message received. /// /// A function to process each message. /// An Agent instance. let createSimple<'T> (processor: 'T -> unit) = Agent<'T>.Start(fun inbox -> let rec loop () = async { let! message = inbox.Receive() processor message return! loop () } loop () ) /// /// Creates and starts a stateful agent that maintains state of type 'State. /// The processor function updates the state for each message received. /// /// The initial state value. /// A function to process each message and update the state. /// An Agent instance. let createStateful<'T, 'State> (initialState: 'State, processor: 'State -> 'T -> 'State) = Agent<'T>.Start(fun inbox -> let rec loop state = async { let! message = inbox.Receive() let newState = processor state message return! loop newState } loop initialState ) /// /// Creates and starts an agent that supports request-reply messaging. /// The processor function computes a reply for each request. /// /// A function to process each request and produce a reply. /// An Agent instance. let createReply<'Request, 'Reply>(processor: 'Request -> 'Reply) = Agent<'Request * AsyncReplyChannel<'Reply>>.Start(fun inbox -> let rec loop () = async { let! request, replyChannel = inbox.Receive() let reply = processor request replyChannel.Reply(reply) return! loop () } loop () ) /// /// Creates and starts a stateful agent that supports request-reply messaging. /// The processor function computes a reply and updates the state for each request. /// /// The initial state value. /// A function to process each request and update the state. /// An Agent instance. let createStatefulReply<'Request, 'Reply, 'State> (initialState: 'State, processor: 'State -> 'Request -> 'Reply * 'State) = Agent<'Request * AsyncReplyChannel<'Reply>>.Start(fun inbox -> let rec loop state = async { let! request, replyChannel = inbox.Receive() let reply, newState = processor state request replyChannel.Reply(reply) return! loop newState } loop initialState ) /// /// Posts a request to the agent and tries to synchronously receive a reply within the specified timeout. /// /// Timeout in milliseconds. /// The request message. /// The agent instance. /// Some(reply) if successful, None if timed out. let tryPostAndReply timeout msg (agent: Agent<_>) = agent.TryPostAndReply((fun replyChannel -> (msg, replyChannel) ), timeout) /// /// Posts a request to the agent and synchronously waits for a reply. /// Throws if the reply times out. /// /// The request message. /// The agent instance. /// The reply value. let postAndReply msg (agent: Agent<_>) = if agent.DefaultTimeout = Timeout.Infinite then agent |> tryPostAndReply 1000 msg |> function | Some v -> v | None -> failwith "Timed out waiting for reply" else agent.PostAndReply(fun replyChannel -> (msg, replyChannel) ) /// /// Posts a request to the agent and asynchronously waits for a reply. /// /// The request message. /// The agent instance. /// An Async computation returning the reply. let postAndAsyncReply msg (agent: Agent<_>) = agent.PostAndAsyncReply(fun replyChannel -> (msg, replyChannel) ) /// /// Posts a request to the agent and asynchronously tries to receive a reply within the specified timeout. /// /// Timeout in milliseconds. /// The request message. /// The agent instance. /// An Async computation returning Some(reply) or None if timed out. let postAndTryAsyncReply timeout msg (agent: Agent<_>) = agent.PostAndTryAsyncReply((fun replyChannel -> (msg, replyChannel) ), timeout) /// /// Posts a request to the agent and returns a Task that completes with the reply. /// /// The request message. /// The agent instance. /// A Task returning the reply. let postAndAsyncReplyTask msg (agent: Agent<_>) = agent.PostAndAsyncReply(fun replyChannel -> (msg, replyChannel) ) |> Task.FromResult /// /// Gets the default timeout for reply operations on the agent. /// /// The agent instance. /// The default timeout in milliseconds. let getDefaultTimeout (agent: Agent<_>) = agent.DefaultTimeout /// /// Sets the default timeout for reply operations on the agent. /// /// Timeout in milliseconds. /// The agent instance. let setDefaultTimeout timeout (agent: Agent<_>) = agent.DefaultTimeout <- timeout /// /// Gets the current number of messages in the agent's queue. /// /// The agent instance. /// The number of queued messages. let getCurrentQueueLength (agent: Agent<_>)= agent.CurrentQueueLength /// /// Disposes the agent and cancels its processing. /// /// The agent instance. let dispose (agent: Agent<_>) = (agent :> IDisposable).Dispose() // Example usage demonstrating the unified agent capabilities module Examples = let createLogger () = let logger = Agent.createSimple <| fun msg -> if msg = "ERROR" then failwith "Simulated error in logger" else printfn "[%s] %s" (DateTime.Now.ToString("HH:mm:ss")) msg // Handle errors logger.OnError.Add(fun ex -> printfn "Logger error: %s" ex.Message) logger // Example 2: Stateful counter agent let createCounter () = Agent.createStateful(0, fun count message -> match message with | "increment" -> let newCount = count + 1 printfn $"Count: %d{newCount}" newCount | "decrement" -> let newCount = count - 1 printfn $"Count: %d{newCount}" newCount | _ -> printfn $"Unknown command, count remains: %d{count}" count ) // Example 3: Request-reply calculator type CalculatorMessage = | Add of int * int | Subtract of int * int | Multiply of int * int | Divide of int * int let createUnsafeCalculator () = Agent.createReply(fun message -> match message with | Add (x, y) -> x + y | Subtract (x, y) -> x - y | Multiply (x, y) -> x * y | Divide (x, y) -> x / y ) let createSafeCalculator () = Agent.createReply(fun message -> match message with | Add (x, y) -> (+), x, y | Subtract (x, y) -> (-), x, y | Multiply (x, y) -> (*), x, y | Divide (x, y) -> (/), x, y |> fun (op, x, y) -> try x |> op <| y |> Ok with | :? DivideByZeroException as ex -> Error ex ) open Examples let logger = createLogger () logger.Post("Log this message") logger.Post("ERROR") logger.Post("Log this message") logger |> Agent.dispose // will throw an exception because the logger has been disposed logger.Post("Log this message") let counter = createCounter () counter.Post("increment") counter.Post("increment") counter.Post("decrement") let unsafeCalculator = createUnsafeCalculator () unsafeCalculator.OnError.Add(fun ex -> printfn "Calculator error: %s" ex.Message ) unsafeCalculator |> Agent.postAndReply (Add(3, 5)) |> printfn "3 + 5 = %d" // will fall back to Agent.tryPostAndReply unsafeCalculator |> Agent.postAndReply (Divide(10, 0)) |> printfn "10 / 0 = %d" // same as above, but using tryPostAndReply directly unsafeCalculator |> Agent.tryPostAndReply 100 (Divide(10, 0)) |> function | Some value -> printfn $"10 / 0 = %d{value}" | None -> printfn "Timed out waiting for reply" // will not do anything because no reply is expected async { let! result = unsafeCalculator |> Agent.postAndAsyncReply (Divide(10, 0)) printfn $"10 / 0 = %d{result}" } |> Async.Start // will time out because no reply is expected async { let! result = unsafeCalculator |> Agent.postAndTryAsyncReply 100 (Divide(10, 0)) match result with | Some value -> printfn $"10 / 0 = %d{value}" | None -> printfn "Timed out waiting for reply" } |> Async.Start let safeCalculator = createSafeCalculator () // return Ok(8) for 3 + 5 safeCalculator |> Agent.postAndReply (Add(3, 5)) |> printfn "3 + 5 = %A" // return Error(DivideByZeroException) for 10 / 0 safeCalculator |> Agent.postAndReply (Divide(10, 0)) |> printfn "10 / 0 = %A"