2 people like it.

Observable Agent

Extension to Control.Observable module to create an Observable linked to a MailboxProcessor. Messages posted to the mailbox are published to subscribers. Requires a cancelation token which when cancelled sends OnComplete to subscribers. Only the Post method is exposed from the internally created MailboxProcessor.

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
14: 
15: 
16: 
17: 
18: 
19: 
20: 
21: 
22: 
23: 
24: 
25: 
26: 
27: 
28: 
29: 
30: 
31: 
32: 
33: 
34: 
35: 
36: 
37: 
38: 
39: 
40: 
41: 
42: 
43: 
44: 
45: 
46: 
47: 
48: 
49: 
50: 
51: 
52: 
53: 
54: 
55: 
56: 
57: 
58: 
59: 
60: 
61: 
62: 
63: 
64: 
65: 
66: 
67: 
68: 
69: 
70: 
71: 
72: 
73: 
74: 
75: 
76: 
77: 
78: 
79: 
80: 
module Observable 
open System

let createObservableAgent<'T> maxQueueDepth (token:System.Threading.CancellationToken) =
    let finished = ref false
    let subscribers = ref (Map.empty : Map<int, IObserver<'T>>)

    let inline publish msg = 
        !subscribers 
        |> Seq.iter (fun (KeyValue(_, sub)) ->
            try
                    sub.OnNext(msg)
            with ex -> 
                System.Diagnostics.Debug.Write(ex))

    let completed() = 
        lock subscribers (fun () ->
        finished := true
        !subscribers |> Seq.iter (fun (KeyValue(_, sub)) -> sub.OnCompleted())
        subscribers := Map.empty)

    token.Register(fun () -> completed()) |> ignore //callback for when token is cancelled
            
    let count = ref 0
    let agent =
        MailboxProcessor.Start
            ((fun inbox ->
                async {
                    while true do
                        let! msg = inbox.Receive()
                        publish msg} ),
                token)
    let obs = 
        { new IObservable<'T> with 
            member this.Subscribe(obs) =
                let key1 =
                    lock subscribers (fun () ->
                        if !finished then failwith "Observable has already completed"
                        let key1 = !count
                        count := !count + 1
                        subscribers := subscribers.Value.Add(key1, obs)
                        key1)
                { new IDisposable with  
                    member this.Dispose() = 
                        lock subscribers (fun () -> 
                            subscribers := subscribers.Value.Remove(key1)) } }
    let post x = if agent.CurrentQueueLength < maxQueueDepth then agent.Post x else printfn "queue depth exceed %d" maxQueueDepth
    obs,post
(*
#load "ObservableExtensions.fs"
open System
let cts = new System.Threading.CancellationTokenSource()
type Data = {Value:string}

let observable,fPost = Observable.createObservableAgent<Data> cts.Token

let sub1 = 
    observable.Subscribe
        ({new IObserver<Data> with
            member x.OnNext msg = printfn "sub1 received msg %A" msg
            member x.OnError(e) = ()
            member x.OnCompleted() = printfn "sub1 received OnCompleted"
        })
let sub2 = 
    observable.Subscribe
        ({new IObserver<Data> with
            member x.OnNext msg = printfn "sub2 received msg %A" msg
            member x.OnError(e) = ()
            member x.OnCompleted() = printfn "sub2 received OnCompleted"
        })

for i in 1 .. 10 do fPost {Value=i.ToString()}

sub1.Dispose()

for i in 11 .. 14 do fPost {Value=i.ToString()}

cts.Cancel() //sends OnCompleted

*)
Multiple items
module Observable

--------------------
module Observable

from Microsoft.FSharp.Control
namespace System
val createObservableAgent : maxQueueDepth:int -> token:Threading.CancellationToken -> IObservable<'T> * ('T -> unit)

Full name: Observable.createObservableAgent
val maxQueueDepth : int
val token : Threading.CancellationToken
namespace System.Threading
Multiple items
type CancellationToken =
  struct
    new : canceled:bool -> CancellationToken
    member CanBeCanceled : bool
    member Equals : other:CancellationToken -> bool + 1 overload
    member GetHashCode : unit -> int
    member IsCancellationRequested : bool
    member Register : callback:Action -> CancellationTokenRegistration + 3 overloads
    member ThrowIfCancellationRequested : unit -> unit
    member WaitHandle : WaitHandle
    static member None : CancellationToken
  end

Full name: System.Threading.CancellationToken

--------------------
Threading.CancellationToken()
Threading.CancellationToken(canceled: bool) : unit
val finished : bool ref
Multiple items
val ref : value:'T -> 'T ref

Full name: Microsoft.FSharp.Core.Operators.ref

--------------------
type 'T ref = Ref<'T>

Full name: Microsoft.FSharp.Core.ref<_>
val subscribers : Map<int,IObserver<'T>> ref
Multiple items
module Map

from Microsoft.FSharp.Collections

--------------------
type Map<'Key,'Value (requires comparison)> =
  interface IEnumerable
  interface IComparable
  interface IEnumerable<KeyValuePair<'Key,'Value>>
  interface ICollection<KeyValuePair<'Key,'Value>>
  interface IDictionary<'Key,'Value>
  new : elements:seq<'Key * 'Value> -> Map<'Key,'Value>
  member Add : key:'Key * value:'Value -> Map<'Key,'Value>
  member ContainsKey : key:'Key -> bool
  override Equals : obj -> bool
  member Remove : key:'Key -> Map<'Key,'Value>
  ...

Full name: Microsoft.FSharp.Collections.Map<_,_>

--------------------
new : elements:seq<'Key * 'Value> -> Map<'Key,'Value>
val empty<'Key,'T (requires comparison)> : Map<'Key,'T> (requires comparison)

Full name: Microsoft.FSharp.Collections.Map.empty
Multiple items
val int : value:'T -> int (requires member op_Explicit)

Full name: Microsoft.FSharp.Core.Operators.int

--------------------
type int = int32

Full name: Microsoft.FSharp.Core.int

--------------------
type int<'Measure> = int

Full name: Microsoft.FSharp.Core.int<_>
type IObserver<'T> =
  member OnCompleted : unit -> unit
  member OnError : error:Exception -> unit
  member OnNext : value:'T -> unit

Full name: System.IObserver<_>
val publish : ('T -> unit)
val msg : 'T
module Seq

from Microsoft.FSharp.Collections
val iter : action:('T -> unit) -> source:seq<'T> -> unit

Full name: Microsoft.FSharp.Collections.Seq.iter
active recognizer KeyValue: Collections.Generic.KeyValuePair<'Key,'Value> -> 'Key * 'Value

Full name: Microsoft.FSharp.Core.Operators.( |KeyValue| )
val sub : IObserver<'T>
IObserver.OnNext(value: 'T) : unit
val ex : exn
namespace System.Diagnostics
type Debug =
  static member Assert : condition:bool -> unit + 3 overloads
  static member AutoFlush : bool with get, set
  static member Close : unit -> unit
  static member Fail : message:string -> unit + 1 overload
  static member Flush : unit -> unit
  static member Indent : unit -> unit
  static member IndentLevel : int with get, set
  static member IndentSize : int with get, set
  static member Listeners : TraceListenerCollection
  static member Print : message:string -> unit + 1 overload
  ...

Full name: System.Diagnostics.Debug
Diagnostics.Debug.Write(value: obj) : unit
Diagnostics.Debug.Write(message: string) : unit
Diagnostics.Debug.Write(value: obj, category: string) : unit
Diagnostics.Debug.Write(message: string, category: string) : unit
val completed : (unit -> unit)
val lock : lockObject:'Lock -> action:(unit -> 'T) -> 'T (requires reference type)

Full name: Microsoft.FSharp.Core.Operators.lock
IObserver.OnCompleted() : unit
Threading.CancellationToken.Register(callback: Action) : Threading.CancellationTokenRegistration
Threading.CancellationToken.Register(callback: Action<obj>, state: obj) : Threading.CancellationTokenRegistration
Threading.CancellationToken.Register(callback: Action, useSynchronizationContext: bool) : Threading.CancellationTokenRegistration
Threading.CancellationToken.Register(callback: Action<obj>, state: obj, useSynchronizationContext: bool) : Threading.CancellationTokenRegistration
val ignore : value:'T -> unit

Full name: Microsoft.FSharp.Core.Operators.ignore
val count : int ref
val agent : MailboxProcessor<'T>
Multiple items
type MailboxProcessor<'Msg> =
  interface IDisposable
  new : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:CancellationToken -> MailboxProcessor<'Msg>
  member Post : message:'Msg -> unit
  member PostAndAsyncReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> Async<'Reply>
  member PostAndReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> 'Reply
  member PostAndTryAsyncReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> Async<'Reply option>
  member Receive : ?timeout:int -> Async<'Msg>
  member Scan : scanner:('Msg -> Async<'T> option) * ?timeout:int -> Async<'T>
  member Start : unit -> unit
  member TryPostAndReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> 'Reply option
  ...

Full name: Microsoft.FSharp.Control.MailboxProcessor<_>

--------------------
new : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:Threading.CancellationToken -> MailboxProcessor<'Msg>
static member MailboxProcessor.Start : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:Threading.CancellationToken -> MailboxProcessor<'Msg>
val inbox : MailboxProcessor<'T>
val async : AsyncBuilder

Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.async
member MailboxProcessor.Receive : ?timeout:int -> Async<'Msg>
val obs : IObservable<'T>
type IObservable<'T> =
  member Subscribe : observer:IObserver<'T> -> IDisposable

Full name: System.IObservable<_>
val this : IObservable<'T>
member IObservable.Subscribe : callback:('T -> unit) -> IDisposable
IObservable.Subscribe(observer: IObserver<'T>) : IDisposable
val obs : IObserver<'T>
val key1 : int
val failwith : message:string -> 'T

Full name: Microsoft.FSharp.Core.Operators.failwith
property Ref.Value: Map<int,IObserver<'T>>
member Map.Add : key:'Key * value:'Value -> Map<'Key,'Value>
type IDisposable =
  member Dispose : unit -> unit

Full name: System.IDisposable
val this : IDisposable
IDisposable.Dispose() : unit
member Map.Remove : key:'Key -> Map<'Key,'Value>
val post : ('T -> unit)
val x : 'T
property MailboxProcessor.CurrentQueueLength: int
member MailboxProcessor.Post : message:'Msg -> unit
val printfn : format:Printf.TextWriterFormat<'T> -> 'T

Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.printfn

More information

Link:http://fssnip.net/nC
Posted:8 years ago
Author:Faisal Waris
Tags: observable , mailboxprocessor