6 people like it.
Like the snippet!
Sliding window for Observable
Implements the Observable.windowed function that creates an observable returning a sliding window. The function is an observable version of Seq.observable. The implementation uses a simple F# agent that keeps partial windows and sends them to an observer.
1:
2:
3:
4:
5:
6:
7:
8:
9:
10:
11:
12:
13:
14:
15:
16:
17:
18:
19:
20:
21:
22:
23:
24:
25:
26:
27:
28:
29:
30:
31:
32:
33:
34:
35:
36:
37:
|
open System
module Observable =
/// Returns an observable that yields sliding windows of
/// containing elements drawn from the input observable.
/// Each window is returned as a fresh array.
let windowed (count:int) (source:IObservable<_>) =
{ new IObservable<_> with
member x.Subscribe(observer) =
// Start an agent that remembers partial windows of length
// smaller than the count (new agent for every observer)
let agent = MailboxProcessor.Start(fun agent ->
// The parameter 'lists' contains partial lists and their lengths
let rec loop lists = async {
// Receive the next value
let! value = agent.Receive()
// Add new empty list and then the new element to all lists.
// Then split the lists into 'full' that should be sent
// to the observer and 'partial' which need more elements.
let full, partial =
((0, []) :: lists)
|> List.map (fun (length, l) -> length + 1, value::l)
|> List.partition (fun (length, l) -> length = count)
// Send all full lists to the observer (as arrays)
for (_, l) in full do
observer.OnNext(l |> Array.ofSeq |> Array.rev)
// Continue looping with incomplete lists
return! loop partial }
// Start with an empty list of partial lists
loop [])
// Send incoming values to the agent
source.Subscribe(agent.Post) }
|
namespace System
Multiple items
module Observable
from Script
--------------------
module Observable
from Microsoft.FSharp.Control
val windowed : count:int -> source:IObservable<'a> -> IObservable<'a []>
Full name: Script.Observable.windowed
Returns an observable that yields sliding windows of
containing elements drawn from the input observable.
Each window is returned as a fresh array.
val count : int
Multiple items
val int : value:'T -> int (requires member op_Explicit)
Full name: Microsoft.FSharp.Core.Operators.int
--------------------
type int = int32
Full name: Microsoft.FSharp.Core.int
--------------------
type int<'Measure> = int
Full name: Microsoft.FSharp.Core.int<_>
val source : IObservable<'a>
type IObservable<'T> =
member Subscribe : observer:IObserver<'T> -> IDisposable
Full name: System.IObservable<_>
val x : IObservable<'a []>
member IObservable.Subscribe : callback:('T -> unit) -> IDisposable
IObservable.Subscribe(observer: IObserver<'a []>) : IDisposable
val observer : IObserver<'a []>
val agent : MailboxProcessor<'a>
Multiple items
type MailboxProcessor<'Msg> =
interface IDisposable
new : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:CancellationToken -> MailboxProcessor<'Msg>
member Post : message:'Msg -> unit
member PostAndAsyncReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> Async<'Reply>
member PostAndReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> 'Reply
member PostAndTryAsyncReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> Async<'Reply option>
member Receive : ?timeout:int -> Async<'Msg>
member Scan : scanner:('Msg -> Async<'T> option) * ?timeout:int -> Async<'T>
member Start : unit -> unit
member TryPostAndReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> 'Reply option
...
Full name: Microsoft.FSharp.Control.MailboxProcessor<_>
--------------------
new : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:Threading.CancellationToken -> MailboxProcessor<'Msg>
static member MailboxProcessor.Start : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:Threading.CancellationToken -> MailboxProcessor<'Msg>
val loop : ((int * 'a list) list -> Async<'b>)
val lists : (int * 'a list) list
val async : AsyncBuilder
Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.async
val value : 'a
member MailboxProcessor.Receive : ?timeout:int -> Async<'Msg>
val full : (int * 'a list) list
val partial : (int * 'a list) list
Multiple items
module List
from Microsoft.FSharp.Collections
--------------------
type List<'T> =
| ( [] )
| ( :: ) of Head: 'T * Tail: 'T list
interface IEnumerable
interface IEnumerable<'T>
member Head : 'T
member IsEmpty : bool
member Item : index:int -> 'T with get
member Length : int
member Tail : 'T list
static member Cons : head:'T * tail:'T list -> 'T list
static member Empty : 'T list
Full name: Microsoft.FSharp.Collections.List<_>
val map : mapping:('T -> 'U) -> list:'T list -> 'U list
Full name: Microsoft.FSharp.Collections.List.map
val length : int
val l : 'a list
val partition : predicate:('T -> bool) -> list:'T list -> 'T list * 'T list
Full name: Microsoft.FSharp.Collections.List.partition
IObserver.OnNext(value: 'a []) : unit
type Array =
member Clone : unit -> obj
member CopyTo : array:Array * index:int -> unit + 1 overload
member GetEnumerator : unit -> IEnumerator
member GetLength : dimension:int -> int
member GetLongLength : dimension:int -> int64
member GetLowerBound : dimension:int -> int
member GetUpperBound : dimension:int -> int
member GetValue : [<ParamArray>] indices:int[] -> obj + 7 overloads
member Initialize : unit -> unit
member IsFixedSize : bool
...
Full name: System.Array
val ofSeq : source:seq<'T> -> 'T []
Full name: Microsoft.FSharp.Collections.Array.ofSeq
val rev : array:'T [] -> 'T []
Full name: Microsoft.FSharp.Collections.Array.rev
member IObservable.Subscribe : callback:('T -> unit) -> IDisposable
IObservable.Subscribe(observer: IObserver<'a>) : IDisposable
member MailboxProcessor.Post : message:'Msg -> unit
More information