3 people like it.

ReplaySubject

The ReplaySubject type implements both IObserver and IObservable. It is functionally equivalent to the class of the same name in the Reactive Extensions (Rx) library with a replay buffer of a specified size .

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
14: 
15: 
16: 
17: 
18: 
19: 
20: 
21: 
22: 
23: 
24: 
25: 
26: 
27: 
28: 
29: 
30: 
31: 
32: 
33: 
34: 
35: 
36: 
37: 
38: 
39: 
40: 
41: 
42: 
43: 
44: 
45: 
46: 
47: 
48: 
49: 
50: 
51: 
52: 
53: 
54: 
55: 
56: 
57: 
open System
  
/// Represents an object that is both 
/// an observable sequence as well as 
/// an observer
type ReplaySubject<'T> (bufferSize:int) =
    let buffer = Array.zeroCreate bufferSize
    let mutable index, total = 0, 0
    let mutable stopped = false
    let observers = System.Collections.Generic.List<IObserver<'T>>()
    let iter f = observers |> Seq.iter f          
    let onCompleted () =
        if not stopped then
            stopped <- true
            iter (fun observer -> observer.OnCompleted())
    let onError ex () =
        if not stopped then
            stopped <- true
            iter (fun observer -> observer.OnError(ex))          
    let next value () =
        if not stopped then
            if bufferSize > 0 then
                buffer.[index] <- value
                index <- (index + 1) % bufferSize
                total <- min (total + 1) bufferSize
            iter (fun observer -> observer.OnNext value)
    let add observer () =
        observers.Add observer
        let start = if total = bufferSize then index else 0
        for i = 0 to total-1 do 
            buffer.[(start + i)%bufferSize] |> observer.OnNext
    let remove observer () =
        observers.Remove observer |> ignore
    let sync = obj()
    member x.Next value = lock sync <| next value
    member x.Error ex = lock sync <| onError ex
    member x.Completed () = lock sync <| onCompleted
    interface IObserver<'T> with
        member x.OnCompleted() = x.Completed()
        member x.OnError ex = x.Error ex
        member x.OnNext value = x.Next value    
    interface IObservable<'T> with
        member this.Subscribe(observer:IObserver<'T>) =
            lock sync <| add observer
            { new IDisposable with
                member this.Dispose() =
                    lock sync <| remove observer
            }
and Subject<'T> () =
    inherit ReplaySubject<'T> (0)


do  let s = ReplaySubject(10)
    use d = s.Subscribe(fun x -> sprintf "%d" x |> Console.WriteLine)
    [1..16] |> Seq.iter s.Next
    use d' = s.Subscribe(fun x -> sprintf "'%d" x |> Console.WriteLine)
    ()
namespace System
Multiple items
type ReplaySubject<'T> =
  interface IObservable<'T>
  interface IObserver<'T>
  new : bufferSize:int -> ReplaySubject<'T>
  member Completed : unit -> unit
  member Error : ex:exn -> unit
  member Next : value:'T -> unit

Full name: Script.ReplaySubject<_>


 Represents an object that is both
 an observable sequence as well as
 an observer


--------------------
new : bufferSize:int -> ReplaySubject<'T>
val bufferSize : int
Multiple items
val int : value:'T -> int (requires member op_Explicit)

Full name: Microsoft.FSharp.Core.Operators.int

--------------------
type int = int32

Full name: Microsoft.FSharp.Core.int

--------------------
type int<'Measure> = int

Full name: Microsoft.FSharp.Core.int<_>
val buffer : 'T []
type Array =
  member Clone : unit -> obj
  member CopyTo : array:Array * index:int -> unit + 1 overload
  member GetEnumerator : unit -> IEnumerator
  member GetLength : dimension:int -> int
  member GetLongLength : dimension:int -> int64
  member GetLowerBound : dimension:int -> int
  member GetUpperBound : dimension:int -> int
  member GetValue : [<ParamArray>] indices:int[] -> obj + 7 overloads
  member Initialize : unit -> unit
  member IsFixedSize : bool
  ...

Full name: System.Array
val zeroCreate : count:int -> 'T []

Full name: Microsoft.FSharp.Collections.Array.zeroCreate
val mutable index : int
val mutable total : int
val mutable stopped : bool
val observers : Collections.Generic.List<IObserver<'T>>
namespace System.Collections
namespace System.Collections.Generic
Multiple items
type List<'T> =
  new : unit -> List<'T> + 2 overloads
  member Add : item:'T -> unit
  member AddRange : collection:IEnumerable<'T> -> unit
  member AsReadOnly : unit -> ReadOnlyCollection<'T>
  member BinarySearch : item:'T -> int + 2 overloads
  member Capacity : int with get, set
  member Clear : unit -> unit
  member Contains : item:'T -> bool
  member ConvertAll<'TOutput> : converter:Converter<'T, 'TOutput> -> List<'TOutput>
  member CopyTo : array:'T[] -> unit + 2 overloads
  ...
  nested type Enumerator

Full name: System.Collections.Generic.List<_>

--------------------
Collections.Generic.List() : unit
Collections.Generic.List(capacity: int) : unit
Collections.Generic.List(collection: Collections.Generic.IEnumerable<'T>) : unit
type IObserver<'T> =
  member OnCompleted : unit -> unit
  member OnError : error:Exception -> unit
  member OnNext : value:'T -> unit

Full name: System.IObserver<_>
val iter : ((IObserver<'T> -> unit) -> unit)
val f : (IObserver<'T> -> unit)
module Seq

from Microsoft.FSharp.Collections
val iter : action:('T -> unit) -> source:seq<'T> -> unit

Full name: Microsoft.FSharp.Collections.Seq.iter
val onCompleted : (unit -> unit)
val not : value:bool -> bool

Full name: Microsoft.FSharp.Core.Operators.not
val observer : IObserver<'T>
IObserver.OnCompleted() : unit
val onError : (exn -> unit -> unit)
val ex : exn
IObserver.OnError(error: exn) : unit
val next : ('T -> unit -> unit)
val value : 'T
val min : e1:'T -> e2:'T -> 'T (requires comparison)

Full name: Microsoft.FSharp.Core.Operators.min
IObserver.OnNext(value: 'T) : unit
val add : (IObserver<'T> -> unit -> unit)
Collections.Generic.List.Add(item: IObserver<'T>) : unit
val start : int
val i : int
val remove : (IObserver<'T> -> unit -> unit)
Collections.Generic.List.Remove(item: IObserver<'T>) : bool
val ignore : value:'T -> unit

Full name: Microsoft.FSharp.Core.Operators.ignore
val sync : Object
type obj = Object

Full name: Microsoft.FSharp.Core.obj
val x : ReplaySubject<'T>
member ReplaySubject.Next : value:'T -> unit

Full name: Script.ReplaySubject`1.Next
val lock : lockObject:'Lock -> action:(unit -> 'T) -> 'T (requires reference type)

Full name: Microsoft.FSharp.Core.Operators.lock
member ReplaySubject.Error : ex:exn -> unit

Full name: Script.ReplaySubject`1.Error
member ReplaySubject.Completed : unit -> unit

Full name: Script.ReplaySubject`1.Completed
override ReplaySubject.OnCompleted : unit -> unit

Full name: Script.ReplaySubject`1.OnCompleted
member ReplaySubject.Completed : unit -> unit
override ReplaySubject.OnError : ex:exn -> unit

Full name: Script.ReplaySubject`1.OnError
member ReplaySubject.Error : ex:exn -> unit
override ReplaySubject.OnNext : value:'T -> unit

Full name: Script.ReplaySubject`1.OnNext
member ReplaySubject.Next : value:'T -> unit
type IObservable<'T> =
  member Subscribe : observer:IObserver<'T> -> IDisposable

Full name: System.IObservable<_>
val this : ReplaySubject<'T>
override ReplaySubject.Subscribe : observer:IObserver<'T> -> IDisposable

Full name: Script.ReplaySubject`1.Subscribe
type IDisposable =
  member Dispose : unit -> unit

Full name: System.IDisposable
val this : IDisposable
IDisposable.Dispose() : unit
Multiple items
type Subject<'T> =
  inherit ReplaySubject<'T>
  new : unit -> Subject<'T>

Full name: Script.Subject<_>

--------------------
new : unit -> Subject<'T>
val s : ReplaySubject<int>
val d : IDisposable
member IObservable.Subscribe : callback:('T -> unit) -> IDisposable
val x : int
val sprintf : format:Printf.StringFormat<'T> -> 'T

Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.sprintf
type Console =
  static member BackgroundColor : ConsoleColor with get, set
  static member Beep : unit -> unit + 1 overload
  static member BufferHeight : int with get, set
  static member BufferWidth : int with get, set
  static member CapsLock : bool
  static member Clear : unit -> unit
  static member CursorLeft : int with get, set
  static member CursorSize : int with get, set
  static member CursorTop : int with get, set
  static member CursorVisible : bool with get, set
  ...

Full name: System.Console
Console.WriteLine() : unit
   (+0 other overloads)
Console.WriteLine(value: string) : unit
   (+0 other overloads)
Console.WriteLine(value: obj) : unit
   (+0 other overloads)
Console.WriteLine(value: uint64) : unit
   (+0 other overloads)
Console.WriteLine(value: int64) : unit
   (+0 other overloads)
Console.WriteLine(value: uint32) : unit
   (+0 other overloads)
Console.WriteLine(value: int) : unit
   (+0 other overloads)
Console.WriteLine(value: float32) : unit
   (+0 other overloads)
Console.WriteLine(value: float) : unit
   (+0 other overloads)
Console.WriteLine(value: decimal) : unit
   (+0 other overloads)
val d' : IDisposable
Next Version Raw view Test code New version

More information

Link:http://fssnip.net/5p
Posted:13 years ago
Author:Phillip Trelford
Tags: observable