29 people like it.
Like the snippet!
Observable.Subject
The Subject type implements both IObserver and IObservable. It is functionally equivalent to the type of the same name in the Reactive Extensions (Rx) library.
1:
2:
3:
4:
5:
6:
7:
8:
9:
10:
11:
12:
13:
14:
15:
16:
17:
18:
19:
20:
21:
22:
23:
24:
25:
26:
27:
28:
29:
30:
31:
32:
33:
34:
35:
36:
37:
38:
39:
40:
41:
|
module Observable
open System
open System.Collections.Generic
type Subject<'T> () =
let mutable sync = obj()
let mutable stopped = false
let observers = List<IObserver<'T>>()
let iter f = observers |> Seq.iter f
let onCompleted () =
if not stopped then
stopped <- true
iter (fun observer -> observer.OnCompleted())
let onError ex () =
if not stopped then
stopped <- true
iter (fun observer -> observer.OnError(ex))
let next value () =
if not stopped then
iter (fun observer -> observer.OnNext(value))
let remove observer () =
observers.Remove observer |> ignore
member x.Next value = lock sync <| next value
member x.Error ex = lock sync <| onError ex
member x.Completed () = lock sync <| onCompleted
interface IObserver<'T> with
member x.OnCompleted() = x.Completed()
member x.OnError ex = x.Error ex
member x.OnNext value = x.Next value
interface IObservable<'T> with
member this.Subscribe(observer:IObserver<'T>) =
observers.Add observer
{ new IDisposable with
member this.Dispose() =
lock sync <| remove observer
}
do let s = Subject()
use d = s.Subscribe(fun x -> sprintf "%d" x |> Console.WriteLine)
[1..12] |> Seq.iter s.Next
|
Multiple items
module Observable
--------------------
module Observable
from Microsoft.FSharp.Control
namespace System
namespace System.Collections
namespace System.Collections.Generic
Multiple items
type Subject<'T> =
interface IObservable<'T>
interface IObserver<'T>
new : unit -> Subject<'T>
member Completed : unit -> unit
member Error : ex:exn -> unit
member Next : value:'T -> unit
Full name: Observable.Subject<_>
--------------------
new : unit -> Subject<'T>
val mutable sync : Object
type obj = Object
Full name: Microsoft.FSharp.Core.obj
val mutable stopped : bool
val observers : List<IObserver<'T>>
Multiple items
type List<'T> =
new : unit -> List<'T> + 2 overloads
member Add : item:'T -> unit
member AddRange : collection:IEnumerable<'T> -> unit
member AsReadOnly : unit -> ReadOnlyCollection<'T>
member BinarySearch : item:'T -> int + 2 overloads
member Capacity : int with get, set
member Clear : unit -> unit
member Contains : item:'T -> bool
member ConvertAll<'TOutput> : converter:Converter<'T, 'TOutput> -> List<'TOutput>
member CopyTo : array:'T[] -> unit + 2 overloads
...
nested type Enumerator
Full name: System.Collections.Generic.List<_>
--------------------
List() : unit
List(capacity: int) : unit
List(collection: IEnumerable<'T>) : unit
type IObserver<'T> =
member OnCompleted : unit -> unit
member OnError : error:Exception -> unit
member OnNext : value:'T -> unit
Full name: System.IObserver<_>
val iter : ((IObserver<'T> -> unit) -> unit)
val f : (IObserver<'T> -> unit)
module Seq
from Microsoft.FSharp.Collections
val iter : action:('T -> unit) -> source:seq<'T> -> unit
Full name: Microsoft.FSharp.Collections.Seq.iter
val onCompleted : (unit -> unit)
val not : value:bool -> bool
Full name: Microsoft.FSharp.Core.Operators.not
val observer : IObserver<'T>
IObserver.OnCompleted() : unit
val onError : (exn -> unit -> unit)
val ex : exn
IObserver.OnError(error: exn) : unit
val next : ('T -> unit -> unit)
val value : 'T
IObserver.OnNext(value: 'T) : unit
val remove : (IObserver<'T> -> unit -> unit)
List.Remove(item: IObserver<'T>) : bool
val ignore : value:'T -> unit
Full name: Microsoft.FSharp.Core.Operators.ignore
val x : Subject<'T>
member Subject.Next : value:'T -> unit
Full name: Observable.Subject`1.Next
val lock : lockObject:'Lock -> action:(unit -> 'T) -> 'T (requires reference type)
Full name: Microsoft.FSharp.Core.Operators.lock
member Subject.Error : ex:exn -> unit
Full name: Observable.Subject`1.Error
member Subject.Completed : unit -> unit
Full name: Observable.Subject`1.Completed
override Subject.OnCompleted : unit -> unit
Full name: Observable.Subject`1.OnCompleted
member Subject.Completed : unit -> unit
override Subject.OnError : ex:exn -> unit
Full name: Observable.Subject`1.OnError
member Subject.Error : ex:exn -> unit
override Subject.OnNext : value:'T -> unit
Full name: Observable.Subject`1.OnNext
member Subject.Next : value:'T -> unit
type IObservable<'T> =
member Subscribe : observer:IObserver<'T> -> IDisposable
Full name: System.IObservable<_>
val this : Subject<'T>
override Subject.Subscribe : observer:IObserver<'T> -> IDisposable
Full name: Observable.Subject`1.Subscribe
List.Add(item: IObserver<'T>) : unit
type IDisposable =
member Dispose : unit -> unit
Full name: System.IDisposable
val this : IDisposable
IDisposable.Dispose() : unit
val s : Subject<int>
val d : IDisposable
member IObservable.Subscribe : callback:('T -> unit) -> IDisposable
val x : int
val sprintf : format:Printf.StringFormat<'T> -> 'T
Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.sprintf
type Console =
static member BackgroundColor : ConsoleColor with get, set
static member Beep : unit -> unit + 1 overload
static member BufferHeight : int with get, set
static member BufferWidth : int with get, set
static member CapsLock : bool
static member Clear : unit -> unit
static member CursorLeft : int with get, set
static member CursorSize : int with get, set
static member CursorTop : int with get, set
static member CursorVisible : bool with get, set
...
Full name: System.Console
Console.WriteLine() : unit
(+0 other overloads)
Console.WriteLine(value: string) : unit
(+0 other overloads)
Console.WriteLine(value: obj) : unit
(+0 other overloads)
Console.WriteLine(value: uint64) : unit
(+0 other overloads)
Console.WriteLine(value: int64) : unit
(+0 other overloads)
Console.WriteLine(value: uint32) : unit
(+0 other overloads)
Console.WriteLine(value: int) : unit
(+0 other overloads)
Console.WriteLine(value: float32) : unit
(+0 other overloads)
Console.WriteLine(value: float) : unit
(+0 other overloads)
Console.WriteLine(value: decimal) : unit
(+0 other overloads)
More information