26 people like it.
Like the snippet!
Playing with async sequences
Attempt to reimplement functions AsyncRead/AsyncReadLines from 'Rx on the server ' articles (by Jeffrey van Gogh) using idea of AsyncSequence (by Tomas Petricek)
1:
2:
3:
4:
5:
6:
7:
8:
9:
10:
11:
12:
13:
14:
15:
16:
17:
18:
19:
20:
21:
22:
23:
24:
25:
26:
27:
28:
29:
30:
31:
32:
33:
34:
35:
36:
37:
38:
39:
40:
41:
42:
43:
44:
45:
46:
47:
48:
49:
50:
51:
52:
53:
54:
55:
56:
57:
58:
59:
60:
61:
62:
63:
64:
65:
66:
67:
68:
69:
70:
71:
72:
73:
74:
75:
76:
77:
78:
79:
80:
81:
82:
83:
84:
85:
86:
87:
88:
89:
90:
91:
92:
93:
94:
95:
96:
97:
98:
99:
100:
101:
102:
103:
104:
105:
106:
107:
108:
109:
110:
111:
112:
113:
114:
|
type AsyncSeq<'T> = Async<Chunk<'T>>
and Chunk<'T> =
| Done
| Value of 'T * AsyncSeq<'T>
module AsyncSeq =
let map f (seq : AsyncSeq<_>) : AsyncSeq<_> =
let rec doMap s = async {
let! chunk = s
match chunk with
| Done -> return Done
| Value (v, next) -> return Value (f v, doMap next)
}
doMap seq
let run action (seq : AsyncSeq<_>) =
let rec doRun s = async {
let! chunk = s
match chunk with
| Done -> return ()
| Value (v, next) -> action v; return! doRun next
}
doRun seq
let filter f (seq : AsyncSeq<_>) : AsyncSeq<_> =
let rec doFilter s = async {
let! chunk = s
match chunk with
| Value (value, next) ->
if f value then return Value(value, doFilter next)
else return! doFilter next
| x -> return x
}
doFilter seq
[<AutoOpen>]
module AsyncSeqExtensions =
open System.Text
type System.IO.Stream with
member this.AsyncReadSeq(?bufferSize) : AsyncSeq<byte[]> =
let bufferSize = defaultArg bufferSize (2 <<< 16)
let temp : byte[] = Array.zeroCreate bufferSize
let rec doRead () = async {
let! count = this.AsyncRead(temp, 0, bufferSize)
if count = 0 then return Done
else
let buf = Array.sub temp 0 count
return Value(buf, doRead ())
}
doRead ()
member this.AsyncReadLines(?bufferSize) =
let sb = StringBuilder()
let getText = AsyncSeq.map Encoding.UTF8.GetString
let rec doRead (s : AsyncSeq<string>) = async {
let! chunk = s
match chunk with
| Done ->
if sb.Length <> 0 then return Value(sb.ToString(), async.Return Done )
else return Done
| Value(part, next) ->
return! doProcess part 0 next
}
and doProcess (text : string) n next = async {
let (|Chars|) pos =
if pos < text.Length - 1 then text.[pos], Some (text.[pos + 1])
else text.[pos], None
let getLine newPos =
let line = sb.ToString()
sb.Length <- 0
Some (line, newPos)
let rec run n =
if n >= text.Length then None
else
match n with
| Chars ('\r', Some '\n') -> getLine (n + 2)
| Chars ('\r', _)
| Chars ('\n', _) -> getLine (n + 1)
| Chars (c, _) ->
sb.Append(c) |> ignore
run(n + 1)
match run n with
| Some (line, pos) -> return Value (line, doProcess text pos next)
| None -> return! doRead next
}
this.AsyncReadSeq(?bufferSize=bufferSize)
|> AsyncSeq.map Encoding.UTF8.GetString
|> doRead
member this.AsyncWriteSeq(seq : AsyncSeq<byte[]>) =
let rec run s = async {
let! chunk = s
match chunk with
| Done -> return ()
| Value(data, next) ->
do! this.AsyncWrite(data)
return! run next
}
run seq
open System.IO
let printWithPrefix path prefix = async {
use f= File.Open(path, FileMode.Open)
do! f.AsyncReadLines()
|> AsyncSeq.map (sprintf "%s: %s" prefix)
|> AsyncSeq.run (printfn "%s")
}
|
Multiple items
type Async
static member AsBeginEnd : computation:('Arg -> Async<'T>) -> ('Arg * AsyncCallback * obj -> IAsyncResult) * (IAsyncResult -> 'T) * (IAsyncResult -> unit)
static member AwaitEvent : event:IEvent<'Del,'T> * ?cancelAction:(unit -> unit) -> Async<'T> (requires delegate and 'Del :> Delegate)
static member AwaitIAsyncResult : iar:IAsyncResult * ?millisecondsTimeout:int -> Async<bool>
static member AwaitTask : task:Task<'T> -> Async<'T>
static member AwaitWaitHandle : waitHandle:WaitHandle * ?millisecondsTimeout:int -> Async<bool>
static member CancelDefaultToken : unit -> unit
static member Catch : computation:Async<'T> -> Async<Choice<'T,exn>>
static member FromBeginEnd : beginAction:(AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg:'Arg1 * beginAction:('Arg1 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg1:'Arg1 * arg2:'Arg2 * beginAction:('Arg1 * 'Arg2 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg1:'Arg1 * arg2:'Arg2 * arg3:'Arg3 * beginAction:('Arg1 * 'Arg2 * 'Arg3 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromContinuations : callback:(('T -> unit) * (exn -> unit) * (OperationCanceledException -> unit) -> unit) -> Async<'T>
static member Ignore : computation:Async<'T> -> Async<unit>
static member OnCancel : interruption:(unit -> unit) -> Async<IDisposable>
static member Parallel : computations:seq<Async<'T>> -> Async<'T []>
static member RunSynchronously : computation:Async<'T> * ?timeout:int * ?cancellationToken:CancellationToken -> 'T
static member Sleep : millisecondsDueTime:int -> Async<unit>
static member Start : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
static member StartAsTask : computation:Async<'T> * ?taskCreationOptions:TaskCreationOptions * ?cancellationToken:CancellationToken -> Task<'T>
static member StartChild : computation:Async<'T> * ?millisecondsTimeout:int -> Async<Async<'T>>
static member StartChildAsTask : computation:Async<'T> * ?taskCreationOptions:TaskCreationOptions -> Async<Task<'T>>
static member StartImmediate : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
static member StartWithContinuations : computation:Async<'T> * continuation:('T -> unit) * exceptionContinuation:(exn -> unit) * cancellationContinuation:(OperationCanceledException -> unit) * ?cancellationToken:CancellationToken -> unit
static member SwitchToContext : syncContext:SynchronizationContext -> Async<unit>
static member SwitchToNewThread : unit -> Async<unit>
static member SwitchToThreadPool : unit -> Async<unit>
static member TryCancelled : computation:Async<'T> * compensation:(OperationCanceledException -> unit) -> Async<'T>
static member CancellationToken : Async<CancellationToken>
static member DefaultCancellationToken : CancellationToken
Full name: Microsoft.FSharp.Control.Async
--------------------
type Async<'T>
Full name: Microsoft.FSharp.Control.Async<_>
union case Chunk.Done: Chunk<'T>
union case Chunk.Value: 'T * AsyncSeq<'T> -> Chunk<'T>
type AsyncSeq<'T> = Async<Chunk<'T>>
Full name: Script.AsyncSeq<_>
val map : f:('a -> 'b) -> seq:AsyncSeq<'a> -> AsyncSeq<'b>
Full name: Script.AsyncSeq.map
val f : ('a -> 'b)
Multiple items
val seq : AsyncSeq<'a>
--------------------
type seq<'T> = System.Collections.Generic.IEnumerable<'T>
Full name: Microsoft.FSharp.Collections.seq<_>
val doMap : (Async<Chunk<'a>> -> Async<Chunk<'b>>)
val s : Async<Chunk<'a>>
val async : AsyncBuilder
Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.async
val chunk : Chunk<'a>
val v : 'a
val next : AsyncSeq<'a>
val run : action:('a -> unit) -> seq:AsyncSeq<'a> -> Async<unit>
Full name: Script.AsyncSeq.run
val action : ('a -> unit)
val doRun : (Async<Chunk<'a>> -> Async<unit>)
val filter : f:('a -> bool) -> seq:AsyncSeq<'a> -> AsyncSeq<'a>
Full name: Script.AsyncSeq.filter
val f : ('a -> bool)
val doFilter : (Async<Chunk<'a>> -> Async<Chunk<'a>>)
val value : 'a
val x : Chunk<'a>
Multiple items
type AutoOpenAttribute =
inherit Attribute
new : unit -> AutoOpenAttribute
new : path:string -> AutoOpenAttribute
member Path : string
Full name: Microsoft.FSharp.Core.AutoOpenAttribute
--------------------
new : unit -> AutoOpenAttribute
new : path:string -> AutoOpenAttribute
namespace System
namespace System.Text
namespace System.IO
type Stream =
inherit MarshalByRefObject
member BeginRead : buffer:byte[] * offset:int * count:int * callback:AsyncCallback * state:obj -> IAsyncResult
member BeginWrite : buffer:byte[] * offset:int * count:int * callback:AsyncCallback * state:obj -> IAsyncResult
member CanRead : bool
member CanSeek : bool
member CanTimeout : bool
member CanWrite : bool
member Close : unit -> unit
member CopyTo : destination:Stream -> unit + 1 overload
member Dispose : unit -> unit
member EndRead : asyncResult:IAsyncResult -> int
...
Full name: System.IO.Stream
val this : System.IO.Stream
member System.IO.Stream.AsyncReadSeq : ?bufferSize:int -> AsyncSeq<byte []>
Full name: Script.AsyncSeqExtensions.AsyncReadSeq
val bufferSize : int option
Multiple items
module AsyncSeq
from Script
--------------------
type AsyncSeq<'T> = Async<Chunk<'T>>
Full name: Script.AsyncSeq<_>
Multiple items
val byte : value:'T -> byte (requires member op_Explicit)
Full name: Microsoft.FSharp.Core.Operators.byte
--------------------
type byte = System.Byte
Full name: Microsoft.FSharp.Core.byte
val bufferSize : int
val defaultArg : arg:'T option -> defaultValue:'T -> 'T
Full name: Microsoft.FSharp.Core.Operators.defaultArg
val temp : byte []
module Array
from Microsoft.FSharp.Collections
val zeroCreate : count:int -> 'T []
Full name: Microsoft.FSharp.Collections.Array.zeroCreate
val doRead : (unit -> Async<Chunk<byte []>>)
val count : int
member System.IO.Stream.AsyncRead : count:int -> Async<byte []>
member System.IO.Stream.AsyncRead : buffer:byte [] * ?offset:int * ?count:int -> Async<int>
val buf : byte []
val sub : array:'T [] -> startIndex:int -> count:int -> 'T []
Full name: Microsoft.FSharp.Collections.Array.sub
member System.IO.Stream.AsyncReadLines : ?bufferSize:int -> Async<Chunk<string>>
Full name: Script.AsyncSeqExtensions.AsyncReadLines
val sb : StringBuilder
Multiple items
type StringBuilder =
new : unit -> StringBuilder + 5 overloads
member Append : value:string -> StringBuilder + 18 overloads
member AppendFormat : format:string * arg0:obj -> StringBuilder + 4 overloads
member AppendLine : unit -> StringBuilder + 1 overload
member Capacity : int with get, set
member Chars : int -> char with get, set
member Clear : unit -> StringBuilder
member CopyTo : sourceIndex:int * destination:char[] * destinationIndex:int * count:int -> unit
member EnsureCapacity : capacity:int -> int
member Equals : sb:StringBuilder -> bool
...
Full name: System.Text.StringBuilder
--------------------
StringBuilder() : unit
StringBuilder(capacity: int) : unit
StringBuilder(value: string) : unit
StringBuilder(value: string, capacity: int) : unit
StringBuilder(capacity: int, maxCapacity: int) : unit
StringBuilder(value: string, startIndex: int, length: int, capacity: int) : unit
val getText : (AsyncSeq<byte []> -> AsyncSeq<string>)
type Encoding =
member BodyName : string
member Clone : unit -> obj
member CodePage : int
member DecoderFallback : DecoderFallback with get, set
member EncoderFallback : EncoderFallback with get, set
member EncodingName : string
member Equals : value:obj -> bool
member GetByteCount : chars:char[] -> int + 3 overloads
member GetBytes : chars:char[] -> byte[] + 5 overloads
member GetCharCount : bytes:byte[] -> int + 2 overloads
...
Full name: System.Text.Encoding
property Encoding.UTF8: Encoding
Encoding.GetString(bytes: byte []) : string
Encoding.GetString(bytes: byte [], index: int, count: int) : string
val doRead : (AsyncSeq<string> -> Async<Chunk<string>>)
val s : AsyncSeq<string>
Multiple items
val string : value:'T -> string
Full name: Microsoft.FSharp.Core.Operators.string
--------------------
type string = System.String
Full name: Microsoft.FSharp.Core.string
val chunk : Chunk<string>
property StringBuilder.Length: int
StringBuilder.ToString() : string
StringBuilder.ToString(startIndex: int, length: int) : string
member AsyncBuilder.Return : value:'T -> Async<'T>
val part : string
val next : AsyncSeq<string>
val doProcess : (string -> int -> AsyncSeq<string> -> Async<Chunk<string>>)
val text : string
val n : int
val pos : int
property System.String.Length: int
union case Option.Some: Value: 'T -> Option<'T>
union case Option.None: Option<'T>
val getLine : ('a -> (string * 'a) option)
val newPos : 'a
val line : string
val run : (int -> (string * int) option)
active recognizer Chars: int -> char * char option
val c : char
StringBuilder.Append(value: char []) : StringBuilder
(+0 other overloads)
StringBuilder.Append(value: obj) : StringBuilder
(+0 other overloads)
StringBuilder.Append(value: uint64) : StringBuilder
(+0 other overloads)
StringBuilder.Append(value: uint32) : StringBuilder
(+0 other overloads)
StringBuilder.Append(value: uint16) : StringBuilder
(+0 other overloads)
StringBuilder.Append(value: decimal) : StringBuilder
(+0 other overloads)
StringBuilder.Append(value: float) : StringBuilder
(+0 other overloads)
StringBuilder.Append(value: float32) : StringBuilder
(+0 other overloads)
StringBuilder.Append(value: int64) : StringBuilder
(+0 other overloads)
StringBuilder.Append(value: int) : StringBuilder
(+0 other overloads)
val ignore : value:'T -> unit
Full name: Microsoft.FSharp.Core.Operators.ignore
member System.IO.Stream.AsyncReadSeq : ?bufferSize:int -> AsyncSeq<byte []>
member System.IO.Stream.AsyncWriteSeq : seq:AsyncSeq<byte []> -> Async<unit>
Full name: Script.AsyncSeqExtensions.AsyncWriteSeq
Multiple items
val seq : AsyncSeq<byte []>
--------------------
type seq<'T> = System.Collections.Generic.IEnumerable<'T>
Full name: Microsoft.FSharp.Collections.seq<_>
val run : (Async<Chunk<byte []>> -> Async<unit>)
val s : Async<Chunk<byte []>>
val chunk : Chunk<byte []>
val data : byte []
val next : AsyncSeq<byte []>
member System.IO.Stream.AsyncWrite : buffer:byte [] * ?offset:int * ?count:int -> Async<unit>
val printWithPrefix : path:string -> prefix:string -> Async<unit>
Full name: Script.printWithPrefix
val path : string
val prefix : string
val f : FileStream
type File =
static member AppendAllLines : path:string * contents:IEnumerable<string> -> unit + 1 overload
static member AppendAllText : path:string * contents:string -> unit + 1 overload
static member AppendText : path:string -> StreamWriter
static member Copy : sourceFileName:string * destFileName:string -> unit + 1 overload
static member Create : path:string -> FileStream + 3 overloads
static member CreateText : path:string -> StreamWriter
static member Decrypt : path:string -> unit
static member Delete : path:string -> unit
static member Encrypt : path:string -> unit
static member Exists : path:string -> bool
...
Full name: System.IO.File
File.Open(path: string, mode: FileMode) : FileStream
File.Open(path: string, mode: FileMode, access: FileAccess) : FileStream
File.Open(path: string, mode: FileMode, access: FileAccess, share: FileShare) : FileStream
type FileMode =
| CreateNew = 1
| Create = 2
| Open = 3
| OpenOrCreate = 4
| Truncate = 5
| Append = 6
Full name: System.IO.FileMode
field FileMode.Open = 3
member Stream.AsyncReadLines : ?bufferSize:int -> Async<Chunk<string>>
val sprintf : format:Printf.StringFormat<'T> -> 'T
Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.sprintf
val printfn : format:Printf.TextWriterFormat<'T> -> 'T
Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.printfn
More information