4 people like it.
Like the snippet!
A Simple Abstraction for Event Sourced Transactions
A simple abstraction for performing event sourced optimistic transactions
1:
2:
3:
4:
5:
6:
7:
8:
9:
10:
11:
12:
13:
14:
15:
16:
17:
18:
19:
20:
21:
22:
23:
24:
25:
26:
27:
28:
29:
30:
31:
32:
33:
34:
35:
36:
37:
38:
39:
40:
41:
42:
43:
44:
45:
|
type EventTransaction<'State, 'Event, 'Result> =
Transaction of (Envelope<'State> -> 'Result * 'Event[])
and Envelope<'State> =
{
StreamId : string
Value : 'State
Version : int64
}
and ITransactionContext<'State, 'Event> =
abstract MaxRetries : int option
abstract GetState : streamId:string -> Async<Envelope<'State>>
abstract TryCommit : streamId:string -> expectedVersion:int64 -> 'Event[] -> Async<bool>
// building a transaction instance from a standard apply/exec pair of functions
module EventTransaction =
let fromApplyExec (apply : 'State -> 'Event -> 'State) (exec : 'State -> #seq<'Event>) =
Transaction(fun state ->
let events = exec state.Value |> Seq.toArray
let state' = Array.fold apply state.Value events
state', events)
// schedule an optimistic transaction again a given context
let runTransaction
(context : ITransactionContext<'State, 'Event>)
(streamId : string)
(Transaction transaction) =
let rec aux numRetries = async {
if context.MaxRetries |> Option.exists (fun mr -> numRetries > mr) then
invalidOp "exceeded max number of retries!"
let! state = context.GetState streamId
let result, events = transaction state
let! success = context.TryCommit streamId state.Version events
if not success then return! aux (numRetries + 1)
else
let wrap i e = { StreamId = streamId ; Value = e ; Version = state.Version + int64 i + 1L }
let eventEnvs = events |> Array.mapi wrap
return result, eventEnvs
}
aux 0
|
1:
2:
3:
4:
5:
6:
7:
8:
9:
10:
11:
12:
13:
14:
15:
16:
17:
18:
19:
20:
|
// Dummy In-memory implementation, replays the entire stream to recover an aggregate
type InMemoryEventStore<'State, 'Event>(init : 'State, apply : 'State -> 'Event -> 'State) =
let eventStreams = System.Collections.Concurrent.ConcurrentDictionary<string, ResizeArray<'Event>>()
let getStream streamId = eventStreams.GetOrAdd(streamId, fun _ -> ResizeArray())
interface ITransactionContext<'State, 'Event> with
member __.MaxRetries = None
member __.GetState streamId = async {
let eventStream = getStream streamId
let events = lock eventStream (fun () -> eventStream.ToArray())
let state = Array.fold apply init events
return { StreamId = streamId ; Value = state ; Version = events.LongLength - 1L }
}
member __.TryCommit streamId expectedVersion events = async {
let eventStream = getStream streamId
return lock eventStream (fun () ->
if expectedVersion <> int64 eventStream.Count - 1L then false
else
eventStream.AddRange events ; true)
}
|
1:
2:
3:
4:
5:
6:
7:
8:
9:
10:
11:
12:
13:
14:
15:
16:
17:
18:
19:
20:
21:
22:
23:
24:
25:
26:
27:
28:
29:
30:
31:
32:
33:
34:
35:
36:
37:
38:
39:
40:
41:
42:
43:
44:
45:
46:
|
type Stack = Stack of int list
type Command = Push of int | Pop
type Event = Pushed of int | Popped of int
[<CompilationRepresentation(CompilationRepresentationFlags.ModuleSuffix)>]
module Stack =
let empty = Stack []
let apply s e =
match s, e with
| Stack s , Pushed i -> Stack(i :: s)
| Stack [], Popped _ -> s
| Stack (h :: t), Popped _ -> Stack t
let exec cmd s =
match cmd, s with
| Push i, _ -> [Pushed i]
| Pop, Stack [] -> []
| Pop, Stack (h::_) -> [Popped h]
let cmd2Transaction cmd = EventTransaction.fromApplyExec apply (exec cmd)
let context = InMemoryEventStore(Stack.empty, Stack.apply)
runTransaction context "foo" (Push 1 |> Stack.cmd2Transaction) |> Async.RunSynchronously
runTransaction context "foo" (Push 2 |> Stack.cmd2Transaction) |> Async.RunSynchronously
runTransaction context "foo" (Pop |> Stack.cmd2Transaction) |> Async.RunSynchronously
// running a custom transaction
let incrHead =
Transaction
(function
| { Value = Stack [] } -> None, [||]
| { Value = Stack (h :: _) } -> Some h, [|Popped h ; Pushed (h + 1)|])
runTransaction context "bar" incrHead |> Async.RunSynchronously
runTransaction context "bar" (Push 0 |> Stack.cmd2Transaction) |> Async.RunSynchronously
runTransaction context "bar" incrHead |> Async.RunSynchronously
runTransaction context "bar" incrHead |> Async.RunSynchronously
// contented transactions
[ for i in 1 .. 10 -> Push i ]
|> Seq.map (runTransaction context "bar" << Stack.cmd2Transaction)
|> Seq.map (fun j -> async { let! _ = Async.Sleep 2 in return! j })
|> Async.Parallel
|> Async.RunSynchronously
|
Multiple items
module Event
from Microsoft.FSharp.Control
--------------------
type Event<'T> =
new : unit -> Event<'T>
member Trigger : arg:'T -> unit
member Publish : IEvent<'T>
Full name: Microsoft.FSharp.Control.Event<_>
--------------------
type Event<'Delegate,'Args (requires delegate and 'Delegate :> Delegate)> =
new : unit -> Event<'Delegate,'Args>
member Trigger : sender:obj * args:'Args -> unit
member Publish : IEvent<'Delegate,'Args>
Full name: Microsoft.FSharp.Control.Event<_,_>
--------------------
new : unit -> Event<'T>
--------------------
new : unit -> Event<'Delegate,'Args>
union case EventTransaction.Transaction: (Envelope<'State> -> 'Result * 'Event []) -> EventTransaction<'State,'Event,'Result>
type Envelope<'State> =
{StreamId: string;
Value: 'State;
Version: int64;}
Full name: Script.Envelope<_>
Envelope.StreamId: string
Multiple items
val string : value:'T -> string
Full name: Microsoft.FSharp.Core.Operators.string
--------------------
type string = System.String
Full name: Microsoft.FSharp.Core.string
Envelope.Value: 'State
Envelope.Version: int64
Multiple items
val int64 : value:'T -> int64 (requires member op_Explicit)
Full name: Microsoft.FSharp.Core.Operators.int64
--------------------
type int64 = System.Int64
Full name: Microsoft.FSharp.Core.int64
--------------------
type int64<'Measure> = int64
Full name: Microsoft.FSharp.Core.int64<_>
type ITransactionContext<'State,'Event> =
interface
abstract member GetState : streamId:string -> Async<Envelope<'State>>
abstract member TryCommit : streamId:string -> expectedVersion:int64 -> 'Event [] -> Async<bool>
abstract member MaxRetries : int option
end
Full name: Script.ITransactionContext<_,_>
abstract member ITransactionContext.MaxRetries : int option
Full name: Script.ITransactionContext`2.MaxRetries
Multiple items
val int : value:'T -> int (requires member op_Explicit)
Full name: Microsoft.FSharp.Core.Operators.int
--------------------
type int = int32
Full name: Microsoft.FSharp.Core.int
--------------------
type int<'Measure> = int
Full name: Microsoft.FSharp.Core.int<_>
type 'T option = Option<'T>
Full name: Microsoft.FSharp.Core.option<_>
abstract member ITransactionContext.GetState : streamId:string -> Async<Envelope<'State>>
Full name: Script.ITransactionContext`2.GetState
Multiple items
type Async
static member AsBeginEnd : computation:('Arg -> Async<'T>) -> ('Arg * AsyncCallback * obj -> IAsyncResult) * (IAsyncResult -> 'T) * (IAsyncResult -> unit)
static member AwaitEvent : event:IEvent<'Del,'T> * ?cancelAction:(unit -> unit) -> Async<'T> (requires delegate and 'Del :> Delegate)
static member AwaitIAsyncResult : iar:IAsyncResult * ?millisecondsTimeout:int -> Async<bool>
static member AwaitTask : task:Task -> Async<unit>
static member AwaitTask : task:Task<'T> -> Async<'T>
static member AwaitWaitHandle : waitHandle:WaitHandle * ?millisecondsTimeout:int -> Async<bool>
static member CancelDefaultToken : unit -> unit
static member Catch : computation:Async<'T> -> Async<Choice<'T,exn>>
static member FromBeginEnd : beginAction:(AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg:'Arg1 * beginAction:('Arg1 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg1:'Arg1 * arg2:'Arg2 * beginAction:('Arg1 * 'Arg2 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg1:'Arg1 * arg2:'Arg2 * arg3:'Arg3 * beginAction:('Arg1 * 'Arg2 * 'Arg3 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromContinuations : callback:(('T -> unit) * (exn -> unit) * (OperationCanceledException -> unit) -> unit) -> Async<'T>
static member Ignore : computation:Async<'T> -> Async<unit>
static member OnCancel : interruption:(unit -> unit) -> Async<IDisposable>
static member Parallel : computations:seq<Async<'T>> -> Async<'T []>
static member RunSynchronously : computation:Async<'T> * ?timeout:int * ?cancellationToken:CancellationToken -> 'T
static member Sleep : millisecondsDueTime:int -> Async<unit>
static member Start : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
static member StartAsTask : computation:Async<'T> * ?taskCreationOptions:TaskCreationOptions * ?cancellationToken:CancellationToken -> Task<'T>
static member StartChild : computation:Async<'T> * ?millisecondsTimeout:int -> Async<Async<'T>>
static member StartChildAsTask : computation:Async<'T> * ?taskCreationOptions:TaskCreationOptions -> Async<Task<'T>>
static member StartImmediate : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
static member StartWithContinuations : computation:Async<'T> * continuation:('T -> unit) * exceptionContinuation:(exn -> unit) * cancellationContinuation:(OperationCanceledException -> unit) * ?cancellationToken:CancellationToken -> unit
static member SwitchToContext : syncContext:SynchronizationContext -> Async<unit>
static member SwitchToNewThread : unit -> Async<unit>
static member SwitchToThreadPool : unit -> Async<unit>
static member TryCancelled : computation:Async<'T> * compensation:(OperationCanceledException -> unit) -> Async<'T>
static member CancellationToken : Async<CancellationToken>
static member DefaultCancellationToken : CancellationToken
Full name: Microsoft.FSharp.Control.Async
--------------------
type Async<'T>
Full name: Microsoft.FSharp.Control.Async<_>
abstract member ITransactionContext.TryCommit : streamId:string -> expectedVersion:int64 -> 'Event [] -> Async<bool>
Full name: Script.ITransactionContext`2.TryCommit
type bool = System.Boolean
Full name: Microsoft.FSharp.Core.bool
type EventTransaction<'State,'Event,'Result> = | Transaction of (Envelope<'State> -> 'Result * 'Event [])
Full name: Script.EventTransaction<_,_,_>
val fromApplyExec : apply:('State -> 'Event -> 'State) -> exec:('State -> #seq<'Event>) -> EventTransaction<'State,'Event,'State>
Full name: Script.EventTransaction.fromApplyExec
val apply : ('State -> 'Event -> 'State)
val exec : ('State -> #seq<'Event>)
Multiple items
val seq : sequence:seq<'T> -> seq<'T>
Full name: Microsoft.FSharp.Core.Operators.seq
--------------------
type seq<'T> = System.Collections.Generic.IEnumerable<'T>
Full name: Microsoft.FSharp.Collections.seq<_>
val state : Envelope<'State>
val events : 'Event []
module Seq
from Microsoft.FSharp.Collections
val toArray : source:seq<'T> -> 'T []
Full name: Microsoft.FSharp.Collections.Seq.toArray
val state' : 'State
module Array
from Microsoft.FSharp.Collections
val fold : folder:('State -> 'T -> 'State) -> state:'State -> array:'T [] -> 'State
Full name: Microsoft.FSharp.Collections.Array.fold
val runTransaction : context:ITransactionContext<'State,'Event> -> streamId:string -> EventTransaction<'State,'Event,'a> -> Async<'a * Envelope<'Event> []>
Full name: Script.runTransaction
val context : ITransactionContext<'State,'Event>
val streamId : string
val transaction : (Envelope<'State> -> 'a * 'Event [])
val aux : (int -> Async<'a * Envelope<'Event> []>)
val numRetries : int
val async : AsyncBuilder
Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.async
property ITransactionContext.MaxRetries: int option
module Option
from Microsoft.FSharp.Core
val exists : predicate:('T -> bool) -> option:'T option -> bool
Full name: Microsoft.FSharp.Core.Option.exists
val mr : int
val invalidOp : message:string -> 'T
Full name: Microsoft.FSharp.Core.Operators.invalidOp
abstract member ITransactionContext.GetState : streamId:string -> Async<Envelope<'State>>
val result : 'a
val success : bool
abstract member ITransactionContext.TryCommit : streamId:string -> expectedVersion:int64 -> 'Event [] -> Async<bool>
val not : value:bool -> bool
Full name: Microsoft.FSharp.Core.Operators.not
val wrap : (int -> 'b -> Envelope<'b>)
val i : int
val e : 'b
val eventEnvs : Envelope<'Event> []
val mapi : mapping:(int -> 'T -> 'U) -> array:'T [] -> 'U []
Full name: Microsoft.FSharp.Collections.Array.mapi
Multiple items
type InMemoryEventStore<'State,'Event> =
interface ITransactionContext<'State,'Event>
new : init:'State * apply:('State -> 'Event -> 'State) -> InMemoryEventStore<'State,'Event>
Full name: Script.InMemoryEventStore<_,_>
--------------------
new : init:'State * apply:('State -> 'Event -> 'State) -> InMemoryEventStore<'State,'Event>
val init : 'State
val eventStreams : System.Collections.Concurrent.ConcurrentDictionary<string,ResizeArray<'Event>>
namespace System
namespace System.Collections
namespace System.Collections.Concurrent
Multiple items
type ConcurrentDictionary<'TKey,'TValue> =
new : unit -> ConcurrentDictionary<'TKey, 'TValue> + 6 overloads
member AddOrUpdate : key:'TKey * addValueFactory:Func<'TKey, 'TValue> * updateValueFactory:Func<'TKey, 'TValue, 'TValue> -> 'TValue + 1 overload
member Clear : unit -> unit
member ContainsKey : key:'TKey -> bool
member Count : int
member GetEnumerator : unit -> IEnumerator<KeyValuePair<'TKey, 'TValue>>
member GetOrAdd : key:'TKey * valueFactory:Func<'TKey, 'TValue> -> 'TValue + 1 overload
member IsEmpty : bool
member Item : 'TKey -> 'TValue with get, set
member Keys : ICollection<'TKey>
...
Full name: System.Collections.Concurrent.ConcurrentDictionary<_,_>
--------------------
System.Collections.Concurrent.ConcurrentDictionary() : unit
System.Collections.Concurrent.ConcurrentDictionary(collection: System.Collections.Generic.IEnumerable<System.Collections.Generic.KeyValuePair<'TKey,'TValue>>) : unit
System.Collections.Concurrent.ConcurrentDictionary(comparer: System.Collections.Generic.IEqualityComparer<'TKey>) : unit
System.Collections.Concurrent.ConcurrentDictionary(concurrencyLevel: int, capacity: int) : unit
System.Collections.Concurrent.ConcurrentDictionary(collection: System.Collections.Generic.IEnumerable<System.Collections.Generic.KeyValuePair<'TKey,'TValue>>, comparer: System.Collections.Generic.IEqualityComparer<'TKey>) : unit
System.Collections.Concurrent.ConcurrentDictionary(concurrencyLevel: int, collection: System.Collections.Generic.IEnumerable<System.Collections.Generic.KeyValuePair<'TKey,'TValue>>, comparer: System.Collections.Generic.IEqualityComparer<'TKey>) : unit
System.Collections.Concurrent.ConcurrentDictionary(concurrencyLevel: int, capacity: int, comparer: System.Collections.Generic.IEqualityComparer<'TKey>) : unit
type ResizeArray<'T> = System.Collections.Generic.List<'T>
Full name: Microsoft.FSharp.Collections.ResizeArray<_>
val getStream : (string -> ResizeArray<'Event>)
System.Collections.Concurrent.ConcurrentDictionary.GetOrAdd(key: string, value: ResizeArray<'Event>) : ResizeArray<'Event>
System.Collections.Concurrent.ConcurrentDictionary.GetOrAdd(key: string, valueFactory: System.Func<string,ResizeArray<'Event>>) : ResizeArray<'Event>
override InMemoryEventStore.MaxRetries : int option
Full name: Script.InMemoryEventStore`2.MaxRetries
union case Option.None: Option<'T>
val __ : InMemoryEventStore<'State,'Event>
override InMemoryEventStore.GetState : streamId:string -> Async<Envelope<'State>>
Full name: Script.InMemoryEventStore`2.GetState
val eventStream : ResizeArray<'Event>
val lock : lockObject:'Lock -> action:(unit -> 'T) -> 'T (requires reference type)
Full name: Microsoft.FSharp.Core.Operators.lock
System.Collections.Generic.List.ToArray() : 'Event []
val state : 'State
property System.Array.LongLength: int64
override InMemoryEventStore.TryCommit : streamId:string -> expectedVersion:int64 -> events:'Event [] -> Async<bool>
Full name: Script.InMemoryEventStore`2.TryCommit
val expectedVersion : int64
property System.Collections.Generic.List.Count: int
System.Collections.Generic.List.AddRange(collection: System.Collections.Generic.IEnumerable<'Event>) : unit
Multiple items
union case Stack.Stack: int list -> Stack
--------------------
type Stack = | Stack of int list
Full name: Script.Stack
type 'T list = List<'T>
Full name: Microsoft.FSharp.Collections.list<_>
type Command =
| Push of int
| Pop
Full name: Script.Command
union case Command.Push: int -> Command
union case Command.Pop: Command
Multiple items
module Event
from Microsoft.FSharp.Control
--------------------
type Event =
| Pushed of int
| Popped of int
Full name: Script.Event
--------------------
type Event<'T> =
new : unit -> Event<'T>
member Trigger : arg:'T -> unit
member Publish : IEvent<'T>
Full name: Microsoft.FSharp.Control.Event<_>
--------------------
type Event<'Delegate,'Args (requires delegate and 'Delegate :> Delegate)> =
new : unit -> Event<'Delegate,'Args>
member Trigger : sender:obj * args:'Args -> unit
member Publish : IEvent<'Delegate,'Args>
Full name: Microsoft.FSharp.Control.Event<_,_>
--------------------
new : unit -> Event<'T>
--------------------
new : unit -> Event<'Delegate,'Args>
union case Event.Pushed: int -> Event
union case Event.Popped: int -> Event
Multiple items
type CompilationRepresentationAttribute =
inherit Attribute
new : flags:CompilationRepresentationFlags -> CompilationRepresentationAttribute
member Flags : CompilationRepresentationFlags
Full name: Microsoft.FSharp.Core.CompilationRepresentationAttribute
--------------------
new : flags:CompilationRepresentationFlags -> CompilationRepresentationAttribute
type CompilationRepresentationFlags =
| None = 0
| Static = 1
| Instance = 2
| ModuleSuffix = 4
| UseNullAsTrueValue = 8
| Event = 16
Full name: Microsoft.FSharp.Core.CompilationRepresentationFlags
CompilationRepresentationFlags.ModuleSuffix: CompilationRepresentationFlags = 4
val empty : Stack
Full name: Script.StackModule.empty
val apply : s:Stack -> e:Event -> Stack
Full name: Script.StackModule.apply
val s : Stack
val e : Event
val s : int list
val h : int
val t : int list
val exec : cmd:Command -> s:Stack -> Event list
Full name: Script.StackModule.exec
val cmd : Command
val cmd2Transaction : cmd:Command -> EventTransaction<Stack,Event,Stack>
Full name: Script.StackModule.cmd2Transaction
Multiple items
module EventTransaction
from Script
--------------------
type EventTransaction<'State,'Event,'Result> = | Transaction of (Envelope<'State> -> 'Result * 'Event [])
Full name: Script.EventTransaction<_,_,_>
val context : InMemoryEventStore<Stack,Event>
Full name: Script.context
Multiple items
union case Stack.Stack: int list -> Stack
--------------------
module Stack
from Script
--------------------
type Stack = | Stack of int list
Full name: Script.Stack
static member Async.RunSynchronously : computation:Async<'T> * ?timeout:int * ?cancellationToken:System.Threading.CancellationToken -> 'T
val incrHead : EventTransaction<Stack,Event,int option>
Full name: Script.incrHead
union case Option.Some: Value: 'T -> Option<'T>
val map : mapping:('T -> 'U) -> source:seq<'T> -> seq<'U>
Full name: Microsoft.FSharp.Collections.Seq.map
val j : Async<Stack * Envelope<Event> []>
static member Async.Sleep : millisecondsDueTime:int -> Async<unit>
static member Async.Parallel : computations:seq<Async<'T>> -> Async<'T []>
More information