0 people like it.

Porting of AsyncOneManyLock to F#

If you have read the awesome book by Jeffrey Richter CLR via C# 4 ed. you have discovered that there are more optimal ways for thread synchronization than the one provided by the BCL. One of them is the use of new asynchronous capabilities in order to create an asynchronous synchronization primitive. In the book it is presented an AsyncOneManyLock which is used for thread synchornization for code with a high demand for responsiveness and scalability. If you are an F# developer you know that the F# Asynchornous Workflow and the Task Parallel Library are different, so I decided to port this useful piece of code to F# and show you how to use it with an example.

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
14: 
15: 
16: 
17: 
18: 
19: 
20: 
21: 
22: 
23: 
24: 
25: 
26: 
27: 
28: 
29: 
30: 
31: 
32: 
33: 
34: 
35: 
36: 
37: 
38: 
39: 
40: 
41: 
42: 
43: 
44: 
45: 
46: 
47: 
48: 
49: 
50: 
51: 
52: 
53: 
54: 
55: 
56: 
57: 
58: 
59: 
60: 
61: 
62: 
63: 
64: 
65: 
66: 
67: 
68: 
69: 
70: 
71: 
72: 
73: 
74: 
75: 
76: 
namespace AP.Threading

open System
open System.Collections.Generic
open System.Threading.Tasks

type OneManyMode = Exclusive | Shared

[<Sealed>]
type AsyncOneManyLock() = 
    
    let _lock = new SpinLock(true)
    let _noContentionAccessGranter = Task.FromResult<Object>(null)
    let _qWaitingWriters = new Queue<TaskCompletionSource<Object>>()
    let mutable _waitingReadersSignal = new TaskCompletionSource<Object>()
    let mutable _numWaitingReaders = 0
    let mutable _state = 0

    let lock() =
        let mutable taken = false
        _lock.Enter(&taken)

    let unlock() =
        _lock.Exit()

    let isFree() = _state = 0
    let isOwnedByWriter() = _state = -1
    let isOwnedByReader() = _state > 0
    let addReaders(count: Int32) = _state <- _state + count
    let subtractReader() = _state <- _state - 1
    let makeWriter() = _state <- -1
    let makeFree() = _state <- 0

    member this.WaitAsync(mode: OneManyMode) =
        let mutable accessGranter = _noContentionAccessGranter

        lock()
        match mode with
        | Exclusive -> 
            if isFree() then
                makeWriter()
            else
                let tcs = new TaskCompletionSource<Object>()
                _qWaitingWriters.Enqueue(tcs)
                accessGranter <- tcs.Task
        | Shared ->
            if isFree() || (isOwnedByReader() && _qWaitingWriters.Count = 0) then
                addReaders(1)
            else
                _numWaitingReaders <- _numWaitingReaders + 1
                accessGranter <- _waitingReadersSignal.Task.ContinueWith(
                    fun (t: Task<Object>) -> 
                        t.Result
                )
        unlock()
        accessGranter

    member this.Release() =
        let mutable accessGranted: TaskCompletionSource<Object> option = None

        lock()
        if isOwnedByWriter() then makeFree()
        else subtractReader()

        if isFree() then
            if _qWaitingWriters.Count > 0 then
                makeWriter()
                accessGranted <- Some <| _qWaitingWriters.Dequeue()
            elif _numWaitingReaders > 0 then
                addReaders(_numWaitingReaders)
                _numWaitingReaders <- 0
                accessGranted <- Some <| _waitingReadersSignal
                _waitingReadersSignal <- new TaskCompletionSource<Object>()

        unlock()
        if accessGranted.IsSome then accessGranted.Value.SetResult(null) 
namespace System.Threading
namespace System
namespace System.Collections
namespace System.Collections.Generic
namespace System.Threading.Tasks
type OneManyMode =
  | Exclusive
  | Shared

Full name: AP.Threading.OneManyMode
union case OneManyMode.Exclusive: OneManyMode
union case OneManyMode.Shared: OneManyMode
Multiple items
type SealedAttribute =
  inherit Attribute
  new : unit -> SealedAttribute
  new : value:bool -> SealedAttribute
  member Value : bool

Full name: Microsoft.FSharp.Core.SealedAttribute

--------------------
new : unit -> SealedAttribute
new : value:bool -> SealedAttribute
Multiple items
type AsyncOneManyLock =
  new : unit -> AsyncOneManyLock
  member Release : unit -> unit
  member WaitAsync : mode:OneManyMode -> Task<Object>

Full name: AP.Threading.AsyncOneManyLock

--------------------
new : unit -> AsyncOneManyLock
Multiple items
type Task =
  new : action:Action -> Task + 7 overloads
  member AsyncState : obj
  member ContinueWith : continuationAction:Action<Task> -> Task + 9 overloads
  member CreationOptions : TaskCreationOptions
  member Dispose : unit -> unit
  member Exception : AggregateException
  member Id : int
  member IsCanceled : bool
  member IsCompleted : bool
  member IsFaulted : bool
  ...

Full name: System.Threading.Tasks.Task

--------------------
type Task<'TResult> =
  inherit Task
  new : function:Func<'TResult> -> Task<'TResult> + 7 overloads
  member ContinueWith : continuationAction:Action<Task<'TResult>> -> Task + 9 overloads
  member Result : 'TResult with get, set
  static member Factory : TaskFactory<'TResult>

Full name: System.Threading.Tasks.Task<_>

--------------------
Task(action: Action) : unit
Task(action: Action, cancellationToken: Threading.CancellationToken) : unit
Task(action: Action, creationOptions: TaskCreationOptions) : unit
Task(action: Action<obj>, state: obj) : unit
Task(action: Action, cancellationToken: Threading.CancellationToken, creationOptions: TaskCreationOptions) : unit
Task(action: Action<obj>, state: obj, cancellationToken: Threading.CancellationToken) : unit
Task(action: Action<obj>, state: obj, creationOptions: TaskCreationOptions) : unit
Task(action: Action<obj>, state: obj, cancellationToken: Threading.CancellationToken, creationOptions: TaskCreationOptions) : unit

--------------------
Task(function: Func<'TResult>) : unit
Task(function: Func<'TResult>, cancellationToken: Threading.CancellationToken) : unit
Task(function: Func<'TResult>, creationOptions: TaskCreationOptions) : unit
Task(function: Func<obj,'TResult>, state: obj) : unit
Task(function: Func<'TResult>, cancellationToken: Threading.CancellationToken, creationOptions: TaskCreationOptions) : unit
Task(function: Func<obj,'TResult>, state: obj, cancellationToken: Threading.CancellationToken) : unit
Task(function: Func<obj,'TResult>, state: obj, creationOptions: TaskCreationOptions) : unit
Task(function: Func<obj,'TResult>, state: obj, cancellationToken: Threading.CancellationToken, creationOptions: TaskCreationOptions) : unit
Multiple items
type Object =
  new : unit -> obj
  member Equals : obj:obj -> bool
  member GetHashCode : unit -> int
  member GetType : unit -> Type
  member ToString : unit -> string
  static member Equals : objA:obj * objB:obj -> bool
  static member ReferenceEquals : objA:obj * objB:obj -> bool

Full name: System.Object

--------------------
Object() : unit
Multiple items
type Queue<'T> =
  new : unit -> Queue<'T> + 2 overloads
  member Clear : unit -> unit
  member Contains : item:'T -> bool
  member CopyTo : array:'T[] * arrayIndex:int -> unit
  member Count : int
  member Dequeue : unit -> 'T
  member Enqueue : item:'T -> unit
  member GetEnumerator : unit -> Enumerator<'T>
  member Peek : unit -> 'T
  member ToArray : unit -> 'T[]
  ...
  nested type Enumerator

Full name: System.Collections.Generic.Queue<_>

--------------------
Queue() : unit
Queue(capacity: int) : unit
Queue(collection: IEnumerable<'T>) : unit
Multiple items
type TaskCompletionSource<'TResult> =
  new : unit -> TaskCompletionSource<'TResult> + 3 overloads
  member SetCanceled : unit -> unit
  member SetException : exception:Exception -> unit + 1 overload
  member SetResult : result:'TResult -> unit
  member Task : Task<'TResult>
  member TrySetCanceled : unit -> bool
  member TrySetException : exception:Exception -> bool + 1 overload
  member TrySetResult : result:'TResult -> bool

Full name: System.Threading.Tasks.TaskCompletionSource<_>

--------------------
TaskCompletionSource() : unit
TaskCompletionSource(creationOptions: TaskCreationOptions) : unit
TaskCompletionSource(state: obj) : unit
TaskCompletionSource(state: obj, creationOptions: TaskCreationOptions) : unit
val lock : (unit -> 'a)
val mutable taken : bool
val _lock : obj
val unlock : (unit -> 'a)
val isFree : (unit -> bool)
val mutable _state : int
val isOwnedByWriter : (unit -> bool)
val isOwnedByReader : (unit -> bool)
val addReaders : (Int32 -> unit)
val count : Int32
type Int32 =
  struct
    member CompareTo : value:obj -> int + 1 overload
    member Equals : obj:obj -> bool + 1 overload
    member GetHashCode : unit -> int
    member GetTypeCode : unit -> TypeCode
    member ToString : unit -> string + 3 overloads
    static val MaxValue : int
    static val MinValue : int
    static member Parse : s:string -> int + 3 overloads
    static member TryParse : s:string * result:int -> bool + 1 overload
  end

Full name: System.Int32
val subtractReader : (unit -> unit)
val makeWriter : (unit -> unit)
val makeFree : (unit -> unit)
val this : AsyncOneManyLock
member AsyncOneManyLock.WaitAsync : mode:OneManyMode -> Task<Object>

Full name: AP.Threading.AsyncOneManyLock.WaitAsync
val mode : OneManyMode
val mutable accessGranter : Task<Object>
val _noContentionAccessGranter : Task<Object>
val tcs : TaskCompletionSource<Object>
val _qWaitingWriters : Queue<TaskCompletionSource<Object>>
Queue.Enqueue(item: TaskCompletionSource<Object>) : unit
property TaskCompletionSource.Task: Task<Object>
property Queue.Count: int
val mutable _numWaitingReaders : int
val mutable _waitingReadersSignal : TaskCompletionSource<Object>
Task.ContinueWith<'TResult>(continuationFunction: Func<Task,'TResult>) : Task<'TResult>
   (+0 other overloads)
Task.ContinueWith(continuationAction: Action<Task>) : Task
   (+0 other overloads)
Task.ContinueWith<'TNewResult>(continuationFunction: Func<Task<Object>,'TNewResult>) : Task<'TNewResult>
   (+0 other overloads)
Task.ContinueWith(continuationAction: Action<Task<Object>>) : Task
   (+0 other overloads)
Task.ContinueWith<'TResult>(continuationFunction: Func<Task,'TResult>, continuationOptions: TaskContinuationOptions) : Task<'TResult>
   (+0 other overloads)
Task.ContinueWith<'TResult>(continuationFunction: Func<Task,'TResult>, scheduler: TaskScheduler) : Task<'TResult>
   (+0 other overloads)
Task.ContinueWith<'TResult>(continuationFunction: Func<Task,'TResult>, cancellationToken: Threading.CancellationToken) : Task<'TResult>
   (+0 other overloads)
Task.ContinueWith(continuationAction: Action<Task>, continuationOptions: TaskContinuationOptions) : Task
   (+0 other overloads)
Task.ContinueWith(continuationAction: Action<Task>, scheduler: TaskScheduler) : Task
   (+0 other overloads)
Task.ContinueWith(continuationAction: Action<Task>, cancellationToken: Threading.CancellationToken) : Task
   (+0 other overloads)
val t : Task<Object>
property Task.Result: Object
member AsyncOneManyLock.Release : unit -> unit

Full name: AP.Threading.AsyncOneManyLock.Release
val mutable accessGranted : TaskCompletionSource<Object> option
type 'T option = Option<'T>

Full name: Microsoft.FSharp.Core.option<_>
union case Option.None: Option<'T>
union case Option.Some: Value: 'T -> Option<'T>
Queue.Dequeue() : TaskCompletionSource<Object>
property Option.IsSome: bool
property Option.Value: TaskCompletionSource<Object>
TaskCompletionSource.SetResult(result: Object) : unit

More information

Link:http://fssnip.net/li
Posted:11 years ago
Author:Antonio Parata
Tags: async , tpl , synchronization