3 people like it.

Async vs sync benchmark

Quick benchmark to compare the throughput performance of F# MailboxProcessor using asynchronous workflows and a synchronous agent using a lock and busy loop. The latter is 9x faster on this machine.

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
14: 
15: 
16: 
17: 
18: 
19: 
20: 
21: 
22: 
23: 
24: 
25: 
26: 
27: 
28: 
29: 
30: 
31: 
32: 
33: 
34: 
35: 
36: 
37: 
38: 
39: 
40: 
41: 
42: 
43: 
44: 
45: 
46: 
47: 
48: 
49: 
50: 
51: 
52: 
53: 
54: 
55: 
56: 
57: 
58: 
type Message<'a> =
  | Enqueue of 'a
  | Dequeue of (seq<'a> -> unit)

while true do
  do
    let n = 1000000
    let agent =
      MailboxProcessor.Start(fun inbox ->
        let msgs = Array.zeroCreate n
        let i = ref 0
        let rec loop() =
          async { let! msg = inbox.Receive()
                  match msg with
                  | Enqueue x ->
                      msgs.[!i] <- x
                      incr i
                      return! loop()
                  | Dequeue reply ->
                      reply msgs }
        loop())
    let timer = System.Diagnostics.Stopwatch.StartNew()
    for i=1 to n do
      agent.Post(Enqueue i)
    agent.PostAndReply(fun reply -> Dequeue reply.Reply)
    |> ignore
    printfn "%f msgs/s" (float n / timer.Elapsed.TotalSeconds)

  do
    let n = 1000000
    let queue = ResizeArray()
    use barrier = new System.Threading.Barrier(2)
    System.Threading.Thread(fun () ->
      let msgs = Array.zeroCreate n
      let i = ref 0
      let msg = ref Unchecked.defaultof<_>
      let rec loop() =
        let xs = lock queue (fun () -> let xs = queue.ToArray() in queue.Clear(); xs)
        let rec iter j =
          if j = xs.Length then loop() else
            match xs.[j] with
            | Enqueue x ->
                msgs.[!i] <- x
                incr i
                iter (j+1)
            | Dequeue reply ->
                reply msgs
                barrier.SignalAndWait()
        iter 0
      loop()).Start()
    let timer = System.Diagnostics.Stopwatch.StartNew()
    for i=1 to n do
      lock queue (fun () -> queue.Add(Enqueue i))
    let msgs = ref Seq.empty
    lock queue (fun () -> queue.Add(Dequeue(fun xs -> msgs := xs)))
    barrier.SignalAndWait()
    let t = timer.Elapsed.TotalSeconds
    printfn "%f msgs/s" (float(Seq.length !msgs) / t)
union case Message.Enqueue: 'a -> Message<'a>
union case Message.Dequeue: (seq<'a> -> unit) -> Message<'a>
Multiple items
val seq : sequence:seq<'T> -> seq<'T>

Full name: Microsoft.FSharp.Core.Operators.seq

--------------------
type seq<'T> = System.Collections.Generic.IEnumerable<'T>

Full name: Microsoft.FSharp.Collections.seq<_>
type unit = Unit

Full name: Microsoft.FSharp.Core.unit
val n : int
val agent : MailboxProcessor<Message<int>>
Multiple items
type MailboxProcessor<'Msg> =
  interface IDisposable
  new : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:CancellationToken -> MailboxProcessor<'Msg>
  member Post : message:'Msg -> unit
  member PostAndAsyncReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> Async<'Reply>
  member PostAndReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> 'Reply
  member PostAndTryAsyncReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> Async<'Reply option>
  member Receive : ?timeout:int -> Async<'Msg>
  member Scan : scanner:('Msg -> Async<'T> option) * ?timeout:int -> Async<'T>
  member Start : unit -> unit
  member TryPostAndReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> 'Reply option
  ...

Full name: Microsoft.FSharp.Control.MailboxProcessor<_>

--------------------
new : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:System.Threading.CancellationToken -> MailboxProcessor<'Msg>
static member MailboxProcessor.Start : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:System.Threading.CancellationToken -> MailboxProcessor<'Msg>
val inbox : MailboxProcessor<Message<int>>
val msgs : int []
module Array

from Microsoft.FSharp.Collections
val zeroCreate : count:int -> 'T []

Full name: Microsoft.FSharp.Collections.Array.zeroCreate
val i : int ref
Multiple items
val ref : value:'T -> 'T ref

Full name: Microsoft.FSharp.Core.Operators.ref

--------------------
type 'T ref = Ref<'T>

Full name: Microsoft.FSharp.Core.ref<_>
val loop : (unit -> Async<unit>)
val async : AsyncBuilder

Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.async
val msg : Message<int>
member MailboxProcessor.Receive : ?timeout:int -> Async<'Msg>
val x : int
val incr : cell:int ref -> unit

Full name: Microsoft.FSharp.Core.Operators.incr
val reply : (seq<int> -> unit)
val timer : System.Diagnostics.Stopwatch
namespace System
namespace System.Diagnostics
Multiple items
type Stopwatch =
  new : unit -> Stopwatch
  member Elapsed : TimeSpan
  member ElapsedMilliseconds : int64
  member ElapsedTicks : int64
  member IsRunning : bool
  member Reset : unit -> unit
  member Restart : unit -> unit
  member Start : unit -> unit
  member Stop : unit -> unit
  static val Frequency : int64
  ...

Full name: System.Diagnostics.Stopwatch

--------------------
System.Diagnostics.Stopwatch() : unit
System.Diagnostics.Stopwatch.StartNew() : System.Diagnostics.Stopwatch
val i : int
member MailboxProcessor.Post : message:'Msg -> unit
member MailboxProcessor.PostAndReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> 'Reply
val reply : AsyncReplyChannel<seq<int>>
member AsyncReplyChannel.Reply : value:'Reply -> unit
val ignore : value:'T -> unit

Full name: Microsoft.FSharp.Core.Operators.ignore
val printfn : format:Printf.TextWriterFormat<'T> -> 'T

Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.printfn
Multiple items
val float : value:'T -> float (requires member op_Explicit)

Full name: Microsoft.FSharp.Core.Operators.float

--------------------
type float = System.Double

Full name: Microsoft.FSharp.Core.float

--------------------
type float<'Measure> = float

Full name: Microsoft.FSharp.Core.float<_>
property System.Diagnostics.Stopwatch.Elapsed: System.TimeSpan
property System.TimeSpan.TotalSeconds: float
val queue : System.Collections.Generic.List<Message<int>>
type ResizeArray<'T> = System.Collections.Generic.List<'T>

Full name: Microsoft.FSharp.Collections.ResizeArray<_>
val barrier : System.Threading.Barrier
namespace System.Threading
Multiple items
type Barrier =
  new : participantCount:int -> Barrier + 1 overload
  member AddParticipant : unit -> int64
  member AddParticipants : participantCount:int -> int64
  member CurrentPhaseNumber : int64
  member Dispose : unit -> unit
  member ParticipantCount : int
  member ParticipantsRemaining : int
  member RemoveParticipant : unit -> unit
  member RemoveParticipants : participantCount:int -> unit
  member SignalAndWait : unit -> unit + 5 overloads

Full name: System.Threading.Barrier

--------------------
System.Threading.Barrier(participantCount: int) : unit
System.Threading.Barrier(participantCount: int, postPhaseAction: System.Action<System.Threading.Barrier>) : unit
Multiple items
type Thread =
  inherit CriticalFinalizerObject
  new : start:ThreadStart -> Thread + 3 overloads
  member Abort : unit -> unit + 1 overload
  member ApartmentState : ApartmentState with get, set
  member CurrentCulture : CultureInfo with get, set
  member CurrentUICulture : CultureInfo with get, set
  member DisableComObjectEagerCleanup : unit -> unit
  member ExecutionContext : ExecutionContext
  member GetApartmentState : unit -> ApartmentState
  member GetCompressedStack : unit -> CompressedStack
  member GetHashCode : unit -> int
  ...

Full name: System.Threading.Thread

--------------------
System.Threading.Thread(start: System.Threading.ThreadStart) : unit
System.Threading.Thread(start: System.Threading.ParameterizedThreadStart) : unit
System.Threading.Thread(start: System.Threading.ThreadStart, maxStackSize: int) : unit
System.Threading.Thread(start: System.Threading.ParameterizedThreadStart, maxStackSize: int) : unit
val msg : obj ref
module Unchecked

from Microsoft.FSharp.Core.Operators
val defaultof<'T> : 'T

Full name: Microsoft.FSharp.Core.Operators.Unchecked.defaultof
val loop : (unit -> unit)
val xs : Message<int> []
val lock : lockObject:'Lock -> action:(unit -> 'T) -> 'T (requires reference type)

Full name: Microsoft.FSharp.Core.Operators.lock
System.Collections.Generic.List.ToArray() : Message<int> []
System.Collections.Generic.List.Clear() : unit
val iter : (int -> unit)
val j : int
property System.Array.Length: int
System.Threading.Barrier.SignalAndWait() : unit
System.Threading.Barrier.SignalAndWait(millisecondsTimeout: int) : bool
System.Threading.Barrier.SignalAndWait(timeout: System.TimeSpan) : bool
System.Threading.Barrier.SignalAndWait(cancellationToken: System.Threading.CancellationToken) : unit
System.Threading.Barrier.SignalAndWait(millisecondsTimeout: int, cancellationToken: System.Threading.CancellationToken) : bool
System.Threading.Barrier.SignalAndWait(timeout: System.TimeSpan, cancellationToken: System.Threading.CancellationToken) : bool
System.Collections.Generic.List.Add(item: Message<int>) : unit
val msgs : seq<int> ref
module Seq

from Microsoft.FSharp.Collections
val empty<'T> : seq<'T>

Full name: Microsoft.FSharp.Collections.Seq.empty
val xs : seq<int>
val t : float
val length : source:seq<'T> -> int

Full name: Microsoft.FSharp.Collections.Seq.length
Next Version Raw view Test code New version

More information

Link:http://fssnip.net/cQ
Posted:5 years ago
Author:Jon Harrop
Tags: asynchronous workflows , benchmark