26 people like it.

Playing with async sequences

Attempt to reimplement functions AsyncRead/AsyncReadLines from 'Rx on the server ' articles (by Jeffrey van Gogh) using idea of AsyncSequence (by Tomas Petricek)

  1: 
  2: 
  3: 
  4: 
  5: 
  6: 
  7: 
  8: 
  9: 
 10: 
 11: 
 12: 
 13: 
 14: 
 15: 
 16: 
 17: 
 18: 
 19: 
 20: 
 21: 
 22: 
 23: 
 24: 
 25: 
 26: 
 27: 
 28: 
 29: 
 30: 
 31: 
 32: 
 33: 
 34: 
 35: 
 36: 
 37: 
 38: 
 39: 
 40: 
 41: 
 42: 
 43: 
 44: 
 45: 
 46: 
 47: 
 48: 
 49: 
 50: 
 51: 
 52: 
 53: 
 54: 
 55: 
 56: 
 57: 
 58: 
 59: 
 60: 
 61: 
 62: 
 63: 
 64: 
 65: 
 66: 
 67: 
 68: 
 69: 
 70: 
 71: 
 72: 
 73: 
 74: 
 75: 
 76: 
 77: 
 78: 
 79: 
 80: 
 81: 
 82: 
 83: 
 84: 
 85: 
 86: 
 87: 
 88: 
 89: 
 90: 
 91: 
 92: 
 93: 
 94: 
 95: 
 96: 
 97: 
 98: 
 99: 
100: 
101: 
102: 
103: 
104: 
105: 
106: 
107: 
108: 
109: 
110: 
111: 
112: 
113: 
114: 
type AsyncSeq<'T> = Async<Chunk<'T>> 
and Chunk<'T> = 
    | Done
    | Value of 'T * AsyncSeq<'T>
 
module AsyncSeq =
    let map f (seq : AsyncSeq<_>) : AsyncSeq<_> = 
        let rec doMap s = async {
            let! chunk = s
            match chunk with
            | Done -> return Done
            | Value (v, next) -> return Value (f v, doMap next)
            }
        doMap seq
    
    let run action (seq : AsyncSeq<_>) = 
        let rec doRun s = async {
            let! chunk = s
            match chunk with
            | Done -> return ()
            | Value (v, next) -> action v; return! doRun next
            }
        doRun seq

    let filter f (seq : AsyncSeq<_>) : AsyncSeq<_> =
        let rec doFilter s = async { 
            let! chunk = s
            match chunk with
            | Value (value, next) -> 
                if f value then return Value(value, doFilter next)
                else return! doFilter next
            | x -> return x
            }
        doFilter seq

[<AutoOpen>]
module AsyncSeqExtensions = 
    
    open System.Text

    type System.IO.Stream with
        member this.AsyncReadSeq(?bufferSize) : AsyncSeq<byte[]> = 
            let bufferSize = defaultArg bufferSize (2 <<< 16)
            let temp : byte[] = Array.zeroCreate bufferSize
            let rec doRead () = async {
                let! count = this.AsyncRead(temp, 0, bufferSize)
                if count = 0 then return Done
                else 
                    let buf = Array.sub temp 0 count
                    return Value(buf, doRead ())
                }
            doRead ()
        
        member this.AsyncReadLines(?bufferSize) = 
            let sb = StringBuilder()
            let getText = AsyncSeq.map Encoding.UTF8.GetString

            let rec doRead (s : AsyncSeq<string>) = async {
                let! chunk = s
                match chunk with
                | Done -> 
                    if sb.Length <> 0 then return Value(sb.ToString(), async.Return Done )
                    else return Done
                | Value(part, next) -> 
                    return! doProcess part 0 next
                }
            and doProcess (text : string) n next = async {
                let (|Chars|) pos = 
                    if pos < text.Length - 1 then text.[pos], Some (text.[pos + 1])
                    else text.[pos], None
                
                let getLine newPos = 
                    let line = sb.ToString()
                    sb.Length <- 0
                    Some (line, newPos)

                let rec run n =
                    if n >= text.Length then None
                    else 
                        match n with
                        | Chars ('\r', Some '\n') -> getLine (n + 2)
                        | Chars ('\r', _) 
                        | Chars ('\n', _) -> getLine (n + 1)
                        | Chars (c, _) -> 
                            sb.Append(c) |> ignore
                            run(n + 1)
                match run n with
                | Some (line, pos) -> return Value (line, doProcess text pos next)
                | None -> return! doRead next
                }
                
            this.AsyncReadSeq(?bufferSize=bufferSize)
                |> AsyncSeq.map Encoding.UTF8.GetString
                |> doRead
        
        member this.AsyncWriteSeq(seq : AsyncSeq<byte[]>) = 
            let rec run s = async {
                let! chunk = s
                match chunk with
                | Done -> return ()
                | Value(data, next) ->
                    do! this.AsyncWrite(data)
                    return! run next
                }
            run seq       

open System.IO

let printWithPrefix path prefix = async {
    use f= File.Open(path, FileMode.Open)
    do! f.AsyncReadLines()
        |> AsyncSeq.map (sprintf "%s: %s" prefix)
        |> AsyncSeq.run (printfn "%s")
    }
Multiple items
type Async
static member AsBeginEnd : computation:('Arg -> Async<'T>) -> ('Arg * AsyncCallback * obj -> IAsyncResult) * (IAsyncResult -> 'T) * (IAsyncResult -> unit)
static member AwaitEvent : event:IEvent<'Del,'T> * ?cancelAction:(unit -> unit) -> Async<'T> (requires delegate and 'Del :> Delegate)
static member AwaitIAsyncResult : iar:IAsyncResult * ?millisecondsTimeout:int -> Async<bool>
static member AwaitTask : task:Task<'T> -> Async<'T>
static member AwaitWaitHandle : waitHandle:WaitHandle * ?millisecondsTimeout:int -> Async<bool>
static member CancelDefaultToken : unit -> unit
static member Catch : computation:Async<'T> -> Async<Choice<'T,exn>>
static member FromBeginEnd : beginAction:(AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg:'Arg1 * beginAction:('Arg1 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg1:'Arg1 * arg2:'Arg2 * beginAction:('Arg1 * 'Arg2 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg1:'Arg1 * arg2:'Arg2 * arg3:'Arg3 * beginAction:('Arg1 * 'Arg2 * 'Arg3 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromContinuations : callback:(('T -> unit) * (exn -> unit) * (OperationCanceledException -> unit) -> unit) -> Async<'T>
static member Ignore : computation:Async<'T> -> Async<unit>
static member OnCancel : interruption:(unit -> unit) -> Async<IDisposable>
static member Parallel : computations:seq<Async<'T>> -> Async<'T []>
static member RunSynchronously : computation:Async<'T> * ?timeout:int * ?cancellationToken:CancellationToken -> 'T
static member Sleep : millisecondsDueTime:int -> Async<unit>
static member Start : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
static member StartAsTask : computation:Async<'T> * ?taskCreationOptions:TaskCreationOptions * ?cancellationToken:CancellationToken -> Task<'T>
static member StartChild : computation:Async<'T> * ?millisecondsTimeout:int -> Async<Async<'T>>
static member StartChildAsTask : computation:Async<'T> * ?taskCreationOptions:TaskCreationOptions -> Async<Task<'T>>
static member StartImmediate : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
static member StartWithContinuations : computation:Async<'T> * continuation:('T -> unit) * exceptionContinuation:(exn -> unit) * cancellationContinuation:(OperationCanceledException -> unit) * ?cancellationToken:CancellationToken -> unit
static member SwitchToContext : syncContext:SynchronizationContext -> Async<unit>
static member SwitchToNewThread : unit -> Async<unit>
static member SwitchToThreadPool : unit -> Async<unit>
static member TryCancelled : computation:Async<'T> * compensation:(OperationCanceledException -> unit) -> Async<'T>
static member CancellationToken : Async<CancellationToken>
static member DefaultCancellationToken : CancellationToken

Full name: Microsoft.FSharp.Control.Async

--------------------
type Async<'T>

Full name: Microsoft.FSharp.Control.Async<_>
union case Chunk.Done: Chunk<'T>
union case Chunk.Value: 'T * AsyncSeq<'T> -> Chunk<'T>
type AsyncSeq<'T> = Async<Chunk<'T>>

Full name: Script.AsyncSeq<_>
val map : f:('a -> 'b) -> seq:AsyncSeq<'a> -> AsyncSeq<'b>

Full name: Script.AsyncSeq.map
val f : ('a -> 'b)
Multiple items
val seq : AsyncSeq<'a>

--------------------
type seq<'T> = System.Collections.Generic.IEnumerable<'T>

Full name: Microsoft.FSharp.Collections.seq<_>
val doMap : (Async<Chunk<'a>> -> Async<Chunk<'b>>)
val s : Async<Chunk<'a>>
val async : AsyncBuilder

Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.async
val chunk : Chunk<'a>
val v : 'a
val next : AsyncSeq<'a>
val run : action:('a -> unit) -> seq:AsyncSeq<'a> -> Async<unit>

Full name: Script.AsyncSeq.run
val action : ('a -> unit)
val doRun : (Async<Chunk<'a>> -> Async<unit>)
val filter : f:('a -> bool) -> seq:AsyncSeq<'a> -> AsyncSeq<'a>

Full name: Script.AsyncSeq.filter
val f : ('a -> bool)
val doFilter : (Async<Chunk<'a>> -> Async<Chunk<'a>>)
val value : 'a
val x : Chunk<'a>
Multiple items
type AutoOpenAttribute =
  inherit Attribute
  new : unit -> AutoOpenAttribute
  new : path:string -> AutoOpenAttribute
  member Path : string

Full name: Microsoft.FSharp.Core.AutoOpenAttribute

--------------------
new : unit -> AutoOpenAttribute
new : path:string -> AutoOpenAttribute
namespace System
namespace System.Text
namespace System.IO
type Stream =
  inherit MarshalByRefObject
  member BeginRead : buffer:byte[] * offset:int * count:int * callback:AsyncCallback * state:obj -> IAsyncResult
  member BeginWrite : buffer:byte[] * offset:int * count:int * callback:AsyncCallback * state:obj -> IAsyncResult
  member CanRead : bool
  member CanSeek : bool
  member CanTimeout : bool
  member CanWrite : bool
  member Close : unit -> unit
  member CopyTo : destination:Stream -> unit + 1 overload
  member Dispose : unit -> unit
  member EndRead : asyncResult:IAsyncResult -> int
  ...

Full name: System.IO.Stream
val this : System.IO.Stream
member System.IO.Stream.AsyncReadSeq : ?bufferSize:int -> AsyncSeq<byte []>

Full name: Script.AsyncSeqExtensions.AsyncReadSeq
val bufferSize : int option
Multiple items
module AsyncSeq

from Script

--------------------
type AsyncSeq<'T> = Async<Chunk<'T>>

Full name: Script.AsyncSeq<_>
Multiple items
val byte : value:'T -> byte (requires member op_Explicit)

Full name: Microsoft.FSharp.Core.Operators.byte

--------------------
type byte = System.Byte

Full name: Microsoft.FSharp.Core.byte
val bufferSize : int
val defaultArg : arg:'T option -> defaultValue:'T -> 'T

Full name: Microsoft.FSharp.Core.Operators.defaultArg
val temp : byte []
module Array

from Microsoft.FSharp.Collections
val zeroCreate : count:int -> 'T []

Full name: Microsoft.FSharp.Collections.Array.zeroCreate
val doRead : (unit -> Async<Chunk<byte []>>)
val count : int
member System.IO.Stream.AsyncRead : count:int -> Async<byte []>
member System.IO.Stream.AsyncRead : buffer:byte [] * ?offset:int * ?count:int -> Async<int>
val buf : byte []
val sub : array:'T [] -> startIndex:int -> count:int -> 'T []

Full name: Microsoft.FSharp.Collections.Array.sub
member System.IO.Stream.AsyncReadLines : ?bufferSize:int -> Async<Chunk<string>>

Full name: Script.AsyncSeqExtensions.AsyncReadLines
val sb : StringBuilder
Multiple items
type StringBuilder =
  new : unit -> StringBuilder + 5 overloads
  member Append : value:string -> StringBuilder + 18 overloads
  member AppendFormat : format:string * arg0:obj -> StringBuilder + 4 overloads
  member AppendLine : unit -> StringBuilder + 1 overload
  member Capacity : int with get, set
  member Chars : int -> char with get, set
  member Clear : unit -> StringBuilder
  member CopyTo : sourceIndex:int * destination:char[] * destinationIndex:int * count:int -> unit
  member EnsureCapacity : capacity:int -> int
  member Equals : sb:StringBuilder -> bool
  ...

Full name: System.Text.StringBuilder

--------------------
StringBuilder() : unit
StringBuilder(capacity: int) : unit
StringBuilder(value: string) : unit
StringBuilder(value: string, capacity: int) : unit
StringBuilder(capacity: int, maxCapacity: int) : unit
StringBuilder(value: string, startIndex: int, length: int, capacity: int) : unit
val getText : (AsyncSeq<byte []> -> AsyncSeq<string>)
type Encoding =
  member BodyName : string
  member Clone : unit -> obj
  member CodePage : int
  member DecoderFallback : DecoderFallback with get, set
  member EncoderFallback : EncoderFallback with get, set
  member EncodingName : string
  member Equals : value:obj -> bool
  member GetByteCount : chars:char[] -> int + 3 overloads
  member GetBytes : chars:char[] -> byte[] + 5 overloads
  member GetCharCount : bytes:byte[] -> int + 2 overloads
  ...

Full name: System.Text.Encoding
property Encoding.UTF8: Encoding
Encoding.GetString(bytes: byte []) : string
Encoding.GetString(bytes: byte [], index: int, count: int) : string
val doRead : (AsyncSeq<string> -> Async<Chunk<string>>)
val s : AsyncSeq<string>
Multiple items
val string : value:'T -> string

Full name: Microsoft.FSharp.Core.Operators.string

--------------------
type string = System.String

Full name: Microsoft.FSharp.Core.string
val chunk : Chunk<string>
property StringBuilder.Length: int
StringBuilder.ToString() : string
StringBuilder.ToString(startIndex: int, length: int) : string
member AsyncBuilder.Return : value:'T -> Async<'T>
val part : string
val next : AsyncSeq<string>
val doProcess : (string -> int -> AsyncSeq<string> -> Async<Chunk<string>>)
val text : string
val n : int
val pos : int
property System.String.Length: int
union case Option.Some: Value: 'T -> Option<'T>
union case Option.None: Option<'T>
val getLine : ('a -> (string * 'a) option)
val newPos : 'a
val line : string
val run : (int -> (string * int) option)
active recognizer Chars: int -> char * char option
val c : char
StringBuilder.Append(value: char []) : StringBuilder
   (+0 other overloads)
StringBuilder.Append(value: obj) : StringBuilder
   (+0 other overloads)
StringBuilder.Append(value: uint64) : StringBuilder
   (+0 other overloads)
StringBuilder.Append(value: uint32) : StringBuilder
   (+0 other overloads)
StringBuilder.Append(value: uint16) : StringBuilder
   (+0 other overloads)
StringBuilder.Append(value: decimal) : StringBuilder
   (+0 other overloads)
StringBuilder.Append(value: float) : StringBuilder
   (+0 other overloads)
StringBuilder.Append(value: float32) : StringBuilder
   (+0 other overloads)
StringBuilder.Append(value: int64) : StringBuilder
   (+0 other overloads)
StringBuilder.Append(value: int) : StringBuilder
   (+0 other overloads)
val ignore : value:'T -> unit

Full name: Microsoft.FSharp.Core.Operators.ignore
member System.IO.Stream.AsyncReadSeq : ?bufferSize:int -> AsyncSeq<byte []>
member System.IO.Stream.AsyncWriteSeq : seq:AsyncSeq<byte []> -> Async<unit>

Full name: Script.AsyncSeqExtensions.AsyncWriteSeq
Multiple items
val seq : AsyncSeq<byte []>

--------------------
type seq<'T> = System.Collections.Generic.IEnumerable<'T>

Full name: Microsoft.FSharp.Collections.seq<_>
val run : (Async<Chunk<byte []>> -> Async<unit>)
val s : Async<Chunk<byte []>>
val chunk : Chunk<byte []>
val data : byte []
val next : AsyncSeq<byte []>
member System.IO.Stream.AsyncWrite : buffer:byte [] * ?offset:int * ?count:int -> Async<unit>
val printWithPrefix : path:string -> prefix:string -> Async<unit>

Full name: Script.printWithPrefix
val path : string
val prefix : string
val f : FileStream
type File =
  static member AppendAllLines : path:string * contents:IEnumerable<string> -> unit + 1 overload
  static member AppendAllText : path:string * contents:string -> unit + 1 overload
  static member AppendText : path:string -> StreamWriter
  static member Copy : sourceFileName:string * destFileName:string -> unit + 1 overload
  static member Create : path:string -> FileStream + 3 overloads
  static member CreateText : path:string -> StreamWriter
  static member Decrypt : path:string -> unit
  static member Delete : path:string -> unit
  static member Encrypt : path:string -> unit
  static member Exists : path:string -> bool
  ...

Full name: System.IO.File
File.Open(path: string, mode: FileMode) : FileStream
File.Open(path: string, mode: FileMode, access: FileAccess) : FileStream
File.Open(path: string, mode: FileMode, access: FileAccess, share: FileShare) : FileStream
type FileMode =
  | CreateNew = 1
  | Create = 2
  | Open = 3
  | OpenOrCreate = 4
  | Truncate = 5
  | Append = 6

Full name: System.IO.FileMode
field FileMode.Open = 3
member Stream.AsyncReadLines : ?bufferSize:int -> Async<Chunk<string>>
val sprintf : format:Printf.StringFormat<'T> -> 'T

Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.sprintf
val printfn : format:Printf.TextWriterFormat<'T> -> 'T

Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.printfn
Raw view New version

More information

Link:http://fssnip.net/1Y
Posted:5 years ago
Author:Vladimir Matveev
Tags: F# , Async , Async sequences