6 people like it.

Observable Async Subject

Simple Async Observable Subject<'T> based on MailboxProcessor. Type declaration is more ML like, but the idea is represented in a simple way!

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
14: 
15: 
16: 
17: 
18: 
19: 
20: 
21: 
22: 
23: 
24: 
25: 
26: 
27: 
28: 
29: 
30: 
31: 
32: 
33: 
34: 
35: 
36: 
37: 
38: 
39: 
40: 
41: 
42: 
43: 
44: 
45: 
46: 
47: 
48: 
49: 
50: 
51: 
52: 
53: 
54: 
55: 
56: 
57: 
58: 
59: 
60: 
61: 
62: 
63: 
64: 
65: 
66: 
67: 
68: 
69: 
70: 
71: 
72: 
73: 
74: 
75: 
76: 
77: 
78: 
79: 
80: 
81: 
82: 
83: 
84: 
85: 
    module Observable =
        open System
        open System.Collections.Generic

        module Subject =
            /// Subject state maintained inside of the mailbox loop
            module State =
                type t<'T> = {
                    observers : IObserver<'T> list
                    stopped   : bool
                }

                let empty() = {observers=[]; stopped=false}

            /// Messages required for the mailbox loop
            module Message =
                type t<'T> =
                | Add       of IObserver<'T>
                | Remove    of IObserver<'T>
                | Next      of 'T
                | Error     of exn
                | Completed

            /// Type t that implements IObservable<'T> and IObserver<'T>
            type t<'T>() =

                let error() = raise(new System.InvalidOperationException("Subject already completed"))

                let mbox = MailboxProcessor<Message.t<'T>>.Start(fun inbox ->
                    let rec loop(t:State.t<'T>) = async {
                        let! req = inbox.Receive()

                        match req with
                        | Message.Add(observer) ->
                            if not(t.stopped) then
                                return! loop ({t with observers = t.observers @ [observer]})
                            else error()

                        | Message.Remove(observer) ->
                            if not(t.stopped) then
                                let t = {t with observers = t.observers |> List.filter(fun f -> f <> observer)}
                                return! loop t
                            else error()

                        | Message.Next(value) ->
                            if not(t.stopped) then
                                t.observers |> List.iter(fun o -> o.OnNext(value))
                                return! loop t
                            else error()

                        | Message.Error(err) ->
                            if not(t.stopped) then
                                t.observers |> List.iter(fun o -> o.OnError(err))
                                return! loop t
                            else error()

                        | Message.Completed ->
                            if not(t.stopped) then
                                t.observers |> List.iter(fun o -> o.OnCompleted())
                                let t = {t with stopped = true}
                                return! loop t
                            else error()
                    }

                    loop (State.empty())
                )

                /// Raises OnNext in all the observers
                member x.Next value  = Message.Next(value)  |> mbox.Post
                /// Raises OnError in all the observers
                member x.Error ex    = Message.Error(ex)    |> mbox.Post
                /// Raises OnCompleted in all the observers
                member x.Completed() = Message.Completed    |> mbox.Post

                interface IObserver<'T> with
                    member x.OnNext value   = x.Next(value)
                    member x.OnError ex     = x.Error(ex)
                    member x.OnCompleted()  = x.Completed()

                interface IObservable<'T> with
                    member x.Subscribe(observer:IObserver<'T>) =
                        observer |> Message.Add |> mbox.Post
                        { new IDisposable with
                            member x.Dispose() =
                                observer |> Message.Remove |> mbox.Post }
module Observable

from Microsoft.FSharp.Control
namespace System
namespace System.Collections
namespace System.Collections.Generic
module Subject

from Script.Observable
type t<'T> =
  {observers: IObserver<'T> list;
   stopped: bool;}

Full name: Script.Observable.Subject.State.t<_>
t.observers: IObserver<'T> list
type IObserver<'T> =
  member OnCompleted : unit -> unit
  member OnError : error:Exception -> unit
  member OnNext : value:'T -> unit

Full name: System.IObserver<_>
type 'T list = List<'T>

Full name: Microsoft.FSharp.Collections.list<_>
t.stopped: bool
type bool = Boolean

Full name: Microsoft.FSharp.Core.bool
val empty : unit -> t<'a>

Full name: Script.Observable.Subject.State.empty
type t<'T> =
  | Add of IObserver<'T>
  | Remove of IObserver<'T>
  | Next of 'T
  | Error of exn
  | Completed

Full name: Script.Observable.Subject.Message.t<_>
union case t.Add: IObserver<'T> -> t<'T>
union case t.Remove: IObserver<'T> -> t<'T>
union case t.Next: 'T -> t<'T>
union case t.Error: exn -> t<'T>
type exn = Exception

Full name: Microsoft.FSharp.Core.exn
union case t.Completed: t<'T>
type t<'T> =
  interface IObservable<'T>
  interface IObserver<'T>
  new : unit -> t<'T>
  member Completed : unit -> unit
  member Error : ex:exn -> unit
  member Next : value:'T -> unit

Full name: Script.Observable.Subject.t<_>


 Type t that implements IObservable<'T> and IObserver<'T>
val error : (unit -> 'b)
val raise : exn:Exception -> 'T

Full name: Microsoft.FSharp.Core.Operators.raise
Multiple items
type InvalidOperationException =
  inherit SystemException
  new : unit -> InvalidOperationException + 2 overloads

Full name: System.InvalidOperationException

--------------------
InvalidOperationException() : unit
InvalidOperationException(message: string) : unit
InvalidOperationException(message: string, innerException: exn) : unit
val mbox : MailboxProcessor<Message.t<'T>>
Multiple items
type MailboxProcessor<'Msg> =
  interface IDisposable
  new : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:CancellationToken -> MailboxProcessor<'Msg>
  member Post : message:'Msg -> unit
  member PostAndAsyncReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> Async<'Reply>
  member PostAndReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> 'Reply
  member PostAndTryAsyncReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> Async<'Reply option>
  member Receive : ?timeout:int -> Async<'Msg>
  member Scan : scanner:('Msg -> Async<'T> option) * ?timeout:int -> Async<'T>
  member Start : unit -> unit
  member TryPostAndReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> 'Reply option
  ...

Full name: Microsoft.FSharp.Control.MailboxProcessor<_>

--------------------
new : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:Threading.CancellationToken -> MailboxProcessor<'Msg>
module Message

from Script.Observable.Subject


 Messages required for the mailbox loop
val inbox : MailboxProcessor<Message.t<'T>>
val loop : (State.t<'T> -> Async<unit>)
val t : State.t<'T>
module State

from Script.Observable.Subject


 Subject state maintained inside of the mailbox loop
val async : AsyncBuilder

Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.async
val req : Message.t<'T>
member MailboxProcessor.Receive : ?timeout:int -> Async<'Msg>
union case Message.t.Add: IObserver<'T> -> Message.t<'T>
val observer : IObserver<'T>
val not : value:bool -> bool

Full name: Microsoft.FSharp.Core.Operators.not
State.t.stopped: bool
State.t.observers: IObserver<'T> list
union case Message.t.Remove: IObserver<'T> -> Message.t<'T>
Multiple items
type List<'T> =
  new : unit -> List<'T> + 2 overloads
  member Add : item:'T -> unit
  member AddRange : collection:IEnumerable<'T> -> unit
  member AsReadOnly : unit -> ReadOnlyCollection<'T>
  member BinarySearch : item:'T -> int + 2 overloads
  member Capacity : int with get, set
  member Clear : unit -> unit
  member Contains : item:'T -> bool
  member ConvertAll<'TOutput> : converter:Converter<'T, 'TOutput> -> List<'TOutput>
  member CopyTo : array:'T[] -> unit + 2 overloads
  ...
  nested type Enumerator

Full name: System.Collections.Generic.List<_>

--------------------
List() : unit
List(capacity: int) : unit
List(collection: IEnumerable<'T>) : unit
val filter : predicate:('T -> bool) -> list:'T list -> 'T list

Full name: Microsoft.FSharp.Collections.List.filter
val f : IObserver<'T>
union case Message.t.Next: 'T -> Message.t<'T>
val value : 'T
val iter : action:('T -> unit) -> list:'T list -> unit

Full name: Microsoft.FSharp.Collections.List.iter
val o : IObserver<'T>
IObserver.OnNext(value: 'T) : unit
union case Message.t.Error: exn -> Message.t<'T>
val err : exn
IObserver.OnError(error: exn) : unit
union case Message.t.Completed: Message.t<'T>
IObserver.OnCompleted() : unit
val empty : unit -> State.t<'a>

Full name: Script.Observable.Subject.State.empty
val x : t<'T>
member t.Next : value:'T -> unit

Full name: Script.Observable.Subject.t`1.Next


 Raises OnNext in all the observers
member MailboxProcessor.Post : message:'Msg -> unit
member t.Error : ex:exn -> unit

Full name: Script.Observable.Subject.t`1.Error


 Raises OnError in all the observers
val ex : exn
member t.Completed : unit -> unit

Full name: Script.Observable.Subject.t`1.Completed


 Raises OnCompleted in all the observers
override t.OnNext : value:'T -> unit

Full name: Script.Observable.Subject.t`1.OnNext
member t.Next : value:'T -> unit


 Raises OnNext in all the observers
override t.OnError : ex:exn -> unit

Full name: Script.Observable.Subject.t`1.OnError
member t.Error : ex:exn -> unit


 Raises OnError in all the observers
override t.OnCompleted : unit -> unit

Full name: Script.Observable.Subject.t`1.OnCompleted
member t.Completed : unit -> unit


 Raises OnCompleted in all the observers
type IObservable<'T> =
  member Subscribe : observer:IObserver<'T> -> IDisposable

Full name: System.IObservable<_>
override t.Subscribe : observer:IObserver<'T> -> IDisposable

Full name: Script.Observable.Subject.t`1.Subscribe
type IDisposable =
  member Dispose : unit -> unit

Full name: System.IDisposable
val x : IDisposable
IDisposable.Dispose() : unit
Raw view Test code New version

More information

Link:http://fssnip.net/9F
Posted:12 years ago
Author:Fahad
Tags: observable , rx