92 people like it.
Like the snippet!
Implementing active objects with a MailboxProcessor
Mailbox processors can easily be used to implement active objects. This example shows how to do that with a reusable wrapper type and minimal boilerplate code in the actual class definitions. Supports both asynchronous calls and synchronous calls. For the latter case, exceptions are automatically propagated back to the caller.
1:
2:
3:
4:
5:
6:
7:
8:
9:
10:
11:
12:
13:
14:
15:
16:
17:
18:
19:
20:
21:
22:
23:
24:
25:
26:
27:
28:
29:
30:
31:
32:
33:
34:
35:
36:
37:
38:
39:
40:
41:
42:
43:
44:
45:
46:
47:
48:
49:
50:
51:
52:
53:
54:
55:
56:
57:
58:
59:
60:
61:
62:
63:
64:
65:
66:
67:
68:
69:
70:
71:
72:
73:
74:
75:
76:
77:
78:
79:
80:
81:
82:
83:
84:
85:
86:
87:
88:
89:
90:
91:
|
open System
// Synchronous calls may either return a value or propagate an exception.
type SyncReply =
| Value of obj
| Exception of Exception
// Two types of messages are used by the mailbox processor:
// Asynchronous messages take a unary procedure and an argument.
// Synchronous messages take a unary function, an argument and a reply channel for the result.
type Message =
| AsyncCall of (obj->unit) * obj
| SyncCall of (obj->obj) * obj * AsyncReplyChannel<SyncReply>
/// Wraps a mailbox processor for easier implementation of active objects.
type Agent() =
let agent = MailboxProcessor.Start( fun inbox ->
async {
while true do
let! msg = inbox.Receive()
match msg with
| AsyncCall(f, args) ->
try
f args
with
| ex -> printfn "Warning: exception in asynchronous call (%A)" ex
| SyncCall(f, args, replyChannel) ->
try
f args |> Value |> replyChannel.Reply
with
| ex -> ex |> Exception |> replyChannel.Reply
})
member x.Async (f:'T->unit) (args:'T) =
let f' (o:obj) = f (o :?> 'T)
agent.Post( AsyncCall(f', args) )
member x.Sync (f:'T->'U) (args:'T) : 'U =
let f' (o:obj) = f (o :?> 'T) :> obj
let reply = agent.PostAndReply( fun replyChannel -> SyncCall (f', args, replyChannel) )
match reply with
| Exception ex -> raise ex
| Value v -> v :?> 'U
// Example: a simple Logger (supports two log levels, writes to stdout)
type LogLevel = Debug=1 | Error=2
type Logger(?logLevel) =
let mutable logLevel = defaultArg logLevel LogLevel.Error
let mutable lastMessage = None
// implement functionality as private let-bound functions
// - use tuples if more than one argument is needed
// - only synchronously used functions should throw exceptions
let log(level, line:string) =
if level >= logLevel then
lastMessage <- Some line
printfn "%s" line
let getLastMessage() =
match lastMessage with
| None -> failwith "no last message"
| Some m -> m
// expose asynchronous and synchronous methods using an agent
let agent = new Agent()
member x.LogError line = agent.Async log (LogLevel.Error, line)
member x.LogDebug line = agent.Async log (LogLevel.Debug, line)
member x.LastMessage = agent.Sync getLastMessage ()
// Example use of Logger
do
let logger = new Logger()
logger.LogDebug "this will not be logged because of the log level"
try
printfn "%s" logger.LastMessage // throws (in calling thread)
with
| ex -> printfn "%s" ex.Message
logger.LogError "this will be logged"
printfn "Press enter to end program"
Console.ReadLine() |> ignore
|
namespace System
type SyncReply =
| Value of obj
| Exception of Exception
Full name: Script.SyncReply
union case SyncReply.Value: obj -> SyncReply
type obj = Object
Full name: Microsoft.FSharp.Core.obj
Multiple items
union case SyncReply.Exception: Exception -> SyncReply
--------------------
type Exception =
new : unit -> Exception + 2 overloads
member Data : IDictionary
member GetBaseException : unit -> Exception
member GetObjectData : info:SerializationInfo * context:StreamingContext -> unit
member GetType : unit -> Type
member HelpLink : string with get, set
member InnerException : Exception
member Message : string
member Source : string with get, set
member StackTrace : string
...
Full name: System.Exception
--------------------
Exception() : unit
Exception(message: string) : unit
Exception(message: string, innerException: exn) : unit
type Message =
| AsyncCall of (obj -> unit) * obj
| SyncCall of (obj -> obj) * obj * AsyncReplyChannel<SyncReply>
Full name: Script.Message
union case Message.AsyncCall: (obj -> unit) * obj -> Message
type unit = Unit
Full name: Microsoft.FSharp.Core.unit
union case Message.SyncCall: (obj -> obj) * obj * AsyncReplyChannel<SyncReply> -> Message
type AsyncReplyChannel<'Reply>
member Reply : value:'Reply -> unit
Full name: Microsoft.FSharp.Control.AsyncReplyChannel<_>
Multiple items
type Agent =
new : unit -> Agent
member Async : f:('T -> unit) -> args:'T -> unit
member Sync : f:('T -> 'U) -> args:'T -> 'U
Full name: Script.Agent
Wraps a mailbox processor for easier implementation of active objects.
--------------------
new : unit -> Agent
val agent : MailboxProcessor<Message>
Multiple items
type MailboxProcessor<'Msg> =
interface IDisposable
new : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:CancellationToken -> MailboxProcessor<'Msg>
member Post : message:'Msg -> unit
member PostAndAsyncReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> Async<'Reply>
member PostAndReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> 'Reply
member PostAndTryAsyncReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> Async<'Reply option>
member Receive : ?timeout:int -> Async<'Msg>
member Scan : scanner:('Msg -> Async<'T> option) * ?timeout:int -> Async<'T>
member Start : unit -> unit
member TryPostAndReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> 'Reply option
...
Full name: Microsoft.FSharp.Control.MailboxProcessor<_>
--------------------
new : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:Threading.CancellationToken -> MailboxProcessor<'Msg>
static member MailboxProcessor.Start : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:Threading.CancellationToken -> MailboxProcessor<'Msg>
val inbox : MailboxProcessor<Message>
val async : AsyncBuilder
Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.async
val msg : Message
member MailboxProcessor.Receive : ?timeout:int -> Async<'Msg>
val f : (obj -> unit)
val args : obj
val ex : exn
val printfn : format:Printf.TextWriterFormat<'T> -> 'T
Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.printfn
val f : (obj -> obj)
val replyChannel : AsyncReplyChannel<SyncReply>
member AsyncReplyChannel.Reply : value:'Reply -> unit
val x : Agent
Multiple items
member Agent.Async : f:('T -> unit) -> args:'T -> unit
Full name: Script.Agent.Async
--------------------
type Async
static member AsBeginEnd : computation:('Arg -> Async<'T>) -> ('Arg * AsyncCallback * obj -> IAsyncResult) * (IAsyncResult -> 'T) * (IAsyncResult -> unit)
static member AwaitEvent : event:IEvent<'Del,'T> * ?cancelAction:(unit -> unit) -> Async<'T> (requires delegate and 'Del :> Delegate)
static member AwaitIAsyncResult : iar:IAsyncResult * ?millisecondsTimeout:int -> Async<bool>
static member AwaitTask : task:Task<'T> -> Async<'T>
static member AwaitWaitHandle : waitHandle:WaitHandle * ?millisecondsTimeout:int -> Async<bool>
static member CancelDefaultToken : unit -> unit
static member Catch : computation:Async<'T> -> Async<Choice<'T,exn>>
static member FromBeginEnd : beginAction:(AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg:'Arg1 * beginAction:('Arg1 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg1:'Arg1 * arg2:'Arg2 * beginAction:('Arg1 * 'Arg2 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg1:'Arg1 * arg2:'Arg2 * arg3:'Arg3 * beginAction:('Arg1 * 'Arg2 * 'Arg3 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromContinuations : callback:(('T -> unit) * (exn -> unit) * (OperationCanceledException -> unit) -> unit) -> Async<'T>
static member Ignore : computation:Async<'T> -> Async<unit>
static member OnCancel : interruption:(unit -> unit) -> Async<IDisposable>
static member Parallel : computations:seq<Async<'T>> -> Async<'T []>
static member RunSynchronously : computation:Async<'T> * ?timeout:int * ?cancellationToken:CancellationToken -> 'T
static member Sleep : millisecondsDueTime:int -> Async<unit>
static member Start : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
static member StartAsTask : computation:Async<'T> * ?taskCreationOptions:TaskCreationOptions * ?cancellationToken:CancellationToken -> Task<'T>
static member StartChild : computation:Async<'T> * ?millisecondsTimeout:int -> Async<Async<'T>>
static member StartChildAsTask : computation:Async<'T> * ?taskCreationOptions:TaskCreationOptions -> Async<Task<'T>>
static member StartImmediate : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
static member StartWithContinuations : computation:Async<'T> * continuation:('T -> unit) * exceptionContinuation:(exn -> unit) * cancellationContinuation:(OperationCanceledException -> unit) * ?cancellationToken:CancellationToken -> unit
static member SwitchToContext : syncContext:SynchronizationContext -> Async<unit>
static member SwitchToNewThread : unit -> Async<unit>
static member SwitchToThreadPool : unit -> Async<unit>
static member TryCancelled : computation:Async<'T> * compensation:(OperationCanceledException -> unit) -> Async<'T>
static member CancellationToken : Async<CancellationToken>
static member DefaultCancellationToken : CancellationToken
Full name: Microsoft.FSharp.Control.Async
--------------------
type Async<'T>
Full name: Microsoft.FSharp.Control.Async<_>
val f : ('T -> unit)
val args : 'T
val f' : (obj -> unit)
val o : obj
member MailboxProcessor.Post : message:'Msg -> unit
member Agent.Sync : f:('T -> 'U) -> args:'T -> 'U
Full name: Script.Agent.Sync
val f : ('T -> 'U)
val f' : (obj -> obj)
val reply : SyncReply
member MailboxProcessor.PostAndReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> 'Reply
val ex : Exception
val raise : exn:Exception -> 'T
Full name: Microsoft.FSharp.Core.Operators.raise
val v : obj
type LogLevel =
| Debug = 1
| Error = 2
Full name: Script.LogLevel
LogLevel.Debug: LogLevel = 1
LogLevel.Error: LogLevel = 2
Multiple items
type Logger =
new : ?logLevel:LogLevel -> Logger
member LogDebug : line:string -> unit
member LogError : line:string -> unit
member LastMessage : string
Full name: Script.Logger
--------------------
new : ?logLevel:LogLevel -> Logger
val logLevel : LogLevel option
val mutable logLevel : LogLevel
val defaultArg : arg:'T option -> defaultValue:'T -> 'T
Full name: Microsoft.FSharp.Core.Operators.defaultArg
val mutable lastMessage : string option
union case Option.None: Option<'T>
val log : (LogLevel * string -> unit)
val level : LogLevel
val line : string
Multiple items
val string : value:'T -> string
Full name: Microsoft.FSharp.Core.Operators.string
--------------------
type string = String
Full name: Microsoft.FSharp.Core.string
union case Option.Some: Value: 'T -> Option<'T>
val getLastMessage : (unit -> string)
val failwith : message:string -> 'T
Full name: Microsoft.FSharp.Core.Operators.failwith
val m : string
val agent : Agent
val x : Logger
member Logger.LogError : line:string -> unit
Full name: Script.Logger.LogError
member Agent.Async : f:('T -> unit) -> args:'T -> unit
member Logger.LogDebug : line:string -> unit
Full name: Script.Logger.LogDebug
member Logger.LastMessage : string
Full name: Script.Logger.LastMessage
member Agent.Sync : f:('T -> 'U) -> args:'T -> 'U
val logger : Logger
member Logger.LogDebug : line:string -> unit
property Logger.LastMessage: string
property Exception.Message: string
member Logger.LogError : line:string -> unit
type Console =
static member BackgroundColor : ConsoleColor with get, set
static member Beep : unit -> unit + 1 overload
static member BufferHeight : int with get, set
static member BufferWidth : int with get, set
static member CapsLock : bool
static member Clear : unit -> unit
static member CursorLeft : int with get, set
static member CursorSize : int with get, set
static member CursorTop : int with get, set
static member CursorVisible : bool with get, set
...
Full name: System.Console
Console.ReadLine() : string
val ignore : value:'T -> unit
Full name: Microsoft.FSharp.Core.Operators.ignore
More information