8 people like it.
Like the snippet!
Alternative Actor Implementation for Android
Actor implementation (much simplified) intended for use on mobile devices. Seems to provide better memory usage behavior than native F# Mailbox Processors (MPB) (on Android). Though is not as 'smooth' as native F# MBP. Smoothness here refers to how processing is balanced between consumers and producers (most relevant to single core machines).
1:
2:
3:
4:
5:
6:
7:
8:
9:
10:
11:
12:
13:
14:
15:
16:
17:
18:
19:
20:
21:
22:
23:
24:
25:
26:
27:
28:
29:
30:
31:
32:
33:
34:
35:
36:
37:
38:
39:
40:
41:
42:
43:
44:
45:
46:
47:
48:
49:
50:
51:
52:
53:
54:
55:
56:
57:
58:
59:
60:
61:
62:
63:
64:
65:
66:
67:
68:
69:
70:
71:
72:
73:
74:
75:
76:
77:
78:
79:
80:
81:
82:
83:
84:
85:
86:
87:
88:
89:
90:
91:
92:
|
namespace ActorModel
open System.Threading
open System
type ActorReplyChannel<'Reply>(replyf: 'Reply->unit) =
member x.Reply(reply) = replyf(reply)
type ActorMailbox<'a > () =
let queue = System.Collections.Generic.Queue<'a>()
let gotMsg = new AutoResetEvent(false)
let mutable currentLength = 0
let incr() = Interlocked.Increment(¤tLength) |> ignore
let decr() = Interlocked.Decrement(¤tLength) |> ignore
let dequeue() =
let v = lock queue (fun () -> queue.Dequeue())
decr()
v
let enqueue v =
lock queue (fun () -> queue.Enqueue(v))
incr()
gotMsg.Set() |> ignore
let sp = new System.Threading.SpinWait()
sp.SpinOnce()
member x.Receive() =
let rec loop() =
async {
let sp = new SpinWait()
sp.SpinOnce()
if currentLength > 0 then
let v = dequeue()
return v
else
let! b = Async.AwaitWaitHandle gotMsg
return! loop()}
loop()
member x.Post msg = enqueue msg
member x.CurrentQueueLength = currentLength
member x.PostAndReply (mConstructor, ?timeout:int) =
let timeout = defaultArg timeout Timeout.Infinite
let v = ref Unchecked.defaultof<_>
use gotReply = new ManualResetEvent(false)
let msg = mConstructor (new ActorReplyChannel<_>(fun reply ->
v := reply
gotReply.Set() |> ignore))
x.Post(msg)
match timeout with
| Timeout.Infinite ->
gotReply.WaitOne() |> ignore
!v
| _ ->
let ok = gotReply.WaitOne(timeout)
if ok then !v
else raise (TimeoutException("actor timed out"))
member x.PostAndAsyncReply (mConstructor, ? timeout:int) =
let timeout = defaultArg timeout Timeout.Infinite
let v = ref Unchecked.defaultof<_>
let gotReply = new ManualResetEvent(false)
let msg = mConstructor (new ActorReplyChannel<_>(fun reply ->
v := reply
gotReply.Set() |> ignore))
x.Post(msg)
match timeout with
| Timeout.Infinite ->
async {
let! _ = Async.AwaitWaitHandle(gotReply)
gotReply.Dispose()
return !v }
| _ ->
async {
let! ok = Async.AwaitWaitHandle(gotReply, timeout)
gotReply.Dispose()
if ok then return !v
else return! raise (TimeoutException("actor timed out"))}
static member Start(f,t) =
let mailbox = new ActorMailbox<'a>()
Async.Start(f mailbox,t)
mailbox //:> ActorMailbox<'a>
interface IDisposable with
member x.Dispose() = gotMsg.Dispose()
|
namespace System
namespace System.Threading
Multiple items
type ActorReplyChannel<'Reply> =
new : replyf:('Reply -> unit) -> ActorReplyChannel<'Reply>
member Reply : reply:'Reply -> unit
Full name: ActorModel.ActorReplyChannel<_>
--------------------
new : replyf:('Reply -> unit) -> ActorReplyChannel<'Reply>
val replyf : ('Reply -> unit)
type unit = Unit
Full name: Microsoft.FSharp.Core.unit
val x : ActorReplyChannel<'Reply>
member ActorReplyChannel.Reply : reply:'Reply -> unit
Full name: ActorModel.ActorReplyChannel`1.Reply
val reply : 'Reply
Multiple items
type ActorMailbox<'a> =
interface IDisposable
new : unit -> ActorMailbox<'a>
member Post : msg:'a -> unit
member PostAndAsyncReply : mConstructor:(ActorReplyChannel<'a0> -> 'a) * ?timeout:int -> Async<'a0>
member PostAndReply : mConstructor:(ActorReplyChannel<'a0> -> 'a) * ?timeout:int -> 'a0
member Receive : unit -> Async<'a>
member CurrentQueueLength : int
static member Start : f:(ActorMailbox<'a> -> Async<unit>) * t:CancellationToken -> ActorMailbox<'a>
Full name: ActorModel.ActorMailbox<_>
--------------------
new : unit -> ActorMailbox<'a>
val queue : Collections.Generic.Queue<'a>
namespace System.Collections
namespace System.Collections.Generic
Multiple items
type Queue<'T> =
new : unit -> Queue<'T> + 2 overloads
member Clear : unit -> unit
member Contains : item:'T -> bool
member CopyTo : array:'T[] * arrayIndex:int -> unit
member Count : int
member Dequeue : unit -> 'T
member Enqueue : item:'T -> unit
member GetEnumerator : unit -> Enumerator<'T>
member Peek : unit -> 'T
member ToArray : unit -> 'T[]
...
nested type Enumerator
Full name: System.Collections.Generic.Queue<_>
--------------------
Collections.Generic.Queue() : unit
Collections.Generic.Queue(capacity: int) : unit
Collections.Generic.Queue(collection: Collections.Generic.IEnumerable<'T>) : unit
val gotMsg : AutoResetEvent
Multiple items
type AutoResetEvent =
inherit EventWaitHandle
new : initialState:bool -> AutoResetEvent
Full name: System.Threading.AutoResetEvent
--------------------
AutoResetEvent(initialState: bool) : unit
val mutable currentLength : int
val incr : (unit -> unit)
type Interlocked =
static member Add : location1:int * value:int -> int + 1 overload
static member CompareExchange : location1:int * value:int * comparand:int -> int + 6 overloads
static member Decrement : location:int -> int + 1 overload
static member Exchange : location1:int * value:int -> int + 6 overloads
static member Increment : location:int -> int + 1 overload
static member Read : location:int64 -> int64
Full name: System.Threading.Interlocked
Interlocked.Increment(location: byref<int64>) : int64
Interlocked.Increment(location: byref<int>) : int
val ignore : value:'T -> unit
Full name: Microsoft.FSharp.Core.Operators.ignore
val decr : (unit -> unit)
Interlocked.Decrement(location: byref<int64>) : int64
Interlocked.Decrement(location: byref<int>) : int
val dequeue : (unit -> 'a)
val v : 'a
val lock : lockObject:'Lock -> action:(unit -> 'T) -> 'T (requires reference type)
Full name: Microsoft.FSharp.Core.Operators.lock
Collections.Generic.Queue.Dequeue() : 'a
val enqueue : ('a -> unit)
Collections.Generic.Queue.Enqueue(item: 'a) : unit
EventWaitHandle.Set() : bool
val sp : SpinWait
type SpinWait =
struct
member Count : int
member NextSpinWillYield : bool
member Reset : unit -> unit
member SpinOnce : unit -> unit
static member SpinUntil : condition:Func<bool> -> unit + 2 overloads
end
Full name: System.Threading.SpinWait
SpinWait.SpinOnce() : unit
val x : ActorMailbox<'a>
member ActorMailbox.Receive : unit -> Async<'a>
Full name: ActorModel.ActorMailbox`1.Receive
val loop : (unit -> Async<'a>)
val async : AsyncBuilder
Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.async
val b : bool
Multiple items
type Async
static member AsBeginEnd : computation:('Arg -> Async<'T>) -> ('Arg * AsyncCallback * obj -> IAsyncResult) * (IAsyncResult -> 'T) * (IAsyncResult -> unit)
static member AwaitEvent : event:IEvent<'Del,'T> * ?cancelAction:(unit -> unit) -> Async<'T> (requires delegate and 'Del :> Delegate)
static member AwaitIAsyncResult : iar:IAsyncResult * ?millisecondsTimeout:int -> Async<bool>
static member AwaitTask : task:Task<'T> -> Async<'T>
static member AwaitWaitHandle : waitHandle:WaitHandle * ?millisecondsTimeout:int -> Async<bool>
static member CancelDefaultToken : unit -> unit
static member Catch : computation:Async<'T> -> Async<Choice<'T,exn>>
static member FromBeginEnd : beginAction:(AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg:'Arg1 * beginAction:('Arg1 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg1:'Arg1 * arg2:'Arg2 * beginAction:('Arg1 * 'Arg2 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg1:'Arg1 * arg2:'Arg2 * arg3:'Arg3 * beginAction:('Arg1 * 'Arg2 * 'Arg3 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromContinuations : callback:(('T -> unit) * (exn -> unit) * (OperationCanceledException -> unit) -> unit) -> Async<'T>
static member Ignore : computation:Async<'T> -> Async<unit>
static member OnCancel : interruption:(unit -> unit) -> Async<IDisposable>
static member Parallel : computations:seq<Async<'T>> -> Async<'T []>
static member RunSynchronously : computation:Async<'T> * ?timeout:int * ?cancellationToken:CancellationToken -> 'T
static member Sleep : millisecondsDueTime:int -> Async<unit>
static member Start : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
static member StartAsTask : computation:Async<'T> * ?taskCreationOptions:TaskCreationOptions * ?cancellationToken:CancellationToken -> Task<'T>
static member StartChild : computation:Async<'T> * ?millisecondsTimeout:int -> Async<Async<'T>>
static member StartChildAsTask : computation:Async<'T> * ?taskCreationOptions:TaskCreationOptions -> Async<Task<'T>>
static member StartImmediate : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
static member StartWithContinuations : computation:Async<'T> * continuation:('T -> unit) * exceptionContinuation:(exn -> unit) * cancellationContinuation:(OperationCanceledException -> unit) * ?cancellationToken:CancellationToken -> unit
static member SwitchToContext : syncContext:SynchronizationContext -> Async<unit>
static member SwitchToNewThread : unit -> Async<unit>
static member SwitchToThreadPool : unit -> Async<unit>
static member TryCancelled : computation:Async<'T> * compensation:(OperationCanceledException -> unit) -> Async<'T>
static member CancellationToken : Async<CancellationToken>
static member DefaultCancellationToken : CancellationToken
Full name: Microsoft.FSharp.Control.Async
--------------------
type Async<'T>
Full name: Microsoft.FSharp.Control.Async<_>
static member Async.AwaitWaitHandle : waitHandle:WaitHandle * ?millisecondsTimeout:int -> Async<bool>
member ActorMailbox.Post : msg:'a -> unit
Full name: ActorModel.ActorMailbox`1.Post
val msg : 'a
member ActorMailbox.CurrentQueueLength : int
Full name: ActorModel.ActorMailbox`1.CurrentQueueLength
member ActorMailbox.PostAndReply : mConstructor:(ActorReplyChannel<'a0> -> 'a) * ?timeout:int -> 'a0
Full name: ActorModel.ActorMailbox`1.PostAndReply
val mConstructor : (ActorReplyChannel<'a> -> 'a0)
val timeout : int option
Multiple items
val int : value:'T -> int (requires member op_Explicit)
Full name: Microsoft.FSharp.Core.Operators.int
--------------------
type int = int32
Full name: Microsoft.FSharp.Core.int
--------------------
type int<'Measure> = int
Full name: Microsoft.FSharp.Core.int<_>
val timeout : int
val defaultArg : arg:'T option -> defaultValue:'T -> 'T
Full name: Microsoft.FSharp.Core.Operators.defaultArg
type Timeout =
static val Infinite : int
Full name: System.Threading.Timeout
field Timeout.Infinite = -1
val v : 'a ref
Multiple items
val ref : value:'T -> 'T ref
Full name: Microsoft.FSharp.Core.Operators.ref
--------------------
type 'T ref = Ref<'T>
Full name: Microsoft.FSharp.Core.ref<_>
module Unchecked
from Microsoft.FSharp.Core.Operators
val defaultof<'T> : 'T
Full name: Microsoft.FSharp.Core.Operators.Unchecked.defaultof
val gotReply : ManualResetEvent
Multiple items
type ManualResetEvent =
inherit EventWaitHandle
new : initialState:bool -> ManualResetEvent
Full name: System.Threading.ManualResetEvent
--------------------
ManualResetEvent(initialState: bool) : unit
val reply : 'a
member ActorMailbox.Post : msg:'a -> unit
WaitHandle.WaitOne() : bool
WaitHandle.WaitOne(timeout: TimeSpan) : bool
WaitHandle.WaitOne(millisecondsTimeout: int) : bool
WaitHandle.WaitOne(timeout: TimeSpan, exitContext: bool) : bool
WaitHandle.WaitOne(millisecondsTimeout: int, exitContext: bool) : bool
val ok : bool
val raise : exn:Exception -> 'T
Full name: Microsoft.FSharp.Core.Operators.raise
Multiple items
type TimeoutException =
inherit SystemException
new : unit -> TimeoutException + 2 overloads
Full name: System.TimeoutException
--------------------
TimeoutException() : unit
TimeoutException(message: string) : unit
TimeoutException(message: string, innerException: exn) : unit
member ActorMailbox.PostAndAsyncReply : mConstructor:(ActorReplyChannel<'a0> -> 'a) * ?timeout:int -> Async<'a0>
Full name: ActorModel.ActorMailbox`1.PostAndAsyncReply
WaitHandle.Dispose() : unit
static member ActorMailbox.Start : f:(ActorMailbox<'a> -> Async<unit>) * t:CancellationToken -> ActorMailbox<'a>
Full name: ActorModel.ActorMailbox`1.Start
val f : (ActorMailbox<'a> -> Async<unit>)
val t : CancellationToken
val mailbox : ActorMailbox<'a>
static member Async.Start : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
type IDisposable =
member Dispose : unit -> unit
Full name: System.IDisposable
override ActorMailbox.Dispose : unit -> unit
Full name: ActorModel.ActorMailbox`1.Dispose
More information