3 people like it.

Cooperative cancellation in Async workflows

A simple implementation that protects the nested workflow from external cancellation. The external cancellation token is passed as an argument for cooperative cancellation. Kudos to Gian Ntzik for this one.

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
14: 
15: 
16: 
17: 
18: 
19: 
20: 
21: 
22: 
23: 
24: 
25: 
26: 
27: 
28: 
29: 
open System.Threading

type Async with
    static member Isolate(f : CancellationToken -> Async<'T>) : Async<'T> =
        async {
            let! ct = Async.CancellationToken
            let isolatedTask = Async.StartAsTask(f ct)
            return! Async.AwaitTask isolatedTask
        }

// example

let rec loop (ct : CancellationToken) : Async<unit> =
    async {
        printfn "looping..."
        
        if ct.IsCancellationRequested then
            printfn "external ct has been cancelled, exiting.."
            return ()
        else
            do! Async.Sleep 1000
            return! loop ct
    }

let cts = new CancellationTokenSource()

Async.Start(Async.Isolate loop, cancellationToken = cts.Token)

cts.Cancel()
namespace System
namespace System.Threading
Multiple items
type Async
static member AsBeginEnd : computation:('Arg -> Async<'T>) -> ('Arg * AsyncCallback * obj -> IAsyncResult) * (IAsyncResult -> 'T) * (IAsyncResult -> unit)
static member AwaitEvent : event:IEvent<'Del,'T> * ?cancelAction:(unit -> unit) -> Async<'T> (requires delegate and 'Del :> Delegate)
static member AwaitIAsyncResult : iar:IAsyncResult * ?millisecondsTimeout:int -> Async<bool>
static member AwaitTask : task:Task<'T> -> Async<'T>
static member AwaitWaitHandle : waitHandle:WaitHandle * ?millisecondsTimeout:int -> Async<bool>
static member CancelDefaultToken : unit -> unit
static member Catch : computation:Async<'T> -> Async<Choice<'T,exn>>
static member FromBeginEnd : beginAction:(AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg:'Arg1 * beginAction:('Arg1 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg1:'Arg1 * arg2:'Arg2 * beginAction:('Arg1 * 'Arg2 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg1:'Arg1 * arg2:'Arg2 * arg3:'Arg3 * beginAction:('Arg1 * 'Arg2 * 'Arg3 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromContinuations : callback:(('T -> unit) * (exn -> unit) * (OperationCanceledException -> unit) -> unit) -> Async<'T>
static member Ignore : computation:Async<'T> -> Async<unit>
static member OnCancel : interruption:(unit -> unit) -> Async<IDisposable>
static member Parallel : computations:seq<Async<'T>> -> Async<'T []>
static member RunSynchronously : computation:Async<'T> * ?timeout:int * ?cancellationToken:CancellationToken -> 'T
static member Sleep : millisecondsDueTime:int -> Async<unit>
static member Start : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
static member StartAsTask : computation:Async<'T> * ?taskCreationOptions:TaskCreationOptions * ?cancellationToken:CancellationToken -> Task<'T>
static member StartChild : computation:Async<'T> * ?millisecondsTimeout:int -> Async<Async<'T>>
static member StartChildAsTask : computation:Async<'T> * ?taskCreationOptions:TaskCreationOptions -> Async<Task<'T>>
static member StartImmediate : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
static member StartWithContinuations : computation:Async<'T> * continuation:('T -> unit) * exceptionContinuation:(exn -> unit) * cancellationContinuation:(OperationCanceledException -> unit) * ?cancellationToken:CancellationToken -> unit
static member SwitchToContext : syncContext:SynchronizationContext -> Async<unit>
static member SwitchToNewThread : unit -> Async<unit>
static member SwitchToThreadPool : unit -> Async<unit>
static member TryCancelled : computation:Async<'T> * compensation:(OperationCanceledException -> unit) -> Async<'T>
static member CancellationToken : Async<CancellationToken>
static member DefaultCancellationToken : CancellationToken

Full name: Microsoft.FSharp.Control.Async

--------------------
type Async<'T>

Full name: Microsoft.FSharp.Control.Async<_>
static member Async.Isolate : f:(CancellationToken -> Async<'T>) -> Async<'T>

Full name: Script.Isolate
val f : (CancellationToken -> Async<'T>)
Multiple items
type CancellationToken =
  struct
    new : canceled:bool -> CancellationToken
    member CanBeCanceled : bool
    member Equals : other:CancellationToken -> bool + 1 overload
    member GetHashCode : unit -> int
    member IsCancellationRequested : bool
    member Register : callback:Action -> CancellationTokenRegistration + 3 overloads
    member ThrowIfCancellationRequested : unit -> unit
    member WaitHandle : WaitHandle
    static member None : CancellationToken
  end

Full name: System.Threading.CancellationToken

--------------------
CancellationToken()
CancellationToken(canceled: bool) : unit
val async : AsyncBuilder

Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.async
val ct : CancellationToken
property Async.CancellationToken: Async<CancellationToken>
val isolatedTask : Tasks.Task<'T>
static member Async.StartAsTask : computation:Async<'T> * ?taskCreationOptions:Tasks.TaskCreationOptions * ?cancellationToken:CancellationToken -> Tasks.Task<'T>
static member Async.AwaitTask : task:Tasks.Task<'T> -> Async<'T>
val loop : ct:CancellationToken -> Async<unit>

Full name: Script.loop
type unit = Unit

Full name: Microsoft.FSharp.Core.unit
val printfn : format:Printf.TextWriterFormat<'T> -> 'T

Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.printfn
property CancellationToken.IsCancellationRequested: bool
static member Async.Sleep : millisecondsDueTime:int -> Async<unit>
val cts : CancellationTokenSource

Full name: Script.cts
Multiple items
type CancellationTokenSource =
  new : unit -> CancellationTokenSource
  member Cancel : unit -> unit + 1 overload
  member Dispose : unit -> unit
  member IsCancellationRequested : bool
  member Token : CancellationToken
  static member CreateLinkedTokenSource : [<ParamArray>] tokens:CancellationToken[] -> CancellationTokenSource + 1 overload

Full name: System.Threading.CancellationTokenSource

--------------------
CancellationTokenSource() : unit
static member Async.Start : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
static member Async.Isolate : f:(CancellationToken -> Async<'T>) -> Async<'T>
property CancellationTokenSource.Token: CancellationToken
CancellationTokenSource.Cancel() : unit
CancellationTokenSource.Cancel(throwOnFirstException: bool) : unit
Raw view Test code New version

More information

Link:http://fssnip.net/gu
Posted:12 years ago
Author:Eirik Tsarpalis
Tags: async , cancellation