3 people like it.
Like the snippet!
Floating window over observables
Creates an observable that returns windows of size 'n' (or smaller at the start) containing 'n' past values produced by observable 'source'. The order of items in the returned buffers is not guaranteed (it's a circular buffer).
1:
2:
3:
4:
5:
6:
7:
8:
9:
10:
11:
12:
13:
14:
15:
16:
17:
18:
19:
20:
21:
|
open System
module Observable =
/// Creates an observable that returns windows of size 'n' (or smaller at the start)
/// containing 'n' past values produced by observable 'source'. The order of items in
/// the returned buffers is not guaranteed (it's a circular buffer).
let windowed n (source:IObservable<'T>) =
{ new IObservable<'T[]> with
member x.Subscribe(obs) =
let agent = MailboxProcessor.Start(fun inbox ->
let buffer = Array.zeroCreate n
let rec loop index count = async {
let! v = inbox.Receive()
buffer.[index] <- v
buffer |> Seq.truncate (count+1) |> Array.ofSeq |> obs.OnNext
return! loop (if index + 1 = n then 0 else index + 1) (count + 1) }
loop 0 0)
{ new IObserver<'T> with
member x.OnNext(v) = agent.Post(v)
member x.OnError(e) = obs.OnError(e)
member x.OnCompleted() = obs.OnCompleted() } |> source.Subscribe }
|
namespace System
Multiple items
module Observable
from Script
--------------------
module Observable
from Microsoft.FSharp.Control
val windowed : n:int -> source:IObservable<'T> -> IObservable<'T []>
Full name: Script.Observable.windowed
Creates an observable that returns windows of size 'n' (or smaller at the start)
containing 'n' past values produced by observable 'source'. The order of items in
the returned buffers is not guaranteed (it's a circular buffer).
val n : int
val source : IObservable<'T>
type IObservable<'T> =
member Subscribe : observer:IObserver<'T> -> IDisposable
Full name: System.IObservable<_>
val x : IObservable<'T []>
member IObservable.Subscribe : callback:('T -> unit) -> IDisposable
IObservable.Subscribe(observer: IObserver<'T []>) : IDisposable
val obs : IObserver<'T []>
val agent : MailboxProcessor<'T>
Multiple items
type MailboxProcessor<'Msg> =
interface IDisposable
new : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:CancellationToken -> MailboxProcessor<'Msg>
member Post : message:'Msg -> unit
member PostAndAsyncReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> Async<'Reply>
member PostAndReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> 'Reply
member PostAndTryAsyncReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> Async<'Reply option>
member Receive : ?timeout:int -> Async<'Msg>
member Scan : scanner:('Msg -> Async<'T> option) * ?timeout:int -> Async<'T>
member Start : unit -> unit
member TryPostAndReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> 'Reply option
...
Full name: Microsoft.FSharp.Control.MailboxProcessor<_>
--------------------
new : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:Threading.CancellationToken -> MailboxProcessor<'Msg>
static member MailboxProcessor.Start : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:Threading.CancellationToken -> MailboxProcessor<'Msg>
val inbox : MailboxProcessor<'T>
val buffer : 'T []
type Array =
member Clone : unit -> obj
member CopyTo : array:Array * index:int -> unit + 1 overload
member GetEnumerator : unit -> IEnumerator
member GetLength : dimension:int -> int
member GetLongLength : dimension:int -> int64
member GetLowerBound : dimension:int -> int
member GetUpperBound : dimension:int -> int
member GetValue : [<ParamArray>] indices:int[] -> obj + 7 overloads
member Initialize : unit -> unit
member IsFixedSize : bool
...
Full name: System.Array
val zeroCreate : count:int -> 'T []
Full name: Microsoft.FSharp.Collections.Array.zeroCreate
val loop : (int -> int -> Async<'a>)
val index : int
val count : int
val async : AsyncBuilder
Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.async
val v : 'T
member MailboxProcessor.Receive : ?timeout:int -> Async<'Msg>
module Seq
from Microsoft.FSharp.Collections
val truncate : count:int -> source:seq<'T> -> seq<'T>
Full name: Microsoft.FSharp.Collections.Seq.truncate
val ofSeq : source:seq<'T> -> 'T []
Full name: Microsoft.FSharp.Collections.Array.ofSeq
IObserver.OnNext(value: 'T []) : unit
type IObserver<'T> =
member OnCompleted : unit -> unit
member OnError : error:Exception -> unit
member OnNext : value:'T -> unit
Full name: System.IObserver<_>
val x : IObserver<'T>
IObserver.OnNext(value: 'T) : unit
member MailboxProcessor.Post : message:'Msg -> unit
IObserver.OnError(error: exn) : unit
val e : exn
IObserver.OnCompleted() : unit
member IObservable.Subscribe : callback:('T -> unit) -> IDisposable
IObservable.Subscribe(observer: IObserver<'T>) : IDisposable
More information