2 people like it.
Like the snippet!
Observable Agent
Extension to Control.Observable module to create an Observable linked to a MailboxProcessor. Messages posted to the mailbox are published to subscribers. Requires a cancelation token which when cancelled sends OnComplete to subscribers. Only the Post method is exposed from the internally created MailboxProcessor.
1:
2:
3:
4:
5:
6:
7:
8:
9:
10:
11:
12:
13:
14:
15:
16:
17:
18:
19:
20:
21:
22:
23:
24:
25:
26:
27:
28:
29:
30:
31:
32:
33:
34:
35:
36:
37:
38:
39:
40:
41:
42:
43:
44:
45:
46:
47:
48:
49:
50:
51:
52:
53:
54:
55:
56:
57:
58:
59:
60:
61:
62:
63:
64:
65:
66:
67:
68:
69:
70:
71:
72:
73:
74:
75:
76:
77:
78:
79:
80:
|
module Observable
open System
let createObservableAgent<'T> maxQueueDepth (token:System.Threading.CancellationToken) =
let finished = ref false
let subscribers = ref (Map.empty : Map<int, IObserver<'T>>)
let inline publish msg =
!subscribers
|> Seq.iter (fun (KeyValue(_, sub)) ->
try
sub.OnNext(msg)
with ex ->
System.Diagnostics.Debug.Write(ex))
let completed() =
lock subscribers (fun () ->
finished := true
!subscribers |> Seq.iter (fun (KeyValue(_, sub)) -> sub.OnCompleted())
subscribers := Map.empty)
token.Register(fun () -> completed()) |> ignore //callback for when token is cancelled
let count = ref 0
let agent =
MailboxProcessor.Start
((fun inbox ->
async {
while true do
let! msg = inbox.Receive()
publish msg} ),
token)
let obs =
{ new IObservable<'T> with
member this.Subscribe(obs) =
let key1 =
lock subscribers (fun () ->
if !finished then failwith "Observable has already completed"
let key1 = !count
count := !count + 1
subscribers := subscribers.Value.Add(key1, obs)
key1)
{ new IDisposable with
member this.Dispose() =
lock subscribers (fun () ->
subscribers := subscribers.Value.Remove(key1)) } }
let post x = if agent.CurrentQueueLength < maxQueueDepth then agent.Post x else printfn "queue depth exceed %d" maxQueueDepth
obs,post
(*
#load "ObservableExtensions.fs"
open System
let cts = new System.Threading.CancellationTokenSource()
type Data = {Value:string}
let observable,fPost = Observable.createObservableAgent<Data> cts.Token
let sub1 =
observable.Subscribe
({new IObserver<Data> with
member x.OnNext msg = printfn "sub1 received msg %A" msg
member x.OnError(e) = ()
member x.OnCompleted() = printfn "sub1 received OnCompleted"
})
let sub2 =
observable.Subscribe
({new IObserver<Data> with
member x.OnNext msg = printfn "sub2 received msg %A" msg
member x.OnError(e) = ()
member x.OnCompleted() = printfn "sub2 received OnCompleted"
})
for i in 1 .. 10 do fPost {Value=i.ToString()}
sub1.Dispose()
for i in 11 .. 14 do fPost {Value=i.ToString()}
cts.Cancel() //sends OnCompleted
*)
|
Multiple items
module Observable
--------------------
module Observable
from Microsoft.FSharp.Control
namespace System
val createObservableAgent : maxQueueDepth:int -> token:Threading.CancellationToken -> IObservable<'T> * ('T -> unit)
Full name: Observable.createObservableAgent
val maxQueueDepth : int
val token : Threading.CancellationToken
namespace System.Threading
Multiple items
type CancellationToken =
struct
new : canceled:bool -> CancellationToken
member CanBeCanceled : bool
member Equals : other:CancellationToken -> bool + 1 overload
member GetHashCode : unit -> int
member IsCancellationRequested : bool
member Register : callback:Action -> CancellationTokenRegistration + 3 overloads
member ThrowIfCancellationRequested : unit -> unit
member WaitHandle : WaitHandle
static member None : CancellationToken
end
Full name: System.Threading.CancellationToken
--------------------
Threading.CancellationToken()
Threading.CancellationToken(canceled: bool) : unit
val finished : bool ref
Multiple items
val ref : value:'T -> 'T ref
Full name: Microsoft.FSharp.Core.Operators.ref
--------------------
type 'T ref = Ref<'T>
Full name: Microsoft.FSharp.Core.ref<_>
val subscribers : Map<int,IObserver<'T>> ref
Multiple items
module Map
from Microsoft.FSharp.Collections
--------------------
type Map<'Key,'Value (requires comparison)> =
interface IEnumerable
interface IComparable
interface IEnumerable<KeyValuePair<'Key,'Value>>
interface ICollection<KeyValuePair<'Key,'Value>>
interface IDictionary<'Key,'Value>
new : elements:seq<'Key * 'Value> -> Map<'Key,'Value>
member Add : key:'Key * value:'Value -> Map<'Key,'Value>
member ContainsKey : key:'Key -> bool
override Equals : obj -> bool
member Remove : key:'Key -> Map<'Key,'Value>
...
Full name: Microsoft.FSharp.Collections.Map<_,_>
--------------------
new : elements:seq<'Key * 'Value> -> Map<'Key,'Value>
val empty<'Key,'T (requires comparison)> : Map<'Key,'T> (requires comparison)
Full name: Microsoft.FSharp.Collections.Map.empty
Multiple items
val int : value:'T -> int (requires member op_Explicit)
Full name: Microsoft.FSharp.Core.Operators.int
--------------------
type int = int32
Full name: Microsoft.FSharp.Core.int
--------------------
type int<'Measure> = int
Full name: Microsoft.FSharp.Core.int<_>
type IObserver<'T> =
member OnCompleted : unit -> unit
member OnError : error:Exception -> unit
member OnNext : value:'T -> unit
Full name: System.IObserver<_>
val publish : ('T -> unit)
val msg : 'T
module Seq
from Microsoft.FSharp.Collections
val iter : action:('T -> unit) -> source:seq<'T> -> unit
Full name: Microsoft.FSharp.Collections.Seq.iter
active recognizer KeyValue: Collections.Generic.KeyValuePair<'Key,'Value> -> 'Key * 'Value
Full name: Microsoft.FSharp.Core.Operators.( |KeyValue| )
val sub : IObserver<'T>
IObserver.OnNext(value: 'T) : unit
val ex : exn
namespace System.Diagnostics
type Debug =
static member Assert : condition:bool -> unit + 3 overloads
static member AutoFlush : bool with get, set
static member Close : unit -> unit
static member Fail : message:string -> unit + 1 overload
static member Flush : unit -> unit
static member Indent : unit -> unit
static member IndentLevel : int with get, set
static member IndentSize : int with get, set
static member Listeners : TraceListenerCollection
static member Print : message:string -> unit + 1 overload
...
Full name: System.Diagnostics.Debug
Diagnostics.Debug.Write(value: obj) : unit
Diagnostics.Debug.Write(message: string) : unit
Diagnostics.Debug.Write(value: obj, category: string) : unit
Diagnostics.Debug.Write(message: string, category: string) : unit
val completed : (unit -> unit)
val lock : lockObject:'Lock -> action:(unit -> 'T) -> 'T (requires reference type)
Full name: Microsoft.FSharp.Core.Operators.lock
IObserver.OnCompleted() : unit
Threading.CancellationToken.Register(callback: Action) : Threading.CancellationTokenRegistration
Threading.CancellationToken.Register(callback: Action<obj>, state: obj) : Threading.CancellationTokenRegistration
Threading.CancellationToken.Register(callback: Action, useSynchronizationContext: bool) : Threading.CancellationTokenRegistration
Threading.CancellationToken.Register(callback: Action<obj>, state: obj, useSynchronizationContext: bool) : Threading.CancellationTokenRegistration
val ignore : value:'T -> unit
Full name: Microsoft.FSharp.Core.Operators.ignore
val count : int ref
val agent : MailboxProcessor<'T>
Multiple items
type MailboxProcessor<'Msg> =
interface IDisposable
new : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:CancellationToken -> MailboxProcessor<'Msg>
member Post : message:'Msg -> unit
member PostAndAsyncReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> Async<'Reply>
member PostAndReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> 'Reply
member PostAndTryAsyncReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> Async<'Reply option>
member Receive : ?timeout:int -> Async<'Msg>
member Scan : scanner:('Msg -> Async<'T> option) * ?timeout:int -> Async<'T>
member Start : unit -> unit
member TryPostAndReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> 'Reply option
...
Full name: Microsoft.FSharp.Control.MailboxProcessor<_>
--------------------
new : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:Threading.CancellationToken -> MailboxProcessor<'Msg>
static member MailboxProcessor.Start : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:Threading.CancellationToken -> MailboxProcessor<'Msg>
val inbox : MailboxProcessor<'T>
val async : AsyncBuilder
Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.async
member MailboxProcessor.Receive : ?timeout:int -> Async<'Msg>
val obs : IObservable<'T>
type IObservable<'T> =
member Subscribe : observer:IObserver<'T> -> IDisposable
Full name: System.IObservable<_>
val this : IObservable<'T>
member IObservable.Subscribe : callback:('T -> unit) -> IDisposable
IObservable.Subscribe(observer: IObserver<'T>) : IDisposable
val obs : IObserver<'T>
val key1 : int
val failwith : message:string -> 'T
Full name: Microsoft.FSharp.Core.Operators.failwith
property Ref.Value: Map<int,IObserver<'T>>
member Map.Add : key:'Key * value:'Value -> Map<'Key,'Value>
type IDisposable =
member Dispose : unit -> unit
Full name: System.IDisposable
val this : IDisposable
IDisposable.Dispose() : unit
member Map.Remove : key:'Key -> Map<'Key,'Value>
val post : ('T -> unit)
val x : 'T
property MailboxProcessor.CurrentQueueLength: int
member MailboxProcessor.Post : message:'Msg -> unit
val printfn : format:Printf.TextWriterFormat<'T> -> 'T
Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.printfn
More information