12 people like it.

Rx CombineLatest for list of IObservable<'a> to IObservable<'a array>.

Rx CombineLatest for list of IObservable<'a> to IObservable<'a array>.

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
14: 
15: 
16: 
17: 
//Open proper namespaces

let combineLatestToArray (list : IObservable<'T> list) = 
    let s = new Subject<'T array>()
    let arr = Array.init list.Length (fun _ -> Unchecked.defaultof<'T>)
    let main = list |> List.mapi (fun i o -> o.Select(fun t -> (i,t)))
               |> Observable.Merge
    async {
        try
            let se = main.ToEnumerable() |> Seq.scan (fun ar (i,t) -> Array.set ar i t; ar) arr
            for i in se do
                s.OnNext(i |> Array.toList |> List.toArray)
            s.OnCompleted()
        with
        | :? Exception as ex -> s.OnError(ex)
    } |> Async.Start
    s :> IObservable<'T array>
val combineLatestToArray : list:'a list -> 'b

Full name: Script.combineLatestToArray
Multiple items
val list : 'a list

--------------------
type 'T list = List<'T>

Full name: Microsoft.FSharp.Collections.list<_>
val s : obj
type 'T array = 'T []

Full name: Microsoft.FSharp.Core.array<_>
val arr : obj []
module Array

from Microsoft.FSharp.Collections
val init : count:int -> initializer:(int -> 'T) -> 'T []

Full name: Microsoft.FSharp.Collections.Array.init
property List.Length: int
module Unchecked

from Microsoft.FSharp.Core.Operators
val defaultof<'T> : 'T

Full name: Microsoft.FSharp.Core.Operators.Unchecked.defaultof
val main : obj
Multiple items
module List

from Microsoft.FSharp.Collections

--------------------
type List<'T> =
  | ( [] )
  | ( :: ) of Head: 'T * Tail: 'T list
  interface IEnumerable
  interface IEnumerable<'T>
  member Head : 'T
  member IsEmpty : bool
  member Item : index:int -> 'T with get
  member Length : int
  member Tail : 'T list
  static member Cons : head:'T * tail:'T list -> 'T list
  static member Empty : 'T list

Full name: Microsoft.FSharp.Collections.List<_>
val mapi : mapping:(int -> 'T -> 'U) -> list:'T list -> 'U list

Full name: Microsoft.FSharp.Collections.List.mapi
val i : int
val o : 'a
module Observable

from Microsoft.FSharp.Control
val async : AsyncBuilder

Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.async
val se : seq<obj []>
module Seq

from Microsoft.FSharp.Collections
val scan : folder:('State -> 'T -> 'State) -> state:'State -> source:seq<'T> -> seq<'State>

Full name: Microsoft.FSharp.Collections.Seq.scan
val ar : obj []
val t : obj
val set : array:'T [] -> index:int -> value:'T -> unit

Full name: Microsoft.FSharp.Collections.Array.set
val i : obj []
val toList : array:'T [] -> 'T list

Full name: Microsoft.FSharp.Collections.Array.toList
val toArray : list:'T list -> 'T []

Full name: Microsoft.FSharp.Collections.List.toArray
val ex : exn
Multiple items
type Async
static member AsBeginEnd : computation:('Arg -> Async<'T>) -> ('Arg * AsyncCallback * obj -> IAsyncResult) * (IAsyncResult -> 'T) * (IAsyncResult -> unit)
static member AwaitEvent : event:IEvent<'Del,'T> * ?cancelAction:(unit -> unit) -> Async<'T> (requires delegate and 'Del :> Delegate)
static member AwaitIAsyncResult : iar:IAsyncResult * ?millisecondsTimeout:int -> Async<bool>
static member AwaitTask : task:Task<'T> -> Async<'T>
static member AwaitWaitHandle : waitHandle:WaitHandle * ?millisecondsTimeout:int -> Async<bool>
static member CancelDefaultToken : unit -> unit
static member Catch : computation:Async<'T> -> Async<Choice<'T,exn>>
static member FromBeginEnd : beginAction:(AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg:'Arg1 * beginAction:('Arg1 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg1:'Arg1 * arg2:'Arg2 * beginAction:('Arg1 * 'Arg2 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg1:'Arg1 * arg2:'Arg2 * arg3:'Arg3 * beginAction:('Arg1 * 'Arg2 * 'Arg3 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromContinuations : callback:(('T -> unit) * (exn -> unit) * (OperationCanceledException -> unit) -> unit) -> Async<'T>
static member Ignore : computation:Async<'T> -> Async<unit>
static member OnCancel : interruption:(unit -> unit) -> Async<IDisposable>
static member Parallel : computations:seq<Async<'T>> -> Async<'T []>
static member RunSynchronously : computation:Async<'T> * ?timeout:int * ?cancellationToken:CancellationToken -> 'T
static member Sleep : millisecondsDueTime:int -> Async<unit>
static member Start : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
static member StartAsTask : computation:Async<'T> * ?taskCreationOptions:TaskCreationOptions * ?cancellationToken:CancellationToken -> Task<'T>
static member StartChild : computation:Async<'T> * ?millisecondsTimeout:int -> Async<Async<'T>>
static member StartChildAsTask : computation:Async<'T> * ?taskCreationOptions:TaskCreationOptions -> Async<Task<'T>>
static member StartImmediate : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
static member StartWithContinuations : computation:Async<'T> * continuation:('T -> unit) * exceptionContinuation:(exn -> unit) * cancellationContinuation:(OperationCanceledException -> unit) * ?cancellationToken:CancellationToken -> unit
static member SwitchToContext : syncContext:SynchronizationContext -> Async<unit>
static member SwitchToNewThread : unit -> Async<unit>
static member SwitchToThreadPool : unit -> Async<unit>
static member TryCancelled : computation:Async<'T> * compensation:(OperationCanceledException -> unit) -> Async<'T>
static member CancellationToken : Async<CancellationToken>
static member DefaultCancellationToken : CancellationToken

Full name: Microsoft.FSharp.Control.Async

--------------------
type Async<'T>

Full name: Microsoft.FSharp.Control.Async<_>
static member Async.Start : computation:Async<unit> * ?cancellationToken:System.Threading.CancellationToken -> unit
Raw view Test code New version

More information

Link:http://fssnip.net/7r
Posted:12 years ago
Author:Ankur Dhama
Tags: rx