2 people like it.

Reporting events from agents

This snippet shows different options for reporting events from an F# agent. The options include triggering the event directly, using a thread pool or using a specified synchronization context.

Triggering events directly

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
14: 
15: 
16: 
17: 
18: 
19: 
20: 
21: 
22: 
23: 
24: 
25: 
26: 
27: 
28: 
29: 
30: 
31: 
    /// Type alias that gives convenient name to F# agent type
    type Agent<'T> = MailboxProcessor<'T>
    
    /// Agent that implements batch processing
    type BatchProcessor<'T>(count) =
      // Event used to report aggregated batches to the user
      let batchEvent = new Event<'T[]>()
      // Trigger event on the thread where the agent is running
      let reportBatch batch =
        try
          // If the handler throws, we need to handle the exception
          batchEvent.Trigger(batch)
        with e ->
          printfn "Event handler failed: %A" e
    
      // Start an agent that implements the batching
      let agent = Agent<'T>.Start(fun inbox -> async {
        while true do
          // Repeatedly allocate a new queue 
          let queue = new ResizeArray<_>()
          // Add specified number of messages to the queue
          for i in 1 .. count do
            let! msg = inbox.Receive()
            queue.Add(msg)
          // Report the batch as an array
          reportBatch (queue.ToArray()) })
    
      /// Event that is triggered when a batch is collected
      member x.BatchProduced = batchEvent.Publish
      /// The method adds one object to the agent
      member x.Post(value) = agent.Post(value)

Triggering events in a thread pool

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
14: 
15: 
16: 
17: 
18: 
19: 
20: 
21: 
22: 
23: 
24: 
25: 
26: 
    /// Agent that implements batch processing
    type BatchProcessor<'T>(count) =
      // Event used to report aggregated batches to the user
      let batchEvent = new Event<'T[]>()
      // Trigger event in a thread pool
      let reportBatch batch =
        // Create simple workflow & start it in the background
        async { batchEvent.Trigger(batch) } 
        |> Async.Start
    
      // Start an agent that implements the batching
      let agent = Agent<'T>.Start(fun inbox -> async {
        while true do
          // Repeatedly allocate a new queue 
          let queue = new ResizeArray<_>()
          // Add specified number of messages to the queue
          for i in 1 .. count do
            let! msg = inbox.Receive()
            queue.Add(msg)
          // Report the batch as an array
          reportBatch (queue.ToArray()) })
    
      /// Event that is triggered when a batch is collected
      member x.BatchProduced = batchEvent.Publish
      /// The method adds one object to the agent
      member x.Post(value) = agent.Post(value)

Reporting events using synchronization context

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
14: 
15: 
16: 
17: 
18: 
19: 
20: 
21: 
22: 
23: 
24: 
25: 
open System.Threading

/// Agent that implements batch processing (eventContext can 
/// be provided to specify synchronization context for event reporting)
type BatchProcessor<'T>(count, ?eventContext:SynchronizationContext) =
  /// Event used to report aggregated batches to the user
  let batchEvent = new Event<'T[]>()

  /// Triggers event using the specified synchronization context
  /// (or directly, if no synchronization context is specified)
  let reportBatch batch =
    match eventContext with 
    | None -> 
        // No synchronization context - trigger as in the first case
        batchEvent.Trigger(batch)
    | Some ctx ->
        // Use the 'Post' method of the context to trigger the event
        ctx.Post((fun _ -> batchEvent.Trigger(batch)), null)

  (unchanged agent body)

  /// Event that is triggered when a batch is collected
  member x.BatchProduced = batchEvent.Publish
  /// The method adds one object to the agent
  member x.Post(value) = agent.Post(value)

Capturing current (user-interface) context

1: 
2: 
3: 
4: 
5: 
6: 
7: 
8: 
// Agent that will trigger events on the current (GUI) thread
let sync = SynchronizationContext.Current
let proc = BatchProcessor<_>(10, sync)

// Start some background work that will report batches to GUI thread
async {
  for i in 0 .. 1000 do 
    proc.Post(i) } |> Async.Start
type Agent<'T> = MailboxProcessor<'T>

Full name: Demo.Template.Agent<_>


 Type alias that gives convenient name to F# agent type
Multiple items
type MailboxProcessor<'Msg> =
  interface IDisposable
  new : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:CancellationToken -> MailboxProcessor<'Msg>
  member Post : message:'Msg -> unit
  member PostAndAsyncReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> Async<'Reply>
  member PostAndReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> 'Reply
  member PostAndTryAsyncReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> Async<'Reply option>
  member Receive : ?timeout:int -> Async<'Msg>
  member Scan : scanner:('Msg -> Async<'T> option) * ?timeout:int -> Async<'T>
  member Start : unit -> unit
  member TryPostAndReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> 'Reply option
  ...

Full name: Microsoft.FSharp.Control.MailboxProcessor<_>

--------------------
new : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:System.Threading.CancellationToken -> MailboxProcessor<'Msg>
Multiple items
type BatchProcessor<'T> =
  new : count:int -> BatchProcessor<'T>
  member Post : value:'T -> unit
  member BatchProduced : IEvent<'T []>

Full name: Demo.Template.BatchProcessor<_>


 Agent that implements batch processing


--------------------
new : count:int -> BatchProcessor<'T>
val count : int
val batchEvent : Event<'T []>
Multiple items
module Event

from Microsoft.FSharp.Control

--------------------
type Event<'T> =
  new : unit -> Event<'T>
  member Trigger : arg:'T -> unit
  member Publish : IEvent<'T>

Full name: Microsoft.FSharp.Control.Event<_>

--------------------
type Event<'Delegate,'Args (requires delegate and 'Delegate :> Delegate)> =
  new : unit -> Event<'Delegate,'Args>
  member Trigger : sender:obj * args:'Args -> unit
  member Publish : IEvent<'Delegate,'Args>

Full name: Microsoft.FSharp.Control.Event<_,_>

--------------------
new : unit -> Event<'T>

--------------------
new : unit -> Event<'Delegate,'Args>
val reportBatch : ('T [] -> unit)
val batch : 'T []
member Event.Trigger : arg:'T -> unit
val e : exn
val printfn : format:Printf.TextWriterFormat<'T> -> 'T

Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.printfn
val agent : MailboxProcessor<'T>
val inbox : MailboxProcessor<'T>
val async : AsyncBuilder

Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.async
val queue : ResizeArray<'T>
type ResizeArray<'T> = System.Collections.Generic.List<'T>

Full name: Microsoft.FSharp.Collections.ResizeArray<_>
val i : int
val msg : 'T
member MailboxProcessor.Receive : ?timeout:int -> Async<'Msg>
System.Collections.Generic.List.Add(item: 'T) : unit
System.Collections.Generic.List.ToArray() : 'T []
val x : BatchProcessor<'T>
member BatchProcessor.BatchProduced : IEvent<'T []>

Full name: Demo.Template.BatchProcessor`1.BatchProduced


 Event that is triggered when a batch is collected
property Event.Publish: IEvent<'T []>
member BatchProcessor.Post : value:'T -> unit

Full name: Demo.Template.BatchProcessor`1.Post


 The method adds one object to the agent
val value : 'T
member MailboxProcessor.Post : message:'Msg -> unit
Multiple items
type BatchProcessor<'T> =
  new : count:int -> BatchProcessor<'T>
  member Post : value:'T -> unit
  member BatchProduced : IEvent<'T []>

Full name: Demo.ThreadPool.BatchProcessor<_>


 Agent that implements batch processing


--------------------
new : count:int -> BatchProcessor<'T>
Multiple items
type Async
static member AsBeginEnd : computation:('Arg -> Async<'T>) -> ('Arg * AsyncCallback * obj -> IAsyncResult) * (IAsyncResult -> 'T) * (IAsyncResult -> unit)
static member AwaitEvent : event:IEvent<'Del,'T> * ?cancelAction:(unit -> unit) -> Async<'T> (requires delegate and 'Del :> Delegate)
static member AwaitIAsyncResult : iar:IAsyncResult * ?millisecondsTimeout:int -> Async<bool>
static member AwaitTask : task:Task<'T> -> Async<'T>
static member AwaitWaitHandle : waitHandle:WaitHandle * ?millisecondsTimeout:int -> Async<bool>
static member CancelDefaultToken : unit -> unit
static member Catch : computation:Async<'T> -> Async<Choice<'T,exn>>
static member FromBeginEnd : beginAction:(AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg:'Arg1 * beginAction:('Arg1 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg1:'Arg1 * arg2:'Arg2 * beginAction:('Arg1 * 'Arg2 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg1:'Arg1 * arg2:'Arg2 * arg3:'Arg3 * beginAction:('Arg1 * 'Arg2 * 'Arg3 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromContinuations : callback:(('T -> unit) * (exn -> unit) * (OperationCanceledException -> unit) -> unit) -> Async<'T>
static member Ignore : computation:Async<'T> -> Async<unit>
static member OnCancel : interruption:(unit -> unit) -> Async<IDisposable>
static member Parallel : computations:seq<Async<'T>> -> Async<'T []>
static member RunSynchronously : computation:Async<'T> * ?timeout:int * ?cancellationToken:CancellationToken -> 'T
static member Sleep : millisecondsDueTime:int -> Async<unit>
static member Start : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
static member StartAsTask : computation:Async<'T> * ?taskCreationOptions:TaskCreationOptions * ?cancellationToken:CancellationToken -> Task<'T>
static member StartChild : computation:Async<'T> * ?millisecondsTimeout:int -> Async<Async<'T>>
static member StartChildAsTask : computation:Async<'T> * ?taskCreationOptions:TaskCreationOptions -> Async<Task<'T>>
static member StartImmediate : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
static member StartWithContinuations : computation:Async<'T> * continuation:('T -> unit) * exceptionContinuation:(exn -> unit) * cancellationContinuation:(OperationCanceledException -> unit) * ?cancellationToken:CancellationToken -> unit
static member SwitchToContext : syncContext:SynchronizationContext -> Async<unit>
static member SwitchToNewThread : unit -> Async<unit>
static member SwitchToThreadPool : unit -> Async<unit>
static member TryCancelled : computation:Async<'T> * compensation:(OperationCanceledException -> unit) -> Async<'T>
static member CancellationToken : Async<CancellationToken>
static member DefaultCancellationToken : CancellationToken

Full name: Microsoft.FSharp.Control.Async

--------------------
type Async<'T>

Full name: Microsoft.FSharp.Control.Async<_>
static member Async.Start : computation:Async<unit> * ?cancellationToken:System.Threading.CancellationToken -> unit
type Agent<'T> = MailboxProcessor<'T>

Full name: Demo.Agent<_>


 Type alias that gives convenient name to F# agent type
member BatchProcessor.BatchProduced : IEvent<'T []>

Full name: Demo.ThreadPool.BatchProcessor`1.BatchProduced


 Event that is triggered when a batch is collected
member BatchProcessor.Post : value:'T -> unit

Full name: Demo.ThreadPool.BatchProcessor`1.Post


 The method adds one object to the agent
namespace System
namespace System.Threading
Multiple items
type BatchProcessor<'T> =
  new : count:int * ?eventContext:SynchronizationContext -> BatchProcessor<'T>
  member Post : value:'T -> unit
  member BatchProduced : IEvent<'T []>

Full name: Demo.BatchProcessor<_>


 Agent that implements batch processing (eventContext can
 be provided to specify synchronization context for event reporting)


--------------------
new : count:int * ?eventContext:SynchronizationContext -> BatchProcessor<'T>
val eventContext : SynchronizationContext option
Multiple items
type SynchronizationContext =
  new : unit -> SynchronizationContext
  member CreateCopy : unit -> SynchronizationContext
  member IsWaitNotificationRequired : unit -> bool
  member OperationCompleted : unit -> unit
  member OperationStarted : unit -> unit
  member Post : d:SendOrPostCallback * state:obj -> unit
  member Send : d:SendOrPostCallback * state:obj -> unit
  member Wait : waitHandles:nativeint[] * waitAll:bool * millisecondsTimeout:int -> int
  static member Current : SynchronizationContext
  static member SetSynchronizationContext : syncContext:SynchronizationContext -> unit

Full name: System.Threading.SynchronizationContext

--------------------
SynchronizationContext() : unit
val batchEvent : Event<'T []>


 Event used to report aggregated batches to the user
val reportBatch : ('T [] -> unit)


 Triggers event using the specified synchronization context
 (or directly, if no synchronization context is specified)
union case Option.None: Option<'T>
union case Option.Some: Value: 'T -> Option<'T>
val ctx : SynchronizationContext
SynchronizationContext.Post(d: SendOrPostCallback, state: obj) : unit
// Start an agent that implements the batching
  let agent = Agent<'T>.Start(fun inbox -> async {
    while true do
      // Repeatedly allocate a new queue
      let queue = new ResizeArray<_>()
      // Add specified number of messages to the queue
      for i in 1 .. count do
        let! msg = inbox.Receive()
        queue.Add(msg)
      // Report the batch as an array
      reportBatch (queue.ToArray()) })
member BatchProcessor.BatchProduced : IEvent<'T []>

Full name: Demo.BatchProcessor`1.BatchProduced


 Event that is triggered when a batch is collected
member BatchProcessor.Post : value:'T -> unit

Full name: Demo.BatchProcessor`1.Post


 The method adds one object to the agent
val sync : SynchronizationContext

Full name: Demo.sync
property SynchronizationContext.Current: SynchronizationContext
val proc : BatchProcessor<int>

Full name: Demo.proc
member BatchProcessor.Post : value:'T -> unit


 The method adds one object to the agent
static member Async.Start : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
Raw view Test code New version

More information

Link:http://fssnip.net/cK
Posted:11 years ago
Author:Tomas Petricek
Tags: synchronizationcontext , async , agent , mailboxprocessor , event