12 people like it.
    Like the snippet!
  
  Rx vs Observable module vs Nessos Streams
  Naive performance comparison of push model libraries: Reactive Extension (Rx), Observable module (built-in to F#) and Nessos Streams. Note: smaller numbers are better
  
|  1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
 | open System
open System.Reactive.Linq
let rxValue =
   data
      .ToObservable()
      .Where(fun x -> x%2L = 0L)
      .Select(fun x -> x * x)
      .Sum()      
      .ToEnumerable()
      |> Seq.head
// Real: 00:00:02.702, CPU: 00:00:02.812, GC gen0: 121, gen1: 2, gen2: 1
 | 
|  1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
 | Observable module extensions
let obsValue =
   data
   |> Observable.ofSeq
   |> Observable.filter (fun x -> x%2L = 0L)
   |> Observable.map (fun x -> x * x)
   |> Observable.sum
   |> Observable.first
// Real: 00:00:00.458, CPU: 00:00:00.453, GC gen0: 18, gen1: 1, gen2: 0
 | 
| 1: 
2: 
3: 
4: 
5: 
6: 
7: 
8: 
9: 
 | open Nessos.Streams
let streamValue =
   data
   |> Stream.ofArray
   |> Stream.filter (fun x -> x%2L = 0L)
   |> Stream.map (fun x -> x * x)
   |> Stream.sum
// Real: 00:00:00.119, CPU: 00:00:00.109, GC gen0: 0, gen1: 0, gen2: 0
 | 
namespace System
namespace System.Reactive
namespace System.Reactive.Linq
val rxValue : int64
Full name: Script.rxValue
val data : int64 []
Full name: Script.data
val x : int64
module Seq
from Microsoft.FSharp.Collections
val head : source:seq<'T> -> 'T
Full name: Microsoft.FSharp.Collections.Seq.head
module Observable =
   open System
   let ofSeq (xs:'T seq) : IObservable<'T> =
      { new IObservable<'T> with
         member __.Subscribe(observer) =
            for x in xs do observer.OnNext(x)
            observer.OnCompleted()
            { new IDisposable with member __.Dispose() = ()}           
      }
   let inline sum (observable:IObservable< ^T >) : IObservable< ^T >
         when ^T : (static member ( + ) : ^T * ^T -> ^T) 
         and  ^T : (static member Zero : ^T) = 
      { new IObservable<'T> with 
         member this.Subscribe(observer:IObserver<'T>) =
            let acc = ref (LanguagePrimitives.GenericZero)
            let accumulator =
               { new IObserver<'T> with 
                  member __.OnNext(x) = acc := !acc + x
                  member __.OnCompleted() = observer.OnNext(!acc)
                  member __.OnError(_) = failwith "Not implemented"
               }
            observable.Subscribe(accumulator)
      }
   let first (observable:IObservable<'T>) : 'T =
      let value = ref (Unchecked.defaultof<'T>)
      let _ = observable.Subscribe(fun x -> value := x)
      !value
val obsValue : int64
Full name: Script.obsValue
type Observable =
  static member Aggregate<'TSource> : source:IObservable<'TSource> * accumulator:Func<'TSource, 'TSource, 'TSource> -> IObservable<'TSource> + 2 overloads
  static member All<'TSource> : source:IObservable<'TSource> * predicate:Func<'TSource, bool> -> IObservable<bool>
  static member Amb<'TSource> : [<ParamArray>] sources:IObservable<'TSource>[] -> IObservable<'TSource> + 2 overloads
  static member And<'TLeft, 'TRight> : left:IObservable<'TLeft> * right:IObservable<'TRight> -> Pattern<'TLeft, 'TRight>
  static member Any<'TSource> : source:IObservable<'TSource> -> IObservable<bool> + 1 overload
  static member AsObservable<'TSource> : source:IObservable<'TSource> -> IObservable<'TSource>
  static member Average : source:IObservable<float> -> IObservable<float> + 19 overloads
  static member Buffer<'TSource, 'TBufferClosing> : source:IObservable<'TSource> * bufferClosingSelector:Func<IObservable<'TBufferClosing>> -> IObservable<IList<'TSource>> + 10 overloads
  static member Case<'TValue, 'TResult> : selector:Func<'TValue> * sources:IDictionary<'TValue, IObservable<'TResult>> -> IObservable<'TResult> + 2 overloads
  static member Cast<'TResult> : source:IObservable<obj> -> IObservable<'TResult>
  ...
Full name: System.Reactive.Linq.Observable
val ofSeq : xs:seq<'T> -> IObservable<'T>
Full name: Script.Observable.ofSeq
val filter : predicate:('T -> bool) -> source:IObservable<'T> -> IObservable<'T>
Full name: Microsoft.FSharp.Control.Observable.filter
val map : mapping:('T -> 'U) -> source:IObservable<'T> -> IObservable<'U>
Full name: Microsoft.FSharp.Control.Observable.map
val sum : observable:IObservable<'T> -> IObservable<'T> (requires member get_Zero and member ( + ))
Full name: Script.Observable.sum
val first : observable:IObservable<'T> -> 'T
Full name: Script.Observable.first
namespace Nessos
namespace Nessos.Streams
val streamValue : int64
Full name: Script.streamValue
Multiple items
module Stream
from Nessos.Streams
--------------------
type Stream<'T> =
  private {Run: Context<'T> -> Iterable;}
  member private RunBulk : ctxt:Context<'T> -> unit
  override ToString : unit -> string
Full name: Nessos.Streams.Stream<_>
val ofArray : source:'T [] -> Stream<'T>
Full name: Nessos.Streams.Stream.ofArray
val filter : predicate:('T -> bool) -> stream:Stream<'T> -> Stream<'T>
Full name: Nessos.Streams.Stream.filter
val map : f:('T -> 'R) -> stream:Stream<'T> -> Stream<'R>
Full name: Nessos.Streams.Stream.map
val sum : stream:Stream<'T> -> 'T (requires member ( + ) and member get_Zero)
Full name: Nessos.Streams.Stream.sum
  
  
  More information