3 people like it.

Different Actors implementation

Discussion here http://stackoverflow.com/a/21434034/801189

  1: 
  2: 
  3: 
  4: 
  5: 
  6: 
  7: 
  8: 
  9: 
 10: 
 11: 
 12: 
 13: 
 14: 
 15: 
 16: 
 17: 
 18: 
 19: 
 20: 
 21: 
 22: 
 23: 
 24: 
 25: 
 26: 
 27: 
 28: 
 29: 
 30: 
 31: 
 32: 
 33: 
 34: 
 35: 
 36: 
 37: 
 38: 
 39: 
 40: 
 41: 
 42: 
 43: 
 44: 
 45: 
 46: 
 47: 
 48: 
 49: 
 50: 
 51: 
 52: 
 53: 
 54: 
 55: 
 56: 
 57: 
 58: 
 59: 
 60: 
 61: 
 62: 
 63: 
 64: 
 65: 
 66: 
 67: 
 68: 
 69: 
 70: 
 71: 
 72: 
 73: 
 74: 
 75: 
 76: 
 77: 
 78: 
 79: 
 80: 
 81: 
 82: 
 83: 
 84: 
 85: 
 86: 
 87: 
 88: 
 89: 
 90: 
 91: 
 92: 
 93: 
 94: 
 95: 
 96: 
 97: 
 98: 
 99: 
100: 
101: 
102: 
103: 
104: 
105: 
106: 
107: 
108: 
109: 
110: 
111: 
112: 
113: 
114: 
115: 
116: 
117: 
118: 
119: 
120: 
121: 
122: 
123: 
124: 
125: 
126: 
127: 
128: 
129: 
130: 
131: 
132: 
133: 
134: 
135: 
136: 
137: 
138: 
139: 
140: 
141: 
142: 
143: 
144: 
145: 
146: 
147: 
148: 
149: 
150: 
151: 
152: 
153: 
154: 
155: 
156: 
157: 
158: 
159: 
160: 
161: 
162: 
163: 
164: 
165: 
166: 
167: 
168: 
169: 
170: 
171: 
172: 
173: 
174: 
175: 
176: 
177: 
178: 
179: 
180: 
181: 
182: 
183: 
184: 
185: 
186: 
187: 
188: 
189: 
190: 
191: 
192: 
193: 
194: 
195: 
196: 
197: 
198: 
199: 
200: 
201: 
202: 
203: 
204: 
205: 
206: 
207: 
208: 
209: 
210: 
211: 
212: 
213: 
214: 
215: 
216: 
217: 
218: 
219: 
220: 
221: 
222: 
223: 
224: 
225: 
226: 
227: 
228: 
229: 
230: 
231: 
232: 
233: 
234: 
235: 
236: 
237: 
238: 
239: 
240: 
241: 
242: 
243: 
244: 
245: 
246: 
247: 
248: 
249: 
250: 
251: 
252: 
253: 
254: 
255: 
256: 
257: 
258: 
259: 
260: 
261: 
262: 
263: 
264: 
265: 
266: 
267: 
268: 
269: 
270: 
271: 
272: 
273: 
274: 
275: 
276: 
277: 
278: 
279: 
280: 
281: 
282: 
283: 
284: 
285: 
286: 
287: 
288: 
289: 
290: 
291: 
292: 
293: 
294: 
295: 
296: 
297: 
298: 
299: 
300: 
301: 
302: 
303: 
304: 
305: 
306: 
307: 
308: 
309: 
310: 
311: 
312: 
313: 
314: 
315: 
316: 
317: 
318: 
open System
open System.Diagnostics
open Microsoft.FSharp.Control
open System.Threading
open System.Threading.Tasks
open System.Threading.Tasks.Dataflow
open System.Collections.Concurrent


/// Code to measure the number of messages the
/// agent can process per second on a number of threads.
let test name f g =
   let test countPerThread =
      let threadCount = System.Environment.ProcessorCount
      let msgCount = threadCount * countPerThread
      let watch = Stopwatch.StartNew()
      let incrementers =
         Array.Parallel.init threadCount (fun _ ->
            for i = 1 to countPerThread do f 100L)
      let expectedCount = int64 countPerThread * int64 threadCount * 100L 
      let finalCount = g() // expectedCount //
      watch.Stop()
      if finalCount <> expectedCount then
         failwith "Didn't work!"
      int (float msgCount / watch.Elapsed.TotalSeconds)
      
   // Warm up!
   test 10000 |> ignore<int>
   System.GC.Collect()
   // Real test
   let msgsPerSecond = test 500000
   printfn "%s processed %i msgs/sec" name msgsPerSecond


type CounterMsg =
   | Add of int64
   | GetAndReset of (int64 -> unit)

let vanillaCounter = 
   MailboxProcessor.Start <| fun inbox ->
      let rec loop count = async {
         let! msg = inbox.Receive()
         match msg with
         | Add n -> return! loop (count + n)
         | GetAndReset reply ->
            reply count
            return! loop 0L
      }
      loop 0L

//test "Vanilla Actor (MailboxProcessor)" 
//   (fun i -> vanillaCounter.Post <| Add i) 
//   (fun () -> vanillaCounter.PostAndReply(fun channel -> GetAndReset channel.Reply))



type 'a ISimpleActor =
   inherit IDisposable
   abstract Post : msg:'a -> unit
   abstract PostAndReply<'b> : msgFactory:(('b -> unit) -> 'a) -> 'b

type 'a SimpleMailbox() =
   let msgs = ConcurrentQueue<'a>()
   let onMsg = new AutoResetEvent(false)

   member __.Receive() =
      let rec await() = async {
         let mutable value = Unchecked.defaultof<_>
         let hasValue = msgs.TryDequeue(&value)
         if hasValue then return value
         else 
            let! _ = Async.AwaitWaitHandle onMsg
            return! await()        
      }
      await()

   member __.Post msg = 
      msgs.Enqueue msg
      onMsg.Set() |> ignore<bool>

   member __.PostAndReply<'b> msgFactory =
      let value = ref Unchecked.defaultof<'b>
      use onReply = new AutoResetEvent(false) 
      let msg = msgFactory (fun x ->
         value := x
         onReply.Set() |> ignore<bool>
      )
      __.Post msg
      onReply.WaitOne() |> ignore<bool>
      !value

   interface 'a ISimpleActor with
      member __.Post msg = __.Post msg
      member __.PostAndReply msgFactory = __.PostAndReply msgFactory
      member __.Dispose() = onMsg.Dispose()

module SimpleActor =
   let Start f =
      let mailbox = new SimpleMailbox<_>()
      f mailbox |> Async.Start
      mailbox :> _ ISimpleActor

let simpleActor = 
   SimpleActor.Start <| fun inbox ->
      let rec loop count = async {
         let! msg = inbox.Receive()
         match msg with
         | Add n -> return! loop (count + n)
         | GetAndReset reply ->
            reply count
            return! loop 0L
      }
      loop 0L

//test "Simple Actor" 
//   (fun i -> simpleActor.Post <| Add i) 
//   (fun () -> simpleActor.PostAndReply GetAndReset)


type 'a ISharedActor =
   abstract Post : msg:'a -> unit
   abstract PostAndReply<'b> : msgFactory:(('b -> unit) -> 'a) -> 'b

type 'a SharedMailbox() =
   let msgs = ConcurrentQueue<'a>()
   let mutable isStarted = false
   let mutable msgCount = 0
   let mutable react = Unchecked.defaultof<_>
   let mutable currentMessage = Unchecked.defaultof<_>

   let rec execute(isFirst) =

      let inline consumeAndLoop() =
         react currentMessage
         currentMessage <- Unchecked.defaultof<_>
         let newCount = Interlocked.Decrement &msgCount
         if newCount <> 0 then execute false

      if isFirst then consumeAndLoop()
      else
         let hasMessage = msgs.TryDequeue(&currentMessage)
         if hasMessage then consumeAndLoop()
         else 
            Thread.SpinWait 20
            execute false
   
   member __.Receive(callback) = 
      isStarted <- true
      react <- callback

   member __.Post msg =
      while not isStarted do Thread.SpinWait 20
      let newCount = Interlocked.Increment &msgCount
      if newCount = 1 then
         currentMessage <- msg
         // Might want to schedule this call on another thread.
         execute true
      else msgs.Enqueue msg
   
   member __.PostAndReply msgFactory =
      let value = ref Unchecked.defaultof<'b>
      use onReply = new AutoResetEvent(false)
      let msg = msgFactory (fun x ->
         value := x
         onReply.Set() |> ignore<bool>
      )
      __.Post msg
      onReply.WaitOne() |> ignore<bool>
      !value


   interface 'a ISharedActor with
      member __.Post msg = __.Post msg
      member __.PostAndReply msgFactory = __.PostAndReply msgFactory

module SharedActor =
   let Start f =
      let mailbox = new SharedMailbox<_>()
      f mailbox
      mailbox :> _ ISharedActor

let sharedActor = 
   SharedActor.Start <| fun inbox ->
      let rec loop count =
         inbox.Receive(fun msg ->
            match msg with
            | Add n -> loop (count + n)
            | GetAndReset reply ->
               reply count
               loop 0L)
      loop 0L

//test "Shared Actor" 
//   (fun i -> sharedActor.Post <| Add i) 
//   (fun () -> sharedActor.PostAndReply GetAndReset)




[<Sealed>]
type AsyncReplyChannel<'Reply> internal (replyf : 'Reply -> unit) =
    member x.Reply(reply) = replyf(reply)

[<Sealed>]
type internal AsyncResultCell<'a>() =
    let source = new TaskCompletionSource<'a>()

    member x.RegisterResult result = source.SetResult(result)

    member x.AsyncWaitResult =
        Async.FromContinuations(fun (cont,_,_) -> 
            let apply = fun (task:Task<_>) -> cont (task.Result)
            source.Task.ContinueWith(apply) |> ignore)

    member x.GetWaitHandle(timeout:int) =
        async { let waithandle = source.Task.Wait(timeout)
                return waithandle }

    member x.GrabResult() = source.Task.Result

    member x.TryWaitResultSynchronously(timeout:int) = 
        //early completion check
        if source.Task.IsCompleted then 
            Some source.Task.Result
        //now force a wait for the task to complete
        else 
            if source.Task.Wait(timeout) then 
                Some source.Task.Result
            else None

type DataflowAgent<'Msg>(initial, ?cancellationToken) =
    let cancellationToken =
        defaultArg cancellationToken Async.DefaultCancellationToken
    let mutable started = false
    let errorEvent = new Event<System.Exception>()
    let incomingMessages = new BufferBlock<'Msg>()
    let mutable defaultTimeout = Timeout.Infinite

    [<CLIEvent>]
    member this.Error = errorEvent.Publish

    member this.Start() =
        if started
            then raise (new InvalidOperationException("Already Started."))
        else
            started <- true
            let comp = async { try do! initial this
                               with error -> errorEvent.Trigger error }
            Async.Start(computation = comp, cancellationToken = cancellationToken)

    member this.Receive(?timeout) =
        Async.AwaitTask <| incomingMessages.ReceiveAsync()

    member this.Post(item) =
        let posted = incomingMessages.Post(item)
        if not posted then
            raise (InvalidOperationException("Incoming message buffer full."))

    member this.PostAndTryAsyncReply(replyChannelMsg, ?timeout) =
        let timeout = defaultArg timeout defaultTimeout
        let resultCell = AsyncResultCell<_>()
        let msg = replyChannelMsg(AsyncReplyChannel<_>(fun reply -> resultCell.RegisterResult(reply)))
        let posted = incomingMessages.Post(msg)
        if posted then
            match timeout with
            |   Threading.Timeout.Infinite ->
                    async { let! result = resultCell.AsyncWaitResult
                            return Some(result) }
            |   _ ->
                    async { let! ok =  resultCell.GetWaitHandle(timeout)
                            let res = (if ok then Some(resultCell.GrabResult()) else None)
                            return res }
        else async{return None}

    member this.PostAndAsyncReply( replyChannelMsg, ?timeout) =
            let timeout = defaultArg timeout defaultTimeout
            match timeout with
            |   Threading.Timeout.Infinite ->
                let resCell = AsyncResultCell<_>()
                let msg = replyChannelMsg (AsyncReplyChannel<_>(fun reply -> resCell.RegisterResult(reply) ))
                let posted = incomingMessages.Post(msg)
                if posted then
                    resCell.AsyncWaitResult
                else
                    raise (InvalidOperationException("Incoming message buffer full."))
            |   _ ->
                let asyncReply = this.PostAndTryAsyncReply(replyChannelMsg, timeout=timeout)
                async { let! res = asyncReply
                        match res with
                        | None ->  return! raise (TimeoutException("PostAndAsyncReply TimedOut"))
                        | Some res -> return res }

    static member Start(initial, ?cancellationToken) =
        let dfa = DataflowAgent<'Msg>(initial, ?cancellationToken = cancellationToken)
        dfa.Start()
        dfa

let dataflowAgent = DataflowAgent.Start <| fun inbox ->
      let rec loop count = async {
         let! msg = inbox.Receive()
         match msg with
         | Add n -> return! loop (count + n)
         | GetAndReset reply ->
            reply count
            return! loop 0L
      }
      loop 0L

test "DataFlow Actor" 
   (fun i -> dataflowAgent.Post <| Add i) 
   (fun () -> 
        Async.RunSynchronously(
                dataflowAgent.PostAndAsyncReply(fun channel -> GetAndReset channel.Reply) 
        )
    )


Console.ReadLine() |> ignore
namespace System
namespace System.Diagnostics
namespace Microsoft
namespace Microsoft.FSharp
namespace Microsoft.FSharp.Control
namespace System.Threading
namespace System.Threading.Tasks
namespace System.Collections
namespace System.Collections.Concurrent
val test : name:string -> f:(int64 -> unit) -> g:(unit -> int64) -> unit

Full name: Script.test


 Code to measure the number of messages the
 agent can process per second on a number of threads.
val name : string
val f : (int64 -> unit)
val g : (unit -> int64)
val test : (int -> int)
val countPerThread : int
val threadCount : int
type Environment =
  static member CommandLine : string
  static member CurrentDirectory : string with get, set
  static member Exit : exitCode:int -> unit
  static member ExitCode : int with get, set
  static member ExpandEnvironmentVariables : name:string -> string
  static member FailFast : message:string -> unit + 1 overload
  static member GetCommandLineArgs : unit -> string[]
  static member GetEnvironmentVariable : variable:string -> string + 1 overload
  static member GetEnvironmentVariables : unit -> IDictionary + 1 overload
  static member GetFolderPath : folder:SpecialFolder -> string + 1 overload
  ...
  nested type SpecialFolder
  nested type SpecialFolderOption

Full name: System.Environment
property Environment.ProcessorCount: int
val msgCount : int
val watch : Stopwatch
Multiple items
type Stopwatch =
  new : unit -> Stopwatch
  member Elapsed : TimeSpan
  member ElapsedMilliseconds : int64
  member ElapsedTicks : int64
  member IsRunning : bool
  member Reset : unit -> unit
  member Restart : unit -> unit
  member Start : unit -> unit
  member Stop : unit -> unit
  static val Frequency : int64
  ...

Full name: System.Diagnostics.Stopwatch

--------------------
Stopwatch() : unit
Stopwatch.StartNew() : Stopwatch
val incrementers : unit []
type Array =
  member Clone : unit -> obj
  member CopyTo : array:Array * index:int -> unit + 1 overload
  member GetEnumerator : unit -> IEnumerator
  member GetLength : dimension:int -> int
  member GetLongLength : dimension:int -> int64
  member GetLowerBound : dimension:int -> int
  member GetUpperBound : dimension:int -> int
  member GetValue : [<ParamArray>] indices:int[] -> obj + 7 overloads
  member Initialize : unit -> unit
  member IsFixedSize : bool
  ...

Full name: System.Array
module Parallel

from Microsoft.FSharp.Collections.ArrayModule
val init : count:int -> initializer:(int -> 'T) -> 'T []

Full name: Microsoft.FSharp.Collections.ArrayModule.Parallel.init
val i : int
val expectedCount : int64
Multiple items
val int64 : value:'T -> int64 (requires member op_Explicit)

Full name: Microsoft.FSharp.Core.Operators.int64

--------------------
type int64 = Int64

Full name: Microsoft.FSharp.Core.int64

--------------------
type int64<'Measure> = int64

Full name: Microsoft.FSharp.Core.int64<_>
val finalCount : int64
Stopwatch.Stop() : unit
val failwith : message:string -> 'T

Full name: Microsoft.FSharp.Core.Operators.failwith
Multiple items
val int : value:'T -> int (requires member op_Explicit)

Full name: Microsoft.FSharp.Core.Operators.int

--------------------
type int = int32

Full name: Microsoft.FSharp.Core.int

--------------------
type int<'Measure> = int

Full name: Microsoft.FSharp.Core.int<_>
Multiple items
val float : value:'T -> float (requires member op_Explicit)

Full name: Microsoft.FSharp.Core.Operators.float

--------------------
type float = Double

Full name: Microsoft.FSharp.Core.float

--------------------
type float<'Measure> = float

Full name: Microsoft.FSharp.Core.float<_>
property Stopwatch.Elapsed: TimeSpan
property TimeSpan.TotalSeconds: float
val ignore : value:'T -> unit

Full name: Microsoft.FSharp.Core.Operators.ignore
type GC =
  static member AddMemoryPressure : bytesAllocated:int64 -> unit
  static member CancelFullGCNotification : unit -> unit
  static member Collect : unit -> unit + 2 overloads
  static member CollectionCount : generation:int -> int
  static member GetGeneration : obj:obj -> int + 1 overload
  static member GetTotalMemory : forceFullCollection:bool -> int64
  static member KeepAlive : obj:obj -> unit
  static member MaxGeneration : int
  static member ReRegisterForFinalize : obj:obj -> unit
  static member RegisterForFullGCNotification : maxGenerationThreshold:int * largeObjectHeapThreshold:int -> unit
  ...

Full name: System.GC
GC.Collect() : unit
GC.Collect(generation: int) : unit
GC.Collect(generation: int, mode: GCCollectionMode) : unit
val msgsPerSecond : int
val printfn : format:Printf.TextWriterFormat<'T> -> 'T

Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.printfn
type CounterMsg =
  | Add of int64
  | GetAndReset of (int64 -> unit)

Full name: Script.CounterMsg
union case CounterMsg.Add: int64 -> CounterMsg
union case CounterMsg.GetAndReset: (int64 -> unit) -> CounterMsg
type unit = Unit

Full name: Microsoft.FSharp.Core.unit
val vanillaCounter : MailboxProcessor<CounterMsg>

Full name: Script.vanillaCounter
Multiple items
type MailboxProcessor<'Msg> =
  interface IDisposable
  new : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:CancellationToken -> MailboxProcessor<'Msg>
  member Post : message:'Msg -> unit
  member PostAndAsyncReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> Async<'Reply>
  member PostAndReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> 'Reply
  member PostAndTryAsyncReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> Async<'Reply option>
  member Receive : ?timeout:int -> Async<'Msg>
  member Scan : scanner:('Msg -> Async<'T> option) * ?timeout:int -> Async<'T>
  member Start : unit -> unit
  member TryPostAndReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> 'Reply option
  ...

Full name: Microsoft.FSharp.Control.MailboxProcessor<_>

--------------------
new : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:CancellationToken -> MailboxProcessor<'Msg>
static member MailboxProcessor.Start : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:CancellationToken -> MailboxProcessor<'Msg>
val inbox : MailboxProcessor<CounterMsg>
val loop : (int64 -> Async<'a>)
val count : int64
val async : AsyncBuilder

Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.async
val msg : CounterMsg
member MailboxProcessor.Receive : ?timeout:int -> Async<'Msg>
val n : int64
val reply : (int64 -> unit)
type 'a ISimpleActor =
  interface
    inherit IDisposable
    abstract member Post : msg:'a -> unit
    abstract member PostAndReply : msgFactory:(('b -> unit) -> 'a) -> 'b
  end

Full name: Script.ISimpleActor<_>
type IDisposable =
  member Dispose : unit -> unit

Full name: System.IDisposable
abstract member ISimpleActor.Post : msg:'a -> unit

Full name: Script.ISimpleActor`1.Post
abstract member ISimpleActor.PostAndReply : msgFactory:(('b -> unit) -> 'a) -> 'b

Full name: Script.ISimpleActor`1.PostAndReply
Multiple items
type 'a SimpleMailbox =
  interface 'a ISimpleActor
  new : unit -> 'a SimpleMailbox
  member Post : msg:'a -> unit
  member PostAndReply : msgFactory:(('b -> unit) -> 'a) -> 'b
  member Receive : unit -> Async<'a>

Full name: Script.SimpleMailbox<_>

--------------------
new : unit -> 'a SimpleMailbox
val msgs : ConcurrentQueue<'a>
Multiple items
type ConcurrentQueue<'T> =
  new : unit -> ConcurrentQueue<'T> + 1 overload
  member CopyTo : array:'T[] * index:int -> unit
  member Count : int
  member Enqueue : item:'T -> unit
  member GetEnumerator : unit -> IEnumerator<'T>
  member IsEmpty : bool
  member ToArray : unit -> 'T[]
  member TryDequeue : result:'T -> bool
  member TryPeek : result:'T -> bool

Full name: System.Collections.Concurrent.ConcurrentQueue<_>

--------------------
ConcurrentQueue() : unit
ConcurrentQueue(collection: Collections.Generic.IEnumerable<'T>) : unit
val onMsg : AutoResetEvent
Multiple items
type AutoResetEvent =
  inherit EventWaitHandle
  new : initialState:bool -> AutoResetEvent

Full name: System.Threading.AutoResetEvent

--------------------
AutoResetEvent(initialState: bool) : unit
member SimpleMailbox.Receive : unit -> Async<'a>

Full name: Script.SimpleMailbox`1.Receive
val await : (unit -> Async<'a>)
val mutable value : 'a
module Unchecked

from Microsoft.FSharp.Core.Operators
val defaultof<'T> : 'T

Full name: Microsoft.FSharp.Core.Operators.Unchecked.defaultof
val hasValue : bool
ConcurrentQueue.TryDequeue(result: byref<'a>) : bool
Multiple items
type Async
static member AsBeginEnd : computation:('Arg -> Async<'T>) -> ('Arg * AsyncCallback * obj -> IAsyncResult) * (IAsyncResult -> 'T) * (IAsyncResult -> unit)
static member AwaitEvent : event:IEvent<'Del,'T> * ?cancelAction:(unit -> unit) -> Async<'T> (requires delegate and 'Del :> Delegate)
static member AwaitIAsyncResult : iar:IAsyncResult * ?millisecondsTimeout:int -> Async<bool>
static member AwaitTask : task:Task<'T> -> Async<'T>
static member AwaitWaitHandle : waitHandle:WaitHandle * ?millisecondsTimeout:int -> Async<bool>
static member CancelDefaultToken : unit -> unit
static member Catch : computation:Async<'T> -> Async<Choice<'T,exn>>
static member FromBeginEnd : beginAction:(AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg:'Arg1 * beginAction:('Arg1 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg1:'Arg1 * arg2:'Arg2 * beginAction:('Arg1 * 'Arg2 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg1:'Arg1 * arg2:'Arg2 * arg3:'Arg3 * beginAction:('Arg1 * 'Arg2 * 'Arg3 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromContinuations : callback:(('T -> unit) * (exn -> unit) * (OperationCanceledException -> unit) -> unit) -> Async<'T>
static member Ignore : computation:Async<'T> -> Async<unit>
static member OnCancel : interruption:(unit -> unit) -> Async<IDisposable>
static member Parallel : computations:seq<Async<'T>> -> Async<'T []>
static member RunSynchronously : computation:Async<'T> * ?timeout:int * ?cancellationToken:CancellationToken -> 'T
static member Sleep : millisecondsDueTime:int -> Async<unit>
static member Start : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
static member StartAsTask : computation:Async<'T> * ?taskCreationOptions:TaskCreationOptions * ?cancellationToken:CancellationToken -> Task<'T>
static member StartChild : computation:Async<'T> * ?millisecondsTimeout:int -> Async<Async<'T>>
static member StartChildAsTask : computation:Async<'T> * ?taskCreationOptions:TaskCreationOptions -> Async<Task<'T>>
static member StartImmediate : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
static member StartWithContinuations : computation:Async<'T> * continuation:('T -> unit) * exceptionContinuation:(exn -> unit) * cancellationContinuation:(OperationCanceledException -> unit) * ?cancellationToken:CancellationToken -> unit
static member SwitchToContext : syncContext:SynchronizationContext -> Async<unit>
static member SwitchToNewThread : unit -> Async<unit>
static member SwitchToThreadPool : unit -> Async<unit>
static member TryCancelled : computation:Async<'T> * compensation:(OperationCanceledException -> unit) -> Async<'T>
static member CancellationToken : Async<CancellationToken>
static member DefaultCancellationToken : CancellationToken

Full name: Microsoft.FSharp.Control.Async

--------------------
type Async<'T>

Full name: Microsoft.FSharp.Control.Async<_>
static member Async.AwaitWaitHandle : waitHandle:WaitHandle * ?millisecondsTimeout:int -> Async<bool>
val __ : 'a SimpleMailbox
member SimpleMailbox.Post : msg:'a -> unit

Full name: Script.SimpleMailbox`1.Post
val msg : 'a
ConcurrentQueue.Enqueue(item: 'a) : unit
EventWaitHandle.Set() : bool
type bool = Boolean

Full name: Microsoft.FSharp.Core.bool
member SimpleMailbox.PostAndReply : msgFactory:(('b -> unit) -> 'a) -> 'b

Full name: Script.SimpleMailbox`1.PostAndReply
val msgFactory : (('b -> unit) -> 'a)
val value : 'b ref
Multiple items
val ref : value:'T -> 'T ref

Full name: Microsoft.FSharp.Core.Operators.ref

--------------------
type 'T ref = Ref<'T>

Full name: Microsoft.FSharp.Core.ref<_>
val onReply : AutoResetEvent
val x : 'b
member SimpleMailbox.Post : msg:'a -> unit
WaitHandle.WaitOne() : bool
WaitHandle.WaitOne(timeout: TimeSpan) : bool
WaitHandle.WaitOne(millisecondsTimeout: int) : bool
WaitHandle.WaitOne(timeout: TimeSpan, exitContext: bool) : bool
WaitHandle.WaitOne(millisecondsTimeout: int, exitContext: bool) : bool
override SimpleMailbox.Post : msg:'a -> unit

Full name: Script.SimpleMailbox`1.Post
override SimpleMailbox.PostAndReply : msgFactory:(('a0 -> unit) -> 'a) -> 'a0

Full name: Script.SimpleMailbox`1.PostAndReply
val msgFactory : (('a -> unit) -> 'a0)
member SimpleMailbox.PostAndReply : msgFactory:(('b -> unit) -> 'a) -> 'b
override SimpleMailbox.Dispose : unit -> unit

Full name: Script.SimpleMailbox`1.Dispose
WaitHandle.Dispose() : unit
val Start : f:('a SimpleMailbox -> Async<unit>) -> 'a ISimpleActor

Full name: Script.SimpleActor.Start
val f : ('a SimpleMailbox -> Async<unit>)
val mailbox : 'a SimpleMailbox
static member Async.Start : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
val simpleActor : CounterMsg ISimpleActor

Full name: Script.simpleActor
module SimpleActor

from Script
val inbox : CounterMsg SimpleMailbox
member SimpleMailbox.Receive : unit -> Async<'a>
type 'a ISharedActor =
  interface
    abstract member Post : msg:'a -> unit
    abstract member PostAndReply : msgFactory:(('b -> unit) -> 'a) -> 'b
  end

Full name: Script.ISharedActor<_>
abstract member ISharedActor.Post : msg:'a -> unit

Full name: Script.ISharedActor`1.Post
abstract member ISharedActor.PostAndReply : msgFactory:(('b -> unit) -> 'a) -> 'b

Full name: Script.ISharedActor`1.PostAndReply
Multiple items
type 'a SharedMailbox =
  interface 'a ISharedActor
  new : unit -> 'a SharedMailbox
  member Post : msg:'a -> unit
  member PostAndReply : msgFactory:(('b -> unit) -> 'a) -> 'b
  member Receive : callback:('a -> unit) -> unit

Full name: Script.SharedMailbox<_>

--------------------
new : unit -> 'a SharedMailbox
val mutable isStarted : bool
val mutable msgCount : int
val mutable react : ('a -> unit)
val mutable currentMessage : 'a
val execute : (bool -> unit)
val isFirst : bool
val consumeAndLoop : (unit -> unit)
val newCount : int
type Interlocked =
  static member Add : location1:int * value:int -> int + 1 overload
  static member CompareExchange : location1:int * value:int * comparand:int -> int + 6 overloads
  static member Decrement : location:int -> int + 1 overload
  static member Exchange : location1:int * value:int -> int + 6 overloads
  static member Increment : location:int -> int + 1 overload
  static member Read : location:int64 -> int64

Full name: System.Threading.Interlocked
Interlocked.Decrement(location: byref<int64>) : int64
Interlocked.Decrement(location: byref<int>) : int
val hasMessage : bool
Multiple items
type Thread =
  inherit CriticalFinalizerObject
  new : start:ThreadStart -> Thread + 3 overloads
  member Abort : unit -> unit + 1 overload
  member ApartmentState : ApartmentState with get, set
  member CurrentCulture : CultureInfo with get, set
  member CurrentUICulture : CultureInfo with get, set
  member DisableComObjectEagerCleanup : unit -> unit
  member ExecutionContext : ExecutionContext
  member GetApartmentState : unit -> ApartmentState
  member GetCompressedStack : unit -> CompressedStack
  member GetHashCode : unit -> int
  ...

Full name: System.Threading.Thread

--------------------
Thread(start: ThreadStart) : unit
Thread(start: ParameterizedThreadStart) : unit
Thread(start: ThreadStart, maxStackSize: int) : unit
Thread(start: ParameterizedThreadStart, maxStackSize: int) : unit
Thread.SpinWait(iterations: int) : unit
member SharedMailbox.Receive : callback:('a -> unit) -> unit

Full name: Script.SharedMailbox`1.Receive
val callback : ('a -> unit)
val __ : 'a SharedMailbox
member SharedMailbox.Post : msg:'a -> unit

Full name: Script.SharedMailbox`1.Post
val not : value:bool -> bool

Full name: Microsoft.FSharp.Core.Operators.not
Interlocked.Increment(location: byref<int64>) : int64
Interlocked.Increment(location: byref<int>) : int
member SharedMailbox.PostAndReply : msgFactory:(('b -> unit) -> 'a) -> 'b

Full name: Script.SharedMailbox`1.PostAndReply
member SharedMailbox.Post : msg:'a -> unit
override SharedMailbox.Post : msg:'a -> unit

Full name: Script.SharedMailbox`1.Post
override SharedMailbox.PostAndReply : msgFactory:(('a0 -> unit) -> 'a) -> 'a0

Full name: Script.SharedMailbox`1.PostAndReply
member SharedMailbox.PostAndReply : msgFactory:(('b -> unit) -> 'a) -> 'b
val Start : f:('a SharedMailbox -> unit) -> 'a ISharedActor

Full name: Script.SharedActor.Start
val f : ('a SharedMailbox -> unit)
val mailbox : 'a SharedMailbox
val sharedActor : CounterMsg ISharedActor

Full name: Script.sharedActor
module SharedActor

from Script
val inbox : CounterMsg SharedMailbox
val loop : (int64 -> unit)
member SharedMailbox.Receive : callback:('a -> unit) -> unit
Multiple items
type SealedAttribute =
  inherit Attribute
  new : unit -> SealedAttribute
  new : value:bool -> SealedAttribute
  member Value : bool

Full name: Microsoft.FSharp.Core.SealedAttribute

--------------------
new : unit -> SealedAttribute
new : value:bool -> SealedAttribute
Multiple items
type AsyncReplyChannel<'Reply> =
  internal new : replyf:('Reply -> unit) -> AsyncReplyChannel<'Reply>
  member Reply : reply:'Reply -> unit

Full name: Script.AsyncReplyChannel<_>

--------------------
internal new : replyf:('Reply -> unit) -> AsyncReplyChannel<'Reply>
val replyf : ('Reply -> unit)
val x : AsyncReplyChannel<'Reply>
member AsyncReplyChannel.Reply : reply:'Reply -> unit

Full name: Script.AsyncReplyChannel`1.Reply
val reply : 'Reply
Multiple items
type internal AsyncResultCell<'a> =
  new : unit -> AsyncResultCell<'a>
  member GetWaitHandle : timeout:int -> Async<bool>
  member GrabResult : unit -> 'a
  member RegisterResult : result:'a -> unit
  member TryWaitResultSynchronously : timeout:int -> 'a option
  member AsyncWaitResult : Async<'a>

Full name: Script.AsyncResultCell<_>

--------------------
internal new : unit -> AsyncResultCell<'a>
val source : TaskCompletionSource<'a>
Multiple items
type TaskCompletionSource<'TResult> =
  new : unit -> TaskCompletionSource<'TResult> + 3 overloads
  member SetCanceled : unit -> unit
  member SetException : exception:Exception -> unit + 1 overload
  member SetResult : result:'TResult -> unit
  member Task : Task<'TResult>
  member TrySetCanceled : unit -> bool
  member TrySetException : exception:Exception -> bool + 1 overload
  member TrySetResult : result:'TResult -> bool

Full name: System.Threading.Tasks.TaskCompletionSource<_>

--------------------
TaskCompletionSource() : unit
TaskCompletionSource(creationOptions: TaskCreationOptions) : unit
TaskCompletionSource(state: obj) : unit
TaskCompletionSource(state: obj, creationOptions: TaskCreationOptions) : unit
val x : AsyncResultCell<'a>
member internal AsyncResultCell.RegisterResult : result:'a -> unit

Full name: Script.AsyncResultCell`1.RegisterResult
val result : 'a
TaskCompletionSource.SetResult(result: 'a) : unit
member internal AsyncResultCell.AsyncWaitResult : Async<'a>

Full name: Script.AsyncResultCell`1.AsyncWaitResult
static member Async.FromContinuations : callback:(('T -> unit) * (exn -> unit) * (OperationCanceledException -> unit) -> unit) -> Async<'T>
val cont : ('a -> unit)
val apply : (Task<'a> -> unit)
val task : Task<'a>
Multiple items
type Task =
  new : action:Action -> Task + 7 overloads
  member AsyncState : obj
  member ContinueWith : continuationAction:Action<Task> -> Task + 9 overloads
  member CreationOptions : TaskCreationOptions
  member Dispose : unit -> unit
  member Exception : AggregateException
  member Id : int
  member IsCanceled : bool
  member IsCompleted : bool
  member IsFaulted : bool
  ...

Full name: System.Threading.Tasks.Task

--------------------
type Task<'TResult> =
  inherit Task
  new : function:Func<'TResult> -> Task<'TResult> + 7 overloads
  member ContinueWith : continuationAction:Action<Task<'TResult>> -> Task + 9 overloads
  member Result : 'TResult with get, set
  static member Factory : TaskFactory<'TResult>

Full name: System.Threading.Tasks.Task<_>

--------------------
Task(action: Action) : unit
Task(action: Action, cancellationToken: CancellationToken) : unit
Task(action: Action, creationOptions: TaskCreationOptions) : unit
Task(action: Action<obj>, state: obj) : unit
Task(action: Action, cancellationToken: CancellationToken, creationOptions: TaskCreationOptions) : unit
Task(action: Action<obj>, state: obj, cancellationToken: CancellationToken) : unit
Task(action: Action<obj>, state: obj, creationOptions: TaskCreationOptions) : unit
Task(action: Action<obj>, state: obj, cancellationToken: CancellationToken, creationOptions: TaskCreationOptions) : unit

--------------------
Task(function: Func<'TResult>) : unit
Task(function: Func<'TResult>, cancellationToken: CancellationToken) : unit
Task(function: Func<'TResult>, creationOptions: TaskCreationOptions) : unit
Task(function: Func<obj,'TResult>, state: obj) : unit
Task(function: Func<'TResult>, cancellationToken: CancellationToken, creationOptions: TaskCreationOptions) : unit
Task(function: Func<obj,'TResult>, state: obj, cancellationToken: CancellationToken) : unit
Task(function: Func<obj,'TResult>, state: obj, creationOptions: TaskCreationOptions) : unit
Task(function: Func<obj,'TResult>, state: obj, cancellationToken: CancellationToken, creationOptions: TaskCreationOptions) : unit
property Task.Result: 'a
property TaskCompletionSource.Task: Task<'a>
Task.ContinueWith<'TResult>(continuationFunction: Func<Task,'TResult>) : Task<'TResult>
   (+0 other overloads)
Task.ContinueWith(continuationAction: Action<Task>) : Task
   (+0 other overloads)
Task.ContinueWith<'TNewResult>(continuationFunction: Func<Task<'a>,'TNewResult>) : Task<'TNewResult>
   (+0 other overloads)
Task.ContinueWith(continuationAction: Action<Task<'a>>) : Task
   (+0 other overloads)
Task.ContinueWith<'TResult>(continuationFunction: Func<Task,'TResult>, continuationOptions: TaskContinuationOptions) : Task<'TResult>
   (+0 other overloads)
Task.ContinueWith<'TResult>(continuationFunction: Func<Task,'TResult>, scheduler: TaskScheduler) : Task<'TResult>
   (+0 other overloads)
Task.ContinueWith<'TResult>(continuationFunction: Func<Task,'TResult>, cancellationToken: CancellationToken) : Task<'TResult>
   (+0 other overloads)
Task.ContinueWith(continuationAction: Action<Task>, continuationOptions: TaskContinuationOptions) : Task
   (+0 other overloads)
Task.ContinueWith(continuationAction: Action<Task>, scheduler: TaskScheduler) : Task
   (+0 other overloads)
Task.ContinueWith(continuationAction: Action<Task>, cancellationToken: CancellationToken) : Task
   (+0 other overloads)
member internal AsyncResultCell.GetWaitHandle : timeout:int -> Async<bool>

Full name: Script.AsyncResultCell`1.GetWaitHandle
val timeout : int
val waithandle : bool
Task.Wait() : unit
Task.Wait(millisecondsTimeout: int) : bool
Task.Wait(cancellationToken: CancellationToken) : unit
Task.Wait(timeout: TimeSpan) : bool
Task.Wait(millisecondsTimeout: int, cancellationToken: CancellationToken) : bool
member internal AsyncResultCell.GrabResult : unit -> 'a

Full name: Script.AsyncResultCell`1.GrabResult
member internal AsyncResultCell.TryWaitResultSynchronously : timeout:int -> 'a option

Full name: Script.AsyncResultCell`1.TryWaitResultSynchronously
property Task.IsCompleted: bool
union case Option.Some: Value: 'T -> Option<'T>
union case Option.None: Option<'T>
Multiple items
type DataflowAgent<'Msg> =
  new : initial:(DataflowAgent<'Msg> -> Async<unit>) * ?cancellationToken:CancellationToken -> DataflowAgent<'Msg>
  member Post : item:'a -> unit
  member PostAndAsyncReply : replyChannelMsg:(AsyncReplyChannel<'a> -> 'b) * ?timeout:int -> Async<'a>
  member PostAndTryAsyncReply : replyChannelMsg:(AsyncReplyChannel<'a> -> 'b) * ?timeout:int -> Async<'a option>
  member Receive : ?timeout:'a -> Async<'b>
  member Start : unit -> unit
  member add_Error : Handler<Exception> -> unit
  member Error : IEvent<Exception>
  member remove_Error : Handler<Exception> -> unit
  static member Start : initial:(DataflowAgent<'Msg> -> Async<unit>) * ?cancellationToken:CancellationToken -> DataflowAgent<'Msg>

Full name: Script.DataflowAgent<_>

--------------------
new : initial:(DataflowAgent<'Msg> -> Async<unit>) * ?cancellationToken:CancellationToken -> DataflowAgent<'Msg>
val initial : (DataflowAgent<'Msg> -> Async<unit>)
val cancellationToken : CancellationToken option
val cancellationToken : CancellationToken
val defaultArg : arg:'T option -> defaultValue:'T -> 'T

Full name: Microsoft.FSharp.Core.Operators.defaultArg
property Async.DefaultCancellationToken: CancellationToken
val mutable started : bool
val errorEvent : Event<Exception>
Multiple items
module Event

from Microsoft.FSharp.Control

--------------------
type Event<'T> =
  new : unit -> Event<'T>
  member Trigger : arg:'T -> unit
  member Publish : IEvent<'T>

Full name: Microsoft.FSharp.Control.Event<_>

--------------------
type Event<'Delegate,'Args (requires delegate and 'Delegate :> Delegate)> =
  new : unit -> Event<'Delegate,'Args>
  member Trigger : sender:obj * args:'Args -> unit
  member Publish : IEvent<'Delegate,'Args>

Full name: Microsoft.FSharp.Control.Event<_,_>

--------------------
new : unit -> Event<'T>

--------------------
new : unit -> Event<'Delegate,'Args>
Multiple items
type Exception =
  new : unit -> Exception + 2 overloads
  member Data : IDictionary
  member GetBaseException : unit -> Exception
  member GetObjectData : info:SerializationInfo * context:StreamingContext -> unit
  member GetType : unit -> Type
  member HelpLink : string with get, set
  member InnerException : Exception
  member Message : string
  member Source : string with get, set
  member StackTrace : string
  ...

Full name: System.Exception

--------------------
Exception() : unit
Exception(message: string) : unit
Exception(message: string, innerException: exn) : unit
val incomingMessages : obj
val mutable defaultTimeout : int
type Timeout =
  static val Infinite : int

Full name: System.Threading.Timeout
field Timeout.Infinite = -1
Multiple items
type CLIEventAttribute =
  inherit Attribute
  new : unit -> CLIEventAttribute

Full name: Microsoft.FSharp.Core.CLIEventAttribute

--------------------
new : unit -> CLIEventAttribute
val this : DataflowAgent<'Msg>
member DataflowAgent.Error : IEvent<Exception>

Full name: Script.DataflowAgent`1.Error
property Event.Publish: IEvent<Exception>
member DataflowAgent.Start : unit -> unit

Full name: Script.DataflowAgent`1.Start
val raise : exn:Exception -> 'T

Full name: Microsoft.FSharp.Core.Operators.raise
Multiple items
type InvalidOperationException =
  inherit SystemException
  new : unit -> InvalidOperationException + 2 overloads

Full name: System.InvalidOperationException

--------------------
InvalidOperationException() : unit
InvalidOperationException(message: string) : unit
InvalidOperationException(message: string, innerException: exn) : unit
val comp : Async<unit>
val error : exn
member Event.Trigger : arg:'T -> unit
member DataflowAgent.Receive : ?timeout:'a -> Async<'b>

Full name: Script.DataflowAgent`1.Receive
val timeout : 'a option
static member Async.AwaitTask : task:Task<'T> -> Async<'T>
member DataflowAgent.Post : item:'a -> unit

Full name: Script.DataflowAgent`1.Post
val item : 'a
val posted : bool
member DataflowAgent.PostAndTryAsyncReply : replyChannelMsg:(AsyncReplyChannel<'a> -> 'b) * ?timeout:int -> Async<'a option>

Full name: Script.DataflowAgent`1.PostAndTryAsyncReply
val replyChannelMsg : (AsyncReplyChannel<'a> -> 'b)
val timeout : int option
val resultCell : AsyncResultCell<'a>
val msg : 'b
val reply : 'a
member internal AsyncResultCell.RegisterResult : result:'a -> unit
property AsyncResultCell.AsyncWaitResult: Async<'a>
val ok : bool
member internal AsyncResultCell.GetWaitHandle : timeout:int -> Async<bool>
val res : 'a option
member internal AsyncResultCell.GrabResult : unit -> 'a
member DataflowAgent.PostAndAsyncReply : replyChannelMsg:(AsyncReplyChannel<'a> -> 'b) * ?timeout:int -> Async<'a>

Full name: Script.DataflowAgent`1.PostAndAsyncReply
val resCell : AsyncResultCell<'a>
val asyncReply : Async<'a option>
member DataflowAgent.PostAndTryAsyncReply : replyChannelMsg:(AsyncReplyChannel<'a> -> 'b) * ?timeout:int -> Async<'a option>
Multiple items
type TimeoutException =
  inherit SystemException
  new : unit -> TimeoutException + 2 overloads

Full name: System.TimeoutException

--------------------
TimeoutException() : unit
TimeoutException(message: string) : unit
TimeoutException(message: string, innerException: exn) : unit
val res : 'a
static member DataflowAgent.Start : initial:(DataflowAgent<'Msg> -> Async<unit>) * ?cancellationToken:CancellationToken -> DataflowAgent<'Msg>

Full name: Script.DataflowAgent`1.Start
val dfa : DataflowAgent<'Msg>
member DataflowAgent.Start : unit -> unit
val dataflowAgent : DataflowAgent<obj>

Full name: Script.dataflowAgent
static member DataflowAgent.Start : initial:(DataflowAgent<'Msg> -> Async<unit>) * ?cancellationToken:CancellationToken -> DataflowAgent<'Msg>
val inbox : DataflowAgent<obj>
member DataflowAgent.Receive : ?timeout:'a -> Async<'b>
val i : int64
member DataflowAgent.Post : item:'a -> unit
static member Async.RunSynchronously : computation:Async<'T> * ?timeout:int * ?cancellationToken:CancellationToken -> 'T
member DataflowAgent.PostAndAsyncReply : replyChannelMsg:(AsyncReplyChannel<'a> -> 'b) * ?timeout:int -> Async<'a>
val channel : AsyncReplyChannel<int64>
member AsyncReplyChannel.Reply : reply:'Reply -> unit
type Console =
  static member BackgroundColor : ConsoleColor with get, set
  static member Beep : unit -> unit + 1 overload
  static member BufferHeight : int with get, set
  static member BufferWidth : int with get, set
  static member CapsLock : bool
  static member Clear : unit -> unit
  static member CursorLeft : int with get, set
  static member CursorSize : int with get, set
  static member CursorTop : int with get, set
  static member CursorVisible : bool with get, set
  ...

Full name: System.Console
Console.ReadLine() : string
Raw view Test code New version

More information

Link:http://fssnip.net/lt
Posted:10 years ago
Author:V.B.
Tags: actors