0 people like it.
Like the snippet!
Porting of AsyncOneManyLock to F#
If you have read the awesome book by Jeffrey Richter CLR via C# 4 ed. you have discovered that there are more optimal ways for thread synchronization than the one provided by the BCL. One of them is the use of new asynchronous capabilities in order to create an asynchronous synchronization primitive. In the book it is presented an AsyncOneManyLock which is used for thread synchornization for code with a high demand for responsiveness and scalability.
If you are an F# developer you know that the F# Asynchornous Workflow and the Task Parallel Library are different, so I decided to port this useful piece of code to F# and show you how to use it with an example.
1:
2:
3:
4:
5:
6:
7:
8:
9:
10:
11:
12:
13:
14:
15:
16:
17:
18:
19:
20:
21:
22:
23:
24:
25:
26:
27:
28:
29:
30:
31:
32:
33:
34:
35:
36:
37:
38:
39:
40:
41:
42:
43:
44:
45:
46:
47:
48:
49:
50:
51:
52:
53:
54:
55:
56:
57:
58:
59:
60:
61:
62:
63:
64:
65:
66:
67:
68:
69:
70:
71:
72:
73:
74:
75:
76:
|
namespace AP.Threading
open System
open System.Collections.Generic
open System.Threading.Tasks
type OneManyMode = Exclusive | Shared
[<Sealed>]
type AsyncOneManyLock() =
let _lock = new SpinLock(true)
let _noContentionAccessGranter = Task.FromResult<Object>(null)
let _qWaitingWriters = new Queue<TaskCompletionSource<Object>>()
let mutable _waitingReadersSignal = new TaskCompletionSource<Object>()
let mutable _numWaitingReaders = 0
let mutable _state = 0
let lock() =
let mutable taken = false
_lock.Enter(&taken)
let unlock() =
_lock.Exit()
let isFree() = _state = 0
let isOwnedByWriter() = _state = -1
let isOwnedByReader() = _state > 0
let addReaders(count: Int32) = _state <- _state + count
let subtractReader() = _state <- _state - 1
let makeWriter() = _state <- -1
let makeFree() = _state <- 0
member this.WaitAsync(mode: OneManyMode) =
let mutable accessGranter = _noContentionAccessGranter
lock()
match mode with
| Exclusive ->
if isFree() then
makeWriter()
else
let tcs = new TaskCompletionSource<Object>()
_qWaitingWriters.Enqueue(tcs)
accessGranter <- tcs.Task
| Shared ->
if isFree() || (isOwnedByReader() && _qWaitingWriters.Count = 0) then
addReaders(1)
else
_numWaitingReaders <- _numWaitingReaders + 1
accessGranter <- _waitingReadersSignal.Task.ContinueWith(
fun (t: Task<Object>) ->
t.Result
)
unlock()
accessGranter
member this.Release() =
let mutable accessGranted: TaskCompletionSource<Object> option = None
lock()
if isOwnedByWriter() then makeFree()
else subtractReader()
if isFree() then
if _qWaitingWriters.Count > 0 then
makeWriter()
accessGranted <- Some <| _qWaitingWriters.Dequeue()
elif _numWaitingReaders > 0 then
addReaders(_numWaitingReaders)
_numWaitingReaders <- 0
accessGranted <- Some <| _waitingReadersSignal
_waitingReadersSignal <- new TaskCompletionSource<Object>()
unlock()
if accessGranted.IsSome then accessGranted.Value.SetResult(null)
|
namespace System.Threading
namespace System
namespace System.Collections
namespace System.Collections.Generic
namespace System.Threading.Tasks
type OneManyMode =
| Exclusive
| Shared
Full name: AP.Threading.OneManyMode
union case OneManyMode.Exclusive: OneManyMode
union case OneManyMode.Shared: OneManyMode
Multiple items
type SealedAttribute =
inherit Attribute
new : unit -> SealedAttribute
new : value:bool -> SealedAttribute
member Value : bool
Full name: Microsoft.FSharp.Core.SealedAttribute
--------------------
new : unit -> SealedAttribute
new : value:bool -> SealedAttribute
Multiple items
type AsyncOneManyLock =
new : unit -> AsyncOneManyLock
member Release : unit -> unit
member WaitAsync : mode:OneManyMode -> Task<Object>
Full name: AP.Threading.AsyncOneManyLock
--------------------
new : unit -> AsyncOneManyLock
Multiple items
type Task =
new : action:Action -> Task + 7 overloads
member AsyncState : obj
member ContinueWith : continuationAction:Action<Task> -> Task + 9 overloads
member CreationOptions : TaskCreationOptions
member Dispose : unit -> unit
member Exception : AggregateException
member Id : int
member IsCanceled : bool
member IsCompleted : bool
member IsFaulted : bool
...
Full name: System.Threading.Tasks.Task
--------------------
type Task<'TResult> =
inherit Task
new : function:Func<'TResult> -> Task<'TResult> + 7 overloads
member ContinueWith : continuationAction:Action<Task<'TResult>> -> Task + 9 overloads
member Result : 'TResult with get, set
static member Factory : TaskFactory<'TResult>
Full name: System.Threading.Tasks.Task<_>
--------------------
Task(action: Action) : unit
Task(action: Action, cancellationToken: Threading.CancellationToken) : unit
Task(action: Action, creationOptions: TaskCreationOptions) : unit
Task(action: Action<obj>, state: obj) : unit
Task(action: Action, cancellationToken: Threading.CancellationToken, creationOptions: TaskCreationOptions) : unit
Task(action: Action<obj>, state: obj, cancellationToken: Threading.CancellationToken) : unit
Task(action: Action<obj>, state: obj, creationOptions: TaskCreationOptions) : unit
Task(action: Action<obj>, state: obj, cancellationToken: Threading.CancellationToken, creationOptions: TaskCreationOptions) : unit
--------------------
Task(function: Func<'TResult>) : unit
Task(function: Func<'TResult>, cancellationToken: Threading.CancellationToken) : unit
Task(function: Func<'TResult>, creationOptions: TaskCreationOptions) : unit
Task(function: Func<obj,'TResult>, state: obj) : unit
Task(function: Func<'TResult>, cancellationToken: Threading.CancellationToken, creationOptions: TaskCreationOptions) : unit
Task(function: Func<obj,'TResult>, state: obj, cancellationToken: Threading.CancellationToken) : unit
Task(function: Func<obj,'TResult>, state: obj, creationOptions: TaskCreationOptions) : unit
Task(function: Func<obj,'TResult>, state: obj, cancellationToken: Threading.CancellationToken, creationOptions: TaskCreationOptions) : unit
Multiple items
type Object =
new : unit -> obj
member Equals : obj:obj -> bool
member GetHashCode : unit -> int
member GetType : unit -> Type
member ToString : unit -> string
static member Equals : objA:obj * objB:obj -> bool
static member ReferenceEquals : objA:obj * objB:obj -> bool
Full name: System.Object
--------------------
Object() : unit
Multiple items
type Queue<'T> =
new : unit -> Queue<'T> + 2 overloads
member Clear : unit -> unit
member Contains : item:'T -> bool
member CopyTo : array:'T[] * arrayIndex:int -> unit
member Count : int
member Dequeue : unit -> 'T
member Enqueue : item:'T -> unit
member GetEnumerator : unit -> Enumerator<'T>
member Peek : unit -> 'T
member ToArray : unit -> 'T[]
...
nested type Enumerator
Full name: System.Collections.Generic.Queue<_>
--------------------
Queue() : unit
Queue(capacity: int) : unit
Queue(collection: IEnumerable<'T>) : unit
Multiple items
type TaskCompletionSource<'TResult> =
new : unit -> TaskCompletionSource<'TResult> + 3 overloads
member SetCanceled : unit -> unit
member SetException : exception:Exception -> unit + 1 overload
member SetResult : result:'TResult -> unit
member Task : Task<'TResult>
member TrySetCanceled : unit -> bool
member TrySetException : exception:Exception -> bool + 1 overload
member TrySetResult : result:'TResult -> bool
Full name: System.Threading.Tasks.TaskCompletionSource<_>
--------------------
TaskCompletionSource() : unit
TaskCompletionSource(creationOptions: TaskCreationOptions) : unit
TaskCompletionSource(state: obj) : unit
TaskCompletionSource(state: obj, creationOptions: TaskCreationOptions) : unit
val lock : (unit -> 'a)
val mutable taken : bool
val _lock : obj
val unlock : (unit -> 'a)
val isFree : (unit -> bool)
val mutable _state : int
val isOwnedByWriter : (unit -> bool)
val isOwnedByReader : (unit -> bool)
val addReaders : (Int32 -> unit)
val count : Int32
type Int32 =
struct
member CompareTo : value:obj -> int + 1 overload
member Equals : obj:obj -> bool + 1 overload
member GetHashCode : unit -> int
member GetTypeCode : unit -> TypeCode
member ToString : unit -> string + 3 overloads
static val MaxValue : int
static val MinValue : int
static member Parse : s:string -> int + 3 overloads
static member TryParse : s:string * result:int -> bool + 1 overload
end
Full name: System.Int32
val subtractReader : (unit -> unit)
val makeWriter : (unit -> unit)
val makeFree : (unit -> unit)
val this : AsyncOneManyLock
member AsyncOneManyLock.WaitAsync : mode:OneManyMode -> Task<Object>
Full name: AP.Threading.AsyncOneManyLock.WaitAsync
val mode : OneManyMode
val mutable accessGranter : Task<Object>
val _noContentionAccessGranter : Task<Object>
val tcs : TaskCompletionSource<Object>
val _qWaitingWriters : Queue<TaskCompletionSource<Object>>
Queue.Enqueue(item: TaskCompletionSource<Object>) : unit
property TaskCompletionSource.Task: Task<Object>
property Queue.Count: int
val mutable _numWaitingReaders : int
val mutable _waitingReadersSignal : TaskCompletionSource<Object>
Task.ContinueWith<'TResult>(continuationFunction: Func<Task,'TResult>) : Task<'TResult>
(+0 other overloads)
Task.ContinueWith(continuationAction: Action<Task>) : Task
(+0 other overloads)
Task.ContinueWith<'TNewResult>(continuationFunction: Func<Task<Object>,'TNewResult>) : Task<'TNewResult>
(+0 other overloads)
Task.ContinueWith(continuationAction: Action<Task<Object>>) : Task
(+0 other overloads)
Task.ContinueWith<'TResult>(continuationFunction: Func<Task,'TResult>, continuationOptions: TaskContinuationOptions) : Task<'TResult>
(+0 other overloads)
Task.ContinueWith<'TResult>(continuationFunction: Func<Task,'TResult>, scheduler: TaskScheduler) : Task<'TResult>
(+0 other overloads)
Task.ContinueWith<'TResult>(continuationFunction: Func<Task,'TResult>, cancellationToken: Threading.CancellationToken) : Task<'TResult>
(+0 other overloads)
Task.ContinueWith(continuationAction: Action<Task>, continuationOptions: TaskContinuationOptions) : Task
(+0 other overloads)
Task.ContinueWith(continuationAction: Action<Task>, scheduler: TaskScheduler) : Task
(+0 other overloads)
Task.ContinueWith(continuationAction: Action<Task>, cancellationToken: Threading.CancellationToken) : Task
(+0 other overloads)
val t : Task<Object>
property Task.Result: Object
member AsyncOneManyLock.Release : unit -> unit
Full name: AP.Threading.AsyncOneManyLock.Release
val mutable accessGranted : TaskCompletionSource<Object> option
type 'T option = Option<'T>
Full name: Microsoft.FSharp.Core.option<_>
union case Option.None: Option<'T>
union case Option.Some: Value: 'T -> Option<'T>
Queue.Dequeue() : TaskCompletionSource<Object>
property Option.IsSome: bool
property Option.Value: TaskCompletionSource<Object>
TaskCompletionSource.SetResult(result: Object) : unit
More information