0 people like it.
Like the snippet!
More lazy AsyncSeq.groupBy
A version of AsyncSeq.groupBy where the user can synchronously request all groups, but also all elements in each group during iteration and the operation does not deadlock.
1:
2:
3:
4:
5:
6:
7:
8:
9:
10:
11:
12:
13:
14:
15:
16:
17:
18:
19:
20:
21:
22:
23:
24:
25:
26:
27:
28:
29:
30:
31:
32:
33:
34:
35:
36:
37:
38:
39:
40:
41:
42:
43:
44:
45:
46:
47:
48:
49:
50:
51:
52:
53:
54:
55:
56:
57:
58:
59:
60:
61:
62:
63:
64:
65:
66:
67:
68:
69:
70:
71:
72:
73:
74:
75:
76:
77:
78:
79:
80:
81:
82:
83:
84:
85:
86:
87:
88:
89:
90:
91:
92:
93:
94:
95:
96:
97:
98:
99:
100:
101:
102:
103:
104:
105:
106:
107:
108:
109:
110:
|
#r "nuget:FSharp.Control.AsyncSeq"
open FSharp.Control
let nums = asyncSeq {
for i in 0 .. 10 do
do! Async.Sleep 1000
yield i }
type Message<'K, 'V> =
| NextGroup of AsyncReplyChannel<option<'K * AsyncSeq<'V>>>
| NextValue of 'K * AsyncReplyChannel<option<'V>>
let groupBy f (asq:AsyncSeq<_>) =
let agent = MailboxProcessor.Start(fun agent ->
let en = asq.GetEnumerator()
// Fetch next element from 'en' and add it to 'groups'
// If there is no more elements, return None
let fetchNext groups = async {
let! next = en.MoveNext()
match next with
| Some n ->
printfn "Fetched: %A" n
let k = f n
if (Map.containsKey k groups) then
let returned, g = groups.[k]
let ng = (false, n)::g
return Some (Map.add k (returned, ng) groups)
else
return Some (Map.add k (false, [false, n]) groups)
| None ->
return None }
// Create AsyncSeq for iterating over group elements
// for a group with key 'k'
let rec groupSeq k = asyncSeq {
printfn "Next inner for: %A" k
let! res = agent.PostAndAsyncReply(fun r -> NextValue(k, r))
match res with
| None -> ()
| Some v ->
yield v
yield! groupSeq k }
// The state 'groups' is a Map<'K, bool * list<bool * 'V>>
// It is a map from keys to groups where each group is a list
// of values. In addition 'bool' values indicate if a group/value
// has already been returned.
let rec loop groups = async {
let! msg = agent.Receive()
match msg with
| NextValue(k, repl) ->
// Do we have unreturned value for a group with key k?
let _, g = groups |> Map.find k
let v = g |> List.tryFind (fun (returned, v) -> not returned)
match v with
| None ->
// If no, fetch next value. If there was one, try again.
// If we got to the end, return 'None'
let! groups = fetchNext groups
match groups with
| None -> repl.Reply None
| Some groups ->
agent.Post(NextValue(k, repl))
return! loop groups
| Some(_, v) ->
// Return this and mark as returned
repl.Reply(Some v)
let ng = g |> List.map (fun (returned, v') ->
if not returned && v = v' then true, v' else returned, v')
return! loop (Map.add k (true, ng) groups)
| NextGroup repl ->
// Same logic as for fetching next value. See if we
// have unreturned group.
printfn "Next outer"
let k = groups |> Map.tryFindKey (fun k (returned, _) -> not returned)
match k with
| None ->
// If no, fetch next value. If there is none, return None,
// otherwise try again.
let! groups = fetchNext groups
match groups with
| None -> repl.Reply None
| Some groups ->
agent.Post(NextGroup repl)
return! loop groups
| Some k ->
// Mark group as returned
let _, g = groups.[k]
repl.Reply(Some(k, groupSeq k))
return! loop (Map.add k (true, g) groups)
return! loop groups }
loop Map.empty)
let rec groups () = asyncSeq {
let! next = agent.PostAndAsyncReply(NextGroup)
match next with
| None -> ()
| Some g ->
yield g
yield! groups () }
groups ()
nums
|> groupBy (fun k -> k%3)
|> AsyncSeq.map (fun (k, v) -> k, AsyncSeq.toArraySynchronously v)
|> AsyncSeq.toArraySynchronously
|
Multiple items
namespace FSharp
--------------------
namespace Microsoft.FSharp
Multiple items
namespace FSharp.Control
--------------------
namespace Microsoft.FSharp.Control
val nums : AsyncSeq<int>
val asyncSeq : AsyncSeq.AsyncSeqBuilder
val i : int
Multiple items
type Async =
static member AsBeginEnd : computation:('Arg -> Async<'T>) -> ('Arg * AsyncCallback * obj -> IAsyncResult) * (IAsyncResult -> 'T) * (IAsyncResult -> unit)
static member AwaitEvent : event:IEvent<'Del,'T> * ?cancelAction:(unit -> unit) -> Async<'T> (requires delegate and 'Del :> Delegate)
static member AwaitIAsyncResult : iar:IAsyncResult * ?millisecondsTimeout:int -> Async<bool>
static member AwaitTask : task:Task -> Async<unit>
static member AwaitTask : task:Task<'T> -> Async<'T>
static member AwaitWaitHandle : waitHandle:WaitHandle * ?millisecondsTimeout:int -> Async<bool>
static member CancelDefaultToken : unit -> unit
static member Catch : computation:Async<'T> -> Async<Choice<'T,exn>>
static member Choice : computations:seq<Async<'T option>> -> Async<'T option>
static member FromBeginEnd : beginAction:(AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
...
--------------------
type Async<'T> =
static member Async.Sleep : millisecondsDueTime:int -> Async<unit>
type Message<'K,'V> =
| NextGroup of AsyncReplyChannel<('K * AsyncSeq<'V>) option>
| NextValue of 'K * AsyncReplyChannel<'V option>
union case Message.NextGroup: AsyncReplyChannel<('K * AsyncSeq<'V>) option> -> Message<'K,'V>
type AsyncReplyChannel<'Reply> =
member Reply : value:'Reply -> unit
type 'T option = Option<'T>
Multiple items
module AsyncSeq
from FSharp.Control
--------------------
type AsyncSeq<'T> = IAsyncEnumerable<'T>
union case Message.NextValue: 'K * AsyncReplyChannel<'V option> -> Message<'K,'V>
val groupBy : f:('a -> 'b) -> asq:AsyncSeq<'a> -> AsyncSeq<'b * AsyncSeq<'a>> (requires equality and comparison)
val f : ('a -> 'b) (requires equality and comparison)
val asq : AsyncSeq<'a> (requires equality)
val agent : MailboxProcessor<Message<'b,'a>> (requires comparison and equality)
Multiple items
type MailboxProcessor<'Msg> =
interface IDisposable
new : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:CancellationToken -> MailboxProcessor<'Msg>
member Post : message:'Msg -> unit
member PostAndAsyncReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> Async<'Reply>
member PostAndReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> 'Reply
member PostAndTryAsyncReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> Async<'Reply option>
member Receive : ?timeout:int -> Async<'Msg>
member Scan : scanner:('Msg -> Async<'T> option) * ?timeout:int -> Async<'T>
member Start : unit -> unit
member TryPostAndReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> 'Reply option
...
--------------------
new : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:System.Threading.CancellationToken -> MailboxProcessor<'Msg>
static member MailboxProcessor.Start : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:System.Threading.CancellationToken -> MailboxProcessor<'Msg>
val en : IAsyncEnumerator<'a> (requires equality)
abstract member IAsyncEnumerable.GetEnumerator : unit -> IAsyncEnumerator<'T>
val fetchNext : (Map<'b,(bool * (bool * 'a) list)> -> Async<Map<'b,(bool * (bool * 'a) list)> option>) (requires comparison and equality)
val groups : Map<'b,(bool * (bool * 'a) list)> (requires comparison and equality)
val async : AsyncBuilder
val next : 'a option (requires equality)
abstract member IAsyncEnumerator.MoveNext : unit -> Async<'T option>
union case Option.Some: Value: 'T -> Option<'T>
val n : 'a (requires equality)
val printfn : format:Printf.TextWriterFormat<'T> -> 'T
val k : 'b (requires comparison)
Multiple items
module Map
from Microsoft.FSharp.Collections
--------------------
type Map<'Key,'Value (requires comparison)> =
interface IReadOnlyDictionary<'Key,'Value>
interface IReadOnlyCollection<KeyValuePair<'Key,'Value>>
interface IEnumerable
interface IComparable
interface IEnumerable<KeyValuePair<'Key,'Value>>
interface ICollection<KeyValuePair<'Key,'Value>>
interface IDictionary<'Key,'Value>
new : elements:seq<'Key * 'Value> -> Map<'Key,'Value>
member Add : key:'Key * value:'Value -> Map<'Key,'Value>
member ContainsKey : key:'Key -> bool
...
--------------------
new : elements:seq<'Key * 'Value> -> Map<'Key,'Value>
val containsKey : key:'Key -> table:Map<'Key,'T> -> bool (requires comparison)
val returned : bool
val g : (bool * 'a) list (requires equality)
val ng : (bool * 'a) list (requires equality)
val add : key:'Key -> value:'T -> table:Map<'Key,'T> -> Map<'Key,'T> (requires comparison)
union case Option.None: Option<'T>
val groupSeq : ('b -> AsyncSeq<'a>) (requires comparison and equality)
val res : 'a option (requires equality)
member MailboxProcessor.PostAndAsyncReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> Async<'Reply>
val r : AsyncReplyChannel<'a option> (requires equality)
val v : 'a (requires equality)
val loop : (Map<'b,(bool * (bool * 'a) list)> -> Async<unit>) (requires comparison and equality)
val msg : Message<'b,'a> (requires comparison and equality)
member MailboxProcessor.Receive : ?timeout:int -> Async<'Msg>
val repl : AsyncReplyChannel<'a option> (requires equality)
val find : key:'Key -> table:Map<'Key,'T> -> 'T (requires comparison)
val v : (bool * 'a) option (requires equality)
Multiple items
module List
from Microsoft.FSharp.Collections
--------------------
type List<'T> =
| ( [] )
| ( :: ) of Head: 'T * Tail: 'T list
interface IReadOnlyList<'T>
interface IReadOnlyCollection<'T>
interface IEnumerable
interface IEnumerable<'T>
member GetReverseIndex : rank:int * offset:int -> int
member GetSlice : startIndex:int option * endIndex:int option -> 'T list
member Head : 'T
member IsEmpty : bool
member Item : index:int -> 'T with get
member Length : int
...
val tryFind : predicate:('T -> bool) -> list:'T list -> 'T option
val not : value:bool -> bool
val groups : Map<'b,(bool * (bool * 'a) list)> option (requires comparison and equality)
member AsyncReplyChannel.Reply : value:'Reply -> unit
member MailboxProcessor.Post : message:'Msg -> unit
val map : mapping:('T -> 'U) -> list:'T list -> 'U list
val v' : 'a (requires equality)
val repl : AsyncReplyChannel<('b * AsyncSeq<'a>) option> (requires comparison and equality)
val k : 'b option (requires comparison)
val tryFindKey : predicate:('Key -> 'T -> bool) -> table:Map<'Key,'T> -> 'Key option (requires comparison)
val empty<'Key,'T (requires comparison)> : Map<'Key,'T> (requires comparison)
val groups : (unit -> AsyncSeq<'b * AsyncSeq<'a>>) (requires comparison and equality)
val next : ('b * AsyncSeq<'a>) option (requires comparison and equality)
val g : 'b * AsyncSeq<'a> (requires comparison and equality)
val k : int
val map : folder:('T -> 'U) -> source:AsyncSeq<'T> -> AsyncSeq<'U>
val v : AsyncSeq<int>
val toArraySynchronously : source:AsyncSeq<'T> -> 'T []
More information