2 people like it.

Ping and Pong go Chunking Along

Parameterizing pong allows us to do even more fun things. Here we use a few message types to allow stateful consumption of data sent by ping to its pongs.

Processing chunks of input with agents

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
14: 
15: 
16: 
17: 
18: 
19: 
20: 
21: 
22: 
23: 
24: 
25: 
26: 
27: 
28: 
29: 
30: 
31: 
32: 
33: 
34: 
35: 
36: 
37: 
38: 
39: 
40: 
41: 
42: 
43: 
44: 
45: 
46: 
47: 
48: 
49: 
50: 
51: 
52: 
53: 
54: 
55: 
56: 
57: 
58: 
59: 
60: 
61: 
62: 
type Agent<'a> = MailboxProcessor<'a>

type State<'a> =
  | Continue of ('a -> State<'a>)
  | Done of 'a

type Message<'a> =
  | Result of 'a
  | NeedInput
  | Error of string

let pong f = Agent<string * (Message<_> -> unit)>.Start(fun inbox ->
  let rec loop f = async {
    let! msg = inbox.Receive()
    match msg with
    | m, cont ->
        match f m with
        | Done x -> cont <| Result x
        | Continue f' ->
            cont NeedInput
            return! loop f'
  }
  loop f )

let rec ping (target1: Agent<_>) (target2: Agent<_>) = Agent<Message<_>>.Start(fun inbox ->
  let target = ref target1
  let state = ref ""
  async {
    for x = 1 to 10 do
      (!target).Post(x.ToString(), inbox.Post)
      let! msg = inbox.Receive()
      match msg with
      | Result v ->
          target := target2
          state := v
      | Error e -> System.Console.WriteLine e
      | _ -> ()
      System.Console.WriteLine msg

    System.Console.WriteLine "Sending \"\""
    (!target).Post("", inbox.Post)
    let! result = inbox.Receive()
    System.Console.WriteLine result
    match result with
    | Result x ->
        System.Console.WriteLine !state
        System.Console.WriteLine x
    | Error x -> System.Console.WriteLine x
    | _ -> System.Console.WriteLine "Something went wrong"
  })

let take n =
  let rec step count state (str: string) =
    System.Console.WriteLine("Received " + str)
    if str = "" then
      Done state
    elif count < n then
      Continue <| step (count + 1) (state + str)
    else Done (state + str)
  if n <= 0 then
    fun _ -> Done "" // Effectively skip the input
  else step 0 ""

Usage

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
14: 
15: 
16: 
17: 
18: 
19: 
20: 
21: 
22: 
23: 
24: 
25: 
26: 
27: 
28: 
let f1 = pong <| take 2
let f2 = pong <| take 10
ping f1 f2
// Received 1
// FSI_0085+Message`1+_NeedInput[System.String]
// Received 2
// FSI_0085+Message`1+_NeedInput[System.String]
// Received 3
// FSI_0085+Message`1+Result[System.String]
// Received 4
// FSI_0085+Message`1+_NeedInput[System.String]
// Received 5
// FSI_0085+Message`1+_NeedInput[System.String]
// Received 6
// FSI_0085+Message`1+_NeedInput[System.String]
// Received 7
// FSI_0085+Message`1+_NeedInput[System.String]
// Received 8
// // FSI_0085+Message`1+_NeedInput[System.String]
// Received 9
// FSI_0085+Message`1+_NeedInput[System.String]
// Received 10
// FSI_0085+Message`1+_NeedInput[System.String]
// Sending ""
// Received 
// FSI_0085+Message`1+Result[System.String]
// 123
// 45678910
Multiple items
type MailboxProcessor<'Msg> =
  interface IDisposable
  new : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:CancellationToken -> MailboxProcessor<'Msg>
  member Post : message:'Msg -> unit
  member PostAndAsyncReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> Async<'Reply>
  member PostAndReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> 'Reply
  member PostAndTryAsyncReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> Async<'Reply option>
  member Receive : ?timeout:int -> Async<'Msg>
  member Scan : scanner:('Msg -> Async<'T> option) * ?timeout:int -> Async<'T>
  member Start : unit -> unit
  member TryPostAndReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> 'Reply option
  ...

Full name: Microsoft.FSharp.Control.MailboxProcessor<_>

--------------------
new : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:System.Threading.CancellationToken -> MailboxProcessor<'Msg>
type State<'a> =
  | Continue of ('a -> State<'a>)
  | Done of 'a

Full name: Script.State<_>
union case State.Continue: ('a -> State<'a>) -> State<'a>
union case State.Done: 'a -> State<'a>
type Message<'a> =
  | Result of 'a
  | NeedInput
  | Error of string

Full name: Script.Message<_>
union case Message.Result: 'a -> Message<'a>
union case Message.NeedInput: Message<'a>
union case Message.Error: string -> Message<'a>
Multiple items
val string : value:'T -> string

Full name: Microsoft.FSharp.Core.Operators.string

--------------------
type string = System.String

Full name: Microsoft.FSharp.Core.string
val pong : f:(string -> State<string>) -> MailboxProcessor<string * (Message<string> -> unit)>

Full name: Script.pong
val f : (string -> State<string>)
type Agent<'a> = MailboxProcessor<'a>

Full name: Script.Agent<_>
type unit = Unit

Full name: Microsoft.FSharp.Core.unit
val inbox : MailboxProcessor<string * (Message<string> -> unit)>
val loop : ((string -> State<string>) -> Async<unit>)
val async : AsyncBuilder

Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.async
val msg : string * (Message<string> -> unit)
member MailboxProcessor.Receive : ?timeout:int -> Async<'Msg>
val m : string
val cont : (Message<string> -> unit)
val x : string
val f' : (string -> State<string>)
val ping : target1:Agent<string * (Message<string> -> unit)> -> target2:Agent<string * (Message<string> -> unit)> -> MailboxProcessor<Message<string>>

Full name: Script.ping
val target1 : Agent<string * (Message<string> -> unit)>
val target2 : Agent<string * (Message<string> -> unit)>
val inbox : MailboxProcessor<Message<string>>
val target : Agent<string * (Message<string> -> unit)> ref
Multiple items
val ref : value:'T -> 'T ref

Full name: Microsoft.FSharp.Core.Operators.ref

--------------------
type 'T ref = Ref<'T>

Full name: Microsoft.FSharp.Core.ref<_>
val state : string ref
val x : int
System.Int32.ToString() : string
System.Int32.ToString(provider: System.IFormatProvider) : string
System.Int32.ToString(format: string) : string
System.Int32.ToString(format: string, provider: System.IFormatProvider) : string
member MailboxProcessor.Post : message:'Msg -> unit
val msg : Message<string>
val v : string
val e : string
namespace System
type Console =
  static member BackgroundColor : ConsoleColor with get, set
  static member Beep : unit -> unit + 1 overload
  static member BufferHeight : int with get, set
  static member BufferWidth : int with get, set
  static member CapsLock : bool
  static member Clear : unit -> unit
  static member CursorLeft : int with get, set
  static member CursorSize : int with get, set
  static member CursorTop : int with get, set
  static member CursorVisible : bool with get, set
  ...

Full name: System.Console
System.Console.WriteLine() : unit
   (+0 other overloads)
System.Console.WriteLine(value: string) : unit
   (+0 other overloads)
System.Console.WriteLine(value: obj) : unit
   (+0 other overloads)
System.Console.WriteLine(value: uint64) : unit
   (+0 other overloads)
System.Console.WriteLine(value: int64) : unit
   (+0 other overloads)
System.Console.WriteLine(value: uint32) : unit
   (+0 other overloads)
System.Console.WriteLine(value: int) : unit
   (+0 other overloads)
System.Console.WriteLine(value: float32) : unit
   (+0 other overloads)
System.Console.WriteLine(value: float) : unit
   (+0 other overloads)
System.Console.WriteLine(value: decimal) : unit
   (+0 other overloads)
val result : Message<string>
val take : n:int -> (string -> State<string>)

Full name: Script.take
val n : int
val step : (int -> string -> string -> State<string>)
val count : int
val state : string
val str : string
val f1 : MailboxProcessor<string * (Message<string> -> unit)>

Full name: Script.f1
val f2 : MailboxProcessor<string * (Message<string> -> unit)>

Full name: Script.f2
Raw view Test code New version

More information

Link:http://fssnip.net/bO
Posted:11 years ago
Author:Ryan Riley
Tags: mailboxprocessor , agent , ping pong