6 people like it.

Asynchronous cancellation of a workflow

The snippet implements Async.StartCancellable method that can be used to start a given workflow and then cancel it. The cancellation of the workflow is done asynchronously, which means that the caller will wait until the workflow is actually cancelled.

Definitions from FSharp.AsyncExtensions or FSharpX

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
14: 
15: 
16: 
17: 
18: 
19: 
20: 
21: 
22: 
23: 
24: 
25: 
26: 
27: 
28: 
29: 
30: 
31: 
open System

/// Helper that can be used for writing CPS-style code that resumes
/// on the same thread where the operation was started.
let internal synchronize f = 
  let ctx = System.Threading.SynchronizationContext.Current 
  f (fun g ->
    let nctx = System.Threading.SynchronizationContext.Current 
    if ctx <> null && ctx <> nctx then ctx.Post((fun _ -> g()), null)
    else g() )

type Microsoft.FSharp.Control.Async with 

  /// Behaves like AwaitObservable, but calls the specified guarding function
  /// after a subscriber is registered with the observable.
  static member GuardedAwaitObservable (ev1:IObservable<'T1>) guardFunction =
    synchronize (fun f ->
      Async.FromContinuations((fun (cont,econt,ccont) -> 
        let rec finish cont value = 
          remover.Dispose()
          f (fun () -> cont value)
        and remover : IDisposable = 
          ev1.Subscribe
            ({ new IObserver<_> with
                  member x.OnNext(v) = finish cont v
                  member x.OnError(e) = finish econt e
                  member x.OnCompleted() = 
                    let msg = "Cancelling the workflow, because the " + 
                      "Observable awaited using AwaitObservable has completed."
                    finish ccont (new System.OperationCanceledException(msg)) }) 
        guardFunction() )))

StartCancellable operation

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
14: 
15: 
16: 
17: 
18: 
open System.Threading

module Async = 
  /// Returns an asynchronous workflow 'Async<Async<unit>>'. When called
  /// using 'let!', it starts the workflow provided as an argument and returns
  /// a token that can be used to cancel the started work - this is an
  /// (asynchronously) blocking operation that waits until the workflow
  /// is actually cancelled 
  let StartCancellable work = async {
    let cts = new CancellationTokenSource()
    // Creates an event used for notification
    let evt = new Event<_>()
    // Wrap the workflow with TryCancelled and notify when cancelled
    Async.Start(Async.TryCancelled(work, ignore >> evt.Trigger), cts.Token)
    // Return a workflow that waits for 'evt' and triggers 'Cancel'
    // after it attaches the event handler (to avoid missing event occurrence)
    let waitForCancel = Async.GuardedAwaitObservable evt.Publish cts.Cancel
    return async.TryFinally(waitForCancel, cts.Dispose) }

Example of use

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
14: 
15: 
16: 
/// Sample workflow that repeatedly starts and stops long running operation
let loop = async {
  for i in 0 .. 9999 do
    printfn "Starting: %d" i
    do! Async.Sleep(1000)
    printfn "Done: %d" i }

// Start the 'loop' workflow, wait for 5.5 seconds and then
// cancel it and wait until it finishes current operation  
async { let! cancelToken = Async.StartCancellable(loop)
        printfn "started"
        do! Async.Sleep(5500)
        printfn "cancelling"
        do! cancelToken
        printfn "done" }
|> Async.Start
namespace System
val internal synchronize : f:(((unit -> unit) -> unit) -> 'a) -> 'a

Full name: Script.synchronize


 Helper that can be used for writing CPS-style code that resumes
 on the same thread where the operation was started.
val f : (((unit -> unit) -> unit) -> 'a)
val ctx : Threading.SynchronizationContext
namespace System.Threading
Multiple items
type SynchronizationContext =
  new : unit -> SynchronizationContext
  member CreateCopy : unit -> SynchronizationContext
  member IsWaitNotificationRequired : unit -> bool
  member OperationCompleted : unit -> unit
  member OperationStarted : unit -> unit
  member Post : d:SendOrPostCallback * state:obj -> unit
  member Send : d:SendOrPostCallback * state:obj -> unit
  member Wait : waitHandles:nativeint[] * waitAll:bool * millisecondsTimeout:int -> int
  static member Current : SynchronizationContext
  static member SetSynchronizationContext : syncContext:SynchronizationContext -> unit

Full name: System.Threading.SynchronizationContext

--------------------
Threading.SynchronizationContext() : unit
property Threading.SynchronizationContext.Current: Threading.SynchronizationContext
val g : (unit -> unit)
val nctx : Threading.SynchronizationContext
Threading.SynchronizationContext.Post(d: Threading.SendOrPostCallback, state: obj) : unit
namespace Microsoft
namespace Microsoft.FSharp
namespace Microsoft.FSharp.Control
Multiple items
type Async<'T>

Full name: Microsoft.FSharp.Control.Async<_>

--------------------
type Async
static member AsBeginEnd : computation:('Arg -> Async<'T>) -> ('Arg * AsyncCallback * obj -> IAsyncResult) * (IAsyncResult -> 'T) * (IAsyncResult -> unit)
static member AwaitEvent : event:IEvent<'Del,'T> * ?cancelAction:(unit -> unit) -> Async<'T> (requires delegate and 'Del :> Delegate)
static member AwaitIAsyncResult : iar:IAsyncResult * ?millisecondsTimeout:int -> Async<bool>
static member AwaitTask : task:Task<'T> -> Async<'T>
static member AwaitWaitHandle : waitHandle:WaitHandle * ?millisecondsTimeout:int -> Async<bool>
static member CancelDefaultToken : unit -> unit
static member Catch : computation:Async<'T> -> Async<Choice<'T,exn>>
static member FromBeginEnd : beginAction:(AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg:'Arg1 * beginAction:('Arg1 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg1:'Arg1 * arg2:'Arg2 * beginAction:('Arg1 * 'Arg2 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg1:'Arg1 * arg2:'Arg2 * arg3:'Arg3 * beginAction:('Arg1 * 'Arg2 * 'Arg3 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromContinuations : callback:(('T -> unit) * (exn -> unit) * (OperationCanceledException -> unit) -> unit) -> Async<'T>
static member Ignore : computation:Async<'T> -> Async<unit>
static member OnCancel : interruption:(unit -> unit) -> Async<IDisposable>
static member Parallel : computations:seq<Async<'T>> -> Async<'T []>
static member RunSynchronously : computation:Async<'T> * ?timeout:int * ?cancellationToken:CancellationToken -> 'T
static member Sleep : millisecondsDueTime:int -> Async<unit>
static member Start : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
static member StartAsTask : computation:Async<'T> * ?taskCreationOptions:TaskCreationOptions * ?cancellationToken:CancellationToken -> Task<'T>
static member StartChild : computation:Async<'T> * ?millisecondsTimeout:int -> Async<Async<'T>>
static member StartChildAsTask : computation:Async<'T> * ?taskCreationOptions:TaskCreationOptions -> Async<Task<'T>>
static member StartImmediate : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
static member StartWithContinuations : computation:Async<'T> * continuation:('T -> unit) * exceptionContinuation:(exn -> unit) * cancellationContinuation:(OperationCanceledException -> unit) * ?cancellationToken:CancellationToken -> unit
static member SwitchToContext : syncContext:SynchronizationContext -> Async<unit>
static member SwitchToNewThread : unit -> Async<unit>
static member SwitchToThreadPool : unit -> Async<unit>
static member TryCancelled : computation:Async<'T> * compensation:(OperationCanceledException -> unit) -> Async<'T>
static member CancellationToken : Async<CancellationToken>
static member DefaultCancellationToken : CancellationToken

Full name: Microsoft.FSharp.Control.Async
static member Async.GuardedAwaitObservable : ev1:IObservable<'T1> -> guardFunction:(unit -> unit) -> Async<'T1>

Full name: Script.GuardedAwaitObservable


 Behaves like AwaitObservable, but calls the specified guarding function
 after a subscriber is registered with the observable.
val ev1 : IObservable<'T1>
type IObservable<'T> =
  member Subscribe : observer:IObserver<'T> -> IDisposable

Full name: System.IObservable<_>
val guardFunction : (unit -> unit)
val f : ((unit -> unit) -> unit)
Multiple items
type Async
static member AsBeginEnd : computation:('Arg -> Async<'T>) -> ('Arg * AsyncCallback * obj -> IAsyncResult) * (IAsyncResult -> 'T) * (IAsyncResult -> unit)
static member AwaitEvent : event:IEvent<'Del,'T> * ?cancelAction:(unit -> unit) -> Async<'T> (requires delegate and 'Del :> Delegate)
static member AwaitIAsyncResult : iar:IAsyncResult * ?millisecondsTimeout:int -> Async<bool>
static member AwaitTask : task:Task<'T> -> Async<'T>
static member AwaitWaitHandle : waitHandle:WaitHandle * ?millisecondsTimeout:int -> Async<bool>
static member CancelDefaultToken : unit -> unit
static member Catch : computation:Async<'T> -> Async<Choice<'T,exn>>
static member FromBeginEnd : beginAction:(AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg:'Arg1 * beginAction:('Arg1 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg1:'Arg1 * arg2:'Arg2 * beginAction:('Arg1 * 'Arg2 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg1:'Arg1 * arg2:'Arg2 * arg3:'Arg3 * beginAction:('Arg1 * 'Arg2 * 'Arg3 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromContinuations : callback:(('T -> unit) * (exn -> unit) * (OperationCanceledException -> unit) -> unit) -> Async<'T>
static member Ignore : computation:Async<'T> -> Async<unit>
static member OnCancel : interruption:(unit -> unit) -> Async<IDisposable>
static member Parallel : computations:seq<Async<'T>> -> Async<'T []>
static member RunSynchronously : computation:Async<'T> * ?timeout:int * ?cancellationToken:CancellationToken -> 'T
static member Sleep : millisecondsDueTime:int -> Async<unit>
static member Start : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
static member StartAsTask : computation:Async<'T> * ?taskCreationOptions:TaskCreationOptions * ?cancellationToken:CancellationToken -> Task<'T>
static member StartChild : computation:Async<'T> * ?millisecondsTimeout:int -> Async<Async<'T>>
static member StartChildAsTask : computation:Async<'T> * ?taskCreationOptions:TaskCreationOptions -> Async<Task<'T>>
static member StartImmediate : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
static member StartWithContinuations : computation:Async<'T> * continuation:('T -> unit) * exceptionContinuation:(exn -> unit) * cancellationContinuation:(OperationCanceledException -> unit) * ?cancellationToken:CancellationToken -> unit
static member SwitchToContext : syncContext:SynchronizationContext -> Async<unit>
static member SwitchToNewThread : unit -> Async<unit>
static member SwitchToThreadPool : unit -> Async<unit>
static member TryCancelled : computation:Async<'T> * compensation:(OperationCanceledException -> unit) -> Async<'T>
static member CancellationToken : Async<CancellationToken>
static member DefaultCancellationToken : CancellationToken

Full name: Microsoft.FSharp.Control.Async

--------------------
type Async<'T>

Full name: Microsoft.FSharp.Control.Async<_>
static member Async.FromContinuations : callback:(('T -> unit) * (exn -> unit) * (OperationCanceledException -> unit) -> unit) -> Async<'T>
val cont : ('T1 -> unit)
val econt : (exn -> unit)
val ccont : (OperationCanceledException -> unit)
val finish : (('a -> unit) -> 'a -> unit)
val cont : ('a -> unit)
val value : 'a
val remover : IDisposable
IDisposable.Dispose() : unit
type IDisposable =
  member Dispose : unit -> unit

Full name: System.IDisposable
member IObservable.Subscribe : callback:('T -> unit) -> IDisposable
IObservable.Subscribe(observer: IObserver<'T1>) : IDisposable
type IObserver<'T> =
  member OnCompleted : unit -> unit
  member OnError : error:Exception -> unit
  member OnNext : value:'T -> unit

Full name: System.IObserver<_>
val x : IObserver<'T1>
IObserver.OnNext(value: 'T1) : unit
val v : 'T1
IObserver.OnError(error: exn) : unit
val e : exn
IObserver.OnCompleted() : unit
val msg : string
Multiple items
type OperationCanceledException =
  inherit SystemException
  new : unit -> OperationCanceledException + 5 overloads
  member CancellationToken : CancellationToken with get, set

Full name: System.OperationCanceledException

--------------------
OperationCanceledException() : unit
OperationCanceledException(message: string) : unit
OperationCanceledException(token: Threading.CancellationToken) : unit
OperationCanceledException(message: string, innerException: exn) : unit
OperationCanceledException(message: string, token: Threading.CancellationToken) : unit
OperationCanceledException(message: string, innerException: exn, token: Threading.CancellationToken) : unit
val StartCancellable : work:Async<unit> -> Async<Async<unit>>

Full name: Script.Async.StartCancellable


 Returns an asynchronous workflow 'Async<Async<unit>>'. When called
 using 'let!', it starts the workflow provided as an argument and returns
 a token that can be used to cancel the started work - this is an
 (asynchronously) blocking operation that waits until the workflow
 is actually cancelled
val work : Async<unit>
val async : AsyncBuilder

Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.async
val cts : CancellationTokenSource
Multiple items
type CancellationTokenSource =
  new : unit -> CancellationTokenSource
  member Cancel : unit -> unit + 1 overload
  member Dispose : unit -> unit
  member IsCancellationRequested : bool
  member Token : CancellationToken
  static member CreateLinkedTokenSource : [<ParamArray>] tokens:CancellationToken[] -> CancellationTokenSource + 1 overload

Full name: System.Threading.CancellationTokenSource

--------------------
CancellationTokenSource() : unit
val evt : Event<unit>
Multiple items
module Event

from Microsoft.FSharp.Control

--------------------
type Event<'T> =
  new : unit -> Event<'T>
  member Trigger : arg:'T -> unit
  member Publish : IEvent<'T>

Full name: Microsoft.FSharp.Control.Event<_>

--------------------
type Event<'Delegate,'Args (requires delegate and 'Delegate :> Delegate)> =
  new : unit -> Event<'Delegate,'Args>
  member Trigger : sender:obj * args:'Args -> unit
  member Publish : IEvent<'Delegate,'Args>

Full name: Microsoft.FSharp.Control.Event<_,_>

--------------------
new : unit -> Event<'T>

--------------------
new : unit -> Event<'Delegate,'Args>
static member Async.Start : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
static member Async.TryCancelled : computation:Async<'T> * compensation:(OperationCanceledException -> unit) -> Async<'T>
val ignore : value:'T -> unit

Full name: Microsoft.FSharp.Core.Operators.ignore
member Event.Trigger : arg:'T -> unit
property CancellationTokenSource.Token: CancellationToken
val waitForCancel : Async<unit>
static member Async.GuardedAwaitObservable : ev1:IObservable<'T1> -> guardFunction:(unit -> unit) -> Async<'T1>


 Behaves like AwaitObservable, but calls the specified guarding function
 after a subscriber is registered with the observable.
property Event.Publish: IEvent<unit>
CancellationTokenSource.Cancel() : unit
CancellationTokenSource.Cancel(throwOnFirstException: bool) : unit
member AsyncBuilder.TryFinally : computation:Async<'T> * compensation:(unit -> unit) -> Async<'T>
CancellationTokenSource.Dispose() : unit
val loop : Async<unit>

Full name: Script.loop


 Sample workflow that repeatedly starts and stops long running operation
val i : int
val printfn : format:Printf.TextWriterFormat<'T> -> 'T

Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.printfn
Multiple items
module Async

from Script

--------------------
type Async
static member AsBeginEnd : computation:('Arg -> Async<'T>) -> ('Arg * AsyncCallback * obj -> IAsyncResult) * (IAsyncResult -> 'T) * (IAsyncResult -> unit)
static member AwaitEvent : event:IEvent<'Del,'T> * ?cancelAction:(unit -> unit) -> Async<'T> (requires delegate and 'Del :> Delegate)
static member AwaitIAsyncResult : iar:IAsyncResult * ?millisecondsTimeout:int -> Async<bool>
static member AwaitTask : task:Task<'T> -> Async<'T>
static member AwaitWaitHandle : waitHandle:WaitHandle * ?millisecondsTimeout:int -> Async<bool>
static member CancelDefaultToken : unit -> unit
static member Catch : computation:Async<'T> -> Async<Choice<'T,exn>>
static member FromBeginEnd : beginAction:(AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg:'Arg1 * beginAction:('Arg1 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg1:'Arg1 * arg2:'Arg2 * beginAction:('Arg1 * 'Arg2 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg1:'Arg1 * arg2:'Arg2 * arg3:'Arg3 * beginAction:('Arg1 * 'Arg2 * 'Arg3 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromContinuations : callback:(('T -> unit) * (exn -> unit) * (OperationCanceledException -> unit) -> unit) -> Async<'T>
static member Ignore : computation:Async<'T> -> Async<unit>
static member OnCancel : interruption:(unit -> unit) -> Async<IDisposable>
static member Parallel : computations:seq<Async<'T>> -> Async<'T []>
static member RunSynchronously : computation:Async<'T> * ?timeout:int * ?cancellationToken:CancellationToken -> 'T
static member Sleep : millisecondsDueTime:int -> Async<unit>
static member Start : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
static member StartAsTask : computation:Async<'T> * ?taskCreationOptions:TaskCreationOptions * ?cancellationToken:CancellationToken -> Task<'T>
static member StartChild : computation:Async<'T> * ?millisecondsTimeout:int -> Async<Async<'T>>
static member StartChildAsTask : computation:Async<'T> * ?taskCreationOptions:TaskCreationOptions -> Async<Task<'T>>
static member StartImmediate : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
static member StartWithContinuations : computation:Async<'T> * continuation:('T -> unit) * exceptionContinuation:(exn -> unit) * cancellationContinuation:(OperationCanceledException -> unit) * ?cancellationToken:CancellationToken -> unit
static member SwitchToContext : syncContext:SynchronizationContext -> Async<unit>
static member SwitchToNewThread : unit -> Async<unit>
static member SwitchToThreadPool : unit -> Async<unit>
static member TryCancelled : computation:Async<'T> * compensation:(OperationCanceledException -> unit) -> Async<'T>
static member CancellationToken : Async<CancellationToken>
static member DefaultCancellationToken : CancellationToken

Full name: Microsoft.FSharp.Control.Async

--------------------
type Async<'T>

Full name: Microsoft.FSharp.Control.Async<_>
static member Async.Sleep : millisecondsDueTime:int -> Async<unit>
val cancelToken : Async<unit>

More information

Link:http://fssnip.net/d4
Posted:12 years ago
Author:Tomas Petricek
Tags: async , workflow , cancellation