0 people like it.
Like the snippet!
Functional Agent
A functional wrapper around MailboxProcessor that simplifies agent-based concurrency, adds error handling via events, and provides convenience functions to avoid common pitfalls when working with asynchronous message processing in F#.
1:
2:
3:
4:
5:
6:
7:
8:
9:
10:
11:
12:
13:
14:
15:
16:
17:
18:
19:
20:
21:
22:
23:
24:
25:
26:
27:
28:
29:
30:
31:
32:
33:
34:
35:
36:
37:
38:
39:
40:
41:
42:
43:
44:
45:
46:
47:
48:
49:
50:
51:
52:
53:
54:
55:
56:
57:
58:
59:
60:
61:
62:
63:
64:
65:
66:
67:
68:
69:
70:
71:
72:
73:
74:
75:
76:
77:
78:
79:
80:
81:
82:
83:
84:
85:
86:
87:
88:
89:
90:
91:
92:
93:
94:
95:
96:
97:
98:
99:
100:
101:
102:
103:
104:
105:
106:
107:
108:
109:
110:
111:
112:
113:
114:
115:
116:
117:
118:
119:
120:
121:
122:
123:
124:
125:
126:
127:
128:
129:
130:
131:
132:
133:
134:
135:
136:
137:
138:
139:
140:
141:
142:
143:
144:
145:
146:
147:
148:
149:
150:
151:
152:
153:
154:
155:
156:
157:
158:
159:
160:
161:
162:
163:
164:
165:
166:
167:
168:
169:
170:
171:
172:
173:
174:
175:
176:
177:
178:
179:
180:
181:
182:
183:
184:
185:
186:
187:
188:
189:
190:
191:
192:
193:
194:
195:
196:
197:
198:
199:
200:
201:
202:
203:
204:
205:
206:
207:
208:
209:
210:
211:
212:
213:
214:
215:
216:
217:
218:
219:
220:
221:
222:
223:
224:
225:
226:
227:
228:
229:
230:
231:
232:
233:
234:
235:
236:
237:
238:
239:
240:
241:
242:
243:
244:
245:
246:
247:
248:
249:
250:
251:
252:
253:
254:
255:
256:
257:
258:
259:
260:
261:
262:
263:
264:
265:
266:
267:
268:
269:
270:
271:
272:
273:
274:
275:
276:
277:
278:
279:
280:
281:
282:
283:
284:
285:
286:
287:
288:
289:
290:
291:
292:
293:
294:
295:
296:
297:
298:
299:
300:
301:
302:
303:
304:
305:
306:
307:
308:
309:
310:
311:
312:
313:
314:
315:
316:
317:
318:
319:
320:
321:
322:
323:
324:
325:
326:
327:
328:
329:
330:
331:
332:
333:
334:
335:
336:
337:
338:
339:
340:
341:
342:
343:
344:
345:
346:
347:
348:
349:
350:
351:
352:
353:
354:
355:
356:
357:
358:
359:
360:
361:
362:
363:
364:
365:
366:
367:
368:
369:
370:
371:
372:
373:
374:
375:
376:
377:
378:
379:
380:
381:
382:
383:
384:
385:
386:
387:
388:
389:
390:
391:
392:
393:
394:
395:
396:
397:
398:
399:
400:
401:
402:
403:
404:
405:
406:
407:
408:
409:
410:
411:
412:
413:
414:
415:
416:
417:
418:
419:
420:
421:
422:
423:
424:
425:
426:
427:
428:
429:
430:
431:
432:
433:
434:
435:
436:
437:
438:
439:
440:
441:
442:
443:
444:
445:
446:
447:
448:
449:
450:
451:
452:
453:
454:
455:
456:
457:
458:
459:
|
open System
open System.Threading
open System.Threading.Tasks
/// <summary>
/// Represents an asynchronous agent that processes messages of type 'T.
/// This is a wrapper around FSharp's MailboxProcessor, providing a unified API for agent-based concurrency.
/// </summary>
type Agent<'T>(body: Agent<'T> -> Async<unit>) as self =
let cts = new CancellationTokenSource()
let errEvent = Event<_>()
// The main difference with the original MailboxProcessor is that we handle errors in the body function
// by triggering an error event instead of throwing exceptions directly.
// This allows the agent to continue processing messages even after an error occurs.
// The error event can be subscribed to by the user to handle errors gracefully.
// The agent will keep running and processing messages, but will notify subscribers of any unhandled exceptions.
let mbox = new MailboxProcessor<'T>((fun _ ->
let rec loop () =
async {
try return! body self
with
| e ->
errEvent.Trigger(e)
return! loop ()
}
loop ()
), true, cts.Token)
/// <summary>
/// Event that is triggered when an unhandled exception occurs in the agent's processing loop.
/// </summary>
member _.OnError = errEvent.Publish
/// <summary>
/// Gets the error event for the underlying MailboxProcessor.
/// </summary>
member _.Error = mbox.Error
/// <summary>
/// Posts a message to the agent asynchronously.
/// </summary>
member _.Post(message) = mbox.Post(message)
/// <summary>
/// Posts a message to the agent and synchronously waits for a reply.
/// </summary>
member _.PostAndReply(messageBuilder) = mbox.PostAndReply(messageBuilder)
/// <summary>
/// Posts a message to the agent and synchronously waits for a reply, with a timeout.
/// </summary>
member _.TryPostAndReply(messageBuilder, timeout) = mbox.TryPostAndReply(messageBuilder, timeout)
/// <summary>
/// Posts a message to the agent and asynchronously waits for a reply.
/// </summary>
member _.PostAndAsyncReply(messageBuilder) = mbox.PostAndAsyncReply(messageBuilder)
/// <summary>
/// Posts a message to the agent and asynchronously waits for a reply, with a timeout.
/// </summary>
member _.PostAndTryAsyncReply(messageBuilder, timeout) = mbox.PostAndTryAsyncReply(messageBuilder, timeout)
/// <summary>
/// Starts the agent's processing loop.
/// </summary>
member _.Start() = mbox.Start()
/// <summary>
/// Starts the agent's processing loop immediately on the current thread.
/// </summary>
member _.StartImmediate () = mbox.StartImmediate()
/// <summary>
/// Receives the next message from the agent's queue asynchronously.
/// </summary>
member _.Receive() = mbox.Receive()
/// <summary>
/// Receives the next message from the agent's queue asynchronously, with a timeout.
/// </summary>
member _.Receive(timeOut) = mbox.Receive(timeOut)
/// <summary>
/// Tries to receive a message from the agent's queue within the specified timeout.
/// </summary>
member _.TryReceive(timeout) = mbox.TryReceive(timeout)
/// <summary>
/// Scans the agent's queue for a message matching the given scanner function.
/// </summary>
member _.Scan(scanner) = mbox.Scan(scanner)
/// <summary>
/// Scans the agent's queue for a message matching the given scanner function, with a timeout.
/// </summary>
member _.TryScan(scanner, timeout) = mbox.TryScan(scanner, timeout)
/// <summary>
/// Gets the current number of messages in the agent's queue.
/// </summary>
member _.CurrentQueueLength = mbox.CurrentQueueLength
/// <summary>
/// Gets or sets the default timeout for reply operations.
/// </summary>
member _.DefaultTimeout
with get() = mbox.DefaultTimeout
and set value = mbox.DefaultTimeout <- value
/// <summary>
/// Gets the cancellation token associated with the agent.
/// </summary>
member _.CancellationToken = cts.Token
/// <summary>
/// Gets a value indicating whether cancellation has been requested for the agent.
/// </summary>
member _.IsCancellationRequested = cts.Token.IsCancellationRequested
/// <summary>
/// Creates and starts a new agent with the specified body.
/// </summary>
static member Start(body) =
let mbox = new Agent<'T>(body)
mbox.Start ()
mbox
/// <summary>
/// Creates and starts a new agent immediately on the current thread.
/// </summary>
static member StartImmediate(body) =
let mbox = new Agent<'T>(body)
mbox.StartImmediate ()
mbox
interface IDisposable with
/// <summary>
/// Disposes the agent and cancels its processing.
/// </summary>
member _.Dispose() =
cts.Cancel()
cts.Dispose()
(mbox :> IDisposable).Dispose()
// Convenience module for creating and using agents with common patterns
module Agent =
/// <summary>
/// Creates and starts a simple agent that processes messages of type 'T in a loop.
/// The processor function is called for each message received.
/// </summary>
/// <param name="processor">A function to process each message.</param>
/// <returns>An Agent instance.</returns>
let createSimple<'T> (processor: 'T -> unit) =
Agent<'T>.Start(fun inbox ->
let rec loop () = async {
let! message = inbox.Receive()
processor message
return! loop ()
}
loop ()
)
/// <summary>
/// Creates and starts a stateful agent that maintains state of type 'State.
/// The processor function updates the state for each message received.
/// </summary>
/// <param name="initialState">The initial state value.</param>
/// <param name="processor">A function to process each message and update the state.</param>
/// <returns>An Agent instance.</returns>
let createStateful<'T, 'State> (initialState: 'State, processor: 'State -> 'T -> 'State) =
Agent<'T>.Start(fun inbox ->
let rec loop state = async {
let! message = inbox.Receive()
let newState = processor state message
return! loop newState
}
loop initialState
)
/// <summary>
/// Creates and starts an agent that supports request-reply messaging.
/// The processor function computes a reply for each request.
/// </summary>
/// <param name="processor">A function to process each request and produce a reply.</param>
/// <returns>An Agent instance.</returns>
let createReply<'Request, 'Reply>(processor: 'Request -> 'Reply) =
Agent<'Request * AsyncReplyChannel<'Reply>>.Start(fun inbox ->
let rec loop () = async {
let! request, replyChannel = inbox.Receive()
let reply = processor request
replyChannel.Reply(reply)
return! loop ()
}
loop ()
)
/// <summary>
/// Creates and starts a stateful agent that supports request-reply messaging.
/// The processor function computes a reply and updates the state for each request.
/// </summary>
/// <param name="initialState">The initial state value.</param>
/// <param name="processor">A function to process each request and update the state.</param>
/// <returns>An Agent instance.</returns>
let createStatefulReply<'Request, 'Reply, 'State> (initialState: 'State, processor: 'State -> 'Request -> 'Reply * 'State) =
Agent<'Request * AsyncReplyChannel<'Reply>>.Start(fun inbox ->
let rec loop state = async {
let! request, replyChannel = inbox.Receive()
let reply, newState = processor state request
replyChannel.Reply(reply)
return! loop newState
}
loop initialState
)
/// <summary>
/// Posts a request to the agent and tries to synchronously receive a reply within the specified timeout.
/// </summary>
/// <param name="timeout">Timeout in milliseconds.</param>
/// <param name="msg">The request message.</param>
/// <param name="agent">The agent instance.</param>
/// <returns>Some(reply) if successful, None if timed out.</returns>
let tryPostAndReply timeout msg (agent: Agent<_>) =
agent.TryPostAndReply((fun replyChannel ->
(msg, replyChannel)
), timeout)
/// <summary>
/// Posts a request to the agent and synchronously waits for a reply.
/// Throws if the reply times out.
/// </summary>
/// <param name="msg">The request message.</param>
/// <param name="agent">The agent instance.</param>
/// <returns>The reply value.</returns>
let postAndReply msg (agent: Agent<_>) =
if agent.DefaultTimeout = Timeout.Infinite then
agent
|> tryPostAndReply 1000 msg
|> function
| Some v -> v
| None -> failwith "Timed out waiting for reply"
else
agent.PostAndReply(fun replyChannel ->
(msg, replyChannel)
)
/// <summary>
/// Posts a request to the agent and asynchronously waits for a reply.
/// </summary>
/// <param name="msg">The request message.</param>
/// <param name="agent">The agent instance.</param>
/// <returns>An Async computation returning the reply.</returns>
let postAndAsyncReply msg (agent: Agent<_>) =
agent.PostAndAsyncReply(fun replyChannel ->
(msg, replyChannel)
)
/// <summary>
/// Posts a request to the agent and asynchronously tries to receive a reply within the specified timeout.
/// </summary>
/// <param name="timeout">Timeout in milliseconds.</param>
/// <param name="msg">The request message.</param>
/// <param name="agent">The agent instance.</param>
/// <returns>An Async computation returning Some(reply) or None if timed out.</returns>
let postAndTryAsyncReply timeout msg (agent: Agent<_>) =
agent.PostAndTryAsyncReply((fun replyChannel ->
(msg, replyChannel)
), timeout)
/// <summary>
/// Posts a request to the agent and returns a Task that completes with the reply.
/// </summary>
/// <param name="msg">The request message.</param>
/// <param name="agent">The agent instance.</param>
/// <returns>A Task returning the reply.</returns>
let postAndAsyncReplyTask msg (agent: Agent<_>) =
agent.PostAndAsyncReply(fun replyChannel ->
(msg, replyChannel)
)
|> Task.FromResult
/// <summary>
/// Gets the default timeout for reply operations on the agent.
/// </summary>
/// <param name="agent">The agent instance.</param>
/// <returns>The default timeout in milliseconds.</returns>
let getDefaultTimeout (agent: Agent<_>) =
agent.DefaultTimeout
/// <summary>
/// Sets the default timeout for reply operations on the agent.
/// </summary>
/// <param name="timeout">Timeout in milliseconds.</param>
/// <param name="agent">The agent instance.</param>
let setDefaultTimeout timeout (agent: Agent<_>) =
agent.DefaultTimeout <- timeout
/// <summary>
/// Gets the current number of messages in the agent's queue.
/// </summary>
/// <param name="agent">The agent instance.</param>
/// <returns>The number of queued messages.</returns>
let getCurrentQueueLength (agent: Agent<_>)=
agent.CurrentQueueLength
/// <summary>
/// Disposes the agent and cancels its processing.
/// </summary>
/// <param name="agent">The agent instance.</param>
let dispose (agent: Agent<_>) =
(agent :> IDisposable).Dispose()
// Example usage demonstrating the unified agent capabilities
module Examples =
let createLogger () =
let logger =
Agent.createSimple <| fun msg ->
if msg = "ERROR" then
failwith "Simulated error in logger"
else
printfn "[%s] %s" (DateTime.Now.ToString("HH:mm:ss")) msg
// Handle errors
logger.OnError.Add(fun ex ->
printfn "Logger error: %s" ex.Message)
logger
// Example 2: Stateful counter agent
let createCounter () =
Agent.createStateful(0, fun count message ->
match message with
| "increment" ->
let newCount = count + 1
printfn $"Count: %d{newCount}"
newCount
| "decrement" ->
let newCount = count - 1
printfn $"Count: %d{newCount}"
newCount
| _ ->
printfn $"Unknown command, count remains: %d{count}"
count
)
// Example 3: Request-reply calculator
type CalculatorMessage =
| Add of int * int
| Subtract of int * int
| Multiply of int * int
| Divide of int * int
let createUnsafeCalculator () =
Agent.createReply(fun message ->
match message with
| Add (x, y) -> x + y
| Subtract (x, y) -> x - y
| Multiply (x, y) -> x * y
| Divide (x, y) -> x / y
)
let createSafeCalculator () =
Agent.createReply(fun message ->
match message with
| Add (x, y) -> (+), x, y
| Subtract (x, y) -> (-), x, y
| Multiply (x, y) -> (*), x, y
| Divide (x, y) -> (/), x, y
|> fun (op, x, y) ->
try
x |> op <| y
|> Ok
with
| :? DivideByZeroException as ex ->
Error ex
)
open Examples
let logger = createLogger ()
logger.Post("Log this message")
logger.Post("ERROR")
logger.Post("Log this message")
logger |> Agent.dispose
// will throw an exception because the logger has been disposed
logger.Post("Log this message")
let counter = createCounter ()
counter.Post("increment")
counter.Post("increment")
counter.Post("decrement")
let unsafeCalculator = createUnsafeCalculator ()
unsafeCalculator.OnError.Add(fun ex ->
printfn "Calculator error: %s" ex.Message
)
unsafeCalculator
|> Agent.postAndReply (Add(3, 5))
|> printfn "3 + 5 = %d"
// will fall back to Agent.tryPostAndReply
unsafeCalculator
|> Agent.postAndReply (Divide(10, 0))
|> printfn "10 / 0 = %d"
// same as above, but using tryPostAndReply directly
unsafeCalculator
|> Agent.tryPostAndReply 100 (Divide(10, 0))
|> function
| Some value -> printfn $"10 / 0 = %d{value}"
| None -> printfn "Timed out waiting for reply"
// will not do anything because no reply is expected
async {
let! result =
unsafeCalculator
|> Agent.postAndAsyncReply (Divide(10, 0))
printfn $"10 / 0 = %d{result}"
}
|> Async.Start
// will time out because no reply is expected
async {
let! result =
unsafeCalculator
|> Agent.postAndTryAsyncReply 100 (Divide(10, 0))
match result with
| Some value -> printfn $"10 / 0 = %d{value}"
| None -> printfn "Timed out waiting for reply"
}
|> Async.Start
let safeCalculator = createSafeCalculator ()
// return Ok(8) for 3 + 5
safeCalculator
|> Agent.postAndReply (Add(3, 5))
|> printfn "3 + 5 = %A"
// return Error(DivideByZeroException) for 10 / 0
safeCalculator
|> Agent.postAndReply (Divide(10, 0))
|> printfn "10 / 0 = %A"
|
namespace System
namespace System.Threading
namespace System.Threading.Tasks
Multiple items
type Agent<'T> =
interface IDisposable
new : body:(Agent<'T> -> Async<unit>) -> Agent<'T>
member Post : message:'T -> unit
member PostAndAsyncReply : messageBuilder:(AsyncReplyChannel<'a> -> 'T) -> Async<'a>
member PostAndReply : messageBuilder:(AsyncReplyChannel<'a> -> 'T) -> 'a
member PostAndTryAsyncReply : messageBuilder:(AsyncReplyChannel<'a> -> 'T) * timeout:int -> Async<'a option>
member Receive : timeOut:int -> Async<'T> + 1 overload
member Scan : scanner:('T -> Async<'a> option) -> Async<'a>
member TryPostAndReply : messageBuilder:(AsyncReplyChannel<'a> -> 'T) * timeout:int -> 'a option
member TryReceive : timeout:int -> Async<'T option>
...
<summary>
Represents an asynchronous agent that processes messages of type 'T.
This is a wrapper around FSharp's MailboxProcessor, providing a unified API for agent-based concurrency.
</summary>
--------------------
new : body:(Agent<'T> -> Async<unit>) -> Agent<'T>
val body : (Agent<'T> -> Async<unit>)
Multiple items
type Async =
static member AsBeginEnd : computation:('Arg -> Async<'T>) -> ('Arg * AsyncCallback * obj -> IAsyncResult) * (IAsyncResult -> 'T) * (IAsyncResult -> unit)
static member AwaitEvent : event:IEvent<'Del,'T> * ?cancelAction:(unit -> unit) -> Async<'T> (requires delegate and 'Del :> Delegate)
static member AwaitIAsyncResult : iar:IAsyncResult * ?millisecondsTimeout:int -> Async<bool>
static member AwaitTask : task:Task<'T> -> Async<'T> + 1 overload
static member AwaitWaitHandle : waitHandle:WaitHandle * ?millisecondsTimeout:int -> Async<bool>
static member CancelDefaultToken : unit -> unit
static member Catch : computation:Async<'T> -> Async<Choice<'T,exn>>
static member Choice : computations:seq<Async<'T option>> -> Async<'T option>
static member FromBeginEnd : beginAction:(AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T> + 3 overloads
static member FromContinuations : callback:(('T -> unit) * (exn -> unit) * (OperationCanceledException -> unit) -> unit) -> Async<'T>
...
--------------------
type Async<'T> =
type unit = Unit
val self : Agent<'T>
val cts : CancellationTokenSource
Multiple items
type CancellationTokenSource =
interface IDisposable
new : unit -> unit + 2 overloads
member Cancel : unit -> unit + 1 overload
member CancelAfter : delay: TimeSpan -> unit + 1 overload
member Dispose : unit -> unit + 1 overload
member ExecuteCallbackHandlers : throwOnFirstException: bool -> unit
member InitializeWithTimer : millisecondsDelay: int -> unit
member InternalRegister : callback: Action<obj> * stateForCallback: obj * syncContext: SynchronizationContext * executionContext: ExecutionContext -> CancellationTokenRegistration
member NotifyCancellation : throwOnFirstException: bool -> unit
member ThrowIfDisposed : unit -> unit
...
--------------------
CancellationTokenSource() : CancellationTokenSource
CancellationTokenSource(delay: TimeSpan) : CancellationTokenSource
CancellationTokenSource(millisecondsDelay: int) : CancellationTokenSource
val errEvent : Event<exn>
Multiple items
module Event
from Microsoft.FSharp.Control
--------------------
type Event<'T> =
new : unit -> Event<'T>
member Trigger : arg:'T -> unit
member Publish : IEvent<'T>
--------------------
type Event<'Delegate,'Args (requires delegate and 'Delegate :> Delegate)> =
new : unit -> Event<'Delegate,'Args>
member Trigger : sender:obj * args:'Args -> unit
member Publish : IEvent<'Delegate,'Args>
--------------------
new : unit -> Event<'T>
--------------------
new : unit -> Event<'Delegate,'Args>
val mbox : MailboxProcessor<'T>
Multiple items
type MailboxProcessor<'Msg> =
interface IDisposable
new : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:CancellationToken -> MailboxProcessor<'Msg>
member Post : message:'Msg -> unit
member PostAndAsyncReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> Async<'Reply>
member PostAndReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> 'Reply
member PostAndTryAsyncReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> Async<'Reply option>
member Receive : ?timeout:int -> Async<'Msg>
member Scan : scanner:('Msg -> Async<'T> option) * ?timeout:int -> Async<'T>
member TryPostAndReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> 'Reply option
member TryReceive : ?timeout:int -> Async<'Msg option>
...
--------------------
new : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:CancellationToken -> MailboxProcessor<'Msg>
val loop : (unit -> Async<unit>)
val async : AsyncBuilder
val e : exn
member Event.Trigger : arg:'T -> unit
property CancellationTokenSource.Token: CancellationToken with get
property Event.Publish: IEvent<exn> with get
event MailboxProcessor.Error: IEvent<Handler<Exception>,Exception>
val message : 'T
member MailboxProcessor.Post : message:'Msg -> unit
val messageBuilder : (AsyncReplyChannel<'a> -> 'T)
member MailboxProcessor.PostAndReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> 'Reply
val timeout : int
member MailboxProcessor.TryPostAndReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> 'Reply option
member MailboxProcessor.PostAndAsyncReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> Async<'Reply>
member MailboxProcessor.PostAndTryAsyncReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> Async<'Reply option>
member MailboxProcessor.Start : unit -> unit
member MailboxProcessor.Receive : ?timeout:int -> Async<'Msg>
val timeOut : int
member MailboxProcessor.TryReceive : ?timeout:int -> Async<'Msg option>
val scanner : ('T -> Async<'a> option)
member MailboxProcessor.Scan : scanner:('Msg -> Async<'T> option) * ?timeout:int -> Async<'T>
member MailboxProcessor.TryScan : scanner:('Msg -> Async<'T> option) * ?timeout:int -> Async<'T option>
property MailboxProcessor.CurrentQueueLength: int with get
property MailboxProcessor.DefaultTimeout: int with get, set
val set : elements:seq<'T> -> Set<'T> (requires comparison)
val value : int
Multiple items
[<Struct>]
type CancellationToken =
new : source: CancellationTokenSource -> unit + 1 overload
member Equals : other: CancellationToken -> bool + 1 overload
member GetHashCode : unit -> int
member Register : callback: Action -> CancellationTokenRegistration + 4 overloads
member ThrowIfCancellationRequested : unit -> unit
member ThrowOperationCanceledException : unit -> unit
member UnsafeRegister : callback: Action<obj> * state: obj -> CancellationTokenRegistration
static member op_Equality : left: CancellationToken * right: CancellationToken -> bool
static member op_Inequality : left: CancellationToken * right: CancellationToken -> bool
static val s_actionToActionObjShunt : Action<obj>
...
--------------------
CancellationToken ()
CancellationToken(canceled: bool) : CancellationToken
property CancellationToken.IsCancellationRequested: bool with get
val mbox : Agent<'T>
member Agent.Start : unit -> unit
<summary>
Starts the agent's processing loop.
</summary>
member Agent.StartImmediate : unit -> 'a
<summary>
Starts the agent's processing loop immediately on the current thread.
</summary>
type IDisposable =
member Dispose : unit -> unit
CancellationTokenSource.Cancel() : unit
CancellationTokenSource.Cancel(throwOnFirstException: bool) : unit
CancellationTokenSource.Dispose() : unit
val createSimple : processor:('T -> unit) -> Agent<'T>
<summary>
Creates and starts a simple agent that processes messages of type 'T in a loop.
The processor function is called for each message received.
</summary>
<param name="processor">A function to process each message.</param>
<returns>An Agent instance.</returns>
val processor : ('T -> unit)
val inbox : Agent<'T>
val loop : (unit -> Async<'a>)
member Agent.Receive : unit -> Async<'T>
<summary>
Receives the next message from the agent's queue asynchronously.
</summary>
member Agent.Receive : timeOut:int -> Async<'T>
<summary>
Receives the next message from the agent's queue asynchronously, with a timeout.
</summary>
val createStateful : initialState:'State * processor:('State -> 'T -> 'State) -> Agent<'T>
<summary>
Creates and starts a stateful agent that maintains state of type 'State.
The processor function updates the state for each message received.
</summary>
<param name="initialState">The initial state value.</param>
<param name="processor">A function to process each message and update the state.</param>
<returns>An Agent instance.</returns>
val initialState : 'State
val processor : ('State -> 'T -> 'State)
val loop : ('State -> Async<'a>)
val state : 'State
val newState : 'State
val createReply : processor:('Request -> 'Reply) -> Agent<'Request * AsyncReplyChannel<'Reply>>
<summary>
Creates and starts an agent that supports request-reply messaging.
The processor function computes a reply for each request.
</summary>
<param name="processor">A function to process each request and produce a reply.</param>
<returns>An Agent instance.</returns>
val processor : ('Request -> 'Reply)
type AsyncReplyChannel<'Reply> =
member Reply : value:'Reply -> unit
val inbox : Agent<'Request * AsyncReplyChannel<'Reply>>
val request : 'Request
val replyChannel : AsyncReplyChannel<'Reply>
val reply : 'Reply
member AsyncReplyChannel.Reply : value:'Reply -> unit
val createStatefulReply : initialState:'State * processor:('State -> 'Request -> 'Reply * 'State) -> Agent<'Request * AsyncReplyChannel<'Reply>>
<summary>
Creates and starts a stateful agent that supports request-reply messaging.
The processor function computes a reply and updates the state for each request.
</summary>
<param name="initialState">The initial state value.</param>
<param name="processor">A function to process each request and update the state.</param>
<returns>An Agent instance.</returns>
val processor : ('State -> 'Request -> 'Reply * 'State)
val tryPostAndReply : timeout:int -> msg:'a -> agent:Agent<'a * AsyncReplyChannel<'b>> -> 'b option
<summary>
Posts a request to the agent and tries to synchronously receive a reply within the specified timeout.
</summary>
<param name="timeout">Timeout in milliseconds.</param>
<param name="msg">The request message.</param>
<param name="agent">The agent instance.</param>
<returns>Some(reply) if successful, None if timed out.</returns>
val msg : 'a
val agent : Agent<'a * AsyncReplyChannel<'b>>
member Agent.TryPostAndReply : messageBuilder:(AsyncReplyChannel<'a> -> 'T) * timeout:int -> 'a option
<summary>
Posts a message to the agent and synchronously waits for a reply, with a timeout.
</summary>
val replyChannel : AsyncReplyChannel<'b>
val postAndReply : msg:'a -> agent:Agent<'a * AsyncReplyChannel<'b>> -> 'b
<summary>
Posts a request to the agent and synchronously waits for a reply.
Throws if the reply times out.
</summary>
<param name="msg">The request message.</param>
<param name="agent">The agent instance.</param>
<returns>The reply value.</returns>
property Agent.DefaultTimeout: int with get, set
<summary>
Gets or sets the default timeout for reply operations.
</summary>
type Timeout =
static val Infinite : int
static val InfiniteTimeSpan : TimeSpan
field Timeout.Infinite: int = -1
union case Option.Some: Value: 'T -> Option<'T>
val v : 'b
union case Option.None: Option<'T>
val failwith : message:string -> 'T
member Agent.PostAndReply : messageBuilder:(AsyncReplyChannel<'a> -> 'T) -> 'a
<summary>
Posts a message to the agent and synchronously waits for a reply.
</summary>
val postAndAsyncReply : msg:'a -> agent:Agent<'a * AsyncReplyChannel<'b>> -> Async<'b>
<summary>
Posts a request to the agent and asynchronously waits for a reply.
</summary>
<param name="msg">The request message.</param>
<param name="agent">The agent instance.</param>
<returns>An Async computation returning the reply.</returns>
member Agent.PostAndAsyncReply : messageBuilder:(AsyncReplyChannel<'a> -> 'T) -> Async<'a>
<summary>
Posts a message to the agent and asynchronously waits for a reply.
</summary>
val postAndTryAsyncReply : timeout:int -> msg:'a -> agent:Agent<'a * AsyncReplyChannel<'b>> -> Async<'b option>
<summary>
Posts a request to the agent and asynchronously tries to receive a reply within the specified timeout.
</summary>
<param name="timeout">Timeout in milliseconds.</param>
<param name="msg">The request message.</param>
<param name="agent">The agent instance.</param>
<returns>An Async computation returning Some(reply) or None if timed out.</returns>
member Agent.PostAndTryAsyncReply : messageBuilder:(AsyncReplyChannel<'a> -> 'T) * timeout:int -> Async<'a option>
<summary>
Posts a message to the agent and asynchronously waits for a reply, with a timeout.
</summary>
val postAndAsyncReplyTask : msg:'a -> agent:Agent<'a * AsyncReplyChannel<'b>> -> Task<Async<'b>>
<summary>
Posts a request to the agent and returns a Task that completes with the reply.
</summary>
<param name="msg">The request message.</param>
<param name="agent">The agent instance.</param>
<returns>A Task returning the reply.</returns>
Multiple items
type Task =
interface IAsyncResult
interface IDisposable
new : canceled: bool * creationOptions: TaskCreationOptions * ct: CancellationToken -> unit + 11 overloads
member AddCompletionAction : action: ITaskCompletionAction -> unit + 1 overload
member AddException : exceptionObject: obj -> unit + 1 overload
member AddExceptionsFromChildren : props: ContingentProperties -> unit
member AddNewChild : unit -> unit
member AddTaskContinuation : tc: obj * addBeforeOthers: bool -> bool
member AddTaskContinuationComplex : tc: obj * addBeforeOthers: bool -> bool
member AssignCancellationToken : cancellationToken: CancellationToken * antecedent: Task * continuation: TaskContinuation -> unit
...
--------------------
type Task<'TResult> =
inherit Task
new : unit -> unit + 13 overloads
member ConfigureAwait : continueOnCapturedContext: bool -> ConfiguredTaskAwaitable<'TResult>
member ContinueWith : continuationAction: Action<Task<'TResult>> -> Task + 23 overloads
member DangerousSetResult : result: 'TResult -> unit
member GetAwaiter : unit -> TaskAwaiter<'TResult>
member GetResultCore : waitCompletionNotification: bool -> 'TResult
member InnerInvoke : unit -> unit
member TrySetResult : result: 'TResult -> bool
static member StartNew : parent: Task * function: Func<'TResult> * cancellationToken: CancellationToken * creationOptions: TaskCreationOptions * internalOptions: InternalTaskOptions * scheduler: TaskScheduler -> Task<'TResult> + 1 overload
...
--------------------
Task(action: Action) : Task
Task(action: Action, cancellationToken: CancellationToken) : Task
Task(action: Action, creationOptions: TaskCreationOptions) : Task
Task(action: Action<obj>, state: obj) : Task
Task(action: Action, cancellationToken: CancellationToken, creationOptions: TaskCreationOptions) : Task
Task(action: Action<obj>, state: obj, cancellationToken: CancellationToken) : Task
Task(action: Action<obj>, state: obj, creationOptions: TaskCreationOptions) : Task
Task(action: Action<obj>, state: obj, cancellationToken: CancellationToken, creationOptions: TaskCreationOptions) : Task
--------------------
Task(function: Func<'TResult>) : Task<'TResult>
Task(function: Func<'TResult>, cancellationToken: CancellationToken) : Task<'TResult>
Task(function: Func<'TResult>, creationOptions: TaskCreationOptions) : Task<'TResult>
Task(function: Func<obj,'TResult>, state: obj) : Task<'TResult>
Task(function: Func<'TResult>, cancellationToken: CancellationToken, creationOptions: TaskCreationOptions) : Task<'TResult>
Task(function: Func<obj,'TResult>, state: obj, cancellationToken: CancellationToken) : Task<'TResult>
Task(function: Func<obj,'TResult>, state: obj, creationOptions: TaskCreationOptions) : Task<'TResult>
Task(function: Func<obj,'TResult>, state: obj, cancellationToken: CancellationToken, creationOptions: TaskCreationOptions) : Task<'TResult>
Task.FromResult<'TResult>(result: 'TResult) : Task<'TResult>
val getDefaultTimeout : agent:Agent<'a> -> int
<summary>
Gets the default timeout for reply operations on the agent.
</summary>
<param name="agent">The agent instance.</param>
<returns>The default timeout in milliseconds.</returns>
val agent : Agent<'a>
val setDefaultTimeout : timeout:int -> agent:Agent<'a> -> unit
<summary>
Sets the default timeout for reply operations on the agent.
</summary>
<param name="timeout">Timeout in milliseconds.</param>
<param name="agent">The agent instance.</param>
val getCurrentQueueLength : agent:Agent<'a> -> int
<summary>
Gets the current number of messages in the agent's queue.
</summary>
<param name="agent">The agent instance.</param>
<returns>The number of queued messages.</returns>
property Agent.CurrentQueueLength: int with get
<summary>
Gets the current number of messages in the agent's queue.
</summary>
val dispose : agent:Agent<'a> -> unit
<summary>
Disposes the agent and cancels its processing.
</summary>
<param name="agent">The agent instance.</param>
val createLogger : unit -> Agent<string>
val logger : Agent<string>
Multiple items
module Agent
from Script
--------------------
type Agent<'T> =
interface IDisposable
new : body:(Agent<'T> -> Async<unit>) -> Agent<'T>
member Post : message:'T -> unit
member PostAndAsyncReply : messageBuilder:(AsyncReplyChannel<'a> -> 'T) -> Async<'a>
member PostAndReply : messageBuilder:(AsyncReplyChannel<'a> -> 'T) -> 'a
member PostAndTryAsyncReply : messageBuilder:(AsyncReplyChannel<'a> -> 'T) * timeout:int -> Async<'a option>
member Receive : timeOut:int -> Async<'T> + 1 overload
member Scan : scanner:('T -> Async<'a> option) -> Async<'a>
member TryPostAndReply : messageBuilder:(AsyncReplyChannel<'a> -> 'T) * timeout:int -> 'a option
member TryReceive : timeout:int -> Async<'T option>
...
<summary>
Represents an asynchronous agent that processes messages of type 'T.
This is a wrapper around FSharp's MailboxProcessor, providing a unified API for agent-based concurrency.
</summary>
--------------------
new : body:(Agent<'T> -> Async<unit>) -> Agent<'T>
val msg : string
val printfn : format:Printf.TextWriterFormat<'T> -> 'T
Multiple items
[<Struct>]
type DateTime =
new : ticks: int64 -> unit + 13 overloads
member Add : value: TimeSpan -> DateTime + 1 overload
member AddDays : value: float -> DateTime
member AddHours : value: float -> DateTime
member AddMilliseconds : value: float -> DateTime
member AddMinutes : value: float -> DateTime
member AddMonths : months: int -> DateTime
member AddSeconds : value: float -> DateTime
member AddTicks : value: int64 -> DateTime
member AddYears : value: int -> DateTime
...
--------------------
DateTime ()
(+0 other overloads)
DateTime(ticks: int64) : DateTime
(+0 other overloads)
DateTime(ticks: int64, kind: DateTimeKind) : DateTime
(+0 other overloads)
DateTime(year: int, month: int, day: int) : DateTime
(+0 other overloads)
DateTime(year: int, month: int, day: int, calendar: Globalization.Calendar) : DateTime
(+0 other overloads)
DateTime(year: int, month: int, day: int, hour: int, minute: int, second: int) : DateTime
(+0 other overloads)
DateTime(year: int, month: int, day: int, hour: int, minute: int, second: int, kind: DateTimeKind) : DateTime
(+0 other overloads)
DateTime(year: int, month: int, day: int, hour: int, minute: int, second: int, calendar: Globalization.Calendar) : DateTime
(+0 other overloads)
DateTime(year: int, month: int, day: int, hour: int, minute: int, second: int, millisecond: int) : DateTime
(+0 other overloads)
DateTime(year: int, month: int, day: int, hour: int, minute: int, second: int, millisecond: int, kind: DateTimeKind) : DateTime
(+0 other overloads)
property DateTime.Now: DateTime with get
DateTime.ToString() : string
DateTime.ToString(provider: IFormatProvider) : string
DateTime.ToString(format: string) : string
DateTime.ToString(format: string, provider: IFormatProvider) : string
property Agent.OnError: IEvent<exn> with get
<summary>
Event that is triggered when an unhandled exception occurs in the agent's processing loop.
</summary>
member IObservable.Add : callback:('T -> unit) -> unit
val ex : exn
property Exception.Message: string with get
val createCounter : unit -> Agent<string>
val count : int
val message : string
val newCount : int
type CalculatorMessage =
| Add of int * int
| Subtract of int * int
| Multiply of int * int
| Divide of int * int
union case CalculatorMessage.Add: int * int -> CalculatorMessage
Multiple items
val int : value:'T -> int (requires member op_Explicit)
--------------------
[<Struct>]
type int = int32
--------------------
type int<'Measure> =
int
union case CalculatorMessage.Subtract: int * int -> CalculatorMessage
union case CalculatorMessage.Multiply: int * int -> CalculatorMessage
union case CalculatorMessage.Divide: int * int -> CalculatorMessage
val createUnsafeCalculator : unit -> Agent<CalculatorMessage * AsyncReplyChannel<int>>
val message : CalculatorMessage
val x : int
val y : int
val createSafeCalculator : unit -> Agent<CalculatorMessage * AsyncReplyChannel<Result<int,DivideByZeroException>>>
val op : (int -> int -> int)
union case Result.Ok: ResultValue: 'T -> Result<'T,'TError>
Multiple items
type DivideByZeroException =
inherit ArithmeticException
new : unit -> unit + 3 overloads
static val InnerExceptionPrefix : string
static val s_DispatchStateLock : obj
--------------------
DivideByZeroException() : DivideByZeroException
DivideByZeroException(message: string) : DivideByZeroException
DivideByZeroException(message: string, innerException: exn) : DivideByZeroException
val ex : DivideByZeroException
union case Result.Error: ErrorValue: 'TError -> Result<'T,'TError>
module Examples
from Script
member Agent.Post : message:'T -> unit
<summary>
Posts a message to the agent asynchronously.
</summary>
val counter : Agent<string>
val unsafeCalculator : Agent<CalculatorMessage * AsyncReplyChannel<int>>
val result : int
static member Async.Start : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
val result : int option
val safeCalculator : Agent<CalculatorMessage * AsyncReplyChannel<Result<int,DivideByZeroException>>>
More information