3 people like it.

Floating window over observables

Creates an observable that returns windows of size 'n' (or smaller at the start) containing 'n' past values produced by observable 'source'. The order of items in the returned buffers is not guaranteed (it's a circular buffer).

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
14: 
15: 
16: 
17: 
18: 
19: 
20: 
21: 
open System

module Observable =     
  /// Creates an observable that returns windows of size 'n' (or smaller at the start)
  /// containing 'n' past values produced by observable 'source'. The order of items in
  /// the returned buffers is not guaranteed (it's a circular buffer).
  let windowed n (source:IObservable<'T>) = 
    { new IObservable<'T[]> with
        member x.Subscribe(obs) = 
          let agent = MailboxProcessor.Start(fun inbox ->
            let buffer = Array.zeroCreate n
            let rec loop index count = async {
              let! v = inbox.Receive()
              buffer.[index] <- v
              buffer |> Seq.truncate (count+1) |> Array.ofSeq |> obs.OnNext
              return! loop (if index + 1 = n then 0 else index + 1) (count + 1) }
            loop 0 0)
          { new IObserver<'T> with
              member x.OnNext(v) = agent.Post(v)
              member x.OnError(e) = obs.OnError(e)
              member x.OnCompleted() = obs.OnCompleted() } |> source.Subscribe }
namespace System
Multiple items
module Observable

from Script

--------------------
module Observable

from Microsoft.FSharp.Control
val windowed : n:int -> source:IObservable<'T> -> IObservable<'T []>

Full name: Script.Observable.windowed


 Creates an observable that returns windows of size 'n' (or smaller at the start)
 containing 'n' past values produced by observable 'source'. The order of items in
 the returned buffers is not guaranteed (it's a circular buffer).
val n : int
val source : IObservable<'T>
type IObservable<'T> =
  member Subscribe : observer:IObserver<'T> -> IDisposable

Full name: System.IObservable<_>
val x : IObservable<'T []>
member IObservable.Subscribe : callback:('T -> unit) -> IDisposable
IObservable.Subscribe(observer: IObserver<'T []>) : IDisposable
val obs : IObserver<'T []>
val agent : MailboxProcessor<'T>
Multiple items
type MailboxProcessor<'Msg> =
  interface IDisposable
  new : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:CancellationToken -> MailboxProcessor<'Msg>
  member Post : message:'Msg -> unit
  member PostAndAsyncReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> Async<'Reply>
  member PostAndReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> 'Reply
  member PostAndTryAsyncReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> Async<'Reply option>
  member Receive : ?timeout:int -> Async<'Msg>
  member Scan : scanner:('Msg -> Async<'T> option) * ?timeout:int -> Async<'T>
  member Start : unit -> unit
  member TryPostAndReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> 'Reply option
  ...

Full name: Microsoft.FSharp.Control.MailboxProcessor<_>

--------------------
new : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:Threading.CancellationToken -> MailboxProcessor<'Msg>
static member MailboxProcessor.Start : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:Threading.CancellationToken -> MailboxProcessor<'Msg>
val inbox : MailboxProcessor<'T>
val buffer : 'T []
type Array =
  member Clone : unit -> obj
  member CopyTo : array:Array * index:int -> unit + 1 overload
  member GetEnumerator : unit -> IEnumerator
  member GetLength : dimension:int -> int
  member GetLongLength : dimension:int -> int64
  member GetLowerBound : dimension:int -> int
  member GetUpperBound : dimension:int -> int
  member GetValue : [<ParamArray>] indices:int[] -> obj + 7 overloads
  member Initialize : unit -> unit
  member IsFixedSize : bool
  ...

Full name: System.Array
val zeroCreate : count:int -> 'T []

Full name: Microsoft.FSharp.Collections.Array.zeroCreate
val loop : (int -> int -> Async<'a>)
val index : int
val count : int
val async : AsyncBuilder

Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.async
val v : 'T
member MailboxProcessor.Receive : ?timeout:int -> Async<'Msg>
module Seq

from Microsoft.FSharp.Collections
val truncate : count:int -> source:seq<'T> -> seq<'T>

Full name: Microsoft.FSharp.Collections.Seq.truncate
val ofSeq : source:seq<'T> -> 'T []

Full name: Microsoft.FSharp.Collections.Array.ofSeq
IObserver.OnNext(value: 'T []) : unit
type IObserver<'T> =
  member OnCompleted : unit -> unit
  member OnError : error:Exception -> unit
  member OnNext : value:'T -> unit

Full name: System.IObserver<_>
val x : IObserver<'T>
IObserver.OnNext(value: 'T) : unit
member MailboxProcessor.Post : message:'Msg -> unit
IObserver.OnError(error: exn) : unit
val e : exn
IObserver.OnCompleted() : unit
member IObservable.Subscribe : callback:('T -> unit) -> IDisposable
IObservable.Subscribe(observer: IObserver<'T>) : IDisposable
Raw view Test code New version

More information

Link:http://fssnip.net/7PX
Posted:7 years ago
Author:Tomas Petricek
Tags: agents , observable , reactive , sliding window , windowed