open System
open System.Threading
open System.Threading.Tasks
///
/// Represents an asynchronous agent that processes messages of type 'T.
/// This is a wrapper around FSharp's MailboxProcessor, providing a unified API for agent-based concurrency.
///
type Agent<'T>(body: Agent<'T> -> Async) as self =
let cts = new CancellationTokenSource()
let errEvent = Event<_>()
// The main difference with the original MailboxProcessor is that we handle errors in the body function
// by triggering an error event instead of throwing exceptions directly.
// This allows the agent to continue processing messages even after an error occurs.
// The error event can be subscribed to by the user to handle errors gracefully.
// The agent will keep running and processing messages, but will notify subscribers of any unhandled exceptions.
let mbox = new MailboxProcessor<'T>((fun _ ->
let rec loop () =
async {
try return! body self
with
| e ->
errEvent.Trigger(e)
return! loop ()
}
loop ()
), true, cts.Token)
///
/// Event that is triggered when an unhandled exception occurs in the agent's processing loop.
///
member _.OnError = errEvent.Publish
///
/// Gets the error event for the underlying MailboxProcessor.
///
member _.Error = mbox.Error
///
/// Posts a message to the agent asynchronously.
///
member _.Post(message) = mbox.Post(message)
///
/// Posts a message to the agent and synchronously waits for a reply.
///
member _.PostAndReply(messageBuilder) = mbox.PostAndReply(messageBuilder)
///
/// Posts a message to the agent and synchronously waits for a reply, with a timeout.
///
member _.TryPostAndReply(messageBuilder, timeout) = mbox.TryPostAndReply(messageBuilder, timeout)
///
/// Posts a message to the agent and asynchronously waits for a reply.
///
member _.PostAndAsyncReply(messageBuilder) = mbox.PostAndAsyncReply(messageBuilder)
///
/// Posts a message to the agent and asynchronously waits for a reply, with a timeout.
///
member _.PostAndTryAsyncReply(messageBuilder, timeout) = mbox.PostAndTryAsyncReply(messageBuilder, timeout)
///
/// Starts the agent's processing loop.
///
member _.Start() = mbox.Start()
///
/// Starts the agent's processing loop immediately on the current thread.
///
member _.StartImmediate () = mbox.StartImmediate()
///
/// Receives the next message from the agent's queue asynchronously.
///
member _.Receive() = mbox.Receive()
///
/// Receives the next message from the agent's queue asynchronously, with a timeout.
///
member _.Receive(timeOut) = mbox.Receive(timeOut)
///
/// Tries to receive a message from the agent's queue within the specified timeout.
///
member _.TryReceive(timeout) = mbox.TryReceive(timeout)
///
/// Scans the agent's queue for a message matching the given scanner function.
///
member _.Scan(scanner) = mbox.Scan(scanner)
///
/// Scans the agent's queue for a message matching the given scanner function, with a timeout.
///
member _.TryScan(scanner, timeout) = mbox.TryScan(scanner, timeout)
///
/// Gets the current number of messages in the agent's queue.
///
member _.CurrentQueueLength = mbox.CurrentQueueLength
///
/// Gets or sets the default timeout for reply operations.
///
member _.DefaultTimeout
with get() = mbox.DefaultTimeout
and set value = mbox.DefaultTimeout <- value
///
/// Gets the cancellation token associated with the agent.
///
member _.CancellationToken = cts.Token
///
/// Gets a value indicating whether cancellation has been requested for the agent.
///
member _.IsCancellationRequested = cts.Token.IsCancellationRequested
///
/// Creates and starts a new agent with the specified body.
///
static member Start(body) =
let mbox = new Agent<'T>(body)
mbox.Start ()
mbox
///
/// Creates and starts a new agent immediately on the current thread.
///
static member StartImmediate(body) =
let mbox = new Agent<'T>(body)
mbox.StartImmediate ()
mbox
interface IDisposable with
///
/// Disposes the agent and cancels its processing.
///
member _.Dispose() =
cts.Cancel()
cts.Dispose()
(mbox :> IDisposable).Dispose()
// Convenience module for creating and using agents with common patterns
module Agent =
///
/// Creates and starts a simple agent that processes messages of type 'T in a loop.
/// The processor function is called for each message received.
///
/// A function to process each message.
/// An Agent instance.
let createSimple<'T> (processor: 'T -> unit) =
Agent<'T>.Start(fun inbox ->
let rec loop () = async {
let! message = inbox.Receive()
processor message
return! loop ()
}
loop ()
)
///
/// Creates and starts a stateful agent that maintains state of type 'State.
/// The processor function updates the state for each message received.
///
/// The initial state value.
/// A function to process each message and update the state.
/// An Agent instance.
let createStateful<'T, 'State> (initialState: 'State, processor: 'State -> 'T -> 'State) =
Agent<'T>.Start(fun inbox ->
let rec loop state = async {
let! message = inbox.Receive()
let newState = processor state message
return! loop newState
}
loop initialState
)
///
/// Creates and starts an agent that supports request-reply messaging.
/// The processor function computes a reply for each request.
///
/// A function to process each request and produce a reply.
/// An Agent instance.
let createReply<'Request, 'Reply>(processor: 'Request -> 'Reply) =
Agent<'Request * AsyncReplyChannel<'Reply>>.Start(fun inbox ->
let rec loop () = async {
let! request, replyChannel = inbox.Receive()
let reply = processor request
replyChannel.Reply(reply)
return! loop ()
}
loop ()
)
///
/// Creates and starts a stateful agent that supports request-reply messaging.
/// The processor function computes a reply and updates the state for each request.
///
/// The initial state value.
/// A function to process each request and update the state.
/// An Agent instance.
let createStatefulReply<'Request, 'Reply, 'State> (initialState: 'State, processor: 'State -> 'Request -> 'Reply * 'State) =
Agent<'Request * AsyncReplyChannel<'Reply>>.Start(fun inbox ->
let rec loop state = async {
let! request, replyChannel = inbox.Receive()
let reply, newState = processor state request
replyChannel.Reply(reply)
return! loop newState
}
loop initialState
)
///
/// Posts a request to the agent and tries to synchronously receive a reply within the specified timeout.
///
/// Timeout in milliseconds.
/// The request message.
/// The agent instance.
/// Some(reply) if successful, None if timed out.
let tryPostAndReply timeout msg (agent: Agent<_>) =
agent.TryPostAndReply((fun replyChannel ->
(msg, replyChannel)
), timeout)
///
/// Posts a request to the agent and synchronously waits for a reply.
/// Throws if the reply times out.
///
/// The request message.
/// The agent instance.
/// The reply value.
let postAndReply msg (agent: Agent<_>) =
if agent.DefaultTimeout = Timeout.Infinite then
agent
|> tryPostAndReply 1000 msg
|> function
| Some v -> v
| None -> failwith "Timed out waiting for reply"
else
agent.PostAndReply(fun replyChannel ->
(msg, replyChannel)
)
///
/// Posts a request to the agent and asynchronously waits for a reply.
///
/// The request message.
/// The agent instance.
/// An Async computation returning the reply.
let postAndAsyncReply msg (agent: Agent<_>) =
agent.PostAndAsyncReply(fun replyChannel ->
(msg, replyChannel)
)
///
/// Posts a request to the agent and asynchronously tries to receive a reply within the specified timeout.
///
/// Timeout in milliseconds.
/// The request message.
/// The agent instance.
/// An Async computation returning Some(reply) or None if timed out.
let postAndTryAsyncReply timeout msg (agent: Agent<_>) =
agent.PostAndTryAsyncReply((fun replyChannel ->
(msg, replyChannel)
), timeout)
///
/// Posts a request to the agent and returns a Task that completes with the reply.
///
/// The request message.
/// The agent instance.
/// A Task returning the reply.
let postAndAsyncReplyTask msg (agent: Agent<_>) =
agent.PostAndAsyncReply(fun replyChannel ->
(msg, replyChannel)
)
|> Task.FromResult
///
/// Gets the default timeout for reply operations on the agent.
///
/// The agent instance.
/// The default timeout in milliseconds.
let getDefaultTimeout (agent: Agent<_>) =
agent.DefaultTimeout
///
/// Sets the default timeout for reply operations on the agent.
///
/// Timeout in milliseconds.
/// The agent instance.
let setDefaultTimeout timeout (agent: Agent<_>) =
agent.DefaultTimeout <- timeout
///
/// Gets the current number of messages in the agent's queue.
///
/// The agent instance.
/// The number of queued messages.
let getCurrentQueueLength (agent: Agent<_>)=
agent.CurrentQueueLength
///
/// Disposes the agent and cancels its processing.
///
/// The agent instance.
let dispose (agent: Agent<_>) =
(agent :> IDisposable).Dispose()
// Example usage demonstrating the unified agent capabilities
module Examples =
let createLogger () =
let logger =
Agent.createSimple <| fun msg ->
if msg = "ERROR" then
failwith "Simulated error in logger"
else
printfn "[%s] %s" (DateTime.Now.ToString("HH:mm:ss")) msg
// Handle errors
logger.OnError.Add(fun ex ->
printfn "Logger error: %s" ex.Message)
logger
// Example 2: Stateful counter agent
let createCounter () =
Agent.createStateful(0, fun count message ->
match message with
| "increment" ->
let newCount = count + 1
printfn $"Count: %d{newCount}"
newCount
| "decrement" ->
let newCount = count - 1
printfn $"Count: %d{newCount}"
newCount
| _ ->
printfn $"Unknown command, count remains: %d{count}"
count
)
// Example 3: Request-reply calculator
type CalculatorMessage =
| Add of int * int
| Subtract of int * int
| Multiply of int * int
| Divide of int * int
let createUnsafeCalculator () =
Agent.createReply(fun message ->
match message with
| Add (x, y) -> x + y
| Subtract (x, y) -> x - y
| Multiply (x, y) -> x * y
| Divide (x, y) -> x / y
)
let createSafeCalculator () =
Agent.createReply(fun message ->
match message with
| Add (x, y) -> (+), x, y
| Subtract (x, y) -> (-), x, y
| Multiply (x, y) -> (*), x, y
| Divide (x, y) -> (/), x, y
|> fun (op, x, y) ->
try
x |> op <| y
|> Ok
with
| :? DivideByZeroException as ex ->
Error ex
)
open Examples
let logger = createLogger ()
logger.Post("Log this message")
logger.Post("ERROR")
logger.Post("Log this message")
logger |> Agent.dispose
// will throw an exception because the logger has been disposed
logger.Post("Log this message")
let counter = createCounter ()
counter.Post("increment")
counter.Post("increment")
counter.Post("decrement")
let unsafeCalculator = createUnsafeCalculator ()
unsafeCalculator.OnError.Add(fun ex ->
printfn "Calculator error: %s" ex.Message
)
unsafeCalculator
|> Agent.postAndReply (Add(3, 5))
|> printfn "3 + 5 = %d"
// will fall back to Agent.tryPostAndReply
unsafeCalculator
|> Agent.postAndReply (Divide(10, 0))
|> printfn "10 / 0 = %d"
// same as above, but using tryPostAndReply directly
unsafeCalculator
|> Agent.tryPostAndReply 100 (Divide(10, 0))
|> function
| Some value -> printfn $"10 / 0 = %d{value}"
| None -> printfn "Timed out waiting for reply"
// will not do anything because no reply is expected
async {
let! result =
unsafeCalculator
|> Agent.postAndAsyncReply (Divide(10, 0))
printfn $"10 / 0 = %d{result}"
}
|> Async.Start
// will time out because no reply is expected
async {
let! result =
unsafeCalculator
|> Agent.postAndTryAsyncReply 100 (Divide(10, 0))
match result with
| Some value -> printfn $"10 / 0 = %d{value}"
| None -> printfn "Timed out waiting for reply"
}
|> Async.Start
let safeCalculator = createSafeCalculator ()
// return Ok(8) for 3 + 5
safeCalculator
|> Agent.postAndReply (Add(3, 5))
|> printfn "3 + 5 = %A"
// return Error(DivideByZeroException) for 10 / 0
safeCalculator
|> Agent.postAndReply (Divide(10, 0))
|> printfn "10 / 0 = %A"