Home
Insert
Update snippet 'More lazy AsyncSeq.groupBy'
Title
Description
A version of AsyncSeq.groupBy where the user can synchronously request all groups, but also all elements in each group during iteration and the operation does not deadlock.
Source code
#r "nuget:FSharp.Control.AsyncSeq" open FSharp.Control let nums = asyncSeq { for i in 0 .. 10 do do! Async.Sleep 1000 yield i } type Message<'K, 'V> = | NextGroup of AsyncReplyChannel<option<'K * AsyncSeq<'V>>> | NextValue of 'K * AsyncReplyChannel<option<'V>> let groupBy f (asq:AsyncSeq<_>) = let agent = MailboxProcessor.Start(fun agent -> let en = asq.GetEnumerator() // Fetch next element from 'en' and add it to 'groups' // If there is no more elements, return None let fetchNext groups = async { let! next = en.MoveNext() match next with | Some n -> printfn "Fetched: %A" n let k = f n if (Map.containsKey k groups) then let returned, g = groups.[k] let ng = (false, n)::g return Some (Map.add k (returned, ng) groups) else return Some (Map.add k (false, [false, n]) groups) | None -> return None } // Create AsyncSeq for iterating over group elements // for a group with key 'k' let rec groupSeq k = asyncSeq { printfn "Next inner for: %A" k let! res = agent.PostAndAsyncReply(fun r -> NextValue(k, r)) match res with | None -> () | Some v -> yield v yield! groupSeq k } // The state 'groups' is a Map<'K, bool * list<bool * 'V>> // It is a map from keys to groups where each group is a list // of values. In addition 'bool' values indicate if a group/value // has already been returned. let rec loop groups = async { let! msg = agent.Receive() match msg with | NextValue(k, repl) -> // Do we have unreturned value for a group with key k? let _, g = groups |> Map.find k let v = g |> List.tryFind (fun (returned, v) -> not returned) match v with | None -> // If no, fetch next value. If there was one, try again. // If we got to the end, return 'None' let! groups = fetchNext groups match groups with | None -> repl.Reply None | Some groups -> agent.Post(NextValue(k, repl)) return! loop groups | Some(_, v) -> // Return this and mark as returned repl.Reply(Some v) let ng = g |> List.map (fun (returned, v') -> if not returned && v = v' then true, v' else returned, v') return! loop (Map.add k (true, ng) groups) | NextGroup repl -> // Same logic as for fetching next value. See if we // have unreturned group. printfn "Next outer" let k = groups |> Map.tryFindKey (fun k (returned, _) -> not returned) match k with | None -> // If no, fetch next value. If there is none, return None, // otherwise try again. let! groups = fetchNext groups match groups with | None -> repl.Reply None | Some groups -> agent.Post(NextGroup repl) return! loop groups | Some k -> // Mark group as returned let _, g = groups.[k] repl.Reply(Some(k, groupSeq k)) return! loop (Map.add k (true, g) groups) return! loop groups } loop Map.empty) let rec groups () = asyncSeq { let! next = agent.PostAndAsyncReply(NextGroup) match next with | None -> () | Some g -> yield g yield! groups () } groups () nums |> groupBy (fun k -> k%3) |> AsyncSeq.map (fun (k, v) -> k, AsyncSeq.toArraySynchronously v) |> AsyncSeq.toArraySynchronously
Tags
asyncseq
asyncseq
Author
Link
Reference NuGet packages
If your snippet has external dependencies, enter the names of NuGet packages to reference, separated by a comma (
#r
directives are not required).
Update