6 people like it.
Like the snippet!
Observable Async Subject
Simple Async Observable Subject<'T> based on MailboxProcessor. Type declaration is more ML like, but the idea is represented in a simple way!
1:
2:
3:
4:
5:
6:
7:
8:
9:
10:
11:
12:
13:
14:
15:
16:
17:
18:
19:
20:
21:
22:
23:
24:
25:
26:
27:
28:
29:
30:
31:
32:
33:
34:
35:
36:
37:
38:
39:
40:
41:
42:
43:
44:
45:
46:
47:
48:
49:
50:
51:
52:
53:
54:
55:
56:
57:
58:
59:
60:
61:
62:
63:
64:
65:
66:
67:
68:
69:
70:
71:
72:
73:
74:
75:
76:
77:
78:
79:
80:
81:
82:
83:
84:
85:
|
module Observable =
open System
open System.Collections.Generic
module Subject =
/// Subject state maintained inside of the mailbox loop
module State =
type t<'T> = {
observers : IObserver<'T> list
stopped : bool
}
let empty() = {observers=[]; stopped=false}
/// Messages required for the mailbox loop
module Message =
type t<'T> =
| Add of IObserver<'T>
| Remove of IObserver<'T>
| Next of 'T
| Error of exn
| Completed
/// Type t that implements IObservable<'T> and IObserver<'T>
type t<'T>() =
let error() = raise(new System.InvalidOperationException("Subject already completed"))
let mbox = MailboxProcessor<Message.t<'T>>.Start(fun inbox ->
let rec loop(t:State.t<'T>) = async {
let! req = inbox.Receive()
match req with
| Message.Add(observer) ->
if not(t.stopped) then
return! loop ({t with observers = t.observers @ [observer]})
else error()
| Message.Remove(observer) ->
if not(t.stopped) then
let t = {t with observers = t.observers |> List.filter(fun f -> f <> observer)}
return! loop t
else error()
| Message.Next(value) ->
if not(t.stopped) then
t.observers |> List.iter(fun o -> o.OnNext(value))
return! loop t
else error()
| Message.Error(err) ->
if not(t.stopped) then
t.observers |> List.iter(fun o -> o.OnError(err))
return! loop t
else error()
| Message.Completed ->
if not(t.stopped) then
t.observers |> List.iter(fun o -> o.OnCompleted())
let t = {t with stopped = true}
return! loop t
else error()
}
loop (State.empty())
)
/// Raises OnNext in all the observers
member x.Next value = Message.Next(value) |> mbox.Post
/// Raises OnError in all the observers
member x.Error ex = Message.Error(ex) |> mbox.Post
/// Raises OnCompleted in all the observers
member x.Completed() = Message.Completed |> mbox.Post
interface IObserver<'T> with
member x.OnNext value = x.Next(value)
member x.OnError ex = x.Error(ex)
member x.OnCompleted() = x.Completed()
interface IObservable<'T> with
member x.Subscribe(observer:IObserver<'T>) =
observer |> Message.Add |> mbox.Post
{ new IDisposable with
member x.Dispose() =
observer |> Message.Remove |> mbox.Post }
|
module Observable
from Microsoft.FSharp.Control
namespace System
namespace System.Collections
namespace System.Collections.Generic
module Subject
from Script.Observable
type t<'T> =
{observers: IObserver<'T> list;
stopped: bool;}
Full name: Script.Observable.Subject.State.t<_>
t.observers: IObserver<'T> list
type IObserver<'T> =
member OnCompleted : unit -> unit
member OnError : error:Exception -> unit
member OnNext : value:'T -> unit
Full name: System.IObserver<_>
type 'T list = List<'T>
Full name: Microsoft.FSharp.Collections.list<_>
t.stopped: bool
type bool = Boolean
Full name: Microsoft.FSharp.Core.bool
val empty : unit -> t<'a>
Full name: Script.Observable.Subject.State.empty
type t<'T> =
| Add of IObserver<'T>
| Remove of IObserver<'T>
| Next of 'T
| Error of exn
| Completed
Full name: Script.Observable.Subject.Message.t<_>
union case t.Add: IObserver<'T> -> t<'T>
union case t.Remove: IObserver<'T> -> t<'T>
union case t.Next: 'T -> t<'T>
union case t.Error: exn -> t<'T>
type exn = Exception
Full name: Microsoft.FSharp.Core.exn
union case t.Completed: t<'T>
type t<'T> =
interface IObservable<'T>
interface IObserver<'T>
new : unit -> t<'T>
member Completed : unit -> unit
member Error : ex:exn -> unit
member Next : value:'T -> unit
Full name: Script.Observable.Subject.t<_>
Type t that implements IObservable<'T> and IObserver<'T>
val error : (unit -> 'b)
val raise : exn:Exception -> 'T
Full name: Microsoft.FSharp.Core.Operators.raise
Multiple items
type InvalidOperationException =
inherit SystemException
new : unit -> InvalidOperationException + 2 overloads
Full name: System.InvalidOperationException
--------------------
InvalidOperationException() : unit
InvalidOperationException(message: string) : unit
InvalidOperationException(message: string, innerException: exn) : unit
val mbox : MailboxProcessor<Message.t<'T>>
Multiple items
type MailboxProcessor<'Msg> =
interface IDisposable
new : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:CancellationToken -> MailboxProcessor<'Msg>
member Post : message:'Msg -> unit
member PostAndAsyncReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> Async<'Reply>
member PostAndReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> 'Reply
member PostAndTryAsyncReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> Async<'Reply option>
member Receive : ?timeout:int -> Async<'Msg>
member Scan : scanner:('Msg -> Async<'T> option) * ?timeout:int -> Async<'T>
member Start : unit -> unit
member TryPostAndReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> 'Reply option
...
Full name: Microsoft.FSharp.Control.MailboxProcessor<_>
--------------------
new : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:Threading.CancellationToken -> MailboxProcessor<'Msg>
module Message
from Script.Observable.Subject
Messages required for the mailbox loop
val inbox : MailboxProcessor<Message.t<'T>>
val loop : (State.t<'T> -> Async<unit>)
val t : State.t<'T>
module State
from Script.Observable.Subject
Subject state maintained inside of the mailbox loop
val async : AsyncBuilder
Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.async
val req : Message.t<'T>
member MailboxProcessor.Receive : ?timeout:int -> Async<'Msg>
union case Message.t.Add: IObserver<'T> -> Message.t<'T>
val observer : IObserver<'T>
val not : value:bool -> bool
Full name: Microsoft.FSharp.Core.Operators.not
State.t.stopped: bool
State.t.observers: IObserver<'T> list
union case Message.t.Remove: IObserver<'T> -> Message.t<'T>
Multiple items
type List<'T> =
new : unit -> List<'T> + 2 overloads
member Add : item:'T -> unit
member AddRange : collection:IEnumerable<'T> -> unit
member AsReadOnly : unit -> ReadOnlyCollection<'T>
member BinarySearch : item:'T -> int + 2 overloads
member Capacity : int with get, set
member Clear : unit -> unit
member Contains : item:'T -> bool
member ConvertAll<'TOutput> : converter:Converter<'T, 'TOutput> -> List<'TOutput>
member CopyTo : array:'T[] -> unit + 2 overloads
...
nested type Enumerator
Full name: System.Collections.Generic.List<_>
--------------------
List() : unit
List(capacity: int) : unit
List(collection: IEnumerable<'T>) : unit
val filter : predicate:('T -> bool) -> list:'T list -> 'T list
Full name: Microsoft.FSharp.Collections.List.filter
val f : IObserver<'T>
union case Message.t.Next: 'T -> Message.t<'T>
val value : 'T
val iter : action:('T -> unit) -> list:'T list -> unit
Full name: Microsoft.FSharp.Collections.List.iter
val o : IObserver<'T>
IObserver.OnNext(value: 'T) : unit
union case Message.t.Error: exn -> Message.t<'T>
val err : exn
IObserver.OnError(error: exn) : unit
union case Message.t.Completed: Message.t<'T>
IObserver.OnCompleted() : unit
val empty : unit -> State.t<'a>
Full name: Script.Observable.Subject.State.empty
val x : t<'T>
member t.Next : value:'T -> unit
Full name: Script.Observable.Subject.t`1.Next
Raises OnNext in all the observers
member MailboxProcessor.Post : message:'Msg -> unit
member t.Error : ex:exn -> unit
Full name: Script.Observable.Subject.t`1.Error
Raises OnError in all the observers
val ex : exn
member t.Completed : unit -> unit
Full name: Script.Observable.Subject.t`1.Completed
Raises OnCompleted in all the observers
override t.OnNext : value:'T -> unit
Full name: Script.Observable.Subject.t`1.OnNext
member t.Next : value:'T -> unit
Raises OnNext in all the observers
override t.OnError : ex:exn -> unit
Full name: Script.Observable.Subject.t`1.OnError
member t.Error : ex:exn -> unit
Raises OnError in all the observers
override t.OnCompleted : unit -> unit
Full name: Script.Observable.Subject.t`1.OnCompleted
member t.Completed : unit -> unit
Raises OnCompleted in all the observers
type IObservable<'T> =
member Subscribe : observer:IObserver<'T> -> IDisposable
Full name: System.IObservable<_>
override t.Subscribe : observer:IObserver<'T> -> IDisposable
Full name: Script.Observable.Subject.t`1.Subscribe
type IDisposable =
member Dispose : unit -> unit
Full name: System.IDisposable
val x : IDisposable
IDisposable.Dispose() : unit
More information