3 people like it.

ReplaySubject

The ReplaySubject type implements both IObserver and IObservable. It is functionally equivalent to the class of the same name in the Reactive Extensions (Rx) library with a replay buffer of a specified size .

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
14: 
15: 
16: 
17: 
18: 
19: 
20: 
21: 
22: 
23: 
24: 
25: 
26: 
27: 
28: 
29: 
30: 
31: 
32: 
33: 
34: 
35: 
36: 
37: 
38: 
39: 
40: 
41: 
42: 
43: 
44: 
45: 
46: 
47: 
48: 
49: 
50: 
51: 
52: 
53: 
54: 
55: 
56: 
57: 
58: 
59: 
60: 
61: 
62: 
63: 
64: 
65: 
66: 
67: 
68: 
69: 
70: 
71: 
72: 
73: 
74: 
75: 
76: 
77: 
78: 
79: 
80: 
81: 
open System
open System.Collections.Generic

type CircularBuffer<'T> (bufferSize:int) =
    let buffer = Array.zeroCreate<'T> bufferSize
    let mutable index = 0
    let mutable total = 0
    member this.Add value =
        if bufferSize > 0 then
            buffer.[index] <- value
            index <- (index + 1) % bufferSize
            total <- min (total + 1) bufferSize
    member this.Iter f =     
        let start = if total = bufferSize then index else 0
        for i = 0 to total - 1 do 
            buffer.[(start + i) % bufferSize] |> f                 

type message<'T> =
    | Add of IObserver<'T>
    | Remove of IObserver<'T>
    | Next of 'T
    | Completed
    | Error of exn

let startAgent (bufferSize:int) =
    let subscribers = LinkedList<_>()
    let buffer = CircularBuffer bufferSize               
    MailboxProcessor.Start(fun inbox ->
        let rec loop () = async {
            let! message = inbox.Receive()
            match message with
            | Add observer ->                    
                subscribers.AddLast observer |> ignore
                buffer.Iter observer.OnNext
                return! loop ()
            | Remove observer ->
                subscribers.Remove observer |> ignore
                return! loop ()
            | Next value ->                                       
                for subscriber in subscribers do
                    subscriber.OnNext value
                buffer.Add value
                return! loop () 
            | Error e ->
                for subscriber in subscribers do
                    subscriber.OnError e
            | Completed ->
                for subscriber in subscribers do
                    subscriber.OnCompleted ()
        }
        loop ()
    )

type ReplaySubject<'T> (bufferSize:int) =
    let bufferSize = max 0 bufferSize
    let agent = startAgent bufferSize    
    let subscribe observer =
        observer |> Add |> agent.Post
        { new System.IDisposable with
            member this.Dispose () =
                observer |> Remove |> agent.Post
        }
    member this.Next value = Next value |> agent.Post
    member this.Error error = Error error |> agent.Post
    member this.Completed () = Completed |> agent.Post    
    interface System.IObserver<'T> with
        member this.OnNext value = Next value |> agent.Post
        member this.OnError error = Error error |> agent.Post
        member this.OnCompleted () = Completed |> agent.Post
    member this.Subscribe(observer:System.IObserver<'T>) =
        subscribe observer
    interface System.IObservable<'T> with
        member this.Subscribe observer = subscribe observer                   
and Subject<'T>() = inherit ReplaySubject<'T>(0)

do  let subject = ReplaySubject(3)            
    use d = subject.Subscribe(fun (x:int) -> System.Console.WriteLine x)
    subject.Next(10)
    subject.Next(11)
    use d = subject.Subscribe(fun (x:int) -> System.Console.WriteLine x)
    System.Console.ReadLine() |> ignore
namespace System
namespace System.Collections
namespace System.Collections.Generic
Multiple items
type CircularBuffer<'T> =
  new : bufferSize:int -> CircularBuffer<'T>
  member Add : value:'T -> unit
  member Iter : f:('T -> unit) -> unit

Full name: Script.CircularBuffer<_>

--------------------
new : bufferSize:int -> CircularBuffer<'T>
val bufferSize : int
Multiple items
val int : value:'T -> int (requires member op_Explicit)

Full name: Microsoft.FSharp.Core.Operators.int

--------------------
type int = int32

Full name: Microsoft.FSharp.Core.int

--------------------
type int<'Measure> = int

Full name: Microsoft.FSharp.Core.int<_>
val buffer : 'T []
type Array =
  member Clone : unit -> obj
  member CopyTo : array:Array * index:int -> unit + 1 overload
  member GetEnumerator : unit -> IEnumerator
  member GetLength : dimension:int -> int
  member GetLongLength : dimension:int -> int64
  member GetLowerBound : dimension:int -> int
  member GetUpperBound : dimension:int -> int
  member GetValue : [<ParamArray>] indices:int[] -> obj + 7 overloads
  member Initialize : unit -> unit
  member IsFixedSize : bool
  ...

Full name: System.Array
val zeroCreate : count:int -> 'T []

Full name: Microsoft.FSharp.Collections.Array.zeroCreate
val mutable index : int
val mutable total : int
val this : CircularBuffer<'T>
member CircularBuffer.Add : value:'T -> unit

Full name: Script.CircularBuffer`1.Add
val value : 'T
val min : e1:'T -> e2:'T -> 'T (requires comparison)

Full name: Microsoft.FSharp.Core.Operators.min
member CircularBuffer.Iter : f:('T -> unit) -> unit

Full name: Script.CircularBuffer`1.Iter
val f : ('T -> unit)
val start : int
val i : int
type message<'T> =
  | Add of IObserver<'T>
  | Remove of IObserver<'T>
  | Next of 'T
  | Completed
  | Error of exn

Full name: Script.message<_>
union case message.Add: IObserver<'T> -> message<'T>
type IObserver<'T> =
  member OnCompleted : unit -> unit
  member OnError : error:Exception -> unit
  member OnNext : value:'T -> unit

Full name: System.IObserver<_>
union case message.Remove: IObserver<'T> -> message<'T>
union case message.Next: 'T -> message<'T>
union case message.Completed: message<'T>
union case message.Error: exn -> message<'T>
type exn = Exception

Full name: Microsoft.FSharp.Core.exn
val startAgent : bufferSize:int -> MailboxProcessor<message<'a>>

Full name: Script.startAgent
val subscribers : LinkedList<IObserver<'a>>
Multiple items
type LinkedList<'T> =
  new : unit -> LinkedList<'T> + 1 overload
  member AddAfter : node:LinkedListNode<'T> * value:'T -> LinkedListNode<'T> + 1 overload
  member AddBefore : node:LinkedListNode<'T> * value:'T -> LinkedListNode<'T> + 1 overload
  member AddFirst : value:'T -> LinkedListNode<'T> + 1 overload
  member AddLast : value:'T -> LinkedListNode<'T> + 1 overload
  member Clear : unit -> unit
  member Contains : value:'T -> bool
  member CopyTo : array:'T[] * index:int -> unit
  member Count : int
  member Find : value:'T -> LinkedListNode<'T>
  ...
  nested type Enumerator

Full name: System.Collections.Generic.LinkedList<_>

--------------------
LinkedList() : unit
LinkedList(collection: IEnumerable<'T>) : unit
val buffer : CircularBuffer<'a>
Multiple items
type MailboxProcessor<'Msg> =
  interface IDisposable
  new : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:CancellationToken -> MailboxProcessor<'Msg>
  member Post : message:'Msg -> unit
  member PostAndAsyncReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> Async<'Reply>
  member PostAndReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> 'Reply
  member PostAndTryAsyncReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> Async<'Reply option>
  member Receive : ?timeout:int -> Async<'Msg>
  member Scan : scanner:('Msg -> Async<'T> option) * ?timeout:int -> Async<'T>
  member Start : unit -> unit
  member TryPostAndReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> 'Reply option
  ...

Full name: Microsoft.FSharp.Control.MailboxProcessor<_>

--------------------
new : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:Threading.CancellationToken -> MailboxProcessor<'Msg>
static member MailboxProcessor.Start : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:Threading.CancellationToken -> MailboxProcessor<'Msg>
val inbox : MailboxProcessor<message<'a>>
val loop : (unit -> Async<unit>)
val async : AsyncBuilder

Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.async
Multiple items
val message : message<'a>

--------------------
type message<'T> =
  | Add of IObserver<'T>
  | Remove of IObserver<'T>
  | Next of 'T
  | Completed
  | Error of exn

Full name: Script.message<_>
member MailboxProcessor.Receive : ?timeout:int -> Async<'Msg>
val observer : IObserver<'a>
LinkedList.AddLast(node: LinkedListNode<IObserver<'a>>) : unit
LinkedList.AddLast(value: IObserver<'a>) : LinkedListNode<IObserver<'a>>
val ignore : value:'T -> unit

Full name: Microsoft.FSharp.Core.Operators.ignore
member CircularBuffer.Iter : f:('T -> unit) -> unit
IObserver.OnNext(value: 'a) : unit
LinkedList.Remove(node: LinkedListNode<IObserver<'a>>) : unit
LinkedList.Remove(value: IObserver<'a>) : bool
val value : 'a
val subscriber : IObserver<'a>
member CircularBuffer.Add : value:'T -> unit
val e : exn
IObserver.OnError(error: exn) : unit
IObserver.OnCompleted() : unit
Multiple items
type ReplaySubject<'T> =
  interface IObservable<'T>
  interface IObserver<'T>
  new : bufferSize:int -> ReplaySubject<'T>
  member Completed : unit -> unit
  member Error : error:exn -> unit
  member Next : value:'T -> unit
  member Subscribe : observer:IObserver<'T> -> IDisposable

Full name: Script.ReplaySubject<_>

--------------------
new : bufferSize:int -> ReplaySubject<'T>
val max : e1:'T -> e2:'T -> 'T (requires comparison)

Full name: Microsoft.FSharp.Core.Operators.max
val agent : MailboxProcessor<message<'T>>
val subscribe : (IObserver<'T> -> IDisposable)
val observer : IObserver<'T>
member MailboxProcessor.Post : message:'Msg -> unit
type IDisposable =
  member Dispose : unit -> unit

Full name: System.IDisposable
val this : IDisposable
IDisposable.Dispose() : unit
val this : ReplaySubject<'T>
member ReplaySubject.Next : value:'T -> unit

Full name: Script.ReplaySubject`1.Next
member ReplaySubject.Error : error:exn -> unit

Full name: Script.ReplaySubject`1.Error
val error : exn
member ReplaySubject.Completed : unit -> unit

Full name: Script.ReplaySubject`1.Completed
override ReplaySubject.OnNext : value:'T -> unit

Full name: Script.ReplaySubject`1.OnNext
override ReplaySubject.OnError : error:exn -> unit

Full name: Script.ReplaySubject`1.OnError
override ReplaySubject.OnCompleted : unit -> unit

Full name: Script.ReplaySubject`1.OnCompleted
member ReplaySubject.Subscribe : observer:IObserver<'T> -> IDisposable

Full name: Script.ReplaySubject`1.Subscribe
type IObservable<'T> =
  member Subscribe : observer:IObserver<'T> -> IDisposable

Full name: System.IObservable<_>
override ReplaySubject.Subscribe : observer:IObserver<'T> -> IDisposable

Full name: Script.ReplaySubject`1.Subscribe
Multiple items
type Subject<'T> =
  inherit ReplaySubject<'T>
  new : unit -> Subject<'T>

Full name: Script.Subject<_>

--------------------
new : unit -> Subject<'T>
val subject : ReplaySubject<int>
val d : IDisposable
member IObservable.Subscribe : callback:('T -> unit) -> IDisposable
member ReplaySubject.Subscribe : observer:IObserver<'T> -> IDisposable
val x : int
type Console =
  static member BackgroundColor : ConsoleColor with get, set
  static member Beep : unit -> unit + 1 overload
  static member BufferHeight : int with get, set
  static member BufferWidth : int with get, set
  static member CapsLock : bool
  static member Clear : unit -> unit
  static member CursorLeft : int with get, set
  static member CursorSize : int with get, set
  static member CursorTop : int with get, set
  static member CursorVisible : bool with get, set
  ...

Full name: System.Console
Console.WriteLine() : unit
   (+0 other overloads)
Console.WriteLine(value: string) : unit
   (+0 other overloads)
Console.WriteLine(value: obj) : unit
   (+0 other overloads)
Console.WriteLine(value: uint64) : unit
   (+0 other overloads)
Console.WriteLine(value: int64) : unit
   (+0 other overloads)
Console.WriteLine(value: uint32) : unit
   (+0 other overloads)
Console.WriteLine(value: int) : unit
   (+0 other overloads)
Console.WriteLine(value: float32) : unit
   (+0 other overloads)
Console.WriteLine(value: float) : unit
   (+0 other overloads)
Console.WriteLine(value: decimal) : unit
   (+0 other overloads)
member ReplaySubject.Next : value:'T -> unit
Console.ReadLine() : string

More information

Link:http://fssnip.net/5p
Posted:12 years ago
Author:Phillip Trelford
Tags: observable