7 people like it.

Return the first result using Async.Choice

The snippet implements Async.Choice method that takes several workflows and creates a workflow, which returns the first result that was computed. After a workflow completes, remaining workflows are cancelled using the F# async cancellation mechanism. (The method doesn't handle exceptions.)

Async.Choice implementation

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
14: 
15: 
16: 
17: 
18: 
19: 
20: 
21: 
22: 
23: 
24: 
25: 
26: 
27: 
28: 
29: 
30: 
31: 
open System.Threading

type Microsoft.FSharp.Control.Async with
  /// Takes several asynchronous workflows and returns 
  /// the result of the first workflow that successfuly completes
  static member Choice(workflows) = 
    Async.FromContinuations(fun (cont, _, _) ->
      let cts = new CancellationTokenSource()
      let completed = ref false
      let lockObj = new obj()
      let synchronized f = lock lockObj f
    
      /// Called when a result is available - the function uses locks
      /// to make sure that it calls the continuation only once
      let completeOnce res =
        let run =
          synchronized(fun () ->
            if completed.Value then false
            else completed := true; true)
        if run then cont res
    
      /// Workflow that will be started for each argument - run the 
      /// operation, cancel pending workflows and then return result
      let runWorkflow workflow = async {
        let! res = workflow
        cts.Cancel()
        completeOnce res }
    
      // Start all workflows using cancellation token
      for work in workflows do
        Async.Start(runWorkflow work, cts.Token) )

Sample

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
14: 
15: 
/// Simple function that sleeps for some time 
/// and then returns a specified value
let delayReturn n s = async {
  // Run a couple of async operations - F# checks for 
  // cancellation automatically when starting them
  do! Async.Sleep(n) 
  do! Async.Sleep(1)  
  printfn "returning %s" s
  return s }

// Run two 'delayReturn' workflows in parallel and return
// the result of the first one. Note that this only prints
// 'returning First!' (because the other workflow is cancelled)
Async.Choice [ delayReturn 1000 "First!"; delayReturn 5000 "Second!" ]
|> Async.RunSynchronously
namespace System
namespace System.Threading
namespace Microsoft
namespace Microsoft.FSharp
namespace Microsoft.FSharp.Control
Multiple items
type Async<'T>

Full name: Microsoft.FSharp.Control.Async<_>

--------------------
type Async
static member AsBeginEnd : computation:('Arg -> Async<'T>) -> ('Arg * AsyncCallback * obj -> IAsyncResult) * (IAsyncResult -> 'T) * (IAsyncResult -> unit)
static member AwaitEvent : event:IEvent<'Del,'T> * ?cancelAction:(unit -> unit) -> Async<'T> (requires delegate and 'Del :> Delegate)
static member AwaitIAsyncResult : iar:IAsyncResult * ?millisecondsTimeout:int -> Async<bool>
static member AwaitTask : task:Task<'T> -> Async<'T>
static member AwaitWaitHandle : waitHandle:WaitHandle * ?millisecondsTimeout:int -> Async<bool>
static member CancelDefaultToken : unit -> unit
static member Catch : computation:Async<'T> -> Async<Choice<'T,exn>>
static member FromBeginEnd : beginAction:(AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg:'Arg1 * beginAction:('Arg1 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg1:'Arg1 * arg2:'Arg2 * beginAction:('Arg1 * 'Arg2 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg1:'Arg1 * arg2:'Arg2 * arg3:'Arg3 * beginAction:('Arg1 * 'Arg2 * 'Arg3 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromContinuations : callback:(('T -> unit) * (exn -> unit) * (OperationCanceledException -> unit) -> unit) -> Async<'T>
static member Ignore : computation:Async<'T> -> Async<unit>
static member OnCancel : interruption:(unit -> unit) -> Async<IDisposable>
static member Parallel : computations:seq<Async<'T>> -> Async<'T []>
static member RunSynchronously : computation:Async<'T> * ?timeout:int * ?cancellationToken:CancellationToken -> 'T
static member Sleep : millisecondsDueTime:int -> Async<unit>
static member Start : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
static member StartAsTask : computation:Async<'T> * ?taskCreationOptions:TaskCreationOptions * ?cancellationToken:CancellationToken -> Task<'T>
static member StartChild : computation:Async<'T> * ?millisecondsTimeout:int -> Async<Async<'T>>
static member StartChildAsTask : computation:Async<'T> * ?taskCreationOptions:TaskCreationOptions -> Async<Task<'T>>
static member StartImmediate : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
static member StartWithContinuations : computation:Async<'T> * continuation:('T -> unit) * exceptionContinuation:(exn -> unit) * cancellationContinuation:(OperationCanceledException -> unit) * ?cancellationToken:CancellationToken -> unit
static member SwitchToContext : syncContext:SynchronizationContext -> Async<unit>
static member SwitchToNewThread : unit -> Async<unit>
static member SwitchToThreadPool : unit -> Async<unit>
static member TryCancelled : computation:Async<'T> * compensation:(OperationCanceledException -> unit) -> Async<'T>
static member CancellationToken : Async<CancellationToken>
static member DefaultCancellationToken : CancellationToken

Full name: Microsoft.FSharp.Control.Async
Multiple items
static member Async.Choice : workflows:seq<Async<'a>> -> Async<'a>

Full name: Script.Choice


 Takes several asynchronous workflows and returns
 the result of the first workflow that successfuly completes


--------------------
type Choice<'T1,'T2> =
  | Choice1Of2 of 'T1
  | Choice2Of2 of 'T2

Full name: Microsoft.FSharp.Core.Choice<_,_>

--------------------
type Choice<'T1,'T2,'T3> =
  | Choice1Of3 of 'T1
  | Choice2Of3 of 'T2
  | Choice3Of3 of 'T3

Full name: Microsoft.FSharp.Core.Choice<_,_,_>

--------------------
type Choice<'T1,'T2,'T3,'T4> =
  | Choice1Of4 of 'T1
  | Choice2Of4 of 'T2
  | Choice3Of4 of 'T3
  | Choice4Of4 of 'T4

Full name: Microsoft.FSharp.Core.Choice<_,_,_,_>

--------------------
type Choice<'T1,'T2,'T3,'T4,'T5> =
  | Choice1Of5 of 'T1
  | Choice2Of5 of 'T2
  | Choice3Of5 of 'T3
  | Choice4Of5 of 'T4
  | Choice5Of5 of 'T5

Full name: Microsoft.FSharp.Core.Choice<_,_,_,_,_>

--------------------
type Choice<'T1,'T2,'T3,'T4,'T5,'T6> =
  | Choice1Of6 of 'T1
  | Choice2Of6 of 'T2
  | Choice3Of6 of 'T3
  | Choice4Of6 of 'T4
  | Choice5Of6 of 'T5
  | Choice6Of6 of 'T6

Full name: Microsoft.FSharp.Core.Choice<_,_,_,_,_,_>

--------------------
type Choice<'T1,'T2,'T3,'T4,'T5,'T6,'T7> =
  | Choice1Of7 of 'T1
  | Choice2Of7 of 'T2
  | Choice3Of7 of 'T3
  | Choice4Of7 of 'T4
  | Choice5Of7 of 'T5
  | Choice6Of7 of 'T6
  | Choice7Of7 of 'T7

Full name: Microsoft.FSharp.Core.Choice<_,_,_,_,_,_,_>
val workflows : seq<Async<'a>>
Multiple items
type Async
static member AsBeginEnd : computation:('Arg -> Async<'T>) -> ('Arg * AsyncCallback * obj -> IAsyncResult) * (IAsyncResult -> 'T) * (IAsyncResult -> unit)
static member AwaitEvent : event:IEvent<'Del,'T> * ?cancelAction:(unit -> unit) -> Async<'T> (requires delegate and 'Del :> Delegate)
static member AwaitIAsyncResult : iar:IAsyncResult * ?millisecondsTimeout:int -> Async<bool>
static member AwaitTask : task:Task<'T> -> Async<'T>
static member AwaitWaitHandle : waitHandle:WaitHandle * ?millisecondsTimeout:int -> Async<bool>
static member CancelDefaultToken : unit -> unit
static member Catch : computation:Async<'T> -> Async<Choice<'T,exn>>
static member FromBeginEnd : beginAction:(AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg:'Arg1 * beginAction:('Arg1 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg1:'Arg1 * arg2:'Arg2 * beginAction:('Arg1 * 'Arg2 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg1:'Arg1 * arg2:'Arg2 * arg3:'Arg3 * beginAction:('Arg1 * 'Arg2 * 'Arg3 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromContinuations : callback:(('T -> unit) * (exn -> unit) * (OperationCanceledException -> unit) -> unit) -> Async<'T>
static member Ignore : computation:Async<'T> -> Async<unit>
static member OnCancel : interruption:(unit -> unit) -> Async<IDisposable>
static member Parallel : computations:seq<Async<'T>> -> Async<'T []>
static member RunSynchronously : computation:Async<'T> * ?timeout:int * ?cancellationToken:CancellationToken -> 'T
static member Sleep : millisecondsDueTime:int -> Async<unit>
static member Start : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
static member StartAsTask : computation:Async<'T> * ?taskCreationOptions:TaskCreationOptions * ?cancellationToken:CancellationToken -> Task<'T>
static member StartChild : computation:Async<'T> * ?millisecondsTimeout:int -> Async<Async<'T>>
static member StartChildAsTask : computation:Async<'T> * ?taskCreationOptions:TaskCreationOptions -> Async<Task<'T>>
static member StartImmediate : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
static member StartWithContinuations : computation:Async<'T> * continuation:('T -> unit) * exceptionContinuation:(exn -> unit) * cancellationContinuation:(OperationCanceledException -> unit) * ?cancellationToken:CancellationToken -> unit
static member SwitchToContext : syncContext:SynchronizationContext -> Async<unit>
static member SwitchToNewThread : unit -> Async<unit>
static member SwitchToThreadPool : unit -> Async<unit>
static member TryCancelled : computation:Async<'T> * compensation:(OperationCanceledException -> unit) -> Async<'T>
static member CancellationToken : Async<CancellationToken>
static member DefaultCancellationToken : CancellationToken

Full name: Microsoft.FSharp.Control.Async

--------------------
type Async<'T>

Full name: Microsoft.FSharp.Control.Async<_>
static member Async.FromContinuations : callback:(('T -> unit) * (exn -> unit) * (System.OperationCanceledException -> unit) -> unit) -> Async<'T>
val cont : ('a -> unit)
val cts : CancellationTokenSource
Multiple items
type CancellationTokenSource =
  new : unit -> CancellationTokenSource
  member Cancel : unit -> unit + 1 overload
  member Dispose : unit -> unit
  member IsCancellationRequested : bool
  member Token : CancellationToken
  static member CreateLinkedTokenSource : [<ParamArray>] tokens:CancellationToken[] -> CancellationTokenSource + 1 overload

Full name: System.Threading.CancellationTokenSource

--------------------
CancellationTokenSource() : unit
val completed : bool ref
Multiple items
val ref : value:'T -> 'T ref

Full name: Microsoft.FSharp.Core.Operators.ref

--------------------
type 'T ref = Ref<'T>

Full name: Microsoft.FSharp.Core.ref<_>
val lockObj : obj
type obj = System.Object

Full name: Microsoft.FSharp.Core.obj
val synchronized : ((unit -> 'b) -> 'b)
val f : (unit -> 'b)
val lock : lockObject:'Lock -> action:(unit -> 'T) -> 'T (requires reference type)

Full name: Microsoft.FSharp.Core.Operators.lock
val completeOnce : ('a -> unit)


 Called when a result is available - the function uses locks
 to make sure that it calls the continuation only once
val res : 'a
val run : bool
property Ref.Value: bool
val runWorkflow : (Async<'a> -> Async<unit>)


 Workflow that will be started for each argument - run the
 operation, cancel pending workflows and then return result
val workflow : Async<'a>
val async : AsyncBuilder

Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.async
CancellationTokenSource.Cancel() : unit
CancellationTokenSource.Cancel(throwOnFirstException: bool) : unit
val work : Async<'a>
static member Async.Start : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
property CancellationTokenSource.Token: CancellationToken
val delayReturn : n:int -> s:string -> Async<string>

Full name: Script.delayReturn


 Simple function that sleeps for some time
 and then returns a specified value
val n : int
val s : string
static member Async.Sleep : millisecondsDueTime:int -> Async<unit>
val printfn : format:Printf.TextWriterFormat<'T> -> 'T

Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.printfn
static member Async.Choice : workflows:seq<Async<'a>> -> Async<'a>


 Takes several asynchronous workflows and returns
 the result of the first workflow that successfuly completes
static member Async.RunSynchronously : computation:Async<'T> * ?timeout:int * ?cancellationToken:CancellationToken -> 'T

More information

Link:http://fssnip.net/6D
Posted:13 years ago
Author:Tomas Petricek
Tags: async , asynchronous workflows , choice , non-determinism