0 people like it.

Functional Agent

A functional wrapper around MailboxProcessor that simplifies agent-based concurrency, adds error handling via events, and provides convenience functions to avoid common pitfalls when working with asynchronous message processing in F#.

  1: 
  2: 
  3: 
  4: 
  5: 
  6: 
  7: 
  8: 
  9: 
 10: 
 11: 
 12: 
 13: 
 14: 
 15: 
 16: 
 17: 
 18: 
 19: 
 20: 
 21: 
 22: 
 23: 
 24: 
 25: 
 26: 
 27: 
 28: 
 29: 
 30: 
 31: 
 32: 
 33: 
 34: 
 35: 
 36: 
 37: 
 38: 
 39: 
 40: 
 41: 
 42: 
 43: 
 44: 
 45: 
 46: 
 47: 
 48: 
 49: 
 50: 
 51: 
 52: 
 53: 
 54: 
 55: 
 56: 
 57: 
 58: 
 59: 
 60: 
 61: 
 62: 
 63: 
 64: 
 65: 
 66: 
 67: 
 68: 
 69: 
 70: 
 71: 
 72: 
 73: 
 74: 
 75: 
 76: 
 77: 
 78: 
 79: 
 80: 
 81: 
 82: 
 83: 
 84: 
 85: 
 86: 
 87: 
 88: 
 89: 
 90: 
 91: 
 92: 
 93: 
 94: 
 95: 
 96: 
 97: 
 98: 
 99: 
100: 
101: 
102: 
103: 
104: 
105: 
106: 
107: 
108: 
109: 
110: 
111: 
112: 
113: 
114: 
115: 
116: 
117: 
118: 
119: 
120: 
121: 
122: 
123: 
124: 
125: 
126: 
127: 
128: 
129: 
130: 
131: 
132: 
133: 
134: 
135: 
136: 
137: 
138: 
139: 
140: 
141: 
142: 
143: 
144: 
145: 
146: 
147: 
148: 
149: 
150: 
151: 
152: 
153: 
154: 
155: 
156: 
157: 
158: 
159: 
160: 
161: 
162: 
163: 
164: 
165: 
166: 
167: 
168: 
169: 
170: 
171: 
172: 
173: 
174: 
175: 
176: 
177: 
178: 
179: 
180: 
181: 
182: 
183: 
184: 
185: 
186: 
187: 
188: 
189: 
190: 
191: 
192: 
193: 
194: 
195: 
196: 
197: 
198: 
199: 
200: 
201: 
202: 
203: 
204: 
205: 
206: 
207: 
208: 
209: 
210: 
211: 
212: 
213: 
214: 
215: 
216: 
217: 
218: 
219: 
220: 
221: 
222: 
223: 
224: 
225: 
226: 
227: 
228: 
229: 
230: 
231: 
232: 
233: 
234: 
235: 
236: 
237: 
238: 
239: 
240: 
241: 
242: 
243: 
244: 
245: 
246: 
247: 
248: 
249: 
250: 
251: 
252: 
253: 
254: 
255: 
256: 
257: 
258: 
259: 
260: 
261: 
262: 
263: 
264: 
265: 
266: 
267: 
268: 
269: 
270: 
271: 
272: 
273: 
274: 
275: 
276: 
277: 
278: 
279: 
280: 
281: 
282: 
283: 
284: 
285: 
286: 
287: 
288: 
289: 
290: 
291: 
292: 
293: 
294: 
295: 
296: 
297: 
298: 
299: 
300: 
301: 
302: 
303: 
304: 
305: 
306: 
307: 
308: 
309: 
310: 
311: 
312: 
313: 
314: 
315: 
316: 
317: 
318: 
319: 
320: 
321: 
322: 
323: 
324: 
325: 
326: 
327: 
328: 
329: 
330: 
331: 
332: 
333: 
334: 
335: 
336: 
337: 
338: 
339: 
340: 
341: 
342: 
343: 
344: 
345: 
346: 
347: 
348: 
349: 
350: 
351: 
352: 
353: 
354: 
355: 
356: 
357: 
358: 
359: 
360: 
361: 
362: 
363: 
364: 
365: 
366: 
367: 
368: 
369: 
370: 
371: 
372: 
373: 
374: 
375: 
376: 
377: 
378: 
379: 
380: 
381: 
382: 
383: 
384: 
385: 
386: 
387: 
388: 
389: 
390: 
391: 
392: 
393: 
394: 
395: 
396: 
397: 
398: 
399: 
400: 
401: 
402: 
403: 
404: 
405: 
406: 
407: 
408: 
409: 
410: 
411: 
412: 
413: 
414: 
415: 
416: 
417: 
418: 
419: 
420: 
421: 
422: 
423: 
424: 
425: 
426: 
427: 
428: 
429: 
430: 
431: 
432: 
433: 
434: 
435: 
436: 
437: 
438: 
439: 
440: 
441: 
442: 
443: 
444: 
445: 
446: 
447: 
448: 
449: 
450: 
451: 
452: 
453: 
454: 
455: 
456: 
457: 
458: 
459: 
open System
open System.Threading
open System.Threading.Tasks

/// <summary>
/// Represents an asynchronous agent that processes messages of type 'T.
/// This is a wrapper around FSharp's MailboxProcessor, providing a unified API for agent-based concurrency.
/// </summary>
type Agent<'T>(body: Agent<'T> -> Async<unit>) as self =
    let cts = new CancellationTokenSource()
    let errEvent = Event<_>()

    // The main difference with the original MailboxProcessor is that we handle errors in the body function
    // by triggering an error event instead of throwing exceptions directly.
    // This allows the agent to continue processing messages even after an error occurs.
    // The error event can be subscribed to by the user to handle errors gracefully.
    // The agent will keep running and processing messages, but will notify subscribers of any unhandled exceptions.
    let mbox = new MailboxProcessor<'T>((fun _ ->
        let rec loop () =
            async {
                try return! body self
                with
                | e ->
                    errEvent.Trigger(e)
                    return! loop ()
            }
        loop ()
    ), true, cts.Token)

    /// <summary>
    /// Event that is triggered when an unhandled exception occurs in the agent's processing loop.
    /// </summary>
    member _.OnError = errEvent.Publish

    /// <summary>
    /// Gets the error event for the underlying MailboxProcessor.
    /// </summary>
    member _.Error = mbox.Error

    /// <summary>
    /// Posts a message to the agent asynchronously.
    /// </summary>
    member _.Post(message) = mbox.Post(message)

    /// <summary>
    /// Posts a message to the agent and synchronously waits for a reply.
    /// </summary>
    member _.PostAndReply(messageBuilder) = mbox.PostAndReply(messageBuilder)

    /// <summary>
    /// Posts a message to the agent and synchronously waits for a reply, with a timeout.
    /// </summary>
    member _.TryPostAndReply(messageBuilder, timeout) = mbox.TryPostAndReply(messageBuilder, timeout)

    /// <summary>
    /// Posts a message to the agent and asynchronously waits for a reply.
    /// </summary>
    member _.PostAndAsyncReply(messageBuilder) = mbox.PostAndAsyncReply(messageBuilder)

    /// <summary>
    /// Posts a message to the agent and asynchronously waits for a reply, with a timeout.
    /// </summary>
    member _.PostAndTryAsyncReply(messageBuilder, timeout) = mbox.PostAndTryAsyncReply(messageBuilder, timeout)

    /// <summary>
    /// Starts the agent's processing loop.
    /// </summary>
    member _.Start() = mbox.Start()

    /// <summary>
    /// Starts the agent's processing loop immediately on the current thread.
    /// </summary>
    member _.StartImmediate () = mbox.StartImmediate()

    /// <summary>
    /// Receives the next message from the agent's queue asynchronously.
    /// </summary>
    member _.Receive() = mbox.Receive()

    /// <summary>
    /// Receives the next message from the agent's queue asynchronously, with a timeout.
    /// </summary>
    member _.Receive(timeOut) = mbox.Receive(timeOut)

    /// <summary>
    /// Tries to receive a message from the agent's queue within the specified timeout.
    /// </summary>
    member _.TryReceive(timeout) = mbox.TryReceive(timeout)

    /// <summary>
    /// Scans the agent's queue for a message matching the given scanner function.
    /// </summary>
    member _.Scan(scanner) = mbox.Scan(scanner)

    /// <summary>
    /// Scans the agent's queue for a message matching the given scanner function, with a timeout.
    /// </summary>
    member _.TryScan(scanner, timeout) = mbox.TryScan(scanner, timeout)

    /// <summary>
    /// Gets the current number of messages in the agent's queue.
    /// </summary>
    member _.CurrentQueueLength = mbox.CurrentQueueLength

    /// <summary>
    /// Gets or sets the default timeout for reply operations.
    /// </summary>
    member _.DefaultTimeout
        with get() = mbox.DefaultTimeout
        and set value = mbox.DefaultTimeout <- value

    /// <summary>
    /// Gets the cancellation token associated with the agent.
    /// </summary>
    member _.CancellationToken = cts.Token

    /// <summary>
    /// Gets a value indicating whether cancellation has been requested for the agent.
    /// </summary>
    member _.IsCancellationRequested = cts.Token.IsCancellationRequested

    /// <summary>
    /// Creates and starts a new agent with the specified body.
    /// </summary>
    static member Start(body) =
        let mbox = new Agent<'T>(body)
        mbox.Start ()
        mbox

    /// <summary>
    /// Creates and starts a new agent immediately on the current thread.
    /// </summary>
    static member StartImmediate(body) =
        let mbox = new Agent<'T>(body)
        mbox.StartImmediate ()
        mbox

    interface IDisposable with
        /// <summary>
        /// Disposes the agent and cancels its processing.
        /// </summary>
        member _.Dispose() =
            cts.Cancel()
            cts.Dispose()
            (mbox :> IDisposable).Dispose()
            
            
// Convenience module for creating and using agents with common patterns
module Agent =

    /// <summary>
    /// Creates and starts a simple agent that processes messages of type 'T in a loop.
    /// The processor function is called for each message received.
    /// </summary>
    /// <param name="processor">A function to process each message.</param>
    /// <returns>An Agent instance.</returns>
    let createSimple<'T> (processor: 'T -> unit) =
        Agent<'T>.Start(fun inbox ->
            let rec loop () = async {
                let! message = inbox.Receive()
                processor message
                return! loop ()
            }
            loop ()
        )

    /// <summary>
    /// Creates and starts a stateful agent that maintains state of type 'State.
    /// The processor function updates the state for each message received.
    /// </summary>
    /// <param name="initialState">The initial state value.</param>
    /// <param name="processor">A function to process each message and update the state.</param>
    /// <returns>An Agent instance.</returns>
    let createStateful<'T, 'State> (initialState: 'State, processor: 'State -> 'T -> 'State) =
        Agent<'T>.Start(fun inbox ->
            let rec loop state = async {
                let! message = inbox.Receive()
                let newState = processor state message
                return! loop newState
            }
            loop initialState
        )

    /// <summary>
    /// Creates and starts an agent that supports request-reply messaging.
    /// The processor function computes a reply for each request.
    /// </summary>
    /// <param name="processor">A function to process each request and produce a reply.</param>
    /// <returns>An Agent instance.</returns>
    let createReply<'Request, 'Reply>(processor: 'Request -> 'Reply) =
        Agent<'Request * AsyncReplyChannel<'Reply>>.Start(fun inbox ->
            let rec loop () = async {
                let! request, replyChannel = inbox.Receive()
                let reply = processor request
                replyChannel.Reply(reply)
                return! loop ()
            }
            loop ()
        )

    /// <summary>
    /// Creates and starts a stateful agent that supports request-reply messaging.
    /// The processor function computes a reply and updates the state for each request.
    /// </summary>
    /// <param name="initialState">The initial state value.</param>
    /// <param name="processor">A function to process each request and update the state.</param>
    /// <returns>An Agent instance.</returns>
    let createStatefulReply<'Request, 'Reply, 'State> (initialState: 'State, processor: 'State -> 'Request -> 'Reply * 'State) =
        Agent<'Request * AsyncReplyChannel<'Reply>>.Start(fun inbox ->
            let rec loop state = async {
                let! request, replyChannel = inbox.Receive()
                let reply, newState = processor state request
                replyChannel.Reply(reply)
                return! loop newState
            }
            loop initialState
        )

    /// <summary>
    /// Posts a request to the agent and tries to synchronously receive a reply within the specified timeout.
    /// </summary>
    /// <param name="timeout">Timeout in milliseconds.</param>
    /// <param name="msg">The request message.</param>
    /// <param name="agent">The agent instance.</param>
    /// <returns>Some(reply) if successful, None if timed out.</returns>
    let tryPostAndReply timeout msg (agent: Agent<_>) =
        agent.TryPostAndReply((fun replyChannel ->
            (msg, replyChannel)
        ), timeout)

    /// <summary>
    /// Posts a request to the agent and synchronously waits for a reply.
    /// Throws if the reply times out.
    /// </summary>
    /// <param name="msg">The request message.</param>
    /// <param name="agent">The agent instance.</param>
    /// <returns>The reply value.</returns>
    let postAndReply msg (agent: Agent<_>) =
        if agent.DefaultTimeout = Timeout.Infinite then
            agent
            |> tryPostAndReply 1000 msg
            |> function
                | Some v -> v
                | None -> failwith "Timed out waiting for reply"
        else
            agent.PostAndReply(fun replyChannel ->
                (msg, replyChannel)
            )

    /// <summary>
    /// Posts a request to the agent and asynchronously waits for a reply.
    /// </summary>
    /// <param name="msg">The request message.</param>
    /// <param name="agent">The agent instance.</param>
    /// <returns>An Async computation returning the reply.</returns>
    let postAndAsyncReply msg (agent: Agent<_>) =
        agent.PostAndAsyncReply(fun replyChannel ->
            (msg, replyChannel)
        )

    /// <summary>
    /// Posts a request to the agent and asynchronously tries to receive a reply within the specified timeout.
    /// </summary>
    /// <param name="timeout">Timeout in milliseconds.</param>
    /// <param name="msg">The request message.</param>
    /// <param name="agent">The agent instance.</param>
    /// <returns>An Async computation returning Some(reply) or None if timed out.</returns>
    let postAndTryAsyncReply timeout msg (agent: Agent<_>) =
        agent.PostAndTryAsyncReply((fun replyChannel ->
            (msg, replyChannel)
        ), timeout)

    /// <summary>
    /// Posts a request to the agent and returns a Task that completes with the reply.
    /// </summary>
    /// <param name="msg">The request message.</param>
    /// <param name="agent">The agent instance.</param>
    /// <returns>A Task returning the reply.</returns>
    let postAndAsyncReplyTask msg (agent: Agent<_>) =
        agent.PostAndAsyncReply(fun replyChannel ->
            (msg, replyChannel)
        )
        |> Task.FromResult

    /// <summary>
    /// Gets the default timeout for reply operations on the agent.
    /// </summary>
    /// <param name="agent">The agent instance.</param>
    /// <returns>The default timeout in milliseconds.</returns>
    let getDefaultTimeout (agent: Agent<_>) =
        agent.DefaultTimeout

    /// <summary>
    /// Sets the default timeout for reply operations on the agent.
    /// </summary>
    /// <param name="timeout">Timeout in milliseconds.</param>
    /// <param name="agent">The agent instance.</param>
    let setDefaultTimeout timeout (agent: Agent<_>) =
        agent.DefaultTimeout <- timeout

    /// <summary>
    /// Gets the current number of messages in the agent's queue.
    /// </summary>
    /// <param name="agent">The agent instance.</param>
    /// <returns>The number of queued messages.</returns>
    let getCurrentQueueLength (agent: Agent<_>)=
        agent.CurrentQueueLength

    
    /// <summary>
    /// Disposes the agent and cancels its processing.
    /// </summary>
    /// <param name="agent">The agent instance.</param>
    let dispose (agent: Agent<_>) =
        (agent :> IDisposable).Dispose()

// Example usage demonstrating the unified agent capabilities
module Examples =
    
    let createLogger () =
        let logger =
            Agent.createSimple <| fun msg ->
                if msg = "ERROR" then
                    failwith "Simulated error in logger"
                else 
                    printfn "[%s] %s" (DateTime.Now.ToString("HH:mm:ss")) msg

        // Handle errors
        logger.OnError.Add(fun ex ->
            printfn "Logger error: %s" ex.Message)

        logger

    // Example 2: Stateful counter agent
    let createCounter () =
        Agent.createStateful(0, fun count message ->
            match message with
            | "increment" -> 
                let newCount = count + 1
                printfn $"Count: %d{newCount}"
                newCount
            | "decrement" -> 
                let newCount = count - 1
                printfn $"Count: %d{newCount}"
                newCount
            | _ -> 
                printfn $"Unknown command, count remains: %d{count}"
                count
        )
    
    // Example 3: Request-reply calculator
    type CalculatorMessage =
        | Add of int * int
        | Subtract of int * int
        | Multiply of int * int
        | Divide of int * int

    let createUnsafeCalculator () =
        Agent.createReply(fun message ->
            match message with
            | Add (x, y) -> x + y
            | Subtract (x, y) -> x - y
            | Multiply (x, y) -> x * y
            | Divide (x, y) ->  x / y
        )


    let createSafeCalculator () =
        Agent.createReply(fun message ->
            match message with
            | Add (x, y) -> (+), x, y
            | Subtract (x, y) -> (-),  x, y
            | Multiply (x, y) -> (*), x, y
            | Divide (x, y) ->  (/), x, y
            |> fun (op, x, y) ->
                try
                    x |> op <| y
                    |> Ok
                with
                | :? DivideByZeroException as ex ->
                    Error ex
        )


open Examples


let logger = createLogger ()
logger.Post("Log this message")
logger.Post("ERROR")
logger.Post("Log this message")


logger |> Agent.dispose
// will throw an exception because the logger has been disposed
logger.Post("Log this message")


let counter = createCounter ()
counter.Post("increment")
counter.Post("increment")
counter.Post("decrement")


let unsafeCalculator = createUnsafeCalculator ()
unsafeCalculator.OnError.Add(fun ex ->
    printfn "Calculator error: %s" ex.Message
)

unsafeCalculator
|> Agent.postAndReply (Add(3, 5))
|> printfn "3 + 5 = %d"

// will fall back to Agent.tryPostAndReply
unsafeCalculator
|> Agent.postAndReply (Divide(10, 0))
|> printfn "10 / 0 = %d"


// same as above, but using tryPostAndReply directly
unsafeCalculator
|> Agent.tryPostAndReply 100 (Divide(10, 0)) 
|> function
    | Some value -> printfn $"10 / 0 = %d{value}"
    | None -> printfn "Timed out waiting for reply"

// will not do anything because no reply is expected
async {
    let! result =
        unsafeCalculator
        |> Agent.postAndAsyncReply (Divide(10, 0))
    printfn $"10 / 0 = %d{result}"
}
|> Async.Start

// will time out because no reply is expected
async {
    let! result =
        unsafeCalculator
        |> Agent.postAndTryAsyncReply 100 (Divide(10, 0)) 
    
    match result with
    | Some value -> printfn $"10 / 0 = %d{value}"
    | None -> printfn "Timed out waiting for reply"
}
|> Async.Start


let safeCalculator = createSafeCalculator ()

// return Ok(8) for 3 + 5
safeCalculator
|> Agent.postAndReply (Add(3, 5))
|> printfn "3 + 5 = %A"

// return Error(DivideByZeroException) for 10 / 0
safeCalculator
|> Agent.postAndReply (Divide(10, 0))
|> printfn "10 / 0 = %A"
namespace System
namespace System.Threading
namespace System.Threading.Tasks
Multiple items
type Agent<'T> =
  interface IDisposable
  new : body:(Agent<'T> -> Async<unit>) -> Agent<'T>
  member Post : message:'T -> unit
  member PostAndAsyncReply : messageBuilder:(AsyncReplyChannel<'a> -> 'T) -> Async<'a>
  member PostAndReply : messageBuilder:(AsyncReplyChannel<'a> -> 'T) -> 'a
  member PostAndTryAsyncReply : messageBuilder:(AsyncReplyChannel<'a> -> 'T) * timeout:int -> Async<'a option>
  member Receive : timeOut:int -> Async<'T> + 1 overload
  member Scan : scanner:('T -> Async<'a> option) -> Async<'a>
  member TryPostAndReply : messageBuilder:(AsyncReplyChannel<'a> -> 'T) * timeout:int -> 'a option
  member TryReceive : timeout:int -> Async<'T option>
  ...
 <summary>
 Represents an asynchronous agent that processes messages of type 'T.
 This is a wrapper around FSharp's MailboxProcessor, providing a unified API for agent-based concurrency.
 </summary>


--------------------
new : body:(Agent<'T> -> Async<unit>) -> Agent<'T>
val body : (Agent<'T> -> Async<unit>)
Multiple items
type Async =
  static member AsBeginEnd : computation:('Arg -> Async<'T>) -> ('Arg * AsyncCallback * obj -> IAsyncResult) * (IAsyncResult -> 'T) * (IAsyncResult -> unit)
  static member AwaitEvent : event:IEvent<'Del,'T> * ?cancelAction:(unit -> unit) -> Async<'T> (requires delegate and 'Del :> Delegate)
  static member AwaitIAsyncResult : iar:IAsyncResult * ?millisecondsTimeout:int -> Async<bool>
  static member AwaitTask : task:Task<'T> -> Async<'T> + 1 overload
  static member AwaitWaitHandle : waitHandle:WaitHandle * ?millisecondsTimeout:int -> Async<bool>
  static member CancelDefaultToken : unit -> unit
  static member Catch : computation:Async<'T> -> Async<Choice<'T,exn>>
  static member Choice : computations:seq<Async<'T option>> -> Async<'T option>
  static member FromBeginEnd : beginAction:(AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T> + 3 overloads
  static member FromContinuations : callback:(('T -> unit) * (exn -> unit) * (OperationCanceledException -> unit) -> unit) -> Async<'T>
  ...

--------------------
type Async<'T> =
type unit = Unit
val self : Agent<'T>
val cts : CancellationTokenSource
Multiple items
type CancellationTokenSource =
  interface IDisposable
  new : unit -> unit + 2 overloads
  member Cancel : unit -> unit + 1 overload
  member CancelAfter : delay: TimeSpan -> unit + 1 overload
  member Dispose : unit -> unit + 1 overload
  member ExecuteCallbackHandlers : throwOnFirstException: bool -> unit
  member InitializeWithTimer : millisecondsDelay: int -> unit
  member InternalRegister : callback: Action<obj> * stateForCallback: obj * syncContext: SynchronizationContext * executionContext: ExecutionContext -> CancellationTokenRegistration
  member NotifyCancellation : throwOnFirstException: bool -> unit
  member ThrowIfDisposed : unit -> unit
  ...

--------------------
CancellationTokenSource() : CancellationTokenSource
CancellationTokenSource(delay: TimeSpan) : CancellationTokenSource
CancellationTokenSource(millisecondsDelay: int) : CancellationTokenSource
val errEvent : Event<exn>
Multiple items
module Event

from Microsoft.FSharp.Control

--------------------
type Event<'T> =
  new : unit -> Event<'T>
  member Trigger : arg:'T -> unit
  member Publish : IEvent<'T>

--------------------
type Event<'Delegate,'Args (requires delegate and 'Delegate :> Delegate)> =
  new : unit -> Event<'Delegate,'Args>
  member Trigger : sender:obj * args:'Args -> unit
  member Publish : IEvent<'Delegate,'Args>

--------------------
new : unit -> Event<'T>

--------------------
new : unit -> Event<'Delegate,'Args>
val mbox : MailboxProcessor<'T>
Multiple items
type MailboxProcessor<'Msg> =
  interface IDisposable
  new : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:CancellationToken -> MailboxProcessor<'Msg>
  member Post : message:'Msg -> unit
  member PostAndAsyncReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> Async<'Reply>
  member PostAndReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> 'Reply
  member PostAndTryAsyncReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> Async<'Reply option>
  member Receive : ?timeout:int -> Async<'Msg>
  member Scan : scanner:('Msg -> Async<'T> option) * ?timeout:int -> Async<'T>
  member TryPostAndReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> 'Reply option
  member TryReceive : ?timeout:int -> Async<'Msg option>
  ...

--------------------
new : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:CancellationToken -> MailboxProcessor<'Msg>
val loop : (unit -> Async<unit>)
val async : AsyncBuilder
val e : exn
member Event.Trigger : arg:'T -> unit
property CancellationTokenSource.Token: CancellationToken with get
property Event.Publish: IEvent<exn> with get
event MailboxProcessor.Error: IEvent<Handler<Exception>,Exception>
val message : 'T
member MailboxProcessor.Post : message:'Msg -> unit
val messageBuilder : (AsyncReplyChannel<'a> -> 'T)
member MailboxProcessor.PostAndReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> 'Reply
val timeout : int
member MailboxProcessor.TryPostAndReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> 'Reply option
member MailboxProcessor.PostAndAsyncReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> Async<'Reply>
member MailboxProcessor.PostAndTryAsyncReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> Async<'Reply option>
member MailboxProcessor.Start : unit -> unit
member MailboxProcessor.Receive : ?timeout:int -> Async<'Msg>
val timeOut : int
member MailboxProcessor.TryReceive : ?timeout:int -> Async<'Msg option>
val scanner : ('T -> Async<'a> option)
member MailboxProcessor.Scan : scanner:('Msg -> Async<'T> option) * ?timeout:int -> Async<'T>
member MailboxProcessor.TryScan : scanner:('Msg -> Async<'T> option) * ?timeout:int -> Async<'T option>
property MailboxProcessor.CurrentQueueLength: int with get
property MailboxProcessor.DefaultTimeout: int with get, set
val set : elements:seq<'T> -> Set<'T> (requires comparison)
val value : int
Multiple items
[<Struct>]
type CancellationToken =
  new : source: CancellationTokenSource -> unit + 1 overload
  member Equals : other: CancellationToken -> bool + 1 overload
  member GetHashCode : unit -> int
  member Register : callback: Action -> CancellationTokenRegistration + 4 overloads
  member ThrowIfCancellationRequested : unit -> unit
  member ThrowOperationCanceledException : unit -> unit
  member UnsafeRegister : callback: Action<obj> * state: obj -> CancellationTokenRegistration
  static member op_Equality : left: CancellationToken * right: CancellationToken -> bool
  static member op_Inequality : left: CancellationToken * right: CancellationToken -> bool
  static val s_actionToActionObjShunt : Action<obj>
  ...

--------------------
CancellationToken ()
CancellationToken(canceled: bool) : CancellationToken
property CancellationToken.IsCancellationRequested: bool with get
val mbox : Agent<'T>
member Agent.Start : unit -> unit
 <summary>
 Starts the agent's processing loop.
 </summary>
member Agent.StartImmediate : unit -> 'a
 <summary>
 Starts the agent's processing loop immediately on the current thread.
 </summary>
type IDisposable =
  member Dispose : unit -> unit
CancellationTokenSource.Cancel() : unit
CancellationTokenSource.Cancel(throwOnFirstException: bool) : unit
CancellationTokenSource.Dispose() : unit
val createSimple : processor:('T -> unit) -> Agent<'T>
 <summary>
 Creates and starts a simple agent that processes messages of type 'T in a loop.
 The processor function is called for each message received.
 </summary>
 <param name="processor">A function to process each message.</param>
 <returns>An Agent instance.</returns>
val processor : ('T -> unit)
val inbox : Agent<'T>
val loop : (unit -> Async<'a>)
member Agent.Receive : unit -> Async<'T>
 <summary>
 Receives the next message from the agent's queue asynchronously.
 </summary>

member Agent.Receive : timeOut:int -> Async<'T>
 <summary>
 Receives the next message from the agent's queue asynchronously, with a timeout.
 </summary>
val createStateful : initialState:'State * processor:('State -> 'T -> 'State) -> Agent<'T>
 <summary>
 Creates and starts a stateful agent that maintains state of type 'State.
 The processor function updates the state for each message received.
 </summary>
 <param name="initialState">The initial state value.</param>
 <param name="processor">A function to process each message and update the state.</param>
 <returns>An Agent instance.</returns>
val initialState : 'State
val processor : ('State -> 'T -> 'State)
val loop : ('State -> Async<'a>)
val state : 'State
val newState : 'State
val createReply : processor:('Request -> 'Reply) -> Agent<'Request * AsyncReplyChannel<'Reply>>
 <summary>
 Creates and starts an agent that supports request-reply messaging.
 The processor function computes a reply for each request.
 </summary>
 <param name="processor">A function to process each request and produce a reply.</param>
 <returns>An Agent instance.</returns>
val processor : ('Request -> 'Reply)
type AsyncReplyChannel<'Reply> =
  member Reply : value:'Reply -> unit
val inbox : Agent<'Request * AsyncReplyChannel<'Reply>>
val request : 'Request
val replyChannel : AsyncReplyChannel<'Reply>
val reply : 'Reply
member AsyncReplyChannel.Reply : value:'Reply -> unit
val createStatefulReply : initialState:'State * processor:('State -> 'Request -> 'Reply * 'State) -> Agent<'Request * AsyncReplyChannel<'Reply>>
 <summary>
 Creates and starts a stateful agent that supports request-reply messaging.
 The processor function computes a reply and updates the state for each request.
 </summary>
 <param name="initialState">The initial state value.</param>
 <param name="processor">A function to process each request and update the state.</param>
 <returns>An Agent instance.</returns>
val processor : ('State -> 'Request -> 'Reply * 'State)
val tryPostAndReply : timeout:int -> msg:'a -> agent:Agent<'a * AsyncReplyChannel<'b>> -> 'b option
 <summary>
 Posts a request to the agent and tries to synchronously receive a reply within the specified timeout.
 </summary>
 <param name="timeout">Timeout in milliseconds.</param>
 <param name="msg">The request message.</param>
 <param name="agent">The agent instance.</param>
 <returns>Some(reply) if successful, None if timed out.</returns>
val msg : 'a
val agent : Agent<'a * AsyncReplyChannel<'b>>
member Agent.TryPostAndReply : messageBuilder:(AsyncReplyChannel<'a> -> 'T) * timeout:int -> 'a option
 <summary>
 Posts a message to the agent and synchronously waits for a reply, with a timeout.
 </summary>
val replyChannel : AsyncReplyChannel<'b>
val postAndReply : msg:'a -> agent:Agent<'a * AsyncReplyChannel<'b>> -> 'b
 <summary>
 Posts a request to the agent and synchronously waits for a reply.
 Throws if the reply times out.
 </summary>
 <param name="msg">The request message.</param>
 <param name="agent">The agent instance.</param>
 <returns>The reply value.</returns>
property Agent.DefaultTimeout: int with get, set
 <summary>
 Gets or sets the default timeout for reply operations.
 </summary>
type Timeout =
  static val Infinite : int
  static val InfiniteTimeSpan : TimeSpan
field Timeout.Infinite: int = -1
union case Option.Some: Value: 'T -> Option<'T>
val v : 'b
union case Option.None: Option<'T>
val failwith : message:string -> 'T
member Agent.PostAndReply : messageBuilder:(AsyncReplyChannel<'a> -> 'T) -> 'a
 <summary>
 Posts a message to the agent and synchronously waits for a reply.
 </summary>
val postAndAsyncReply : msg:'a -> agent:Agent<'a * AsyncReplyChannel<'b>> -> Async<'b>
 <summary>
 Posts a request to the agent and asynchronously waits for a reply.
 </summary>
 <param name="msg">The request message.</param>
 <param name="agent">The agent instance.</param>
 <returns>An Async computation returning the reply.</returns>
member Agent.PostAndAsyncReply : messageBuilder:(AsyncReplyChannel<'a> -> 'T) -> Async<'a>
 <summary>
 Posts a message to the agent and asynchronously waits for a reply.
 </summary>
val postAndTryAsyncReply : timeout:int -> msg:'a -> agent:Agent<'a * AsyncReplyChannel<'b>> -> Async<'b option>
 <summary>
 Posts a request to the agent and asynchronously tries to receive a reply within the specified timeout.
 </summary>
 <param name="timeout">Timeout in milliseconds.</param>
 <param name="msg">The request message.</param>
 <param name="agent">The agent instance.</param>
 <returns>An Async computation returning Some(reply) or None if timed out.</returns>
member Agent.PostAndTryAsyncReply : messageBuilder:(AsyncReplyChannel<'a> -> 'T) * timeout:int -> Async<'a option>
 <summary>
 Posts a message to the agent and asynchronously waits for a reply, with a timeout.
 </summary>
val postAndAsyncReplyTask : msg:'a -> agent:Agent<'a * AsyncReplyChannel<'b>> -> Task<Async<'b>>
 <summary>
 Posts a request to the agent and returns a Task that completes with the reply.
 </summary>
 <param name="msg">The request message.</param>
 <param name="agent">The agent instance.</param>
 <returns>A Task returning the reply.</returns>
Multiple items
type Task =
  interface IAsyncResult
  interface IDisposable
  new : canceled: bool * creationOptions: TaskCreationOptions * ct: CancellationToken -> unit + 11 overloads
  member AddCompletionAction : action: ITaskCompletionAction -> unit + 1 overload
  member AddException : exceptionObject: obj -> unit + 1 overload
  member AddExceptionsFromChildren : props: ContingentProperties -> unit
  member AddNewChild : unit -> unit
  member AddTaskContinuation : tc: obj * addBeforeOthers: bool -> bool
  member AddTaskContinuationComplex : tc: obj * addBeforeOthers: bool -> bool
  member AssignCancellationToken : cancellationToken: CancellationToken * antecedent: Task * continuation: TaskContinuation -> unit
  ...

--------------------
type Task<'TResult> =
  inherit Task
  new : unit -> unit + 13 overloads
  member ConfigureAwait : continueOnCapturedContext: bool -> ConfiguredTaskAwaitable<'TResult>
  member ContinueWith : continuationAction: Action<Task<'TResult>> -> Task + 23 overloads
  member DangerousSetResult : result: 'TResult -> unit
  member GetAwaiter : unit -> TaskAwaiter<'TResult>
  member GetResultCore : waitCompletionNotification: bool -> 'TResult
  member InnerInvoke : unit -> unit
  member TrySetResult : result: 'TResult -> bool
  static member StartNew : parent: Task * function: Func<'TResult> * cancellationToken: CancellationToken * creationOptions: TaskCreationOptions * internalOptions: InternalTaskOptions * scheduler: TaskScheduler -> Task<'TResult> + 1 overload
  ...

--------------------
Task(action: Action) : Task
Task(action: Action, cancellationToken: CancellationToken) : Task
Task(action: Action, creationOptions: TaskCreationOptions) : Task
Task(action: Action<obj>, state: obj) : Task
Task(action: Action, cancellationToken: CancellationToken, creationOptions: TaskCreationOptions) : Task
Task(action: Action<obj>, state: obj, cancellationToken: CancellationToken) : Task
Task(action: Action<obj>, state: obj, creationOptions: TaskCreationOptions) : Task
Task(action: Action<obj>, state: obj, cancellationToken: CancellationToken, creationOptions: TaskCreationOptions) : Task

--------------------
Task(function: Func<'TResult>) : Task<'TResult>
Task(function: Func<'TResult>, cancellationToken: CancellationToken) : Task<'TResult>
Task(function: Func<'TResult>, creationOptions: TaskCreationOptions) : Task<'TResult>
Task(function: Func<obj,'TResult>, state: obj) : Task<'TResult>
Task(function: Func<'TResult>, cancellationToken: CancellationToken, creationOptions: TaskCreationOptions) : Task<'TResult>
Task(function: Func<obj,'TResult>, state: obj, cancellationToken: CancellationToken) : Task<'TResult>
Task(function: Func<obj,'TResult>, state: obj, creationOptions: TaskCreationOptions) : Task<'TResult>
Task(function: Func<obj,'TResult>, state: obj, cancellationToken: CancellationToken, creationOptions: TaskCreationOptions) : Task<'TResult>
Task.FromResult<'TResult>(result: 'TResult) : Task<'TResult>
val getDefaultTimeout : agent:Agent<'a> -> int
 <summary>
 Gets the default timeout for reply operations on the agent.
 </summary>
 <param name="agent">The agent instance.</param>
 <returns>The default timeout in milliseconds.</returns>
val agent : Agent<'a>
val setDefaultTimeout : timeout:int -> agent:Agent<'a> -> unit
 <summary>
 Sets the default timeout for reply operations on the agent.
 </summary>
 <param name="timeout">Timeout in milliseconds.</param>
 <param name="agent">The agent instance.</param>
val getCurrentQueueLength : agent:Agent<'a> -> int
 <summary>
 Gets the current number of messages in the agent's queue.
 </summary>
 <param name="agent">The agent instance.</param>
 <returns>The number of queued messages.</returns>
property Agent.CurrentQueueLength: int with get
 <summary>
 Gets the current number of messages in the agent's queue.
 </summary>
val dispose : agent:Agent<'a> -> unit
 <summary>
 Disposes the agent and cancels its processing.
 </summary>
 <param name="agent">The agent instance.</param>
val createLogger : unit -> Agent<string>
val logger : Agent<string>
Multiple items
module Agent

from Script

--------------------
type Agent<'T> =
  interface IDisposable
  new : body:(Agent<'T> -> Async<unit>) -> Agent<'T>
  member Post : message:'T -> unit
  member PostAndAsyncReply : messageBuilder:(AsyncReplyChannel<'a> -> 'T) -> Async<'a>
  member PostAndReply : messageBuilder:(AsyncReplyChannel<'a> -> 'T) -> 'a
  member PostAndTryAsyncReply : messageBuilder:(AsyncReplyChannel<'a> -> 'T) * timeout:int -> Async<'a option>
  member Receive : timeOut:int -> Async<'T> + 1 overload
  member Scan : scanner:('T -> Async<'a> option) -> Async<'a>
  member TryPostAndReply : messageBuilder:(AsyncReplyChannel<'a> -> 'T) * timeout:int -> 'a option
  member TryReceive : timeout:int -> Async<'T option>
  ...
 <summary>
 Represents an asynchronous agent that processes messages of type 'T.
 This is a wrapper around FSharp's MailboxProcessor, providing a unified API for agent-based concurrency.
 </summary>


--------------------
new : body:(Agent<'T> -> Async<unit>) -> Agent<'T>
val msg : string
val printfn : format:Printf.TextWriterFormat<'T> -> 'T
Multiple items
[<Struct>]
type DateTime =
  new : ticks: int64 -> unit + 13 overloads
  member Add : value: TimeSpan -> DateTime + 1 overload
  member AddDays : value: float -> DateTime
  member AddHours : value: float -> DateTime
  member AddMilliseconds : value: float -> DateTime
  member AddMinutes : value: float -> DateTime
  member AddMonths : months: int -> DateTime
  member AddSeconds : value: float -> DateTime
  member AddTicks : value: int64 -> DateTime
  member AddYears : value: int -> DateTime
  ...

--------------------
DateTime ()
   (+0 other overloads)
DateTime(ticks: int64) : DateTime
   (+0 other overloads)
DateTime(ticks: int64, kind: DateTimeKind) : DateTime
   (+0 other overloads)
DateTime(year: int, month: int, day: int) : DateTime
   (+0 other overloads)
DateTime(year: int, month: int, day: int, calendar: Globalization.Calendar) : DateTime
   (+0 other overloads)
DateTime(year: int, month: int, day: int, hour: int, minute: int, second: int) : DateTime
   (+0 other overloads)
DateTime(year: int, month: int, day: int, hour: int, minute: int, second: int, kind: DateTimeKind) : DateTime
   (+0 other overloads)
DateTime(year: int, month: int, day: int, hour: int, minute: int, second: int, calendar: Globalization.Calendar) : DateTime
   (+0 other overloads)
DateTime(year: int, month: int, day: int, hour: int, minute: int, second: int, millisecond: int) : DateTime
   (+0 other overloads)
DateTime(year: int, month: int, day: int, hour: int, minute: int, second: int, millisecond: int, kind: DateTimeKind) : DateTime
   (+0 other overloads)
property DateTime.Now: DateTime with get
DateTime.ToString() : string
DateTime.ToString(provider: IFormatProvider) : string
DateTime.ToString(format: string) : string
DateTime.ToString(format: string, provider: IFormatProvider) : string
property Agent.OnError: IEvent<exn> with get
 <summary>
 Event that is triggered when an unhandled exception occurs in the agent's processing loop.
 </summary>
member IObservable.Add : callback:('T -> unit) -> unit
val ex : exn
property Exception.Message: string with get
val createCounter : unit -> Agent<string>
val count : int
val message : string
val newCount : int
type CalculatorMessage =
  | Add of int * int
  | Subtract of int * int
  | Multiply of int * int
  | Divide of int * int
union case CalculatorMessage.Add: int * int -> CalculatorMessage
Multiple items
val int : value:'T -> int (requires member op_Explicit)

--------------------
[<Struct>]
type int = int32

--------------------
type int<'Measure> =
  int
union case CalculatorMessage.Subtract: int * int -> CalculatorMessage
union case CalculatorMessage.Multiply: int * int -> CalculatorMessage
union case CalculatorMessage.Divide: int * int -> CalculatorMessage
val createUnsafeCalculator : unit -> Agent<CalculatorMessage * AsyncReplyChannel<int>>
val message : CalculatorMessage
val x : int
val y : int
val createSafeCalculator : unit -> Agent<CalculatorMessage * AsyncReplyChannel<Result<int,DivideByZeroException>>>
val op : (int -> int -> int)
union case Result.Ok: ResultValue: 'T -> Result<'T,'TError>
Multiple items
type DivideByZeroException =
  inherit ArithmeticException
  new : unit -> unit + 3 overloads
  static val InnerExceptionPrefix : string
  static val s_DispatchStateLock : obj

--------------------
DivideByZeroException() : DivideByZeroException
DivideByZeroException(message: string) : DivideByZeroException
DivideByZeroException(message: string, innerException: exn) : DivideByZeroException
val ex : DivideByZeroException
union case Result.Error: ErrorValue: 'TError -> Result<'T,'TError>
module Examples

from Script
member Agent.Post : message:'T -> unit
 <summary>
 Posts a message to the agent asynchronously.
 </summary>
val counter : Agent<string>
val unsafeCalculator : Agent<CalculatorMessage * AsyncReplyChannel<int>>
val result : int
static member Async.Start : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
val result : int option
val safeCalculator : Agent<CalculatorMessage * AsyncReplyChannel<Result<int,DivideByZeroException>>>
Raw view Test code New version

More information

Link:http://fssnip.net/8aK
Posted:22 days ago
Author:halcwb
Tags: #mailboxprocessor , #agents