12 people like it.
Like the snippet!
Rx vs Observable module vs Nessos Streams
Naive performance comparison of push model libraries: Reactive Extension (Rx), Observable module (built-in to F#) and Nessos Streams. Note: smaller numbers are better
1:
2:
3:
4:
5:
6:
7:
8:
9:
10:
11:
12:
|
open System
open System.Reactive.Linq
let rxValue =
data
.ToObservable()
.Where(fun x -> x%2L = 0L)
.Select(fun x -> x * x)
.Sum()
.ToEnumerable()
|> Seq.head
// Real: 00:00:02.702, CPU: 00:00:02.812, GC gen0: 121, gen1: 2, gen2: 1
|
1:
2:
3:
4:
5:
6:
7:
8:
9:
10:
|
Observable module extensions
let obsValue =
data
|> Observable.ofSeq
|> Observable.filter (fun x -> x%2L = 0L)
|> Observable.map (fun x -> x * x)
|> Observable.sum
|> Observable.first
// Real: 00:00:00.458, CPU: 00:00:00.453, GC gen0: 18, gen1: 1, gen2: 0
|
1:
2:
3:
4:
5:
6:
7:
8:
9:
|
open Nessos.Streams
let streamValue =
data
|> Stream.ofArray
|> Stream.filter (fun x -> x%2L = 0L)
|> Stream.map (fun x -> x * x)
|> Stream.sum
// Real: 00:00:00.119, CPU: 00:00:00.109, GC gen0: 0, gen1: 0, gen2: 0
|
namespace System
namespace System.Reactive
namespace System.Reactive.Linq
val rxValue : int64
Full name: Script.rxValue
val data : int64 []
Full name: Script.data
val x : int64
module Seq
from Microsoft.FSharp.Collections
val head : source:seq<'T> -> 'T
Full name: Microsoft.FSharp.Collections.Seq.head
module Observable =
open System
let ofSeq (xs:'T seq) : IObservable<'T> =
{ new IObservable<'T> with
member __.Subscribe(observer) =
for x in xs do observer.OnNext(x)
observer.OnCompleted()
{ new IDisposable with member __.Dispose() = ()}
}
let inline sum (observable:IObservable< ^T >) : IObservable< ^T >
when ^T : (static member ( + ) : ^T * ^T -> ^T)
and ^T : (static member Zero : ^T) =
{ new IObservable<'T> with
member this.Subscribe(observer:IObserver<'T>) =
let acc = ref (LanguagePrimitives.GenericZero)
let accumulator =
{ new IObserver<'T> with
member __.OnNext(x) = acc := !acc + x
member __.OnCompleted() = observer.OnNext(!acc)
member __.OnError(_) = failwith "Not implemented"
}
observable.Subscribe(accumulator)
}
let first (observable:IObservable<'T>) : 'T =
let value = ref (Unchecked.defaultof<'T>)
let _ = observable.Subscribe(fun x -> value := x)
!value
val obsValue : int64
Full name: Script.obsValue
type Observable =
static member Aggregate<'TSource> : source:IObservable<'TSource> * accumulator:Func<'TSource, 'TSource, 'TSource> -> IObservable<'TSource> + 2 overloads
static member All<'TSource> : source:IObservable<'TSource> * predicate:Func<'TSource, bool> -> IObservable<bool>
static member Amb<'TSource> : [<ParamArray>] sources:IObservable<'TSource>[] -> IObservable<'TSource> + 2 overloads
static member And<'TLeft, 'TRight> : left:IObservable<'TLeft> * right:IObservable<'TRight> -> Pattern<'TLeft, 'TRight>
static member Any<'TSource> : source:IObservable<'TSource> -> IObservable<bool> + 1 overload
static member AsObservable<'TSource> : source:IObservable<'TSource> -> IObservable<'TSource>
static member Average : source:IObservable<float> -> IObservable<float> + 19 overloads
static member Buffer<'TSource, 'TBufferClosing> : source:IObservable<'TSource> * bufferClosingSelector:Func<IObservable<'TBufferClosing>> -> IObservable<IList<'TSource>> + 10 overloads
static member Case<'TValue, 'TResult> : selector:Func<'TValue> * sources:IDictionary<'TValue, IObservable<'TResult>> -> IObservable<'TResult> + 2 overloads
static member Cast<'TResult> : source:IObservable<obj> -> IObservable<'TResult>
...
Full name: System.Reactive.Linq.Observable
val ofSeq : xs:seq<'T> -> IObservable<'T>
Full name: Script.Observable.ofSeq
val filter : predicate:('T -> bool) -> source:IObservable<'T> -> IObservable<'T>
Full name: Microsoft.FSharp.Control.Observable.filter
val map : mapping:('T -> 'U) -> source:IObservable<'T> -> IObservable<'U>
Full name: Microsoft.FSharp.Control.Observable.map
val sum : observable:IObservable<'T> -> IObservable<'T> (requires member get_Zero and member ( + ))
Full name: Script.Observable.sum
val first : observable:IObservable<'T> -> 'T
Full name: Script.Observable.first
namespace Nessos
namespace Nessos.Streams
val streamValue : int64
Full name: Script.streamValue
Multiple items
module Stream
from Nessos.Streams
--------------------
type Stream<'T> =
private {Run: Context<'T> -> Iterable;}
member private RunBulk : ctxt:Context<'T> -> unit
override ToString : unit -> string
Full name: Nessos.Streams.Stream<_>
val ofArray : source:'T [] -> Stream<'T>
Full name: Nessos.Streams.Stream.ofArray
val filter : predicate:('T -> bool) -> stream:Stream<'T> -> Stream<'T>
Full name: Nessos.Streams.Stream.filter
val map : f:('T -> 'R) -> stream:Stream<'T> -> Stream<'R>
Full name: Nessos.Streams.Stream.map
val sum : stream:Stream<'T> -> 'T (requires member ( + ) and member get_Zero)
Full name: Nessos.Streams.Stream.sum
More information