0 people like it.

More lazy AsyncSeq.groupBy

A version of AsyncSeq.groupBy where the user can synchronously request all groups, but also all elements in each group during iteration and the operation does not deadlock.

  1: 
  2: 
  3: 
  4: 
  5: 
  6: 
  7: 
  8: 
  9: 
 10: 
 11: 
 12: 
 13: 
 14: 
 15: 
 16: 
 17: 
 18: 
 19: 
 20: 
 21: 
 22: 
 23: 
 24: 
 25: 
 26: 
 27: 
 28: 
 29: 
 30: 
 31: 
 32: 
 33: 
 34: 
 35: 
 36: 
 37: 
 38: 
 39: 
 40: 
 41: 
 42: 
 43: 
 44: 
 45: 
 46: 
 47: 
 48: 
 49: 
 50: 
 51: 
 52: 
 53: 
 54: 
 55: 
 56: 
 57: 
 58: 
 59: 
 60: 
 61: 
 62: 
 63: 
 64: 
 65: 
 66: 
 67: 
 68: 
 69: 
 70: 
 71: 
 72: 
 73: 
 74: 
 75: 
 76: 
 77: 
 78: 
 79: 
 80: 
 81: 
 82: 
 83: 
 84: 
 85: 
 86: 
 87: 
 88: 
 89: 
 90: 
 91: 
 92: 
 93: 
 94: 
 95: 
 96: 
 97: 
 98: 
 99: 
100: 
101: 
102: 
103: 
104: 
105: 
106: 
107: 
108: 
109: 
110: 
#r "nuget:FSharp.Control.AsyncSeq"
open FSharp.Control

let nums = asyncSeq { 
  for i in 0 .. 10 do 
    do! Async.Sleep 1000
    yield i }

type Message<'K, 'V> = 
  | NextGroup of AsyncReplyChannel<option<'K * AsyncSeq<'V>>>
  | NextValue of 'K * AsyncReplyChannel<option<'V>>

let groupBy f (asq:AsyncSeq<_>) = 
  let agent = MailboxProcessor.Start(fun agent ->
    let en = asq.GetEnumerator()

    // Fetch next element from 'en' and add it to 'groups'
    // If there is no more elements, return None
    let fetchNext groups = async {
      let! next = en.MoveNext()
      match next with 
      | Some n ->
          printfn "Fetched: %A" n
          let k = f n
          if (Map.containsKey k groups) then 
            let returned, g = groups.[k]
            let ng = (false, n)::g
            return Some (Map.add k (returned, ng) groups)
          else 
            return Some (Map.add k (false, [false, n]) groups)
      | None ->
          return None }

    // Create AsyncSeq for iterating over group elements
    // for a group with key 'k'
    let rec groupSeq k = asyncSeq {
      printfn "Next inner for: %A" k
      let! res = agent.PostAndAsyncReply(fun r -> NextValue(k, r))
      match res with 
      | None -> ()
      | Some v ->
          yield v
          yield! groupSeq k }

    // The state 'groups' is a Map<'K, bool * list<bool * 'V>>
    // It is a map from keys to groups where each group is a list
    // of values. In addition 'bool' values indicate if a group/value
    // has already been returned. 
    let rec loop groups = async {
      let! msg = agent.Receive()
      match msg with 
      | NextValue(k, repl) ->
          // Do we have unreturned value for a group with key k?
          let _, g = groups |> Map.find k
          let v = g |> List.tryFind (fun (returned, v) -> not returned)
          match v with 
          | None ->
              // If no, fetch next value. If there was one, try again.
              // If we got to the end, return 'None'
              let! groups = fetchNext groups
              match groups with 
              | None -> repl.Reply None
              | Some groups ->
                  agent.Post(NextValue(k, repl))
                  return! loop groups
          | Some(_, v) ->
              // Return this and mark as returned
              repl.Reply(Some v)
              let ng = g |> List.map (fun (returned, v') -> 
                if not returned && v = v' then true, v' else returned, v')
              return! loop (Map.add k (true, ng) groups)
              
      | NextGroup repl ->
          // Same logic as for fetching next value. See if we
          // have unreturned group.
          printfn "Next outer"
          let k = groups |> Map.tryFindKey (fun k (returned, _) -> not returned)
          match k with 
          | None ->
              // If no, fetch next value. If there is none, return None,
              // otherwise try again.
              let! groups = fetchNext groups
              match groups with 
              | None -> repl.Reply None
              | Some groups ->
                  agent.Post(NextGroup repl)
                  return! loop groups
          | Some k -> 
              // Mark group as returned
              let _, g = groups.[k]
              repl.Reply(Some(k, groupSeq k))
              return! loop (Map.add k (true, g) groups)

      return! loop groups }
    loop Map.empty)
    
  let rec groups () = asyncSeq {
    let! next = agent.PostAndAsyncReply(NextGroup)
    match next with 
    | None -> ()
    | Some g -> 
        yield g
        yield! groups () }

  groups ()

nums
|> groupBy (fun k -> k%3)
|> AsyncSeq.map (fun (k, v) -> k, AsyncSeq.toArraySynchronously v)
|> AsyncSeq.toArraySynchronously
Multiple items
namespace FSharp

--------------------
namespace Microsoft.FSharp
Multiple items
namespace FSharp.Control

--------------------
namespace Microsoft.FSharp.Control
val nums : AsyncSeq<int>
val asyncSeq : AsyncSeq.AsyncSeqBuilder
val i : int
Multiple items
type Async =
  static member AsBeginEnd : computation:('Arg -> Async<'T>) -> ('Arg * AsyncCallback * obj -> IAsyncResult) * (IAsyncResult -> 'T) * (IAsyncResult -> unit)
  static member AwaitEvent : event:IEvent<'Del,'T> * ?cancelAction:(unit -> unit) -> Async<'T> (requires delegate and 'Del :> Delegate)
  static member AwaitIAsyncResult : iar:IAsyncResult * ?millisecondsTimeout:int -> Async<bool>
  static member AwaitTask : task:Task -> Async<unit>
  static member AwaitTask : task:Task<'T> -> Async<'T>
  static member AwaitWaitHandle : waitHandle:WaitHandle * ?millisecondsTimeout:int -> Async<bool>
  static member CancelDefaultToken : unit -> unit
  static member Catch : computation:Async<'T> -> Async<Choice<'T,exn>>
  static member Choice : computations:seq<Async<'T option>> -> Async<'T option>
  static member FromBeginEnd : beginAction:(AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
  ...

--------------------
type Async<'T> =
static member Async.Sleep : millisecondsDueTime:int -> Async<unit>
type Message<'K,'V> =
  | NextGroup of AsyncReplyChannel<('K * AsyncSeq<'V>) option>
  | NextValue of 'K * AsyncReplyChannel<'V option>
union case Message.NextGroup: AsyncReplyChannel<('K * AsyncSeq<'V>) option> -> Message<'K,'V>
type AsyncReplyChannel<'Reply> =
  member Reply : value:'Reply -> unit
type 'T option = Option<'T>
Multiple items
module AsyncSeq

from FSharp.Control

--------------------
type AsyncSeq<'T> = IAsyncEnumerable<'T>
union case Message.NextValue: 'K * AsyncReplyChannel<'V option> -> Message<'K,'V>
val groupBy : f:('a -> 'b) -> asq:AsyncSeq<'a> -> AsyncSeq<'b * AsyncSeq<'a>> (requires equality and comparison)
val f : ('a -> 'b) (requires equality and comparison)
val asq : AsyncSeq<'a> (requires equality)
val agent : MailboxProcessor<Message<'b,'a>> (requires comparison and equality)
Multiple items
type MailboxProcessor<'Msg> =
  interface IDisposable
  new : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:CancellationToken -> MailboxProcessor<'Msg>
  member Post : message:'Msg -> unit
  member PostAndAsyncReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> Async<'Reply>
  member PostAndReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> 'Reply
  member PostAndTryAsyncReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> Async<'Reply option>
  member Receive : ?timeout:int -> Async<'Msg>
  member Scan : scanner:('Msg -> Async<'T> option) * ?timeout:int -> Async<'T>
  member Start : unit -> unit
  member TryPostAndReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> 'Reply option
  ...

--------------------
new : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:System.Threading.CancellationToken -> MailboxProcessor<'Msg>
static member MailboxProcessor.Start : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:System.Threading.CancellationToken -> MailboxProcessor<'Msg>
val en : IAsyncEnumerator<'a> (requires equality)
abstract member IAsyncEnumerable.GetEnumerator : unit -> IAsyncEnumerator<'T>
val fetchNext : (Map<'b,(bool * (bool * 'a) list)> -> Async<Map<'b,(bool * (bool * 'a) list)> option>) (requires comparison and equality)
val groups : Map<'b,(bool * (bool * 'a) list)> (requires comparison and equality)
val async : AsyncBuilder
val next : 'a option (requires equality)
abstract member IAsyncEnumerator.MoveNext : unit -> Async<'T option>
union case Option.Some: Value: 'T -> Option<'T>
val n : 'a (requires equality)
val printfn : format:Printf.TextWriterFormat<'T> -> 'T
val k : 'b (requires comparison)
Multiple items
module Map

from Microsoft.FSharp.Collections

--------------------
type Map<'Key,'Value (requires comparison)> =
  interface IReadOnlyDictionary<'Key,'Value>
  interface IReadOnlyCollection<KeyValuePair<'Key,'Value>>
  interface IEnumerable
  interface IComparable
  interface IEnumerable<KeyValuePair<'Key,'Value>>
  interface ICollection<KeyValuePair<'Key,'Value>>
  interface IDictionary<'Key,'Value>
  new : elements:seq<'Key * 'Value> -> Map<'Key,'Value>
  member Add : key:'Key * value:'Value -> Map<'Key,'Value>
  member ContainsKey : key:'Key -> bool
  ...

--------------------
new : elements:seq<'Key * 'Value> -> Map<'Key,'Value>
val containsKey : key:'Key -> table:Map<'Key,'T> -> bool (requires comparison)
val returned : bool
val g : (bool * 'a) list (requires equality)
val ng : (bool * 'a) list (requires equality)
val add : key:'Key -> value:'T -> table:Map<'Key,'T> -> Map<'Key,'T> (requires comparison)
union case Option.None: Option<'T>
val groupSeq : ('b -> AsyncSeq<'a>) (requires comparison and equality)
val res : 'a option (requires equality)
member MailboxProcessor.PostAndAsyncReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> Async<'Reply>
val r : AsyncReplyChannel<'a option> (requires equality)
val v : 'a (requires equality)
val loop : (Map<'b,(bool * (bool * 'a) list)> -> Async<unit>) (requires comparison and equality)
val msg : Message<'b,'a> (requires comparison and equality)
member MailboxProcessor.Receive : ?timeout:int -> Async<'Msg>
val repl : AsyncReplyChannel<'a option> (requires equality)
val find : key:'Key -> table:Map<'Key,'T> -> 'T (requires comparison)
val v : (bool * 'a) option (requires equality)
Multiple items
module List

from Microsoft.FSharp.Collections

--------------------
type List<'T> =
  | ( [] )
  | ( :: ) of Head: 'T * Tail: 'T list
    interface IReadOnlyList<'T>
    interface IReadOnlyCollection<'T>
    interface IEnumerable
    interface IEnumerable<'T>
    member GetReverseIndex : rank:int * offset:int -> int
    member GetSlice : startIndex:int option * endIndex:int option -> 'T list
    member Head : 'T
    member IsEmpty : bool
    member Item : index:int -> 'T with get
    member Length : int
    ...
val tryFind : predicate:('T -> bool) -> list:'T list -> 'T option
val not : value:bool -> bool
val groups : Map<'b,(bool * (bool * 'a) list)> option (requires comparison and equality)
member AsyncReplyChannel.Reply : value:'Reply -> unit
member MailboxProcessor.Post : message:'Msg -> unit
val map : mapping:('T -> 'U) -> list:'T list -> 'U list
val v' : 'a (requires equality)
val repl : AsyncReplyChannel<('b * AsyncSeq<'a>) option> (requires comparison and equality)
val k : 'b option (requires comparison)
val tryFindKey : predicate:('Key -> 'T -> bool) -> table:Map<'Key,'T> -> 'Key option (requires comparison)
val empty<'Key,'T (requires comparison)> : Map<'Key,'T> (requires comparison)
val groups : (unit -> AsyncSeq<'b * AsyncSeq<'a>>) (requires comparison and equality)
val next : ('b * AsyncSeq<'a>) option (requires comparison and equality)
val g : 'b * AsyncSeq<'a> (requires comparison and equality)
val k : int
val map : folder:('T -> 'U) -> source:AsyncSeq<'T> -> AsyncSeq<'U>
val v : AsyncSeq<int>
val toArraySynchronously : source:AsyncSeq<'T> -> 'T []
Raw view Test code New version

More information

Link:http://fssnip.net/862
Posted:2 years ago
Author:Tomas Petricek
Tags: asyncseq