module Projection
type Message =
| Update
| Rebuild
/// Build an actor to coordinate taking events and transforming (projecting) them into a new representation
/// Get the next event after the id if specified, if not specified get the first event
/// Get the current state of the projection, or None if not yet started
/// Store the projection if the event id (if specified) matches the currently stored id
/// Apply the event to previous projection to create the new projection
let buildProjector
(getNextEvent : 'eventId option -> option<'eventId * 'event> Async)
(getCurrentState : unit -> option<'eventId * 'projection> Async)
(updateState : 'eventId option -> ('eventId * 'projection) -> unit Async)
(applyEvent : 'event -> 'projection option -> 'projection Async) =
let rec update currentState =
async {
let currentEventId = (currentState |> Option.map fst)
let! nextEventOption = getNextEvent currentEventId
match nextEventOption with
| None -> return currentState
| Some(eventId, event) ->
let! nextProjection = currentState
|> Option.map snd
|> applyEvent event
return! update (Some(eventId, nextProjection))
}
(new MailboxProcessor(fun inbox ->
let rec processNextMessage() : Async =
async {
let! message = inbox.Receive()
match message with
| Update -> let! state = getCurrentState()
return! state |> applyUpdate
| Rebuild ->
let state = None
return! state |> applyUpdate
}
and applyUpdate state =
async {
let! updated = state |> update
match updated with
| None -> return! processNextMessage()
| Some u ->
do! updateState (state |> Option.map fst) u
return! processNextMessage()
}
processNextMessage())).Post