3 people like it.
Like the snippet!
Ping and Pong go Chunking Along
Parameterizing pong allows us to do even more fun things. Here we use a few message types to allow stateful consumption of data sent by ping to its pongs.
1:
2:
3:
4:
5:
6:
7:
8:
9:
10:
11:
12:
13:
14:
15:
16:
17:
18:
19:
20:
21:
22:
23:
24:
25:
26:
27:
28:
29:
30:
31:
32:
33:
34:
35:
36:
37:
38:
39:
40:
41:
42:
43:
44:
45:
46:
47:
48:
49:
50:
51:
52:
53:
54:
55:
56:
57:
58:
59:
60:
61:
62:
|
type Agent<'a> = MailboxProcessor<'a>
type State<'a> =
| Continue of ('a -> State<'a>)
| Done of 'a
type Message<'a> =
| Result of 'a
| NeedInput
| Error of string
let pong f = Agent<string * (Message<_> -> unit)>.Start(fun inbox ->
let rec loop f = async {
let! msg = inbox.Receive()
match msg with
| m, cont ->
match f m with
| Done x -> cont <| Result x
| Continue f' ->
cont NeedInput
return! loop f'
}
loop f )
let rec ping (target1: Agent<_>) (target2: Agent<_>) = Agent<Message<_>>.Start(fun inbox ->
let target = ref target1
let state = ref ""
async {
for x = 1 to 10 do
(!target).Post(x.ToString(), inbox.Post)
let! msg = inbox.Receive()
match msg with
| Result v ->
target := target2
state := v
| Error e -> System.Console.WriteLine e
| _ -> ()
System.Console.WriteLine msg
System.Console.WriteLine "Sending \"\""
(!target).Post("", inbox.Post)
let! result = inbox.Receive()
System.Console.WriteLine result
match result with
| Result x ->
System.Console.WriteLine !state
System.Console.WriteLine x
| Error x -> System.Console.WriteLine x
| _ -> System.Console.WriteLine "Something went wrong"
})
let take n =
let rec step count state (str: string) =
System.Console.WriteLine("Received " + str)
if str = "" then
Done state
elif count < n then
Continue <| step (count + 1) (state + str)
else Done (state + str)
if n <= 0 then
fun _ -> Done "" // Effectively skip the input
else step 0 ""
|
1:
2:
3:
4:
5:
6:
7:
8:
9:
10:
11:
12:
13:
14:
15:
16:
17:
18:
19:
20:
21:
22:
23:
24:
25:
26:
27:
28:
|
let f1 = pong <| take 2
let f2 = pong <| take 10
ping f1 f2
// Received 1
// FSI_0085+Message`1+_NeedInput[System.String]
// Received 2
// FSI_0085+Message`1+_NeedInput[System.String]
// Received 3
// FSI_0085+Message`1+Result[System.String]
// Received 4
// FSI_0085+Message`1+_NeedInput[System.String]
// Received 5
// FSI_0085+Message`1+_NeedInput[System.String]
// Received 6
// FSI_0085+Message`1+_NeedInput[System.String]
// Received 7
// FSI_0085+Message`1+_NeedInput[System.String]
// Received 8
// // FSI_0085+Message`1+_NeedInput[System.String]
// Received 9
// FSI_0085+Message`1+_NeedInput[System.String]
// Received 10
// FSI_0085+Message`1+_NeedInput[System.String]
// Sending ""
// Received
// FSI_0085+Message`1+Result[System.String]
// 123
// 45678910
|
Multiple items
type MailboxProcessor<'Msg> =
interface IDisposable
new : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:CancellationToken -> MailboxProcessor<'Msg>
member Post : message:'Msg -> unit
member PostAndAsyncReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> Async<'Reply>
member PostAndReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> 'Reply
member PostAndTryAsyncReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> Async<'Reply option>
member Receive : ?timeout:int -> Async<'Msg>
member Scan : scanner:('Msg -> Async<'T> option) * ?timeout:int -> Async<'T>
member Start : unit -> unit
member TryPostAndReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> 'Reply option
...
Full name: Microsoft.FSharp.Control.MailboxProcessor<_>
--------------------
new : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:System.Threading.CancellationToken -> MailboxProcessor<'Msg>
type State<'a> =
| Continue of ('a -> State<'a>)
| Done of 'a
Full name: Script.State<_>
union case State.Continue: ('a -> State<'a>) -> State<'a>
union case State.Done: 'a -> State<'a>
type Message<'a> =
| Result of 'a
| NeedInput
| Error of string
Full name: Script.Message<_>
union case Message.Result: 'a -> Message<'a>
union case Message.NeedInput: Message<'a>
union case Message.Error: string -> Message<'a>
Multiple items
val string : value:'T -> string
Full name: Microsoft.FSharp.Core.Operators.string
--------------------
type string = System.String
Full name: Microsoft.FSharp.Core.string
val pong : f:(string -> State<string>) -> MailboxProcessor<string * (Message<string> -> unit)>
Full name: Script.pong
val f : (string -> State<string>)
type Agent<'a> = MailboxProcessor<'a>
Full name: Script.Agent<_>
type unit = Unit
Full name: Microsoft.FSharp.Core.unit
val inbox : MailboxProcessor<string * (Message<string> -> unit)>
val loop : ((string -> State<string>) -> Async<unit>)
val async : AsyncBuilder
Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.async
val msg : string * (Message<string> -> unit)
member MailboxProcessor.Receive : ?timeout:int -> Async<'Msg>
val m : string
val cont : (Message<string> -> unit)
val x : string
val f' : (string -> State<string>)
val ping : target1:Agent<string * (Message<string> -> unit)> -> target2:Agent<string * (Message<string> -> unit)> -> MailboxProcessor<Message<string>>
Full name: Script.ping
val target1 : Agent<string * (Message<string> -> unit)>
val target2 : Agent<string * (Message<string> -> unit)>
val inbox : MailboxProcessor<Message<string>>
val target : Agent<string * (Message<string> -> unit)> ref
Multiple items
val ref : value:'T -> 'T ref
Full name: Microsoft.FSharp.Core.Operators.ref
--------------------
type 'T ref = Ref<'T>
Full name: Microsoft.FSharp.Core.ref<_>
val state : string ref
val x : int
System.Int32.ToString() : string
System.Int32.ToString(provider: System.IFormatProvider) : string
System.Int32.ToString(format: string) : string
System.Int32.ToString(format: string, provider: System.IFormatProvider) : string
member MailboxProcessor.Post : message:'Msg -> unit
val msg : Message<string>
val v : string
val e : string
namespace System
type Console =
static member BackgroundColor : ConsoleColor with get, set
static member Beep : unit -> unit + 1 overload
static member BufferHeight : int with get, set
static member BufferWidth : int with get, set
static member CapsLock : bool
static member Clear : unit -> unit
static member CursorLeft : int with get, set
static member CursorSize : int with get, set
static member CursorTop : int with get, set
static member CursorVisible : bool with get, set
...
Full name: System.Console
System.Console.WriteLine() : unit
(+0 other overloads)
System.Console.WriteLine(value: string) : unit
(+0 other overloads)
System.Console.WriteLine(value: obj) : unit
(+0 other overloads)
System.Console.WriteLine(value: uint64) : unit
(+0 other overloads)
System.Console.WriteLine(value: int64) : unit
(+0 other overloads)
System.Console.WriteLine(value: uint32) : unit
(+0 other overloads)
System.Console.WriteLine(value: int) : unit
(+0 other overloads)
System.Console.WriteLine(value: float32) : unit
(+0 other overloads)
System.Console.WriteLine(value: float) : unit
(+0 other overloads)
System.Console.WriteLine(value: decimal) : unit
(+0 other overloads)
val result : Message<string>
val take : n:int -> (string -> State<string>)
Full name: Script.take
val n : int
val step : (int -> string -> string -> State<string>)
val count : int
val state : string
val str : string
val f1 : MailboxProcessor<string * (Message<string> -> unit)>
Full name: Script.f1
val f2 : MailboxProcessor<string * (Message<string> -> unit)>
Full name: Script.f2
More information