5 people like it.
Like the snippet!
Event stream projection with actors
An example of using an actor (MailboxProcessor) to coordinate the projection of events from an event stream into a persisted projection. This example covers handling simple concurrency conflicts via also persisting and checking the latest event ID with with projection.
The update command will take the current projection and apply all new events before persisting it back.
The rebuild command will ignore an existing projection and replay all events in the stream.
1:
2:
3:
4:
5:
6:
7:
8:
9:
10:
11:
12:
13:
14:
15:
16:
17:
18:
19:
20:
21:
22:
23:
24:
25:
26:
27:
28:
29:
30:
31:
32:
33:
34:
35:
36:
37:
38:
39:
40:
41:
42:
43:
44:
45:
46:
47:
48:
49:
50:
51:
52:
|
module Projection
type Message =
| Update
| Rebuild
/// <summary>Build an actor to coordinate taking events and transforming (projecting) them into a new representation</summary>
/// <param name="getNextEvent">Get the next event after the id if specified, if not specified get the first event</param>
/// <param name="getCurrentState">Get the current state of the projection, or None if not yet started</param>
/// <param name="updateState">Store the projection if the event id (if specified) matches the currently stored id</param>
/// <param name="applyEvent">Apply the event to previous projection to create the new projection</param>
let buildProjector
(getNextEvent : 'eventId option -> option<'eventId * 'event> Async)
(getCurrentState : unit -> option<'eventId * 'projection> Async)
(updateState : 'eventId option -> ('eventId * 'projection) -> unit Async)
(applyEvent : 'event -> 'projection option -> 'projection Async) =
let rec update currentState =
async {
let currentEventId = (currentState |> Option.map fst)
let! nextEventOption = getNextEvent currentEventId
match nextEventOption with
| None -> return currentState
| Some(eventId, event) ->
let! nextProjection = currentState
|> Option.map snd
|> applyEvent event
return! update (Some(eventId, nextProjection))
}
(new MailboxProcessor<Message>(fun inbox ->
let rec processNextMessage() : Async<unit> =
async {
let! message = inbox.Receive()
match message with
| Update -> let! state = getCurrentState()
return! state |> applyUpdate
| Rebuild ->
let state = None
return! state |> applyUpdate
}
and applyUpdate state =
async {
let! updated = state |> update
match updated with
| None -> return! processNextMessage()
| Some u ->
do! updateState (state |> Option.map fst) u
return! processNextMessage()
}
processNextMessage())).Post
|
module Projection
type Message =
| Update
| Rebuild
Full name: Projection.Message
union case Message.Update: Message
union case Message.Rebuild: Message
val buildProjector : getNextEvent:('eventId option -> Async<('eventId * 'event) option>) -> getCurrentState:(unit -> Async<('eventId * 'projection) option>) -> updateState:('eventId option -> 'eventId * 'projection -> Async<unit>) -> applyEvent:('event -> 'projection option -> Async<'projection>) -> (Message -> unit)
Full name: Projection.buildProjector
<summary>Build an actor to coordinate taking events and transforming (projecting) them into a new representation</summary>
<param name="getNextEvent">Get the next event after the id if specified, if not specified get the first event</param>
<param name="getCurrentState">Get the current state of the projection, or None if not yet started</param>
<param name="updateState">Store the projection if the event id (if specified) matches the currently stored id</param>
<param name="applyEvent">Apply the event to previous projection to create the new projection</param>
val getNextEvent : ('eventId option -> Async<('eventId * 'event) option>)
type 'T option = Option<'T>
Full name: Microsoft.FSharp.Core.option<_>
Multiple items
type Async
static member AsBeginEnd : computation:('Arg -> Async<'T>) -> ('Arg * AsyncCallback * obj -> IAsyncResult) * (IAsyncResult -> 'T) * (IAsyncResult -> unit)
static member AwaitEvent : event:IEvent<'Del,'T> * ?cancelAction:(unit -> unit) -> Async<'T> (requires delegate and 'Del :> Delegate)
static member AwaitIAsyncResult : iar:IAsyncResult * ?millisecondsTimeout:int -> Async<bool>
static member AwaitTask : task:Task<'T> -> Async<'T>
static member AwaitWaitHandle : waitHandle:WaitHandle * ?millisecondsTimeout:int -> Async<bool>
static member CancelDefaultToken : unit -> unit
static member Catch : computation:Async<'T> -> Async<Choice<'T,exn>>
static member FromBeginEnd : beginAction:(AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg:'Arg1 * beginAction:('Arg1 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg1:'Arg1 * arg2:'Arg2 * beginAction:('Arg1 * 'Arg2 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg1:'Arg1 * arg2:'Arg2 * arg3:'Arg3 * beginAction:('Arg1 * 'Arg2 * 'Arg3 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromContinuations : callback:(('T -> unit) * (exn -> unit) * (OperationCanceledException -> unit) -> unit) -> Async<'T>
static member Ignore : computation:Async<'T> -> Async<unit>
static member OnCancel : interruption:(unit -> unit) -> Async<IDisposable>
static member Parallel : computations:seq<Async<'T>> -> Async<'T []>
static member RunSynchronously : computation:Async<'T> * ?timeout:int * ?cancellationToken:CancellationToken -> 'T
static member Sleep : millisecondsDueTime:int -> Async<unit>
static member Start : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
static member StartAsTask : computation:Async<'T> * ?taskCreationOptions:TaskCreationOptions * ?cancellationToken:CancellationToken -> Task<'T>
static member StartChild : computation:Async<'T> * ?millisecondsTimeout:int -> Async<Async<'T>>
static member StartChildAsTask : computation:Async<'T> * ?taskCreationOptions:TaskCreationOptions -> Async<Task<'T>>
static member StartImmediate : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
static member StartWithContinuations : computation:Async<'T> * continuation:('T -> unit) * exceptionContinuation:(exn -> unit) * cancellationContinuation:(OperationCanceledException -> unit) * ?cancellationToken:CancellationToken -> unit
static member SwitchToContext : syncContext:SynchronizationContext -> Async<unit>
static member SwitchToNewThread : unit -> Async<unit>
static member SwitchToThreadPool : unit -> Async<unit>
static member TryCancelled : computation:Async<'T> * compensation:(OperationCanceledException -> unit) -> Async<'T>
static member CancellationToken : Async<CancellationToken>
static member DefaultCancellationToken : CancellationToken
Full name: Microsoft.FSharp.Control.Async
--------------------
type Async<'T>
Full name: Microsoft.FSharp.Control.Async<_>
val getCurrentState : (unit -> Async<('eventId * 'projection) option>)
type unit = Unit
Full name: Microsoft.FSharp.Core.unit
val updateState : ('eventId option -> 'eventId * 'projection -> Async<unit>)
val applyEvent : ('event -> 'projection option -> Async<'projection>)
val update : (('eventId * 'projection) option -> Async<('eventId * 'projection) option>)
val currentState : ('eventId * 'projection) option
val async : AsyncBuilder
Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.async
val currentEventId : 'eventId option
module Option
from Microsoft.FSharp.Core
val map : mapping:('T -> 'U) -> option:'T option -> 'U option
Full name: Microsoft.FSharp.Core.Option.map
val fst : tuple:('T1 * 'T2) -> 'T1
Full name: Microsoft.FSharp.Core.Operators.fst
val nextEventOption : ('eventId * 'event) option
union case Option.None: Option<'T>
union case Option.Some: Value: 'T -> Option<'T>
val eventId : 'eventId
val event : 'event
val nextProjection : 'projection
val snd : tuple:('T1 * 'T2) -> 'T2
Full name: Microsoft.FSharp.Core.Operators.snd
Multiple items
type MailboxProcessor<'Msg> =
interface IDisposable
new : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:CancellationToken -> MailboxProcessor<'Msg>
member Post : message:'Msg -> unit
member PostAndAsyncReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> Async<'Reply>
member PostAndReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> 'Reply
member PostAndTryAsyncReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> Async<'Reply option>
member Receive : ?timeout:int -> Async<'Msg>
member Scan : scanner:('Msg -> Async<'T> option) * ?timeout:int -> Async<'T>
member Start : unit -> unit
member TryPostAndReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> 'Reply option
...
Full name: Microsoft.FSharp.Control.MailboxProcessor<_>
--------------------
new : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:System.Threading.CancellationToken -> MailboxProcessor<'Msg>
val inbox : MailboxProcessor<Message>
val processNextMessage : (unit -> Async<unit>)
val message : Message
member MailboxProcessor.Receive : ?timeout:int -> Async<'Msg>
val state : ('eventId * 'projection) option
val applyUpdate : (('eventId * 'projection) option -> Async<unit>)
val state : 'a option
val updated : ('eventId * 'projection) option
val u : 'eventId * 'projection
More information