2 people like it.
Like the snippet!
Async workflow with asynchronous "finally" clause
The F# Core library offers async.TryFinally which where a synchronous compensation function (of type unit -> unit) is run after an error or cancellation. However, it offers no way to start an asynchronous compensation. The TryFinallyAsync method defined below offers a way around this.
1:
2:
3:
4:
5:
6:
7:
8:
9:
10:
11:
12:
13:
14:
15:
16:
17:
18:
19:
20:
21:
22:
23:
24:
25:
26:
27:
28:
29:
30:
31:
32:
33:
34:
35:
36:
37:
38:
39:
40:
41:
42:
43:
44:
45:
46:
47:
48:
49:
50:
51:
52:
53:
54:
55:
56:
57:
58:
59:
60:
61:
62:
63:
64:
65:
66:
67:
68:
69:
70:
71:
72:
73:
74:
75:
76:
77:
78:
79:
80:
81:
82:
83:
84:
85:
86:
87:
88:
89:
90:
91:
92:
93:
94:
95:
96:
97:
98:
99:
100:
101:
102:
103:
104:
105:
106:
107:
108:
109:
110:
111:
112:
113:
114:
115:
116:
117:
118:
119:
120:
121:
122:
123:
124:
125:
126:
127:
128:
129:
130:
131:
132:
133:
134:
135:
136:
137:
138:
139:
140:
141:
142:
143:
144:
145:
146:
147:
148:
149:
150:
151:
152:
153:
154:
155:
156:
157:
158:
159:
160:
161:
162:
163:
164:
165:
166:
167:
|
module Application
open System
open System.Threading
type Microsoft.FSharp.Control.Async with
static member TryFinallyAsync comp deferred =
let finish (compResult, deferredResult) (cont, econt, ccont) =
match (compResult, deferredResult) with
| (Choice1Of3 (), Choice1Of3 ()) -> cont ()
| (Choice2Of3 compExn, Choice1Of3 ()) -> econt compExn
| (Choice3Of3 compExn, Choice1Of3 ()) -> ccont compExn
| (Choice1Of3 (), Choice2Of3 deferredExn) -> econt deferredExn
| (Choice2Of3 compExn, Choice2Of3 deferredExn) -> econt <| new Exception(deferredExn.Message, compExn)
| (Choice3Of3 compExn, Choice2Of3 deferredExn) -> econt deferredExn
| (_, Choice3Of3 deferredExn) -> econt <| new Exception("Unexpected cancellation.", deferredExn)
let startDeferred compResult (cont, econt, ccont) =
Async.StartWithContinuations(deferred,
(fun () -> finish (compResult, Choice1Of3 ()) (cont, econt, ccont)),
(fun exn -> finish (compResult, Choice2Of3 exn) (cont, econt, ccont)),
(fun exn -> finish (compResult, Choice3Of3 exn) (cont, econt, ccont)))
let startComp ct (cont, econt, ccont) =
Async.StartWithContinuations(comp,
(fun () -> startDeferred (Choice1Of3 ()) (cont, econt, ccont)),
(fun exn -> startDeferred (Choice2Of3 exn) (cont, econt, ccont)),
(fun exn -> startDeferred (Choice3Of3 exn) (cont, econt, ccont)),
ct)
async { let! ct = Async.CancellationToken
do! Async.FromContinuations (startComp ct) }
let continueMsg = "\nPress enter to continue..."
[<EntryPoint>]
let main _ =
printfn "Demo - no cancellation or error"
let workflow1 =
async { printfn "Starting work..."
do! Async.Sleep 1000
printfn "Finished work." }
|> Async.TryFinallyAsync <|
async {
printfn "Starting 'finally' clause."
do! Async.Sleep 1000
printfn "Completed 'finally' clause." }
Async.StartWithContinuations(workflow1,
(fun () -> printfn "Success!%s" continueMsg),
(fun exn -> printfn "Error: %s%s" exn.Message continueMsg),
(fun exn -> printfn "Cancelled: %s%s" exn.Message continueMsg))
Console.ReadLine() |> ignore
printfn "Demo - error in main workflow"
let workflow2 =
async { do! Async.Sleep 1000
printfn "Starting work..."
failwith "Failed to do the work." }
|> Async.TryFinallyAsync <|
async { printfn "Starting 'finally' clause."
do! Async.Sleep 1000
printfn "Completed with 'finally' clause." }
Async.StartWithContinuations(workflow2,
(fun () -> printfn "Success!%s" continueMsg),
(fun exn -> printfn "Error: %s%s" exn.Message continueMsg),
(fun exn -> printfn "Cancelled: %s%s" exn.Message continueMsg))
Console.ReadLine() |> ignore
printfn "Demo - cancellation in main workflow:"
let workflow3 =
async { printfn "Starting work...."
do! Async.Sleep 1000
printfn "Finished the work." }
|> Async.TryFinallyAsync <|
async { printfn "Starting 'finally' clause."
do! Async.Sleep 1000
printfn "Completed with 'finally' clause." }
let cancellationCapability1 = new CancellationTokenSource()
Async.StartWithContinuations(workflow3,
(fun () -> printfn "Success!%s" continueMsg),
(fun exn -> printfn "Error: %s%s" exn.Message continueMsg),
(fun exn -> printfn "Cancelled: %s%s" exn.Message continueMsg),
cancellationCapability1.Token)
Thread.Sleep 500
cancellationCapability1.Cancel()
Console.ReadLine() |> ignore
printfn "Demo - cancellation after main workflow:"
let workflow4 =
async { printfn "Starting work...."
do! Async.Sleep 1000
printfn "Finished the work." }
|> Async.TryFinallyAsync <|
async { printfn "Starting 'finally' clause."
do! Async.Sleep 1000
printfn "Completed with 'finally' clause." }
let cancellationCapability2 = new CancellationTokenSource()
Async.StartWithContinuations(workflow4,
(fun () -> printfn "Success!%s" continueMsg),
(fun exn -> printfn "Error: %s%s" exn.Message continueMsg),
(fun exn -> printfn "Cancelled: %s%s" exn.Message continueMsg),
cancellationCapability2.Token)
Thread.Sleep 1900
cancellationCapability2.Cancel()
Console.ReadLine() |> ignore
printfn "Demo - error during finally clause:"
let workflow5 =
async { printfn "Starting work...."
do! Async.Sleep 2000
printfn "Finished the work." }
|> Async.TryFinallyAsync <|
async { printfn "Starting 'finally' clause."
failwith "Failed during 'finally' clause." }
Async.StartWithContinuations(workflow5,
(fun () -> printfn "Success!%s" continueMsg),
(fun exn -> printfn "Error: %s%s" exn.Message continueMsg),
(fun exn -> printfn "Cancelled: %s%s" exn.Message continueMsg))
Console.ReadLine() |> ignore
printfn "Demo - error during both clauses:"
let workflow6 =
async { printfn "Starting work...."
failwith "Failed to do the work." }
|> Async.TryFinallyAsync <|
async { printfn "Starting 'finally' clause."
failwith "Failed during 'finally' clause." }
Async.StartWithContinuations(workflow6,
(fun () -> printfn "Success!%s" continueMsg),
(fun exn -> printfn "Error: %s%s" exn.Message continueMsg),
(fun exn -> printfn "Cancelled: %s%s" exn.Message continueMsg))
Console.ReadLine() |> ignore
printfn "Demo - cancellation during main workflow and error in 'finally' clause:"
let workflow7 =
async { printfn "Starting work...."
do! Async.Sleep 1000
printfn "Finished the work." }
|> Async.TryFinallyAsync <|
async { printfn "Starting 'finally' clause."
failwith "Failed during 'finally' clause." }
let cancellationCapability1 = new CancellationTokenSource()
Async.StartWithContinuations(workflow7,
(fun () -> printfn "Success!%s" continueMsg),
(fun exn -> printfn "Error: %s%s" exn.Message continueMsg),
(fun exn -> printfn "Cancelled: %s%s" exn.Message continueMsg),
cancellationCapability1.Token)
Thread.Sleep 500
cancellationCapability1.Cancel()
Console.ReadLine() |> ignore
0
|
module Application
namespace System
namespace System.Threading
namespace Microsoft
namespace Microsoft.FSharp
namespace Microsoft.FSharp.Control
Multiple items
type Async<'T>
Full name: Microsoft.FSharp.Control.Async<_>
--------------------
type Async
static member AsBeginEnd : computation:('Arg -> Async<'T>) -> ('Arg * AsyncCallback * obj -> IAsyncResult) * (IAsyncResult -> 'T) * (IAsyncResult -> unit)
static member AwaitEvent : event:IEvent<'Del,'T> * ?cancelAction:(unit -> unit) -> Async<'T> (requires delegate and 'Del :> Delegate)
static member AwaitIAsyncResult : iar:IAsyncResult * ?millisecondsTimeout:int -> Async<bool>
static member AwaitTask : task:Task<'T> -> Async<'T>
static member AwaitWaitHandle : waitHandle:WaitHandle * ?millisecondsTimeout:int -> Async<bool>
static member CancelDefaultToken : unit -> unit
static member Catch : computation:Async<'T> -> Async<Choice<'T,exn>>
static member FromBeginEnd : beginAction:(AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg:'Arg1 * beginAction:('Arg1 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg1:'Arg1 * arg2:'Arg2 * beginAction:('Arg1 * 'Arg2 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg1:'Arg1 * arg2:'Arg2 * arg3:'Arg3 * beginAction:('Arg1 * 'Arg2 * 'Arg3 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromContinuations : callback:(('T -> unit) * (exn -> unit) * (OperationCanceledException -> unit) -> unit) -> Async<'T>
static member Ignore : computation:Async<'T> -> Async<unit>
static member OnCancel : interruption:(unit -> unit) -> Async<IDisposable>
static member Parallel : computations:seq<Async<'T>> -> Async<'T []>
static member RunSynchronously : computation:Async<'T> * ?timeout:int * ?cancellationToken:CancellationToken -> 'T
static member Sleep : millisecondsDueTime:int -> Async<unit>
static member Start : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
static member StartAsTask : computation:Async<'T> * ?taskCreationOptions:TaskCreationOptions * ?cancellationToken:CancellationToken -> Task<'T>
static member StartChild : computation:Async<'T> * ?millisecondsTimeout:int -> Async<Async<'T>>
static member StartChildAsTask : computation:Async<'T> * ?taskCreationOptions:TaskCreationOptions -> Async<Task<'T>>
static member StartImmediate : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
static member StartWithContinuations : computation:Async<'T> * continuation:('T -> unit) * exceptionContinuation:(exn -> unit) * cancellationContinuation:(OperationCanceledException -> unit) * ?cancellationToken:CancellationToken -> unit
static member SwitchToContext : syncContext:SynchronizationContext -> Async<unit>
static member SwitchToNewThread : unit -> Async<unit>
static member SwitchToThreadPool : unit -> Async<unit>
static member TryCancelled : computation:Async<'T> * compensation:(OperationCanceledException -> unit) -> Async<'T>
static member CancellationToken : Async<CancellationToken>
static member DefaultCancellationToken : CancellationToken
Full name: Microsoft.FSharp.Control.Async
static member Async.TryFinallyAsync : comp:Async<unit> -> deferred:Async<unit> -> Async<unit>
Full name: Application.TryFinallyAsync
val comp : Async<unit>
val deferred : Async<unit>
val finish : (Choice<unit,Exception,'a> * Choice<unit,Exception,#exn> -> (unit -> 'c) * (Exception -> 'c) * ('a -> 'c) -> 'c)
val compResult : Choice<unit,Exception,'a>
val deferredResult : Choice<unit,Exception,#exn>
val cont : (unit -> 'c)
val econt : (Exception -> 'c)
val ccont : ('a -> 'c)
union case Choice.Choice1Of3: 'T1 -> Choice<'T1,'T2,'T3>
union case Choice.Choice2Of3: 'T2 -> Choice<'T1,'T2,'T3>
val compExn : Exception
union case Choice.Choice3Of3: 'T3 -> Choice<'T1,'T2,'T3>
val compExn : 'a
val deferredExn : Exception
Multiple items
type Exception =
new : unit -> Exception + 2 overloads
member Data : IDictionary
member GetBaseException : unit -> Exception
member GetObjectData : info:SerializationInfo * context:StreamingContext -> unit
member GetType : unit -> Type
member HelpLink : string with get, set
member InnerException : Exception
member Message : string
member Source : string with get, set
member StackTrace : string
...
Full name: System.Exception
--------------------
Exception() : unit
Exception(message: string) : unit
Exception(message: string, innerException: exn) : unit
property Exception.Message: string
val deferredExn : #exn
val startDeferred : (Choice<unit,Exception,'a> -> (unit -> unit) * (Exception -> unit) * ('a -> unit) -> unit)
val cont : (unit -> unit)
val econt : (Exception -> unit)
val ccont : ('a -> unit)
Multiple items
type Async
static member AsBeginEnd : computation:('Arg -> Async<'T>) -> ('Arg * AsyncCallback * obj -> IAsyncResult) * (IAsyncResult -> 'T) * (IAsyncResult -> unit)
static member AwaitEvent : event:IEvent<'Del,'T> * ?cancelAction:(unit -> unit) -> Async<'T> (requires delegate and 'Del :> Delegate)
static member AwaitIAsyncResult : iar:IAsyncResult * ?millisecondsTimeout:int -> Async<bool>
static member AwaitTask : task:Task<'T> -> Async<'T>
static member AwaitWaitHandle : waitHandle:WaitHandle * ?millisecondsTimeout:int -> Async<bool>
static member CancelDefaultToken : unit -> unit
static member Catch : computation:Async<'T> -> Async<Choice<'T,exn>>
static member FromBeginEnd : beginAction:(AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg:'Arg1 * beginAction:('Arg1 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg1:'Arg1 * arg2:'Arg2 * beginAction:('Arg1 * 'Arg2 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg1:'Arg1 * arg2:'Arg2 * arg3:'Arg3 * beginAction:('Arg1 * 'Arg2 * 'Arg3 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromContinuations : callback:(('T -> unit) * (exn -> unit) * (OperationCanceledException -> unit) -> unit) -> Async<'T>
static member Ignore : computation:Async<'T> -> Async<unit>
static member OnCancel : interruption:(unit -> unit) -> Async<IDisposable>
static member Parallel : computations:seq<Async<'T>> -> Async<'T []>
static member RunSynchronously : computation:Async<'T> * ?timeout:int * ?cancellationToken:CancellationToken -> 'T
static member Sleep : millisecondsDueTime:int -> Async<unit>
static member Start : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
static member StartAsTask : computation:Async<'T> * ?taskCreationOptions:TaskCreationOptions * ?cancellationToken:CancellationToken -> Task<'T>
static member StartChild : computation:Async<'T> * ?millisecondsTimeout:int -> Async<Async<'T>>
static member StartChildAsTask : computation:Async<'T> * ?taskCreationOptions:TaskCreationOptions -> Async<Task<'T>>
static member StartImmediate : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
static member StartWithContinuations : computation:Async<'T> * continuation:('T -> unit) * exceptionContinuation:(exn -> unit) * cancellationContinuation:(OperationCanceledException -> unit) * ?cancellationToken:CancellationToken -> unit
static member SwitchToContext : syncContext:SynchronizationContext -> Async<unit>
static member SwitchToNewThread : unit -> Async<unit>
static member SwitchToThreadPool : unit -> Async<unit>
static member TryCancelled : computation:Async<'T> * compensation:(OperationCanceledException -> unit) -> Async<'T>
static member CancellationToken : Async<CancellationToken>
static member DefaultCancellationToken : CancellationToken
Full name: Microsoft.FSharp.Control.Async
--------------------
type Async<'T>
Full name: Microsoft.FSharp.Control.Async<_>
static member Async.StartWithContinuations : computation:Async<'T> * continuation:('T -> unit) * exceptionContinuation:(exn -> unit) * cancellationContinuation:(OperationCanceledException -> unit) * ?cancellationToken:CancellationToken -> unit
Multiple items
val exn : exn
--------------------
type exn = Exception
Full name: Microsoft.FSharp.Core.exn
Multiple items
val exn : OperationCanceledException
--------------------
type exn = Exception
Full name: Microsoft.FSharp.Core.exn
val startComp : (CancellationToken -> (unit -> unit) * (Exception -> unit) * (OperationCanceledException -> unit) -> unit)
val ct : CancellationToken
val ccont : (OperationCanceledException -> unit)
val async : AsyncBuilder
Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.async
property Async.CancellationToken: Async<CancellationToken>
static member Async.FromContinuations : callback:(('T -> unit) * (exn -> unit) * (OperationCanceledException -> unit) -> unit) -> Async<'T>
val continueMsg : string
Full name: Application.continueMsg
Multiple items
type EntryPointAttribute =
inherit Attribute
new : unit -> EntryPointAttribute
Full name: Microsoft.FSharp.Core.EntryPointAttribute
--------------------
new : unit -> EntryPointAttribute
val main : string [] -> int
Full name: Application.main
val printfn : format:Printf.TextWriterFormat<'T> -> 'T
Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.printfn
val workflow1 : Async<unit>
static member Async.Sleep : millisecondsDueTime:int -> Async<unit>
static member Async.TryFinallyAsync : comp:Async<unit> -> deferred:Async<unit> -> Async<unit>
type Console =
static member BackgroundColor : ConsoleColor with get, set
static member Beep : unit -> unit + 1 overload
static member BufferHeight : int with get, set
static member BufferWidth : int with get, set
static member CapsLock : bool
static member Clear : unit -> unit
static member CursorLeft : int with get, set
static member CursorSize : int with get, set
static member CursorTop : int with get, set
static member CursorVisible : bool with get, set
...
Full name: System.Console
Console.ReadLine() : string
val ignore : value:'T -> unit
Full name: Microsoft.FSharp.Core.Operators.ignore
val workflow2 : Async<unit>
val failwith : message:string -> 'T
Full name: Microsoft.FSharp.Core.Operators.failwith
val workflow3 : Async<unit>
val cancellationCapability1 : CancellationTokenSource
Multiple items
type CancellationTokenSource =
new : unit -> CancellationTokenSource
member Cancel : unit -> unit + 1 overload
member Dispose : unit -> unit
member IsCancellationRequested : bool
member Token : CancellationToken
static member CreateLinkedTokenSource : [<ParamArray>] tokens:CancellationToken[] -> CancellationTokenSource + 1 overload
Full name: System.Threading.CancellationTokenSource
--------------------
CancellationTokenSource() : unit
property CancellationTokenSource.Token: CancellationToken
Multiple items
type Thread =
inherit CriticalFinalizerObject
new : start:ThreadStart -> Thread + 3 overloads
member Abort : unit -> unit + 1 overload
member ApartmentState : ApartmentState with get, set
member CurrentCulture : CultureInfo with get, set
member CurrentUICulture : CultureInfo with get, set
member DisableComObjectEagerCleanup : unit -> unit
member ExecutionContext : ExecutionContext
member GetApartmentState : unit -> ApartmentState
member GetCompressedStack : unit -> CompressedStack
member GetHashCode : unit -> int
...
Full name: System.Threading.Thread
--------------------
Thread(start: ThreadStart) : unit
Thread(start: ParameterizedThreadStart) : unit
Thread(start: ThreadStart, maxStackSize: int) : unit
Thread(start: ParameterizedThreadStart, maxStackSize: int) : unit
Thread.Sleep(timeout: TimeSpan) : unit
Thread.Sleep(millisecondsTimeout: int) : unit
CancellationTokenSource.Cancel() : unit
CancellationTokenSource.Cancel(throwOnFirstException: bool) : unit
val workflow4 : Async<unit>
val cancellationCapability2 : CancellationTokenSource
val workflow5 : Async<unit>
val workflow6 : Async<unit>
val workflow7 : Async<unit>
More information