open System open System.Collections.Generic type CircularBuffer<'T> (bufferSize:int) = let buffer = Array.zeroCreate<'T> bufferSize let mutable index = 0 let mutable total = 0 member this.Add value = if bufferSize > 0 then buffer.[index] <- value index <- (index + 1) % bufferSize total <- min (total + 1) bufferSize member this.Iter f = let start = if total = bufferSize then index else 0 for i = 0 to total - 1 do buffer.[(start + i) % bufferSize] |> f type message<'T> = | Add of IObserver<'T> | Remove of IObserver<'T> | Next of 'T | Completed | Error of exn let startAgent (bufferSize:int) = let subscribers = LinkedList<_>() let buffer = CircularBuffer bufferSize MailboxProcessor.Start(fun inbox -> let rec loop () = async { let! message = inbox.Receive() match message with | Add observer -> subscribers.AddLast observer |> ignore buffer.Iter observer.OnNext return! loop () | Remove observer -> subscribers.Remove observer |> ignore return! loop () | Next value -> for subscriber in subscribers do subscriber.OnNext value buffer.Add value return! loop () | Error e -> for subscriber in subscribers do subscriber.OnError e | Completed -> for subscriber in subscribers do subscriber.OnCompleted () } loop () ) type ReplaySubject<'T> (bufferSize:int) = let bufferSize = max 0 bufferSize let agent = startAgent bufferSize let subscribe observer = observer |> Add |> agent.Post { new System.IDisposable with member this.Dispose () = observer |> Remove |> agent.Post } member this.Next value = Next value |> agent.Post member this.Error error = Error error |> agent.Post member this.Completed () = Completed |> agent.Post interface System.IObserver<'T> with member this.OnNext value = Next value |> agent.Post member this.OnError error = Error error |> agent.Post member this.OnCompleted () = Completed |> agent.Post member this.Subscribe(observer:System.IObserver<'T>) = subscribe observer interface System.IObservable<'T> with member this.Subscribe observer = subscribe observer and Subject<'T>() = inherit ReplaySubject<'T>(0) do let subject = ReplaySubject(3) use d = subject.Subscribe(fun (x:int) -> System.Console.WriteLine x) subject.Next(10) subject.Next(11) use d = subject.Subscribe(fun (x:int) -> System.Console.WriteLine x) System.Console.ReadLine() |> ignore