open System

module Observable =     
  /// Creates an observable that returns windows of size 'n' (or smaller at the start)
  /// containing 'n' past values produced by observable 'source'. The order of items in
  /// the returned buffers is not guaranteed (it's a circular buffer).
  let windowed n (source:IObservable<'T>) = 
    { new IObservable<'T[]> with
        member x.Subscribe(obs) = 
          let agent = MailboxProcessor.Start(fun inbox ->
            let buffer = Array.zeroCreate n
            let rec loop index count = async {
              let! v = inbox.Receive()
              buffer.[index] <- v
              buffer |> Seq.truncate (count+1) |> Array.ofSeq |> obs.OnNext
              return! loop (if index + 1 = n then 0 else index + 1) (count + 1) }
            loop 0 0)
          { new IObserver<'T> with
              member x.OnNext(v) = agent.Post(v)
              member x.OnError(e) = obs.OnError(e)
              member x.OnCompleted() = obs.OnCompleted() } |> source.Subscribe }