module Observable = open System open System.Collections.Generic module Subject = /// Subject state maintained inside of the mailbox loop module State = type t<'T> = { observers : IObserver<'T> list stopped : bool } let empty() = {observers=[]; stopped=false} /// Messages required for the mailbox loop module Message = type t<'T> = | Add of IObserver<'T> | Remove of IObserver<'T> | Next of 'T | Error of exn | Completed /// Type t that implements IObservable<'T> and IObserver<'T> type t<'T>() = let error() = raise(new System.InvalidOperationException("Subject already completed")) let mbox = MailboxProcessor>.Start(fun inbox -> let rec loop(t:State.t<'T>) = async { let! req = inbox.Receive() match req with | Message.Add(observer) -> if not(t.stopped) then return! loop ({t with observers = t.observers @ [observer]}) else error() | Message.Remove(observer) -> if not(t.stopped) then let t = {t with observers = t.observers |> List.filter(fun f -> f <> observer)} return! loop t else error() | Message.Next(value) -> if not(t.stopped) then t.observers |> List.iter(fun o -> o.OnNext(value)) return! loop t else error() | Message.Error(err) -> if not(t.stopped) then t.observers |> List.iter(fun o -> o.OnError(err)) return! loop t else error() | Message.Completed -> if not(t.stopped) then t.observers |> List.iter(fun o -> o.OnCompleted()) let t = {t with stopped = true} return! loop t else error() } loop (State.empty()) ) /// Raises OnNext in all the observers member x.Next value = Message.Next(value) |> mbox.Post /// Raises OnError in all the observers member x.Error ex = Message.Error(ex) |> mbox.Post /// Raises OnCompleted in all the observers member x.Completed() = Message.Completed |> mbox.Post interface IObserver<'T> with member x.OnNext value = x.Next(value) member x.OnError ex = x.Error(ex) member x.OnCompleted() = x.Completed() interface IObservable<'T> with member x.Subscribe(observer:IObserver<'T>) = observer |> Message.Add |> mbox.Post { new IDisposable with member x.Dispose() = observer |> Message.Remove |> mbox.Post }