3 people like it.
Like the snippet!
ReplaySubject
The ReplaySubject type implements both IObserver and IObservable. It is functionally equivalent to the class of the same name in the Reactive Extensions (Rx) library with a replay buffer of a specified size .
1:
2:
3:
4:
5:
6:
7:
8:
9:
10:
11:
12:
13:
14:
15:
16:
17:
18:
19:
20:
21:
22:
23:
24:
25:
26:
27:
28:
29:
30:
31:
32:
33:
34:
35:
36:
37:
38:
39:
40:
41:
42:
43:
44:
45:
46:
47:
48:
49:
50:
51:
52:
53:
54:
55:
56:
57:
|
open System
/// Represents an object that is both
/// an observable sequence as well as
/// an observer
type ReplaySubject<'T> (bufferSize:int) =
let buffer = Array.zeroCreate bufferSize
let mutable index, total = 0, 0
let mutable stopped = false
let observers = System.Collections.Generic.List<IObserver<'T>>()
let iter f = observers |> Seq.iter f
let onCompleted () =
if not stopped then
stopped <- true
iter (fun observer -> observer.OnCompleted())
let onError ex () =
if not stopped then
stopped <- true
iter (fun observer -> observer.OnError(ex))
let next value () =
if not stopped then
if bufferSize > 0 then
buffer.[index] <- value
index <- (index + 1) % bufferSize
total <- min (total + 1) bufferSize
iter (fun observer -> observer.OnNext value)
let add observer () =
observers.Add observer
let start = if total = bufferSize then index else 0
for i = 0 to total-1 do
buffer.[(start + i)%bufferSize] |> observer.OnNext
let remove observer () =
observers.Remove observer |> ignore
let sync = obj()
member x.Next value = lock sync <| next value
member x.Error ex = lock sync <| onError ex
member x.Completed () = lock sync <| onCompleted
interface IObserver<'T> with
member x.OnCompleted() = x.Completed()
member x.OnError ex = x.Error ex
member x.OnNext value = x.Next value
interface IObservable<'T> with
member this.Subscribe(observer:IObserver<'T>) =
lock sync <| add observer
{ new IDisposable with
member this.Dispose() =
lock sync <| remove observer
}
and Subject<'T> () =
inherit ReplaySubject<'T> (0)
do let s = ReplaySubject(10)
use d = s.Subscribe(fun x -> sprintf "%d" x |> Console.WriteLine)
[1..16] |> Seq.iter s.Next
use d' = s.Subscribe(fun x -> sprintf "'%d" x |> Console.WriteLine)
()
|
namespace System
Multiple items
type ReplaySubject<'T> =
interface IObservable<'T>
interface IObserver<'T>
new : bufferSize:int -> ReplaySubject<'T>
member Completed : unit -> unit
member Error : ex:exn -> unit
member Next : value:'T -> unit
Full name: Script.ReplaySubject<_>
Represents an object that is both
an observable sequence as well as
an observer
--------------------
new : bufferSize:int -> ReplaySubject<'T>
val bufferSize : int
Multiple items
val int : value:'T -> int (requires member op_Explicit)
Full name: Microsoft.FSharp.Core.Operators.int
--------------------
type int = int32
Full name: Microsoft.FSharp.Core.int
--------------------
type int<'Measure> = int
Full name: Microsoft.FSharp.Core.int<_>
val buffer : 'T []
type Array =
member Clone : unit -> obj
member CopyTo : array:Array * index:int -> unit + 1 overload
member GetEnumerator : unit -> IEnumerator
member GetLength : dimension:int -> int
member GetLongLength : dimension:int -> int64
member GetLowerBound : dimension:int -> int
member GetUpperBound : dimension:int -> int
member GetValue : [<ParamArray>] indices:int[] -> obj + 7 overloads
member Initialize : unit -> unit
member IsFixedSize : bool
...
Full name: System.Array
val zeroCreate : count:int -> 'T []
Full name: Microsoft.FSharp.Collections.Array.zeroCreate
val mutable index : int
val mutable total : int
val mutable stopped : bool
val observers : Collections.Generic.List<IObserver<'T>>
namespace System.Collections
namespace System.Collections.Generic
Multiple items
type List<'T> =
new : unit -> List<'T> + 2 overloads
member Add : item:'T -> unit
member AddRange : collection:IEnumerable<'T> -> unit
member AsReadOnly : unit -> ReadOnlyCollection<'T>
member BinarySearch : item:'T -> int + 2 overloads
member Capacity : int with get, set
member Clear : unit -> unit
member Contains : item:'T -> bool
member ConvertAll<'TOutput> : converter:Converter<'T, 'TOutput> -> List<'TOutput>
member CopyTo : array:'T[] -> unit + 2 overloads
...
nested type Enumerator
Full name: System.Collections.Generic.List<_>
--------------------
Collections.Generic.List() : unit
Collections.Generic.List(capacity: int) : unit
Collections.Generic.List(collection: Collections.Generic.IEnumerable<'T>) : unit
type IObserver<'T> =
member OnCompleted : unit -> unit
member OnError : error:Exception -> unit
member OnNext : value:'T -> unit
Full name: System.IObserver<_>
val iter : ((IObserver<'T> -> unit) -> unit)
val f : (IObserver<'T> -> unit)
module Seq
from Microsoft.FSharp.Collections
val iter : action:('T -> unit) -> source:seq<'T> -> unit
Full name: Microsoft.FSharp.Collections.Seq.iter
val onCompleted : (unit -> unit)
val not : value:bool -> bool
Full name: Microsoft.FSharp.Core.Operators.not
val observer : IObserver<'T>
IObserver.OnCompleted() : unit
val onError : (exn -> unit -> unit)
val ex : exn
IObserver.OnError(error: exn) : unit
val next : ('T -> unit -> unit)
val value : 'T
val min : e1:'T -> e2:'T -> 'T (requires comparison)
Full name: Microsoft.FSharp.Core.Operators.min
IObserver.OnNext(value: 'T) : unit
val add : (IObserver<'T> -> unit -> unit)
Collections.Generic.List.Add(item: IObserver<'T>) : unit
val start : int
val i : int
val remove : (IObserver<'T> -> unit -> unit)
Collections.Generic.List.Remove(item: IObserver<'T>) : bool
val ignore : value:'T -> unit
Full name: Microsoft.FSharp.Core.Operators.ignore
val sync : Object
type obj = Object
Full name: Microsoft.FSharp.Core.obj
val x : ReplaySubject<'T>
member ReplaySubject.Next : value:'T -> unit
Full name: Script.ReplaySubject`1.Next
val lock : lockObject:'Lock -> action:(unit -> 'T) -> 'T (requires reference type)
Full name: Microsoft.FSharp.Core.Operators.lock
member ReplaySubject.Error : ex:exn -> unit
Full name: Script.ReplaySubject`1.Error
member ReplaySubject.Completed : unit -> unit
Full name: Script.ReplaySubject`1.Completed
override ReplaySubject.OnCompleted : unit -> unit
Full name: Script.ReplaySubject`1.OnCompleted
member ReplaySubject.Completed : unit -> unit
override ReplaySubject.OnError : ex:exn -> unit
Full name: Script.ReplaySubject`1.OnError
member ReplaySubject.Error : ex:exn -> unit
override ReplaySubject.OnNext : value:'T -> unit
Full name: Script.ReplaySubject`1.OnNext
member ReplaySubject.Next : value:'T -> unit
type IObservable<'T> =
member Subscribe : observer:IObserver<'T> -> IDisposable
Full name: System.IObservable<_>
val this : ReplaySubject<'T>
override ReplaySubject.Subscribe : observer:IObserver<'T> -> IDisposable
Full name: Script.ReplaySubject`1.Subscribe
type IDisposable =
member Dispose : unit -> unit
Full name: System.IDisposable
val this : IDisposable
IDisposable.Dispose() : unit
Multiple items
type Subject<'T> =
inherit ReplaySubject<'T>
new : unit -> Subject<'T>
Full name: Script.Subject<_>
--------------------
new : unit -> Subject<'T>
val s : ReplaySubject<int>
val d : IDisposable
member IObservable.Subscribe : callback:('T -> unit) -> IDisposable
val x : int
val sprintf : format:Printf.StringFormat<'T> -> 'T
Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.sprintf
type Console =
static member BackgroundColor : ConsoleColor with get, set
static member Beep : unit -> unit + 1 overload
static member BufferHeight : int with get, set
static member BufferWidth : int with get, set
static member CapsLock : bool
static member Clear : unit -> unit
static member CursorLeft : int with get, set
static member CursorSize : int with get, set
static member CursorTop : int with get, set
static member CursorVisible : bool with get, set
...
Full name: System.Console
Console.WriteLine() : unit
(+0 other overloads)
Console.WriteLine(value: string) : unit
(+0 other overloads)
Console.WriteLine(value: obj) : unit
(+0 other overloads)
Console.WriteLine(value: uint64) : unit
(+0 other overloads)
Console.WriteLine(value: int64) : unit
(+0 other overloads)
Console.WriteLine(value: uint32) : unit
(+0 other overloads)
Console.WriteLine(value: int) : unit
(+0 other overloads)
Console.WriteLine(value: float32) : unit
(+0 other overloads)
Console.WriteLine(value: float) : unit
(+0 other overloads)
Console.WriteLine(value: decimal) : unit
(+0 other overloads)
val d' : IDisposable
More information