12 people like it.

Rx vs Observable module vs Nessos Streams

Naive performance comparison of push model libraries: Reactive Extension (Rx), Observable module (built-in to F#) and Nessos Streams. Note: smaller numbers are better

Reactive Extensions (Rx)

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
open System
open System.Reactive.Linq

let rxValue =
   data
      .ToObservable()
      .Where(fun x -> x%2L = 0L)
      .Select(fun x -> x * x)
      .Sum()      
      .ToEnumerable()
      |> Seq.head
// Real: 00:00:02.702, CPU: 00:00:02.812, GC gen0: 121, gen1: 2, gen2: 1

Observable module

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
Observable module extensions

let obsValue =
   data
   |> Observable.ofSeq
   |> Observable.filter (fun x -> x%2L = 0L)
   |> Observable.map (fun x -> x * x)
   |> Observable.sum
   |> Observable.first
// Real: 00:00:00.458, CPU: 00:00:00.453, GC gen0: 18, gen1: 1, gen2: 0

Nessos Streams

1: 
2: 
3: 
4: 
5: 
6: 
7: 
8: 
9: 
open Nessos.Streams

let streamValue =
   data
   |> Stream.ofArray
   |> Stream.filter (fun x -> x%2L = 0L)
   |> Stream.map (fun x -> x * x)
   |> Stream.sum
// Real: 00:00:00.119, CPU: 00:00:00.109, GC gen0: 0, gen1: 0, gen2: 0
namespace System
namespace System.Reactive
namespace System.Reactive.Linq
val rxValue : int64

Full name: Script.rxValue
val data : int64 []

Full name: Script.data
val x : int64
module Seq

from Microsoft.FSharp.Collections
val head : source:seq<'T> -> 'T

Full name: Microsoft.FSharp.Collections.Seq.head
module Observable =
   open System
   let ofSeq (xs:'T seq) : IObservable<'T> =
      { new IObservable<'T> with
         member __.Subscribe(observer) =
            for x in xs do observer.OnNext(x)
            observer.OnCompleted()
            { new IDisposable with member __.Dispose() = ()}
      }
   let inline sum (observable:IObservable< ^T >) : IObservable< ^T >
         when ^T : (static member ( + ) : ^T * ^T -> ^T)
         and ^T : (static member Zero : ^T) =
      { new IObservable<'T> with
         member this.Subscribe(observer:IObserver<'T>) =
            let acc = ref (LanguagePrimitives.GenericZero)
            let accumulator =
               { new IObserver<'T> with
                  member __.OnNext(x) = acc := !acc + x
                  member __.OnCompleted() = observer.OnNext(!acc)
                  member __.OnError(_) = failwith "Not implemented"
               }
            observable.Subscribe(accumulator)
      }
   let first (observable:IObservable<'T>) : 'T =
      let value = ref (Unchecked.defaultof<'T>)
      let _ = observable.Subscribe(fun x -> value := x)
      !value
val obsValue : int64

Full name: Script.obsValue
type Observable =
  static member Aggregate<'TSource> : source:IObservable<'TSource> * accumulator:Func<'TSource, 'TSource, 'TSource> -> IObservable<'TSource> + 2 overloads
  static member All<'TSource> : source:IObservable<'TSource> * predicate:Func<'TSource, bool> -> IObservable<bool>
  static member Amb<'TSource> : [<ParamArray>] sources:IObservable<'TSource>[] -> IObservable<'TSource> + 2 overloads
  static member And<'TLeft, 'TRight> : left:IObservable<'TLeft> * right:IObservable<'TRight> -> Pattern<'TLeft, 'TRight>
  static member Any<'TSource> : source:IObservable<'TSource> -> IObservable<bool> + 1 overload
  static member AsObservable<'TSource> : source:IObservable<'TSource> -> IObservable<'TSource>
  static member Average : source:IObservable<float> -> IObservable<float> + 19 overloads
  static member Buffer<'TSource, 'TBufferClosing> : source:IObservable<'TSource> * bufferClosingSelector:Func<IObservable<'TBufferClosing>> -> IObservable<IList<'TSource>> + 10 overloads
  static member Case<'TValue, 'TResult> : selector:Func<'TValue> * sources:IDictionary<'TValue, IObservable<'TResult>> -> IObservable<'TResult> + 2 overloads
  static member Cast<'TResult> : source:IObservable<obj> -> IObservable<'TResult>
  ...

Full name: System.Reactive.Linq.Observable
val ofSeq : xs:seq<'T> -> IObservable<'T>

Full name: Script.Observable.ofSeq
val filter : predicate:('T -> bool) -> source:IObservable<'T> -> IObservable<'T>

Full name: Microsoft.FSharp.Control.Observable.filter
val map : mapping:('T -> 'U) -> source:IObservable<'T> -> IObservable<'U>

Full name: Microsoft.FSharp.Control.Observable.map
val sum : observable:IObservable<'T> -> IObservable<'T> (requires member get_Zero and member ( + ))

Full name: Script.Observable.sum
val first : observable:IObservable<'T> -> 'T

Full name: Script.Observable.first
namespace Nessos
namespace Nessos.Streams
val streamValue : int64

Full name: Script.streamValue
Multiple items
module Stream

from Nessos.Streams

--------------------
type Stream<'T> =
  private {Run: Context<'T> -> Iterable;}
  member private RunBulk : ctxt:Context<'T> -> unit
  override ToString : unit -> string

Full name: Nessos.Streams.Stream<_>
val ofArray : source:'T [] -> Stream<'T>

Full name: Nessos.Streams.Stream.ofArray
val filter : predicate:('T -> bool) -> stream:Stream<'T> -> Stream<'T>

Full name: Nessos.Streams.Stream.filter
val map : f:('T -> 'R) -> stream:Stream<'T> -> Stream<'R>

Full name: Nessos.Streams.Stream.map
val sum : stream:Stream<'T> -> 'T (requires member ( + ) and member get_Zero)

Full name: Nessos.Streams.Stream.sum
Raw view Test code New version

More information

Link:http://fssnip.net/ow
Posted:10 years ago
Author:Phillip Trelford
Tags: rx , observable , streams