3 people like it.
Like the snippet!
ReplaySubject
The ReplaySubject type implements both IObserver and IObservable. It is functionally equivalent to the class of the same name in the Reactive Extensions (Rx) library with a replay buffer of a specified size .
1:
2:
3:
4:
5:
6:
7:
8:
9:
10:
11:
12:
13:
14:
15:
16:
17:
18:
19:
20:
21:
22:
23:
24:
25:
26:
27:
28:
29:
30:
31:
32:
33:
34:
35:
36:
37:
38:
39:
40:
41:
42:
43:
44:
45:
46:
47:
48:
49:
50:
51:
52:
53:
54:
55:
56:
57:
58:
59:
60:
61:
62:
63:
64:
65:
66:
67:
68:
69:
70:
71:
72:
73:
74:
75:
76:
77:
78:
79:
80:
81:
|
open System
open System.Collections.Generic
type CircularBuffer<'T> (bufferSize:int) =
let buffer = Array.zeroCreate<'T> bufferSize
let mutable index = 0
let mutable total = 0
member this.Add value =
if bufferSize > 0 then
buffer.[index] <- value
index <- (index + 1) % bufferSize
total <- min (total + 1) bufferSize
member this.Iter f =
let start = if total = bufferSize then index else 0
for i = 0 to total - 1 do
buffer.[(start + i) % bufferSize] |> f
type message<'T> =
| Add of IObserver<'T>
| Remove of IObserver<'T>
| Next of 'T
| Completed
| Error of exn
let startAgent (bufferSize:int) =
let subscribers = LinkedList<_>()
let buffer = CircularBuffer bufferSize
MailboxProcessor.Start(fun inbox ->
let rec loop () = async {
let! message = inbox.Receive()
match message with
| Add observer ->
subscribers.AddLast observer |> ignore
buffer.Iter observer.OnNext
return! loop ()
| Remove observer ->
subscribers.Remove observer |> ignore
return! loop ()
| Next value ->
for subscriber in subscribers do
subscriber.OnNext value
buffer.Add value
return! loop ()
| Error e ->
for subscriber in subscribers do
subscriber.OnError e
| Completed ->
for subscriber in subscribers do
subscriber.OnCompleted ()
}
loop ()
)
type ReplaySubject<'T> (bufferSize:int) =
let bufferSize = max 0 bufferSize
let agent = startAgent bufferSize
let subscribe observer =
observer |> Add |> agent.Post
{ new System.IDisposable with
member this.Dispose () =
observer |> Remove |> agent.Post
}
member this.Next value = Next value |> agent.Post
member this.Error error = Error error |> agent.Post
member this.Completed () = Completed |> agent.Post
interface System.IObserver<'T> with
member this.OnNext value = Next value |> agent.Post
member this.OnError error = Error error |> agent.Post
member this.OnCompleted () = Completed |> agent.Post
member this.Subscribe(observer:System.IObserver<'T>) =
subscribe observer
interface System.IObservable<'T> with
member this.Subscribe observer = subscribe observer
and Subject<'T>() = inherit ReplaySubject<'T>(0)
do let subject = ReplaySubject(3)
use d = subject.Subscribe(fun (x:int) -> System.Console.WriteLine x)
subject.Next(10)
subject.Next(11)
use d = subject.Subscribe(fun (x:int) -> System.Console.WriteLine x)
System.Console.ReadLine() |> ignore
|
namespace System
namespace System.Collections
namespace System.Collections.Generic
Multiple items
type CircularBuffer<'T> =
new : bufferSize:int -> CircularBuffer<'T>
member Add : value:'T -> unit
member Iter : f:('T -> unit) -> unit
Full name: Script.CircularBuffer<_>
--------------------
new : bufferSize:int -> CircularBuffer<'T>
val bufferSize : int
Multiple items
val int : value:'T -> int (requires member op_Explicit)
Full name: Microsoft.FSharp.Core.Operators.int
--------------------
type int = int32
Full name: Microsoft.FSharp.Core.int
--------------------
type int<'Measure> = int
Full name: Microsoft.FSharp.Core.int<_>
val buffer : 'T []
type Array =
member Clone : unit -> obj
member CopyTo : array:Array * index:int -> unit + 1 overload
member GetEnumerator : unit -> IEnumerator
member GetLength : dimension:int -> int
member GetLongLength : dimension:int -> int64
member GetLowerBound : dimension:int -> int
member GetUpperBound : dimension:int -> int
member GetValue : [<ParamArray>] indices:int[] -> obj + 7 overloads
member Initialize : unit -> unit
member IsFixedSize : bool
...
Full name: System.Array
val zeroCreate : count:int -> 'T []
Full name: Microsoft.FSharp.Collections.Array.zeroCreate
val mutable index : int
val mutable total : int
val this : CircularBuffer<'T>
member CircularBuffer.Add : value:'T -> unit
Full name: Script.CircularBuffer`1.Add
val value : 'T
val min : e1:'T -> e2:'T -> 'T (requires comparison)
Full name: Microsoft.FSharp.Core.Operators.min
member CircularBuffer.Iter : f:('T -> unit) -> unit
Full name: Script.CircularBuffer`1.Iter
val f : ('T -> unit)
val start : int
val i : int
type message<'T> =
| Add of IObserver<'T>
| Remove of IObserver<'T>
| Next of 'T
| Completed
| Error of exn
Full name: Script.message<_>
union case message.Add: IObserver<'T> -> message<'T>
type IObserver<'T> =
member OnCompleted : unit -> unit
member OnError : error:Exception -> unit
member OnNext : value:'T -> unit
Full name: System.IObserver<_>
union case message.Remove: IObserver<'T> -> message<'T>
union case message.Next: 'T -> message<'T>
union case message.Completed: message<'T>
union case message.Error: exn -> message<'T>
type exn = Exception
Full name: Microsoft.FSharp.Core.exn
val startAgent : bufferSize:int -> MailboxProcessor<message<'a>>
Full name: Script.startAgent
val subscribers : LinkedList<IObserver<'a>>
Multiple items
type LinkedList<'T> =
new : unit -> LinkedList<'T> + 1 overload
member AddAfter : node:LinkedListNode<'T> * value:'T -> LinkedListNode<'T> + 1 overload
member AddBefore : node:LinkedListNode<'T> * value:'T -> LinkedListNode<'T> + 1 overload
member AddFirst : value:'T -> LinkedListNode<'T> + 1 overload
member AddLast : value:'T -> LinkedListNode<'T> + 1 overload
member Clear : unit -> unit
member Contains : value:'T -> bool
member CopyTo : array:'T[] * index:int -> unit
member Count : int
member Find : value:'T -> LinkedListNode<'T>
...
nested type Enumerator
Full name: System.Collections.Generic.LinkedList<_>
--------------------
LinkedList() : unit
LinkedList(collection: IEnumerable<'T>) : unit
val buffer : CircularBuffer<'a>
Multiple items
type MailboxProcessor<'Msg> =
interface IDisposable
new : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:CancellationToken -> MailboxProcessor<'Msg>
member Post : message:'Msg -> unit
member PostAndAsyncReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> Async<'Reply>
member PostAndReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> 'Reply
member PostAndTryAsyncReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> Async<'Reply option>
member Receive : ?timeout:int -> Async<'Msg>
member Scan : scanner:('Msg -> Async<'T> option) * ?timeout:int -> Async<'T>
member Start : unit -> unit
member TryPostAndReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> 'Reply option
...
Full name: Microsoft.FSharp.Control.MailboxProcessor<_>
--------------------
new : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:Threading.CancellationToken -> MailboxProcessor<'Msg>
static member MailboxProcessor.Start : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:Threading.CancellationToken -> MailboxProcessor<'Msg>
val inbox : MailboxProcessor<message<'a>>
val loop : (unit -> Async<unit>)
val async : AsyncBuilder
Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.async
Multiple items
val message : message<'a>
--------------------
type message<'T> =
| Add of IObserver<'T>
| Remove of IObserver<'T>
| Next of 'T
| Completed
| Error of exn
Full name: Script.message<_>
member MailboxProcessor.Receive : ?timeout:int -> Async<'Msg>
val observer : IObserver<'a>
LinkedList.AddLast(node: LinkedListNode<IObserver<'a>>) : unit
LinkedList.AddLast(value: IObserver<'a>) : LinkedListNode<IObserver<'a>>
val ignore : value:'T -> unit
Full name: Microsoft.FSharp.Core.Operators.ignore
member CircularBuffer.Iter : f:('T -> unit) -> unit
IObserver.OnNext(value: 'a) : unit
LinkedList.Remove(node: LinkedListNode<IObserver<'a>>) : unit
LinkedList.Remove(value: IObserver<'a>) : bool
val value : 'a
val subscriber : IObserver<'a>
member CircularBuffer.Add : value:'T -> unit
val e : exn
IObserver.OnError(error: exn) : unit
IObserver.OnCompleted() : unit
Multiple items
type ReplaySubject<'T> =
interface IObservable<'T>
interface IObserver<'T>
new : bufferSize:int -> ReplaySubject<'T>
member Completed : unit -> unit
member Error : error:exn -> unit
member Next : value:'T -> unit
member Subscribe : observer:IObserver<'T> -> IDisposable
Full name: Script.ReplaySubject<_>
--------------------
new : bufferSize:int -> ReplaySubject<'T>
val max : e1:'T -> e2:'T -> 'T (requires comparison)
Full name: Microsoft.FSharp.Core.Operators.max
val agent : MailboxProcessor<message<'T>>
val subscribe : (IObserver<'T> -> IDisposable)
val observer : IObserver<'T>
member MailboxProcessor.Post : message:'Msg -> unit
type IDisposable =
member Dispose : unit -> unit
Full name: System.IDisposable
val this : IDisposable
IDisposable.Dispose() : unit
val this : ReplaySubject<'T>
member ReplaySubject.Next : value:'T -> unit
Full name: Script.ReplaySubject`1.Next
member ReplaySubject.Error : error:exn -> unit
Full name: Script.ReplaySubject`1.Error
val error : exn
member ReplaySubject.Completed : unit -> unit
Full name: Script.ReplaySubject`1.Completed
override ReplaySubject.OnNext : value:'T -> unit
Full name: Script.ReplaySubject`1.OnNext
override ReplaySubject.OnError : error:exn -> unit
Full name: Script.ReplaySubject`1.OnError
override ReplaySubject.OnCompleted : unit -> unit
Full name: Script.ReplaySubject`1.OnCompleted
member ReplaySubject.Subscribe : observer:IObserver<'T> -> IDisposable
Full name: Script.ReplaySubject`1.Subscribe
type IObservable<'T> =
member Subscribe : observer:IObserver<'T> -> IDisposable
Full name: System.IObservable<_>
override ReplaySubject.Subscribe : observer:IObserver<'T> -> IDisposable
Full name: Script.ReplaySubject`1.Subscribe
Multiple items
type Subject<'T> =
inherit ReplaySubject<'T>
new : unit -> Subject<'T>
Full name: Script.Subject<_>
--------------------
new : unit -> Subject<'T>
val subject : ReplaySubject<int>
val d : IDisposable
member IObservable.Subscribe : callback:('T -> unit) -> IDisposable
member ReplaySubject.Subscribe : observer:IObserver<'T> -> IDisposable
val x : int
type Console =
static member BackgroundColor : ConsoleColor with get, set
static member Beep : unit -> unit + 1 overload
static member BufferHeight : int with get, set
static member BufferWidth : int with get, set
static member CapsLock : bool
static member Clear : unit -> unit
static member CursorLeft : int with get, set
static member CursorSize : int with get, set
static member CursorTop : int with get, set
static member CursorVisible : bool with get, set
...
Full name: System.Console
Console.WriteLine() : unit
(+0 other overloads)
Console.WriteLine(value: string) : unit
(+0 other overloads)
Console.WriteLine(value: obj) : unit
(+0 other overloads)
Console.WriteLine(value: uint64) : unit
(+0 other overloads)
Console.WriteLine(value: int64) : unit
(+0 other overloads)
Console.WriteLine(value: uint32) : unit
(+0 other overloads)
Console.WriteLine(value: int) : unit
(+0 other overloads)
Console.WriteLine(value: float32) : unit
(+0 other overloads)
Console.WriteLine(value: float) : unit
(+0 other overloads)
Console.WriteLine(value: decimal) : unit
(+0 other overloads)
member ReplaySubject.Next : value:'T -> unit
Console.ReadLine() : string
More information