4 people like it.

A Simple Abstraction for Event Sourced Transactions

A simple abstraction for performing event sourced optimistic transactions

Core Abstractions

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
14: 
15: 
16: 
17: 
18: 
19: 
20: 
21: 
22: 
23: 
24: 
25: 
26: 
27: 
28: 
29: 
30: 
31: 
32: 
33: 
34: 
35: 
36: 
37: 
38: 
39: 
40: 
41: 
42: 
43: 
44: 
45: 
type EventTransaction<'State, 'Event, 'Result> = 
    Transaction of (Envelope<'State> -> 'Result * 'Event[])

and Envelope<'State> =
    {
        StreamId : string
        Value : 'State
        Version : int64
    }

and ITransactionContext<'State, 'Event> =
    abstract MaxRetries : int option
    abstract GetState : streamId:string -> Async<Envelope<'State>>
    abstract TryCommit : streamId:string -> expectedVersion:int64 -> 'Event[] -> Async<bool>

// building a transaction instance from a standard apply/exec pair of functions
module EventTransaction =

    let fromApplyExec (apply : 'State -> 'Event -> 'State) (exec  : 'State -> #seq<'Event>) =
        Transaction(fun state ->
            let events = exec state.Value |> Seq.toArray
            let state' = Array.fold apply state.Value events
            state', events)

// schedule an optimistic transaction again a given context
let runTransaction 
    (context : ITransactionContext<'State, 'Event>) 
    (streamId : string) 
    (Transaction transaction) =

    let rec aux numRetries = async {
        if context.MaxRetries |> Option.exists (fun mr -> numRetries > mr) then 
            invalidOp "exceeded max number of retries!"

        let! state = context.GetState streamId
        let result, events = transaction state
        let! success = context.TryCommit streamId state.Version events
        if not success then return! aux (numRetries + 1)
        else
            let wrap i e = { StreamId = streamId ; Value = e ; Version = state.Version + int64 i + 1L }
            let eventEnvs = events |> Array.mapi wrap
            return result, eventEnvs
    }

    aux 0

Dummy In-Memory Context

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
14: 
15: 
16: 
17: 
18: 
19: 
20: 
// Dummy In-memory implementation, replays the entire stream to recover an aggregate
type InMemoryEventStore<'State, 'Event>(init : 'State, apply : 'State -> 'Event -> 'State) =
    let eventStreams = System.Collections.Concurrent.ConcurrentDictionary<string, ResizeArray<'Event>>()
    let getStream streamId = eventStreams.GetOrAdd(streamId, fun _ -> ResizeArray())
    interface ITransactionContext<'State, 'Event> with
        member __.MaxRetries = None
        member __.GetState streamId = async {
            let eventStream = getStream streamId
            let events = lock eventStream (fun () -> eventStream.ToArray())
            let state = Array.fold apply init events
            return { StreamId = streamId ; Value = state ; Version = events.LongLength - 1L }
        }

        member __.TryCommit streamId expectedVersion events = async {
            let eventStream = getStream streamId
            return lock eventStream (fun () ->
                if expectedVersion <> int64 eventStream.Count - 1L then false
                else
                    eventStream.AddRange events ; true)
        }

Example: Event Sourced Stack

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
14: 
15: 
16: 
17: 
18: 
19: 
20: 
21: 
22: 
23: 
24: 
25: 
26: 
27: 
28: 
29: 
30: 
31: 
32: 
33: 
34: 
35: 
36: 
37: 
38: 
39: 
40: 
41: 
42: 
43: 
44: 
45: 
46: 
type Stack = Stack of int list
type Command = Push of int | Pop
type Event = Pushed of int | Popped of int

[<CompilationRepresentation(CompilationRepresentationFlags.ModuleSuffix)>]
module Stack =
    let empty = Stack []
    let apply s e =
        match s, e with
        | Stack s , Pushed i -> Stack(i :: s)
        | Stack [], Popped _ -> s
        | Stack (h :: t), Popped _ -> Stack t

    let exec cmd s =
        match cmd, s with
        | Push i, _ -> [Pushed i]
        | Pop, Stack [] -> []
        | Pop, Stack (h::_) -> [Popped h]

    let cmd2Transaction cmd = EventTransaction.fromApplyExec apply (exec cmd)


let context = InMemoryEventStore(Stack.empty, Stack.apply)

runTransaction context "foo" (Push 1 |> Stack.cmd2Transaction) |> Async.RunSynchronously
runTransaction context "foo" (Push 2 |> Stack.cmd2Transaction) |> Async.RunSynchronously
runTransaction context "foo" (Pop |> Stack.cmd2Transaction) |> Async.RunSynchronously

// running a custom transaction
let incrHead = 
    Transaction
        (function
        | { Value = Stack [] } -> None, [||]
        | { Value = Stack (h :: _) } -> Some h, [|Popped h ; Pushed (h + 1)|])

runTransaction context "bar" incrHead |> Async.RunSynchronously
runTransaction context "bar" (Push 0 |> Stack.cmd2Transaction) |> Async.RunSynchronously
runTransaction context "bar" incrHead |> Async.RunSynchronously
runTransaction context "bar" incrHead |> Async.RunSynchronously

// contented transactions
[ for i in 1 .. 10 -> Push i ]
|> Seq.map (runTransaction context "bar" << Stack.cmd2Transaction)
|> Seq.map (fun j -> async { let! _ = Async.Sleep 2 in return! j })
|> Async.Parallel
|> Async.RunSynchronously
Multiple items
module Event

from Microsoft.FSharp.Control

--------------------
type Event<'T> =
  new : unit -> Event<'T>
  member Trigger : arg:'T -> unit
  member Publish : IEvent<'T>

Full name: Microsoft.FSharp.Control.Event<_>

--------------------
type Event<'Delegate,'Args (requires delegate and 'Delegate :> Delegate)> =
  new : unit -> Event<'Delegate,'Args>
  member Trigger : sender:obj * args:'Args -> unit
  member Publish : IEvent<'Delegate,'Args>

Full name: Microsoft.FSharp.Control.Event<_,_>

--------------------
new : unit -> Event<'T>

--------------------
new : unit -> Event<'Delegate,'Args>
union case EventTransaction.Transaction: (Envelope<'State> -> 'Result * 'Event []) -> EventTransaction<'State,'Event,'Result>
type Envelope<'State> =
  {StreamId: string;
   Value: 'State;
   Version: int64;}

Full name: Script.Envelope<_>
Envelope.StreamId: string
Multiple items
val string : value:'T -> string

Full name: Microsoft.FSharp.Core.Operators.string

--------------------
type string = System.String

Full name: Microsoft.FSharp.Core.string
Envelope.Value: 'State
Envelope.Version: int64
Multiple items
val int64 : value:'T -> int64 (requires member op_Explicit)

Full name: Microsoft.FSharp.Core.Operators.int64

--------------------
type int64 = System.Int64

Full name: Microsoft.FSharp.Core.int64

--------------------
type int64<'Measure> = int64

Full name: Microsoft.FSharp.Core.int64<_>
type ITransactionContext<'State,'Event> =
  interface
    abstract member GetState : streamId:string -> Async<Envelope<'State>>
    abstract member TryCommit : streamId:string -> expectedVersion:int64 -> 'Event [] -> Async<bool>
    abstract member MaxRetries : int option
  end

Full name: Script.ITransactionContext<_,_>
abstract member ITransactionContext.MaxRetries : int option

Full name: Script.ITransactionContext`2.MaxRetries
Multiple items
val int : value:'T -> int (requires member op_Explicit)

Full name: Microsoft.FSharp.Core.Operators.int

--------------------
type int = int32

Full name: Microsoft.FSharp.Core.int

--------------------
type int<'Measure> = int

Full name: Microsoft.FSharp.Core.int<_>
type 'T option = Option<'T>

Full name: Microsoft.FSharp.Core.option<_>
abstract member ITransactionContext.GetState : streamId:string -> Async<Envelope<'State>>

Full name: Script.ITransactionContext`2.GetState
Multiple items
type Async
static member AsBeginEnd : computation:('Arg -> Async<'T>) -> ('Arg * AsyncCallback * obj -> IAsyncResult) * (IAsyncResult -> 'T) * (IAsyncResult -> unit)
static member AwaitEvent : event:IEvent<'Del,'T> * ?cancelAction:(unit -> unit) -> Async<'T> (requires delegate and 'Del :> Delegate)
static member AwaitIAsyncResult : iar:IAsyncResult * ?millisecondsTimeout:int -> Async<bool>
static member AwaitTask : task:Task -> Async<unit>
static member AwaitTask : task:Task<'T> -> Async<'T>
static member AwaitWaitHandle : waitHandle:WaitHandle * ?millisecondsTimeout:int -> Async<bool>
static member CancelDefaultToken : unit -> unit
static member Catch : computation:Async<'T> -> Async<Choice<'T,exn>>
static member FromBeginEnd : beginAction:(AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg:'Arg1 * beginAction:('Arg1 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg1:'Arg1 * arg2:'Arg2 * beginAction:('Arg1 * 'Arg2 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg1:'Arg1 * arg2:'Arg2 * arg3:'Arg3 * beginAction:('Arg1 * 'Arg2 * 'Arg3 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromContinuations : callback:(('T -> unit) * (exn -> unit) * (OperationCanceledException -> unit) -> unit) -> Async<'T>
static member Ignore : computation:Async<'T> -> Async<unit>
static member OnCancel : interruption:(unit -> unit) -> Async<IDisposable>
static member Parallel : computations:seq<Async<'T>> -> Async<'T []>
static member RunSynchronously : computation:Async<'T> * ?timeout:int * ?cancellationToken:CancellationToken -> 'T
static member Sleep : millisecondsDueTime:int -> Async<unit>
static member Start : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
static member StartAsTask : computation:Async<'T> * ?taskCreationOptions:TaskCreationOptions * ?cancellationToken:CancellationToken -> Task<'T>
static member StartChild : computation:Async<'T> * ?millisecondsTimeout:int -> Async<Async<'T>>
static member StartChildAsTask : computation:Async<'T> * ?taskCreationOptions:TaskCreationOptions -> Async<Task<'T>>
static member StartImmediate : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
static member StartWithContinuations : computation:Async<'T> * continuation:('T -> unit) * exceptionContinuation:(exn -> unit) * cancellationContinuation:(OperationCanceledException -> unit) * ?cancellationToken:CancellationToken -> unit
static member SwitchToContext : syncContext:SynchronizationContext -> Async<unit>
static member SwitchToNewThread : unit -> Async<unit>
static member SwitchToThreadPool : unit -> Async<unit>
static member TryCancelled : computation:Async<'T> * compensation:(OperationCanceledException -> unit) -> Async<'T>
static member CancellationToken : Async<CancellationToken>
static member DefaultCancellationToken : CancellationToken

Full name: Microsoft.FSharp.Control.Async

--------------------
type Async<'T>

Full name: Microsoft.FSharp.Control.Async<_>
abstract member ITransactionContext.TryCommit : streamId:string -> expectedVersion:int64 -> 'Event [] -> Async<bool>

Full name: Script.ITransactionContext`2.TryCommit
type bool = System.Boolean

Full name: Microsoft.FSharp.Core.bool
type EventTransaction<'State,'Event,'Result> = | Transaction of (Envelope<'State> -> 'Result * 'Event [])

Full name: Script.EventTransaction<_,_,_>
val fromApplyExec : apply:('State -> 'Event -> 'State) -> exec:('State -> #seq<'Event>) -> EventTransaction<'State,'Event,'State>

Full name: Script.EventTransaction.fromApplyExec
val apply : ('State -> 'Event -> 'State)
val exec : ('State -> #seq<'Event>)
Multiple items
val seq : sequence:seq<'T> -> seq<'T>

Full name: Microsoft.FSharp.Core.Operators.seq

--------------------
type seq<'T> = System.Collections.Generic.IEnumerable<'T>

Full name: Microsoft.FSharp.Collections.seq<_>
val state : Envelope<'State>
val events : 'Event []
module Seq

from Microsoft.FSharp.Collections
val toArray : source:seq<'T> -> 'T []

Full name: Microsoft.FSharp.Collections.Seq.toArray
val state' : 'State
module Array

from Microsoft.FSharp.Collections
val fold : folder:('State -> 'T -> 'State) -> state:'State -> array:'T [] -> 'State

Full name: Microsoft.FSharp.Collections.Array.fold
val runTransaction : context:ITransactionContext<'State,'Event> -> streamId:string -> EventTransaction<'State,'Event,'a> -> Async<'a * Envelope<'Event> []>

Full name: Script.runTransaction
val context : ITransactionContext<'State,'Event>
val streamId : string
val transaction : (Envelope<'State> -> 'a * 'Event [])
val aux : (int -> Async<'a * Envelope<'Event> []>)
val numRetries : int
val async : AsyncBuilder

Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.async
property ITransactionContext.MaxRetries: int option
module Option

from Microsoft.FSharp.Core
val exists : predicate:('T -> bool) -> option:'T option -> bool

Full name: Microsoft.FSharp.Core.Option.exists
val mr : int
val invalidOp : message:string -> 'T

Full name: Microsoft.FSharp.Core.Operators.invalidOp
abstract member ITransactionContext.GetState : streamId:string -> Async<Envelope<'State>>
val result : 'a
val success : bool
abstract member ITransactionContext.TryCommit : streamId:string -> expectedVersion:int64 -> 'Event [] -> Async<bool>
val not : value:bool -> bool

Full name: Microsoft.FSharp.Core.Operators.not
val wrap : (int -> 'b -> Envelope<'b>)
val i : int
val e : 'b
val eventEnvs : Envelope<'Event> []
val mapi : mapping:(int -> 'T -> 'U) -> array:'T [] -> 'U []

Full name: Microsoft.FSharp.Collections.Array.mapi
Multiple items
type InMemoryEventStore<'State,'Event> =
  interface ITransactionContext<'State,'Event>
  new : init:'State * apply:('State -> 'Event -> 'State) -> InMemoryEventStore<'State,'Event>

Full name: Script.InMemoryEventStore<_,_>

--------------------
new : init:'State * apply:('State -> 'Event -> 'State) -> InMemoryEventStore<'State,'Event>
val init : 'State
val eventStreams : System.Collections.Concurrent.ConcurrentDictionary<string,ResizeArray<'Event>>
namespace System
namespace System.Collections
namespace System.Collections.Concurrent
Multiple items
type ConcurrentDictionary<'TKey,'TValue> =
  new : unit -> ConcurrentDictionary<'TKey, 'TValue> + 6 overloads
  member AddOrUpdate : key:'TKey * addValueFactory:Func<'TKey, 'TValue> * updateValueFactory:Func<'TKey, 'TValue, 'TValue> -> 'TValue + 1 overload
  member Clear : unit -> unit
  member ContainsKey : key:'TKey -> bool
  member Count : int
  member GetEnumerator : unit -> IEnumerator<KeyValuePair<'TKey, 'TValue>>
  member GetOrAdd : key:'TKey * valueFactory:Func<'TKey, 'TValue> -> 'TValue + 1 overload
  member IsEmpty : bool
  member Item : 'TKey -> 'TValue with get, set
  member Keys : ICollection<'TKey>
  ...

Full name: System.Collections.Concurrent.ConcurrentDictionary<_,_>

--------------------
System.Collections.Concurrent.ConcurrentDictionary() : unit
System.Collections.Concurrent.ConcurrentDictionary(collection: System.Collections.Generic.IEnumerable<System.Collections.Generic.KeyValuePair<'TKey,'TValue>>) : unit
System.Collections.Concurrent.ConcurrentDictionary(comparer: System.Collections.Generic.IEqualityComparer<'TKey>) : unit
System.Collections.Concurrent.ConcurrentDictionary(concurrencyLevel: int, capacity: int) : unit
System.Collections.Concurrent.ConcurrentDictionary(collection: System.Collections.Generic.IEnumerable<System.Collections.Generic.KeyValuePair<'TKey,'TValue>>, comparer: System.Collections.Generic.IEqualityComparer<'TKey>) : unit
System.Collections.Concurrent.ConcurrentDictionary(concurrencyLevel: int, collection: System.Collections.Generic.IEnumerable<System.Collections.Generic.KeyValuePair<'TKey,'TValue>>, comparer: System.Collections.Generic.IEqualityComparer<'TKey>) : unit
System.Collections.Concurrent.ConcurrentDictionary(concurrencyLevel: int, capacity: int, comparer: System.Collections.Generic.IEqualityComparer<'TKey>) : unit
type ResizeArray<'T> = System.Collections.Generic.List<'T>

Full name: Microsoft.FSharp.Collections.ResizeArray<_>
val getStream : (string -> ResizeArray<'Event>)
System.Collections.Concurrent.ConcurrentDictionary.GetOrAdd(key: string, value: ResizeArray<'Event>) : ResizeArray<'Event>
System.Collections.Concurrent.ConcurrentDictionary.GetOrAdd(key: string, valueFactory: System.Func<string,ResizeArray<'Event>>) : ResizeArray<'Event>
override InMemoryEventStore.MaxRetries : int option

Full name: Script.InMemoryEventStore`2.MaxRetries
union case Option.None: Option<'T>
val __ : InMemoryEventStore<'State,'Event>
override InMemoryEventStore.GetState : streamId:string -> Async<Envelope<'State>>

Full name: Script.InMemoryEventStore`2.GetState
val eventStream : ResizeArray<'Event>
val lock : lockObject:'Lock -> action:(unit -> 'T) -> 'T (requires reference type)

Full name: Microsoft.FSharp.Core.Operators.lock
System.Collections.Generic.List.ToArray() : 'Event []
val state : 'State
property System.Array.LongLength: int64
override InMemoryEventStore.TryCommit : streamId:string -> expectedVersion:int64 -> events:'Event [] -> Async<bool>

Full name: Script.InMemoryEventStore`2.TryCommit
val expectedVersion : int64
property System.Collections.Generic.List.Count: int
System.Collections.Generic.List.AddRange(collection: System.Collections.Generic.IEnumerable<'Event>) : unit
Multiple items
union case Stack.Stack: int list -> Stack

--------------------
type Stack = | Stack of int list

Full name: Script.Stack
type 'T list = List<'T>

Full name: Microsoft.FSharp.Collections.list<_>
type Command =
  | Push of int
  | Pop

Full name: Script.Command
union case Command.Push: int -> Command
union case Command.Pop: Command
Multiple items
module Event

from Microsoft.FSharp.Control

--------------------
type Event =
  | Pushed of int
  | Popped of int

Full name: Script.Event

--------------------
type Event<'T> =
  new : unit -> Event<'T>
  member Trigger : arg:'T -> unit
  member Publish : IEvent<'T>

Full name: Microsoft.FSharp.Control.Event<_>

--------------------
type Event<'Delegate,'Args (requires delegate and 'Delegate :> Delegate)> =
  new : unit -> Event<'Delegate,'Args>
  member Trigger : sender:obj * args:'Args -> unit
  member Publish : IEvent<'Delegate,'Args>

Full name: Microsoft.FSharp.Control.Event<_,_>

--------------------
new : unit -> Event<'T>

--------------------
new : unit -> Event<'Delegate,'Args>
union case Event.Pushed: int -> Event
union case Event.Popped: int -> Event
Multiple items
type CompilationRepresentationAttribute =
  inherit Attribute
  new : flags:CompilationRepresentationFlags -> CompilationRepresentationAttribute
  member Flags : CompilationRepresentationFlags

Full name: Microsoft.FSharp.Core.CompilationRepresentationAttribute

--------------------
new : flags:CompilationRepresentationFlags -> CompilationRepresentationAttribute
type CompilationRepresentationFlags =
  | None = 0
  | Static = 1
  | Instance = 2
  | ModuleSuffix = 4
  | UseNullAsTrueValue = 8
  | Event = 16

Full name: Microsoft.FSharp.Core.CompilationRepresentationFlags
CompilationRepresentationFlags.ModuleSuffix: CompilationRepresentationFlags = 4
val empty : Stack

Full name: Script.StackModule.empty
val apply : s:Stack -> e:Event -> Stack

Full name: Script.StackModule.apply
val s : Stack
val e : Event
val s : int list
val h : int
val t : int list
val exec : cmd:Command -> s:Stack -> Event list

Full name: Script.StackModule.exec
val cmd : Command
val cmd2Transaction : cmd:Command -> EventTransaction<Stack,Event,Stack>

Full name: Script.StackModule.cmd2Transaction
Multiple items
module EventTransaction

from Script

--------------------
type EventTransaction<'State,'Event,'Result> = | Transaction of (Envelope<'State> -> 'Result * 'Event [])

Full name: Script.EventTransaction<_,_,_>
val context : InMemoryEventStore<Stack,Event>

Full name: Script.context
Multiple items
union case Stack.Stack: int list -> Stack

--------------------
module Stack

from Script

--------------------
type Stack = | Stack of int list

Full name: Script.Stack
static member Async.RunSynchronously : computation:Async<'T> * ?timeout:int * ?cancellationToken:System.Threading.CancellationToken -> 'T
val incrHead : EventTransaction<Stack,Event,int option>

Full name: Script.incrHead
union case Option.Some: Value: 'T -> Option<'T>
val map : mapping:('T -> 'U) -> source:seq<'T> -> seq<'U>

Full name: Microsoft.FSharp.Collections.Seq.map
val j : Async<Stack * Envelope<Event> []>
static member Async.Sleep : millisecondsDueTime:int -> Async<unit>
static member Async.Parallel : computations:seq<Async<'T>> -> Async<'T []>
Raw view Test code New version

More information

Link:http://fssnip.net/7V8
Posted:5 years ago
Author:Eirik Tsarpalis
Tags: event sourcing