92 people like it.

Implementing active objects with a MailboxProcessor

Mailbox processors can easily be used to implement active objects. This example shows how to do that with a reusable wrapper type and minimal boilerplate code in the actual class definitions. Supports both asynchronous calls and synchronous calls. For the latter case, exceptions are automatically propagated back to the caller.

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
14: 
15: 
16: 
17: 
18: 
19: 
20: 
21: 
22: 
23: 
24: 
25: 
26: 
27: 
28: 
29: 
30: 
31: 
32: 
33: 
34: 
35: 
36: 
37: 
38: 
39: 
40: 
41: 
42: 
43: 
44: 
45: 
46: 
47: 
48: 
49: 
50: 
51: 
52: 
53: 
54: 
55: 
56: 
57: 
58: 
59: 
60: 
61: 
62: 
63: 
64: 
65: 
66: 
67: 
68: 
69: 
70: 
71: 
72: 
73: 
74: 
75: 
76: 
77: 
78: 
79: 
80: 
81: 
82: 
83: 
84: 
85: 
86: 
87: 
88: 
89: 
90: 
91: 
open System

// Synchronous calls may either return a value or propagate an exception.
type SyncReply =
  | Value of obj
  | Exception of Exception

// Two types of messages are used by the mailbox processor:
//  Asynchronous messages take a unary procedure and an argument.
//  Synchronous messages take a unary function, an argument and a reply channel for the result.
type Message =
  | AsyncCall of (obj->unit) * obj
  | SyncCall of (obj->obj) * obj * AsyncReplyChannel<SyncReply>

/// Wraps a mailbox processor for easier implementation of active objects.
type Agent() =
   let agent = MailboxProcessor.Start( fun inbox ->
      async {
         while true do
            let! msg = inbox.Receive()
            match msg with
            | AsyncCall(f, args) ->
                try
                    f args
                with
                    | ex -> printfn "Warning: exception in asynchronous call (%A)" ex
            | SyncCall(f, args, replyChannel) ->
                try
                    f args |> Value |> replyChannel.Reply
                with
                    | ex -> ex |> Exception |> replyChannel.Reply     
      })
   
   member x.Async (f:'T->unit) (args:'T) =
      let f' (o:obj) = f (o :?> 'T)
      agent.Post( AsyncCall(f', args) )

   member x.Sync (f:'T->'U) (args:'T) : 'U =
      let f' (o:obj) = f (o :?> 'T) :> obj
      let reply = agent.PostAndReply( fun replyChannel -> SyncCall (f', args, replyChannel) )
      match reply with
      | Exception ex -> raise ex
      | Value v -> v :?> 'U
  

// Example: a simple Logger (supports two log levels, writes to stdout)

type LogLevel = Debug=1 | Error=2

type Logger(?logLevel) =
   let mutable logLevel = defaultArg logLevel LogLevel.Error
   let mutable lastMessage = None

   // implement functionality as private let-bound functions
   //  - use tuples if more than one argument is needed
   //  - only synchronously used functions should throw exceptions

   let log(level, line:string) =
      if level >= logLevel then
         lastMessage <- Some line
         printfn "%s" line

   let getLastMessage() =
      match lastMessage with
      | None -> failwith "no last message"
      | Some m -> m

   // expose asynchronous and synchronous methods using an agent

   let agent = new Agent()

   member x.LogError line = agent.Async log (LogLevel.Error, line)
   member x.LogDebug line = agent.Async log (LogLevel.Debug, line)

   member x.LastMessage = agent.Sync getLastMessage ()


// Example use of Logger
do
   let logger = new Logger()
   logger.LogDebug "this will not be logged because of the log level"

   try
      printfn "%s" logger.LastMessage  // throws (in calling thread)
   with
      | ex -> printfn "%s" ex.Message

   logger.LogError "this will be logged"

   printfn "Press enter to end program"
   Console.ReadLine() |> ignore
namespace System
type SyncReply =
  | Value of obj
  | Exception of Exception

Full name: Script.SyncReply
union case SyncReply.Value: obj -> SyncReply
type obj = Object

Full name: Microsoft.FSharp.Core.obj
Multiple items
union case SyncReply.Exception: Exception -> SyncReply

--------------------
type Exception =
  new : unit -> Exception + 2 overloads
  member Data : IDictionary
  member GetBaseException : unit -> Exception
  member GetObjectData : info:SerializationInfo * context:StreamingContext -> unit
  member GetType : unit -> Type
  member HelpLink : string with get, set
  member InnerException : Exception
  member Message : string
  member Source : string with get, set
  member StackTrace : string
  ...

Full name: System.Exception

--------------------
Exception() : unit
Exception(message: string) : unit
Exception(message: string, innerException: exn) : unit
type Message =
  | AsyncCall of (obj -> unit) * obj
  | SyncCall of (obj -> obj) * obj * AsyncReplyChannel<SyncReply>

Full name: Script.Message
union case Message.AsyncCall: (obj -> unit) * obj -> Message
type unit = Unit

Full name: Microsoft.FSharp.Core.unit
union case Message.SyncCall: (obj -> obj) * obj * AsyncReplyChannel<SyncReply> -> Message
type AsyncReplyChannel<'Reply>
member Reply : value:'Reply -> unit

Full name: Microsoft.FSharp.Control.AsyncReplyChannel<_>
Multiple items
type Agent =
  new : unit -> Agent
  member Async : f:('T -> unit) -> args:'T -> unit
  member Sync : f:('T -> 'U) -> args:'T -> 'U

Full name: Script.Agent


 Wraps a mailbox processor for easier implementation of active objects.


--------------------
new : unit -> Agent
val agent : MailboxProcessor<Message>
Multiple items
type MailboxProcessor<'Msg> =
  interface IDisposable
  new : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:CancellationToken -> MailboxProcessor<'Msg>
  member Post : message:'Msg -> unit
  member PostAndAsyncReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> Async<'Reply>
  member PostAndReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> 'Reply
  member PostAndTryAsyncReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> Async<'Reply option>
  member Receive : ?timeout:int -> Async<'Msg>
  member Scan : scanner:('Msg -> Async<'T> option) * ?timeout:int -> Async<'T>
  member Start : unit -> unit
  member TryPostAndReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> 'Reply option
  ...

Full name: Microsoft.FSharp.Control.MailboxProcessor<_>

--------------------
new : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:Threading.CancellationToken -> MailboxProcessor<'Msg>
static member MailboxProcessor.Start : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:Threading.CancellationToken -> MailboxProcessor<'Msg>
val inbox : MailboxProcessor<Message>
val async : AsyncBuilder

Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.async
val msg : Message
member MailboxProcessor.Receive : ?timeout:int -> Async<'Msg>
val f : (obj -> unit)
val args : obj
val ex : exn
val printfn : format:Printf.TextWriterFormat<'T> -> 'T

Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.printfn
val f : (obj -> obj)
val replyChannel : AsyncReplyChannel<SyncReply>
member AsyncReplyChannel.Reply : value:'Reply -> unit
val x : Agent
Multiple items
member Agent.Async : f:('T -> unit) -> args:'T -> unit

Full name: Script.Agent.Async

--------------------
type Async
static member AsBeginEnd : computation:('Arg -> Async<'T>) -> ('Arg * AsyncCallback * obj -> IAsyncResult) * (IAsyncResult -> 'T) * (IAsyncResult -> unit)
static member AwaitEvent : event:IEvent<'Del,'T> * ?cancelAction:(unit -> unit) -> Async<'T> (requires delegate and 'Del :> Delegate)
static member AwaitIAsyncResult : iar:IAsyncResult * ?millisecondsTimeout:int -> Async<bool>
static member AwaitTask : task:Task<'T> -> Async<'T>
static member AwaitWaitHandle : waitHandle:WaitHandle * ?millisecondsTimeout:int -> Async<bool>
static member CancelDefaultToken : unit -> unit
static member Catch : computation:Async<'T> -> Async<Choice<'T,exn>>
static member FromBeginEnd : beginAction:(AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg:'Arg1 * beginAction:('Arg1 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg1:'Arg1 * arg2:'Arg2 * beginAction:('Arg1 * 'Arg2 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg1:'Arg1 * arg2:'Arg2 * arg3:'Arg3 * beginAction:('Arg1 * 'Arg2 * 'Arg3 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromContinuations : callback:(('T -> unit) * (exn -> unit) * (OperationCanceledException -> unit) -> unit) -> Async<'T>
static member Ignore : computation:Async<'T> -> Async<unit>
static member OnCancel : interruption:(unit -> unit) -> Async<IDisposable>
static member Parallel : computations:seq<Async<'T>> -> Async<'T []>
static member RunSynchronously : computation:Async<'T> * ?timeout:int * ?cancellationToken:CancellationToken -> 'T
static member Sleep : millisecondsDueTime:int -> Async<unit>
static member Start : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
static member StartAsTask : computation:Async<'T> * ?taskCreationOptions:TaskCreationOptions * ?cancellationToken:CancellationToken -> Task<'T>
static member StartChild : computation:Async<'T> * ?millisecondsTimeout:int -> Async<Async<'T>>
static member StartChildAsTask : computation:Async<'T> * ?taskCreationOptions:TaskCreationOptions -> Async<Task<'T>>
static member StartImmediate : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
static member StartWithContinuations : computation:Async<'T> * continuation:('T -> unit) * exceptionContinuation:(exn -> unit) * cancellationContinuation:(OperationCanceledException -> unit) * ?cancellationToken:CancellationToken -> unit
static member SwitchToContext : syncContext:SynchronizationContext -> Async<unit>
static member SwitchToNewThread : unit -> Async<unit>
static member SwitchToThreadPool : unit -> Async<unit>
static member TryCancelled : computation:Async<'T> * compensation:(OperationCanceledException -> unit) -> Async<'T>
static member CancellationToken : Async<CancellationToken>
static member DefaultCancellationToken : CancellationToken

Full name: Microsoft.FSharp.Control.Async

--------------------
type Async<'T>

Full name: Microsoft.FSharp.Control.Async<_>
val f : ('T -> unit)
val args : 'T
val f' : (obj -> unit)
val o : obj
member MailboxProcessor.Post : message:'Msg -> unit
member Agent.Sync : f:('T -> 'U) -> args:'T -> 'U

Full name: Script.Agent.Sync
val f : ('T -> 'U)
val f' : (obj -> obj)
val reply : SyncReply
member MailboxProcessor.PostAndReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> 'Reply
val ex : Exception
val raise : exn:Exception -> 'T

Full name: Microsoft.FSharp.Core.Operators.raise
val v : obj
type LogLevel =
  | Debug = 1
  | Error = 2

Full name: Script.LogLevel
LogLevel.Debug: LogLevel = 1
LogLevel.Error: LogLevel = 2
Multiple items
type Logger =
  new : ?logLevel:LogLevel -> Logger
  member LogDebug : line:string -> unit
  member LogError : line:string -> unit
  member LastMessage : string

Full name: Script.Logger

--------------------
new : ?logLevel:LogLevel -> Logger
val logLevel : LogLevel option
val mutable logLevel : LogLevel
val defaultArg : arg:'T option -> defaultValue:'T -> 'T

Full name: Microsoft.FSharp.Core.Operators.defaultArg
val mutable lastMessage : string option
union case Option.None: Option<'T>
val log : (LogLevel * string -> unit)
val level : LogLevel
val line : string
Multiple items
val string : value:'T -> string

Full name: Microsoft.FSharp.Core.Operators.string

--------------------
type string = String

Full name: Microsoft.FSharp.Core.string
union case Option.Some: Value: 'T -> Option<'T>
val getLastMessage : (unit -> string)
val failwith : message:string -> 'T

Full name: Microsoft.FSharp.Core.Operators.failwith
val m : string
val agent : Agent
val x : Logger
member Logger.LogError : line:string -> unit

Full name: Script.Logger.LogError
member Agent.Async : f:('T -> unit) -> args:'T -> unit
member Logger.LogDebug : line:string -> unit

Full name: Script.Logger.LogDebug
member Logger.LastMessage : string

Full name: Script.Logger.LastMessage
member Agent.Sync : f:('T -> 'U) -> args:'T -> 'U
val logger : Logger
member Logger.LogDebug : line:string -> unit
property Logger.LastMessage: string
property Exception.Message: string
member Logger.LogError : line:string -> unit
type Console =
  static member BackgroundColor : ConsoleColor with get, set
  static member Beep : unit -> unit + 1 overload
  static member BufferHeight : int with get, set
  static member BufferWidth : int with get, set
  static member CapsLock : bool
  static member Clear : unit -> unit
  static member CursorLeft : int with get, set
  static member CursorSize : int with get, set
  static member CursorTop : int with get, set
  static member CursorVisible : bool with get, set
  ...

Full name: System.Console
Console.ReadLine() : string
val ignore : value:'T -> unit

Full name: Microsoft.FSharp.Core.Operators.ignore
Raw view Test code New version

More information

Link:http://fssnip.net/3l
Posted:13 years ago
Author:Wolfgang Meyer
Tags: asynchronous programming , active objects , concurrency