2 people like it.

Async Primitives

A set of Async primitives as described by Dave Thomas [1] (and derived from Stephen Toub [2]). [1] http://moiraesoftware.com/blog/2012/04/22/back-to-the-primitive-ii/ [2] http://blogs.msdn.com/b/pfxteam/archive/2012/02/11/10266923.aspx

Async primitives

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
14: 
15: 
16: 
17: 
18: 
19: 
20: 
21: 
22: 
23: 
24: 
25: 
26: 
27: 
28: 
29: 
30: 
31: 
32: 
33: 
34: 
35: 
36: 
37: 
38: 
39: 
40: 
41: 
42: 
43: 
44: 
45: 
46: 
47: 
48: 
49: 
type AsyncManualResetEvent() =
  let resultCell = ref <| AsyncResultCell<_>()

  member x.AsyncWait() = (!resultCell).AsyncResult
  member x.Set() = (!resultCell).RegisterResult(AsyncOk(true))
  member x.Reset() =
    let rec swap newVal =
      let currentValue = !resultCell
      let result = Interlocked.CompareExchange<_>(resultCell, newVal, currentValue)
      if obj.ReferenceEquals(result, currentValue) then () else
      Thread.SpinWait 20
      swap newVal
    swap <| AsyncResultCell<_>()

type AsyncAutoResetEvent(?reuseThread) =
  let mutable awaits = Queue<_>()
  let mutable signalled = false
  let completed = async.Return true
  let reuseThread = defaultArg reuseThread false

  member x.AsyncWait() =
    lock awaits (fun () ->
      if signalled then
        signalled <- false
        completed
      else
        let are = AsyncResultCell<_>()
        awaits.Enqueue are
        are.AsyncResult )

type AsyncCountdownEvent(initialCount) =
  let amre = AsyncManualResetEvent()
  do if initialCount <= 0 then raise (new ArgumentOutOfRangeException("initialCount"))
  let count = ref initialCount

  member x.AsyncWait() = amre.AsyncWait()
  member x.Signal() =
    if !count <= 0 then invalidOp ""

    let newCount = Interlocked.Decrement(count)
    if newCount = 0 then
      amre.Set()
    elif newCount < 0 then
      invalidOp ""
    else ()

  member x.SignalAndWait() =
    x.Signal()
    x.AsyncWait()

Sample usage

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
14: 
15: 
16: 
17: 
18: 
19: 
20: 
21: 
22: 
23: 
24: 
25: 
26: 
27: 
28: 
29: 
30: 
31: 
32: 
33: 
34: 
35: 
36: 
37: 
38: 
39: 
40: 
41: 
42: 
43: 
44: 
45: 
46: 
47: 
48: 
49: 
50: 
Console.WriteLine("AsyncManualResetEvent")

let amre = AsyncManualResetEvent()
let x = async{let! x = amre.AsyncWait()
              Console.WriteLine("First signalled")}

let y = async{let! x = amre.AsyncWait()
             Console.WriteLine("Second signalled")}

let z = async{let! x = amre.AsyncWait()
              Console.WriteLine("Third signalled")}
//start async workflows x and y
Async.Start x
Async.Start y

//reset the asyncManualResetEvent, this will test whether the async workflows x and y 
// are orphaned due to the AsyncResultCell being recycled.
amre.Reset()

//now start the async z
Async.Start z

//we set a single time, this should result in the three async workflows completing
amre.Set()

Console.WriteLine()

Console.WriteLine("AsyncCountdownEvent")

let ace = AsyncCountdownEvent(3)

let aceWait1 = async{
  let! _ = ace.SignalAndWait()
  Console.WriteLine("First signalled")
}

let aceWait2 = async{
  let! _ = ace.SignalAndWait()
  Console.WriteLine("Second signalled")
}

let aceWait3 = async {
  let! _ = ace.SignalAndWait()
  Console.WriteLine("Third signalled")
}

//start async workflows aceWait1 and aceWait2
Async.Start aceWait1
Async.Start aceWait2
Async.Start aceWait3
Multiple items
type AsyncManualResetEvent =
  new : unit -> AsyncManualResetEvent
  member AsyncWait : unit -> 'b
  member Reset : unit -> unit
  member Set : unit -> 'a

Full name: AsyncPrimitives.AsyncManualResetEvent

--------------------
new : unit -> AsyncManualResetEvent
val resultCell : obj ref
Multiple items
val ref : value:'T -> 'T ref

Full name: Microsoft.FSharp.Core.Operators.ref

--------------------
type 'T ref = Ref<'T>

Full name: Microsoft.FSharp.Core.ref<_>
val x : AsyncManualResetEvent
member AsyncManualResetEvent.AsyncWait : unit -> 'b

Full name: AsyncPrimitives.AsyncManualResetEvent.AsyncWait
Multiple items
member AsyncManualResetEvent.Set : unit -> 'a

Full name: AsyncPrimitives.AsyncManualResetEvent.Set

--------------------
module Set

from Microsoft.FSharp.Collections

--------------------
type Set<'T (requires comparison)> =
  interface IComparable
  interface IEnumerable
  interface IEnumerable<'T>
  interface ICollection<'T>
  new : elements:seq<'T> -> Set<'T>
  member Add : value:'T -> Set<'T>
  member Contains : value:'T -> bool
  override Equals : obj -> bool
  member IsProperSubsetOf : otherSet:Set<'T> -> bool
  member IsProperSupersetOf : otherSet:Set<'T> -> bool
  ...

Full name: Microsoft.FSharp.Collections.Set<_>

--------------------
new : elements:seq<'T> -> Set<'T>
member AsyncManualResetEvent.Reset : unit -> unit

Full name: AsyncPrimitives.AsyncManualResetEvent.Reset
val swap : (obj -> unit)
val newVal : obj
val currentValue : obj
val result : obj
type Interlocked =
  static member Add : location1:int * value:int -> int + 1 overload
  static member CompareExchange : location1:int * value:int * comparand:int -> int + 6 overloads
  static member Decrement : location:int -> int + 1 overload
  static member Exchange : location1:int * value:int -> int + 6 overloads
  static member Increment : location:int -> int + 1 overload
  static member Read : location:int64 -> int64

Full name: System.Threading.Interlocked
Interlocked.CompareExchange<'T (requires reference type)>(location1: byref<'T>, value: 'T, comparand: 'T) : 'T
Interlocked.CompareExchange(location1: byref<nativeint>, value: nativeint, comparand: nativeint) : nativeint
Interlocked.CompareExchange(location1: byref<obj>, value: obj, comparand: obj) : obj
Interlocked.CompareExchange(location1: byref<float>, value: float, comparand: float) : float
Interlocked.CompareExchange(location1: byref<float32>, value: float32, comparand: float32) : float32
Interlocked.CompareExchange(location1: byref<int64>, value: int64, comparand: int64) : int64
Interlocked.CompareExchange(location1: byref<int>, value: int, comparand: int) : int
type obj = Object

Full name: Microsoft.FSharp.Core.obj
Object.ReferenceEquals(objA: obj, objB: obj) : bool
Multiple items
type Thread =
  inherit CriticalFinalizerObject
  new : start:ThreadStart -> Thread + 3 overloads
  member Abort : unit -> unit + 1 overload
  member ApartmentState : ApartmentState with get, set
  member CurrentCulture : CultureInfo with get, set
  member CurrentUICulture : CultureInfo with get, set
  member DisableComObjectEagerCleanup : unit -> unit
  member ExecutionContext : ExecutionContext
  member GetApartmentState : unit -> ApartmentState
  member GetCompressedStack : unit -> CompressedStack
  member GetHashCode : unit -> int
  ...

Full name: System.Threading.Thread

--------------------
Thread(start: ThreadStart) : unit
Thread(start: ParameterizedThreadStart) : unit
Thread(start: ThreadStart, maxStackSize: int) : unit
Thread(start: ParameterizedThreadStart, maxStackSize: int) : unit
Thread.SpinWait(iterations: int) : unit
Multiple items
type AsyncAutoResetEvent =
  new : ?reuseThread:bool -> AsyncAutoResetEvent
  member AsyncWait : unit -> Async<bool>

Full name: AsyncPrimitives.AsyncAutoResetEvent

--------------------
new : ?reuseThread:bool -> AsyncAutoResetEvent
val reuseThread : bool option
val mutable awaits : Queue<obj>
Multiple items
type Queue<'T> =
  new : unit -> Queue<'T> + 2 overloads
  member Clear : unit -> unit
  member Contains : item:'T -> bool
  member CopyTo : array:'T[] * arrayIndex:int -> unit
  member Count : int
  member Dequeue : unit -> 'T
  member Enqueue : item:'T -> unit
  member GetEnumerator : unit -> Enumerator<'T>
  member Peek : unit -> 'T
  member ToArray : unit -> 'T[]
  ...
  nested type Enumerator

Full name: System.Collections.Generic.Queue<_>

--------------------
Queue() : unit
Queue(capacity: int) : unit
Queue(collection: IEnumerable<'T>) : unit
val mutable signalled : bool
val completed : Async<bool>
val async : AsyncBuilder

Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.async
member AsyncBuilder.Return : value:'T -> Async<'T>
val reuseThread : bool
val defaultArg : arg:'T option -> defaultValue:'T -> 'T

Full name: Microsoft.FSharp.Core.Operators.defaultArg
val x : AsyncAutoResetEvent
member AsyncAutoResetEvent.AsyncWait : unit -> Async<bool>

Full name: AsyncPrimitives.AsyncAutoResetEvent.AsyncWait
val lock : lockObject:'Lock -> action:(unit -> 'T) -> 'T (requires reference type)

Full name: Microsoft.FSharp.Core.Operators.lock
val are : obj
Queue.Enqueue(item: obj) : unit
Multiple items
type AsyncCountdownEvent =
  new : initialCount:int -> AsyncCountdownEvent
  member AsyncWait : unit -> 'b
  member Signal : unit -> unit
  member SignalAndWait : unit -> 'a

Full name: AsyncPrimitives.AsyncCountdownEvent

--------------------
new : initialCount:int -> AsyncCountdownEvent
val initialCount : int
val amre : AsyncManualResetEvent
val raise : exn:Exception -> 'T

Full name: Microsoft.FSharp.Core.Operators.raise
Multiple items
type ArgumentOutOfRangeException =
  inherit ArgumentException
  new : unit -> ArgumentOutOfRangeException + 4 overloads
  member ActualValue : obj
  member GetObjectData : info:SerializationInfo * context:StreamingContext -> unit
  member Message : string

Full name: System.ArgumentOutOfRangeException

--------------------
ArgumentOutOfRangeException() : unit
ArgumentOutOfRangeException(paramName: string) : unit
ArgumentOutOfRangeException(paramName: string, message: string) : unit
ArgumentOutOfRangeException(message: string, innerException: exn) : unit
ArgumentOutOfRangeException(paramName: string, actualValue: obj, message: string) : unit
val count : int ref
val x : AsyncCountdownEvent
member AsyncCountdownEvent.AsyncWait : unit -> 'b

Full name: AsyncPrimitives.AsyncCountdownEvent.AsyncWait
member AsyncManualResetEvent.AsyncWait : unit -> 'b
member AsyncCountdownEvent.Signal : unit -> unit

Full name: AsyncPrimitives.AsyncCountdownEvent.Signal
val invalidOp : message:string -> 'T

Full name: Microsoft.FSharp.Core.Operators.invalidOp
val newCount : int
Interlocked.Decrement(location: byref<int64>) : int64
Interlocked.Decrement(location: byref<int>) : int
member AsyncManualResetEvent.Set : unit -> 'a
member AsyncCountdownEvent.SignalAndWait : unit -> 'a

Full name: AsyncPrimitives.AsyncCountdownEvent.SignalAndWait
member AsyncCountdownEvent.Signal : unit -> unit
member AsyncCountdownEvent.AsyncWait : unit -> 'b
type Console =
  static member BackgroundColor : ConsoleColor with get, set
  static member Beep : unit -> unit + 1 overload
  static member BufferHeight : int with get, set
  static member BufferWidth : int with get, set
  static member CapsLock : bool
  static member Clear : unit -> unit
  static member CursorLeft : int with get, set
  static member CursorSize : int with get, set
  static member CursorTop : int with get, set
  static member CursorVisible : bool with get, set
  ...

Full name: System.Console
Console.WriteLine() : unit
   (+0 other overloads)
Console.WriteLine(value: string) : unit
   (+0 other overloads)
Console.WriteLine(value: obj) : unit
   (+0 other overloads)
Console.WriteLine(value: uint64) : unit
   (+0 other overloads)
Console.WriteLine(value: int64) : unit
   (+0 other overloads)
Console.WriteLine(value: uint32) : unit
   (+0 other overloads)
Console.WriteLine(value: int) : unit
   (+0 other overloads)
Console.WriteLine(value: float32) : unit
   (+0 other overloads)
Console.WriteLine(value: float) : unit
   (+0 other overloads)
Console.WriteLine(value: decimal) : unit
   (+0 other overloads)
val amre : AsyncManualResetEvent

Full name: AsyncPrimitives.amre
val x : Async<unit>

Full name: AsyncPrimitives.x
val x : obj
val y : Async<unit>

Full name: AsyncPrimitives.y
val z : Async<unit>

Full name: AsyncPrimitives.z
Multiple items
type Async
static member AsBeginEnd : computation:('Arg -> Async<'T>) -> ('Arg * AsyncCallback * obj -> IAsyncResult) * (IAsyncResult -> 'T) * (IAsyncResult -> unit)
static member AwaitEvent : event:IEvent<'Del,'T> * ?cancelAction:(unit -> unit) -> Async<'T> (requires delegate and 'Del :> Delegate)
static member AwaitIAsyncResult : iar:IAsyncResult * ?millisecondsTimeout:int -> Async<bool>
static member AwaitTask : task:Task<'T> -> Async<'T>
static member AwaitWaitHandle : waitHandle:WaitHandle * ?millisecondsTimeout:int -> Async<bool>
static member CancelDefaultToken : unit -> unit
static member Catch : computation:Async<'T> -> Async<Choice<'T,exn>>
static member FromBeginEnd : beginAction:(AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg:'Arg1 * beginAction:('Arg1 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg1:'Arg1 * arg2:'Arg2 * beginAction:('Arg1 * 'Arg2 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg1:'Arg1 * arg2:'Arg2 * arg3:'Arg3 * beginAction:('Arg1 * 'Arg2 * 'Arg3 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromContinuations : callback:(('T -> unit) * (exn -> unit) * (OperationCanceledException -> unit) -> unit) -> Async<'T>
static member Ignore : computation:Async<'T> -> Async<unit>
static member OnCancel : interruption:(unit -> unit) -> Async<IDisposable>
static member Parallel : computations:seq<Async<'T>> -> Async<'T []>
static member RunSynchronously : computation:Async<'T> * ?timeout:int * ?cancellationToken:CancellationToken -> 'T
static member Sleep : millisecondsDueTime:int -> Async<unit>
static member Start : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
static member StartAsTask : computation:Async<'T> * ?taskCreationOptions:TaskCreationOptions * ?cancellationToken:CancellationToken -> Task<'T>
static member StartChild : computation:Async<'T> * ?millisecondsTimeout:int -> Async<Async<'T>>
static member StartChildAsTask : computation:Async<'T> * ?taskCreationOptions:TaskCreationOptions -> Async<Task<'T>>
static member StartImmediate : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
static member StartWithContinuations : computation:Async<'T> * continuation:('T -> unit) * exceptionContinuation:(exn -> unit) * cancellationContinuation:(OperationCanceledException -> unit) * ?cancellationToken:CancellationToken -> unit
static member SwitchToContext : syncContext:SynchronizationContext -> Async<unit>
static member SwitchToNewThread : unit -> Async<unit>
static member SwitchToThreadPool : unit -> Async<unit>
static member TryCancelled : computation:Async<'T> * compensation:(OperationCanceledException -> unit) -> Async<'T>
static member CancellationToken : Async<CancellationToken>
static member DefaultCancellationToken : CancellationToken

Full name: Microsoft.FSharp.Control.Async

--------------------
type Async<'T>

Full name: Microsoft.FSharp.Control.Async<_>
static member Async.Start : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
member AsyncManualResetEvent.Reset : unit -> unit
val ace : AsyncCountdownEvent

Full name: AsyncPrimitives.ace
val aceWait1 : Async<unit>

Full name: AsyncPrimitives.aceWait1
member AsyncCountdownEvent.SignalAndWait : unit -> 'a
val aceWait2 : Async<unit>

Full name: AsyncPrimitives.aceWait2
val aceWait3 : Async<unit>

Full name: AsyncPrimitives.aceWait3
Raw view Test code New version

More information

Link:http://fssnip.net/bV
Posted:11 years ago
Author:Ryan Riley
Tags: async , threading