29 people like it.

Observable.Subject

The Subject type implements both IObserver and IObservable. It is functionally equivalent to the type of the same name in the Reactive Extensions (Rx) library.

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
14: 
15: 
16: 
17: 
18: 
19: 
20: 
21: 
22: 
23: 
24: 
25: 
26: 
27: 
28: 
29: 
30: 
31: 
32: 
33: 
34: 
35: 
36: 
37: 
38: 
39: 
40: 
41: 
module Observable

open System
open System.Collections.Generic

type Subject<'T> () =
   let mutable sync = obj()
   let mutable stopped = false
   let observers = List<IObserver<'T>>()
   let iter f = observers |> Seq.iter f
   let onCompleted () =
      if not stopped then
         stopped <- true
         iter (fun observer -> observer.OnCompleted())
   let onError ex () =
      if not stopped then
         stopped <- true
         iter (fun observer -> observer.OnError(ex))
   let next value () =
      if not stopped then
         iter (fun observer -> observer.OnNext(value))
   let remove observer () =
      observers.Remove observer |> ignore
   member x.Next value = lock sync <| next value
   member x.Error ex = lock sync <| onError ex
   member x.Completed () = lock sync <| onCompleted
   interface IObserver<'T> with
      member x.OnCompleted() = x.Completed()
      member x.OnError ex = x.Error ex
      member x.OnNext value = x.Next value
   interface IObservable<'T> with
      member this.Subscribe(observer:IObserver<'T>) =
         observers.Add observer
         { new IDisposable with
            member this.Dispose() =
               lock sync <| remove observer
         }

do  let s = Subject()
    use d = s.Subscribe(fun x -> sprintf "%d" x |> Console.WriteLine)
    [1..12] |> Seq.iter s.Next
Multiple items
module Observable

--------------------
module Observable

from Microsoft.FSharp.Control
namespace System
namespace System.Collections
namespace System.Collections.Generic
Multiple items
type Subject<'T> =
  interface IObservable<'T>
  interface IObserver<'T>
  new : unit -> Subject<'T>
  member Completed : unit -> unit
  member Error : ex:exn -> unit
  member Next : value:'T -> unit

Full name: Observable.Subject<_>

--------------------
new : unit -> Subject<'T>
val mutable sync : Object
type obj = Object

Full name: Microsoft.FSharp.Core.obj
val mutable stopped : bool
val observers : List<IObserver<'T>>
Multiple items
type List<'T> =
  new : unit -> List<'T> + 2 overloads
  member Add : item:'T -> unit
  member AddRange : collection:IEnumerable<'T> -> unit
  member AsReadOnly : unit -> ReadOnlyCollection<'T>
  member BinarySearch : item:'T -> int + 2 overloads
  member Capacity : int with get, set
  member Clear : unit -> unit
  member Contains : item:'T -> bool
  member ConvertAll<'TOutput> : converter:Converter<'T, 'TOutput> -> List<'TOutput>
  member CopyTo : array:'T[] -> unit + 2 overloads
  ...
  nested type Enumerator

Full name: System.Collections.Generic.List<_>

--------------------
List() : unit
List(capacity: int) : unit
List(collection: IEnumerable<'T>) : unit
type IObserver<'T> =
  member OnCompleted : unit -> unit
  member OnError : error:Exception -> unit
  member OnNext : value:'T -> unit

Full name: System.IObserver<_>
val iter : ((IObserver<'T> -> unit) -> unit)
val f : (IObserver<'T> -> unit)
module Seq

from Microsoft.FSharp.Collections
val iter : action:('T -> unit) -> source:seq<'T> -> unit

Full name: Microsoft.FSharp.Collections.Seq.iter
val onCompleted : (unit -> unit)
val not : value:bool -> bool

Full name: Microsoft.FSharp.Core.Operators.not
val observer : IObserver<'T>
IObserver.OnCompleted() : unit
val onError : (exn -> unit -> unit)
val ex : exn
IObserver.OnError(error: exn) : unit
val next : ('T -> unit -> unit)
val value : 'T
IObserver.OnNext(value: 'T) : unit
val remove : (IObserver<'T> -> unit -> unit)
List.Remove(item: IObserver<'T>) : bool
val ignore : value:'T -> unit

Full name: Microsoft.FSharp.Core.Operators.ignore
val x : Subject<'T>
member Subject.Next : value:'T -> unit

Full name: Observable.Subject`1.Next
val lock : lockObject:'Lock -> action:(unit -> 'T) -> 'T (requires reference type)

Full name: Microsoft.FSharp.Core.Operators.lock
member Subject.Error : ex:exn -> unit

Full name: Observable.Subject`1.Error
member Subject.Completed : unit -> unit

Full name: Observable.Subject`1.Completed
override Subject.OnCompleted : unit -> unit

Full name: Observable.Subject`1.OnCompleted
member Subject.Completed : unit -> unit
override Subject.OnError : ex:exn -> unit

Full name: Observable.Subject`1.OnError
member Subject.Error : ex:exn -> unit
override Subject.OnNext : value:'T -> unit

Full name: Observable.Subject`1.OnNext
member Subject.Next : value:'T -> unit
type IObservable<'T> =
  member Subscribe : observer:IObserver<'T> -> IDisposable

Full name: System.IObservable<_>
val this : Subject<'T>
override Subject.Subscribe : observer:IObserver<'T> -> IDisposable

Full name: Observable.Subject`1.Subscribe
List.Add(item: IObserver<'T>) : unit
type IDisposable =
  member Dispose : unit -> unit

Full name: System.IDisposable
val this : IDisposable
IDisposable.Dispose() : unit
val s : Subject<int>
val d : IDisposable
member IObservable.Subscribe : callback:('T -> unit) -> IDisposable
val x : int
val sprintf : format:Printf.StringFormat<'T> -> 'T

Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.sprintf
type Console =
  static member BackgroundColor : ConsoleColor with get, set
  static member Beep : unit -> unit + 1 overload
  static member BufferHeight : int with get, set
  static member BufferWidth : int with get, set
  static member CapsLock : bool
  static member Clear : unit -> unit
  static member CursorLeft : int with get, set
  static member CursorSize : int with get, set
  static member CursorTop : int with get, set
  static member CursorVisible : bool with get, set
  ...

Full name: System.Console
Console.WriteLine() : unit
   (+0 other overloads)
Console.WriteLine(value: string) : unit
   (+0 other overloads)
Console.WriteLine(value: obj) : unit
   (+0 other overloads)
Console.WriteLine(value: uint64) : unit
   (+0 other overloads)
Console.WriteLine(value: int64) : unit
   (+0 other overloads)
Console.WriteLine(value: uint32) : unit
   (+0 other overloads)
Console.WriteLine(value: int) : unit
   (+0 other overloads)
Console.WriteLine(value: float32) : unit
   (+0 other overloads)
Console.WriteLine(value: float) : unit
   (+0 other overloads)
Console.WriteLine(value: decimal) : unit
   (+0 other overloads)
Next Version Raw view Test code New version

More information

Link:http://fssnip.net/2E
Posted:13 years ago
Author:Phillip Trelford
Tags: observable