5 people like it.

Event stream projection with actors

An example of using an actor (MailboxProcessor) to coordinate the projection of events from an event stream into a persisted projection. This example covers handling simple concurrency conflicts via also persisting and checking the latest event ID with with projection. The update command will take the current projection and apply all new events before persisting it back. The rebuild command will ignore an existing projection and replay all events in the stream.

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
14: 
15: 
16: 
17: 
18: 
19: 
20: 
21: 
22: 
23: 
24: 
25: 
26: 
27: 
28: 
29: 
30: 
31: 
32: 
33: 
34: 
35: 
36: 
37: 
38: 
39: 
40: 
41: 
42: 
43: 
44: 
45: 
46: 
47: 
48: 
49: 
50: 
51: 
52: 
module Projection

type Message = 
    | Update
    | Rebuild

/// <summary>Build an actor to coordinate taking events and transforming (projecting) them into a new representation</summary>
/// <param name="getNextEvent">Get the next event after the id if specified, if not specified get the first event</param>
/// <param name="getCurrentState">Get the current state of the projection, or None if not yet started</param>
/// <param name="updateState">Store the projection if the event id (if specified) matches the currently stored id</param>
/// <param name="applyEvent">Apply the event to previous projection to create the new projection</param>
let buildProjector
    (getNextEvent : 'eventId option -> option<'eventId * 'event> Async)
    (getCurrentState : unit -> option<'eventId * 'projection> Async)
    (updateState : 'eventId option -> ('eventId * 'projection) -> unit Async)
    (applyEvent : 'event -> 'projection option -> 'projection Async) = 

    let rec update currentState = 
        async { 
            let currentEventId = (currentState |> Option.map fst)
            let! nextEventOption = getNextEvent currentEventId
            match nextEventOption with
            | None -> return currentState
            | Some(eventId, event) -> 
                let! nextProjection = currentState
                                      |> Option.map snd
                                      |> applyEvent event
                return! update (Some(eventId, nextProjection))
        }
    (new MailboxProcessor<Message>(fun inbox -> 
    let rec processNextMessage() : Async<unit> = 
        async { 
            let! message = inbox.Receive()
            match message with
            | Update -> let! state = getCurrentState()
                        return! state |> applyUpdate
            | Rebuild -> 
                let state = None
                return! state |> applyUpdate
        }
    
    and applyUpdate state = 
        async { 
            let! updated = state |> update
            match updated with
            | None -> return! processNextMessage()
            | Some u -> 
                do! updateState (state |> Option.map fst) u
                return! processNextMessage()
        }
    
    processNextMessage())).Post
module Projection
type Message =
  | Update
  | Rebuild

Full name: Projection.Message
union case Message.Update: Message
union case Message.Rebuild: Message
val buildProjector : getNextEvent:('eventId option -> Async<('eventId * 'event) option>) -> getCurrentState:(unit -> Async<('eventId * 'projection) option>) -> updateState:('eventId option -> 'eventId * 'projection -> Async<unit>) -> applyEvent:('event -> 'projection option -> Async<'projection>) -> (Message -> unit)

Full name: Projection.buildProjector


 <summary>Build an actor to coordinate taking events and transforming (projecting) them into a new representation</summary>
 <param name="getNextEvent">Get the next event after the id if specified, if not specified get the first event</param>
 <param name="getCurrentState">Get the current state of the projection, or None if not yet started</param>
 <param name="updateState">Store the projection if the event id (if specified) matches the currently stored id</param>
 <param name="applyEvent">Apply the event to previous projection to create the new projection</param>
val getNextEvent : ('eventId option -> Async<('eventId * 'event) option>)
type 'T option = Option<'T>

Full name: Microsoft.FSharp.Core.option<_>
Multiple items
type Async
static member AsBeginEnd : computation:('Arg -> Async<'T>) -> ('Arg * AsyncCallback * obj -> IAsyncResult) * (IAsyncResult -> 'T) * (IAsyncResult -> unit)
static member AwaitEvent : event:IEvent<'Del,'T> * ?cancelAction:(unit -> unit) -> Async<'T> (requires delegate and 'Del :> Delegate)
static member AwaitIAsyncResult : iar:IAsyncResult * ?millisecondsTimeout:int -> Async<bool>
static member AwaitTask : task:Task<'T> -> Async<'T>
static member AwaitWaitHandle : waitHandle:WaitHandle * ?millisecondsTimeout:int -> Async<bool>
static member CancelDefaultToken : unit -> unit
static member Catch : computation:Async<'T> -> Async<Choice<'T,exn>>
static member FromBeginEnd : beginAction:(AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg:'Arg1 * beginAction:('Arg1 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg1:'Arg1 * arg2:'Arg2 * beginAction:('Arg1 * 'Arg2 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg1:'Arg1 * arg2:'Arg2 * arg3:'Arg3 * beginAction:('Arg1 * 'Arg2 * 'Arg3 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromContinuations : callback:(('T -> unit) * (exn -> unit) * (OperationCanceledException -> unit) -> unit) -> Async<'T>
static member Ignore : computation:Async<'T> -> Async<unit>
static member OnCancel : interruption:(unit -> unit) -> Async<IDisposable>
static member Parallel : computations:seq<Async<'T>> -> Async<'T []>
static member RunSynchronously : computation:Async<'T> * ?timeout:int * ?cancellationToken:CancellationToken -> 'T
static member Sleep : millisecondsDueTime:int -> Async<unit>
static member Start : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
static member StartAsTask : computation:Async<'T> * ?taskCreationOptions:TaskCreationOptions * ?cancellationToken:CancellationToken -> Task<'T>
static member StartChild : computation:Async<'T> * ?millisecondsTimeout:int -> Async<Async<'T>>
static member StartChildAsTask : computation:Async<'T> * ?taskCreationOptions:TaskCreationOptions -> Async<Task<'T>>
static member StartImmediate : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
static member StartWithContinuations : computation:Async<'T> * continuation:('T -> unit) * exceptionContinuation:(exn -> unit) * cancellationContinuation:(OperationCanceledException -> unit) * ?cancellationToken:CancellationToken -> unit
static member SwitchToContext : syncContext:SynchronizationContext -> Async<unit>
static member SwitchToNewThread : unit -> Async<unit>
static member SwitchToThreadPool : unit -> Async<unit>
static member TryCancelled : computation:Async<'T> * compensation:(OperationCanceledException -> unit) -> Async<'T>
static member CancellationToken : Async<CancellationToken>
static member DefaultCancellationToken : CancellationToken

Full name: Microsoft.FSharp.Control.Async

--------------------
type Async<'T>

Full name: Microsoft.FSharp.Control.Async<_>
val getCurrentState : (unit -> Async<('eventId * 'projection) option>)
type unit = Unit

Full name: Microsoft.FSharp.Core.unit
val updateState : ('eventId option -> 'eventId * 'projection -> Async<unit>)
val applyEvent : ('event -> 'projection option -> Async<'projection>)
val update : (('eventId * 'projection) option -> Async<('eventId * 'projection) option>)
val currentState : ('eventId * 'projection) option
val async : AsyncBuilder

Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.async
val currentEventId : 'eventId option
module Option

from Microsoft.FSharp.Core
val map : mapping:('T -> 'U) -> option:'T option -> 'U option

Full name: Microsoft.FSharp.Core.Option.map
val fst : tuple:('T1 * 'T2) -> 'T1

Full name: Microsoft.FSharp.Core.Operators.fst
val nextEventOption : ('eventId * 'event) option
union case Option.None: Option<'T>
union case Option.Some: Value: 'T -> Option<'T>
val eventId : 'eventId
val event : 'event
val nextProjection : 'projection
val snd : tuple:('T1 * 'T2) -> 'T2

Full name: Microsoft.FSharp.Core.Operators.snd
Multiple items
type MailboxProcessor<'Msg> =
  interface IDisposable
  new : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:CancellationToken -> MailboxProcessor<'Msg>
  member Post : message:'Msg -> unit
  member PostAndAsyncReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> Async<'Reply>
  member PostAndReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> 'Reply
  member PostAndTryAsyncReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> Async<'Reply option>
  member Receive : ?timeout:int -> Async<'Msg>
  member Scan : scanner:('Msg -> Async<'T> option) * ?timeout:int -> Async<'T>
  member Start : unit -> unit
  member TryPostAndReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> 'Reply option
  ...

Full name: Microsoft.FSharp.Control.MailboxProcessor<_>

--------------------
new : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:System.Threading.CancellationToken -> MailboxProcessor<'Msg>
val inbox : MailboxProcessor<Message>
val processNextMessage : (unit -> Async<unit>)
val message : Message
member MailboxProcessor.Receive : ?timeout:int -> Async<'Msg>
val state : ('eventId * 'projection) option
val applyUpdate : (('eventId * 'projection) option -> Async<unit>)
val state : 'a option
val updated : ('eventId * 'projection) option
val u : 'eventId * 'projection
Raw view Test code New version

More information

Link:http://fssnip.net/oc
Posted:9 years ago
Author:Daniel Bradley
Tags: events , event sourcing , cqrs , actors , mailboxprocessor