Asynchronous sequences

An asynchronous sequence is similar to the seq<T> type, but the elements of the sequence are generated asynchronously without blocking the caller as in Async<T>. This snippet declares asynchronous sequence and uses it to compare two files in 1k blocks.

Copy Source
Copy Link
Tools:

Declaration of asynchronous sequence

1: open System.IO
2: 
3: /// Represents a sequence of values 'T where items 
4: /// are generated asynchronously on-demand
5: type AsyncSeq<'T> = Async<AsyncSeqInner<'T>> 
6: and AsyncSeqInner<'T> =
7:   | Ended
8:   | Item of 'T * AsyncSeq<'T>

Using asynchronous sequences

 1: /// Read file 'fn' in blocks of size 'size'
 2: /// (returns on-demand asynchronous sequence)
 3: let readInBlocks fn size = async {
 4:   let stream = File.OpenRead(fn)
 5:   let buffer = Array.zeroCreate size
 6:   
 7:   /// Returns next block as 'Item' of async seq
 8:   let rec nextBlock() = async {
 9:     let! count = stream.AsyncRead(buffer, 0, size)
10:     if count = 0 then return Ended
11:     else 
12:       // Create buffer with the right size
13:       let res = 
14:         if count = size then buffer
15:         else buffer |> Seq.take count |> Array.ofSeq
16:       return Item(res, nextBlock()) }
17: 
18:   return! nextBlock() }
19: 
20: /// Asynchronous function that compares two asynchronous sequences
21: /// item by item. If an item doesn't match, 'false' is returned
22: /// immediately without generating the rest of the sequence. If the
23: /// lengths don't match, exception is thrown.
24: let rec compareAsyncSeqs seq1 seq2 = async {
25:   let! item1 = seq1
26:   let! item2 = seq2
27:   match item1, item2 with 
28:   | Item(b1, ns1), Item(b2, ns2) when b1 <> b2 -> return false
29:   | Item(b1, ns1), Item(b2, ns2) -> return! compareAsyncSeqs ns1 ns2
30:   | Ended, Ended -> return true
31:   | _ -> return failwith "Size doesn't match" }
32: 
33: /// Compare two files using 1k blocks
34: let s1 = readInBlocks "f1" 1000
35: let s2 = readInBlocks "f2" 1000
36: compareAsyncSeqs s1 s2
namespace System
namespace System.IO
type AsyncSeq<'T> = Async<AsyncSeqInner<'T>>

Full name: Test.AsyncSeq<_>

Represents a sequence of values 'T where items
 are generated asynchronously on-demand

Multiple items
type Async<'T>

Full name: Microsoft.FSharp.Control.Async<_>

--------------------

type Async
with
  static member AsBeginEnd : computation:('Arg -> Async<'T>) -> ('Arg * System.AsyncCallback * obj -> System.IAsyncResult) * (System.IAsyncResult -> 'T) * (System.IAsyncResult -> unit)
  static member AwaitEvent : event:IEvent<'Del,'T> * ?cancelAction:(unit -> unit) -> Async<'T> (requires delegate and 'Del :> System.Delegate)
  static member AwaitIAsyncResult : iar:System.IAsyncResult * ?millisecondsTimeout:int -> Async<bool>
  static member AwaitTask : task:System.Threading.Tasks.Task<'T> -> Async<'T>
  static member AwaitWaitHandle : waitHandle:System.Threading.WaitHandle * ?millisecondsTimeout:int -> Async<bool>
  static member CancelDefaultToken : unit -> unit
  static member Catch : computation:Async<'T> -> Async<Choice<'T,exn>>
  static member FromBeginEnd : beginAction:(System.AsyncCallback * obj -> System.IAsyncResult) * endAction:(System.IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
  static member FromBeginEnd : arg:'Arg1 * beginAction:('Arg1 * System.AsyncCallback * obj -> System.IAsyncResult) * endAction:(System.IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
  static member FromBeginEnd : arg1:'Arg1 * arg2:'Arg2 * beginAction:('Arg1 * 'Arg2 * System.AsyncCallback * obj -> System.IAsyncResult) * endAction:(System.IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
  static member FromBeginEnd : arg1:'Arg1 * arg2:'Arg2 * arg3:'Arg3 * beginAction:('Arg1 * 'Arg2 * 'Arg3 * System.AsyncCallback * obj -> System.IAsyncResult) * endAction:(System.IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
  static member FromContinuations : callback:(('T -> unit) * (exn -> unit) * (System.OperationCanceledException -> unit) -> unit) -> Async<'T>
  static member Ignore : computation:Async<'T> -> Async<unit>
  static member OnCancel : interruption:(unit -> unit) -> Async<System.IDisposable>
  static member Parallel : computations:seq<Async<'T>> -> Async<'T []>
  static member RunSynchronously : computation:Async<'T> * ?timeout:int * ?cancellationToken:System.Threading.CancellationToken -> 'T
  static member Sleep : millisecondsDueTime:int -> Async<unit>
  static member Start : computation:Async<unit> * ?cancellationToken:System.Threading.CancellationToken -> unit
  static member StartAsTask : computation:Async<'T> * ?taskCreationOptions:System.Threading.Tasks.TaskCreationOptions * ?cancellationToken:System.Threading.CancellationToken -> System.Threading.Tasks.Task<'T>
  static member StartChild : computation:Async<'T> * ?millisecondsTimeout:int -> Async<Async<'T>>
  static member StartChildAsTask : computation:Async<'T> * ?taskCreationOptions:System.Threading.Tasks.TaskCreationOptions -> Async<System.Threading.Tasks.Task<'T>>
  static member StartImmediate : computation:Async<unit> * ?cancellationToken:System.Threading.CancellationToken -> unit
  static member StartWithContinuations : computation:Async<'T> * continuation:('T -> unit) * exceptionContinuation:(exn -> unit) * cancellationContinuation:(System.OperationCanceledException -> unit) * ?cancellationToken:System.Threading.CancellationToken -> unit
  static member SwitchToContext : syncContext:System.Threading.SynchronizationContext -> Async<unit>
  static member SwitchToNewThread : unit -> Async<unit>
  static member SwitchToThreadPool : unit -> Async<unit>
  static member TryCancelled : computation:Async<'T> * compensation:(System.OperationCanceledException -> unit) -> Async<'T>
  static member CancellationToken : Async<System.Threading.CancellationToken>
  static member DefaultCancellationToken : System.Threading.CancellationToken
end

Full name: Microsoft.FSharp.Control.Async
type AsyncSeqInner<'T> =
  | Ended
  | Item of 'T * AsyncSeq<'T>

Full name: Test.AsyncSeqInner<_>
union case AsyncSeqInner.Ended: AsyncSeqInner<'T>
union case AsyncSeqInner.Item: 'T * AsyncSeq<'T> -> AsyncSeqInner<'T>
val readInBlocks : string -> int -> Async<AsyncSeqInner<byte []>>

Full name: Test.readInBlocks

Read file 'fn' in blocks of size 'size'
 (returns on-demand asynchronous sequence)

val fn : string

  type: string
  implements: System.IComparable
  implements: System.ICloneable
  implements: System.IConvertible
  implements: System.IComparable<string>
  implements: seq<char>
  implements: System.Collections.IEnumerable
  implements: System.IEquatable<string>
val size : int

  type: int
  implements: System.IComparable
  implements: System.IFormattable
  implements: System.IConvertible
  implements: System.IComparable<int>
  implements: System.IEquatable<int>
  inherits: System.ValueType
val async : AsyncBuilder

Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.async
val stream : FileStream

  type: FileStream
  implements: System.IDisposable
  inherits: Stream
  inherits: System.MarshalByRefObject
type File =
  class
    static member AppendAllLines : string * System.Collections.Generic.IEnumerable<string> -> unit
    static member AppendAllLines : string * System.Collections.Generic.IEnumerable<string> * System.Text.Encoding -> unit
    static member AppendAllText : string * string -> unit
    static member AppendAllText : string * string * System.Text.Encoding -> unit
    static member AppendText : string -> System.IO.StreamWriter
    static member Copy : string * string -> unit
    static member Copy : string * string * bool -> unit
    static member Create : string -> System.IO.FileStream
    static member Create : string * int -> System.IO.FileStream
    static member Create : string * int * System.IO.FileOptions -> System.IO.FileStream
    static member Create : string * int * System.IO.FileOptions * System.Security.AccessControl.FileSecurity -> System.IO.FileStream
    static member CreateText : string -> System.IO.StreamWriter
    static member Decrypt : string -> unit
    static member Delete : string -> unit
    static member Encrypt : string -> unit
    static member Exists : string -> bool
    static member GetAccessControl : string -> System.Security.AccessControl.FileSecurity
    static member GetAccessControl : string * System.Security.AccessControl.AccessControlSections -> System.Security.AccessControl.FileSecurity
    static member GetAttributes : string -> System.IO.FileAttributes
    static member GetCreationTime : string -> System.DateTime
    static member GetCreationTimeUtc : string -> System.DateTime
    static member GetLastAccessTime : string -> System.DateTime
    static member GetLastAccessTimeUtc : string -> System.DateTime
    static member GetLastWriteTime : string -> System.DateTime
    static member GetLastWriteTimeUtc : string -> System.DateTime
    static member Move : string * string -> unit
    static member Open : string * System.IO.FileMode -> System.IO.FileStream
    static member Open : string * System.IO.FileMode * System.IO.FileAccess -> System.IO.FileStream
    static member Open : string * System.IO.FileMode * System.IO.FileAccess * System.IO.FileShare -> System.IO.FileStream
    static member OpenRead : string -> System.IO.FileStream
    static member OpenText : string -> System.IO.StreamReader
    static member OpenWrite : string -> System.IO.FileStream
    static member ReadAllBytes : string -> System.Byte []
    static member ReadAllLines : string -> string []
    static member ReadAllLines : string * System.Text.Encoding -> string []
    static member ReadAllText : string -> string
    static member ReadAllText : string * System.Text.Encoding -> string
    static member ReadLines : string -> System.Collections.Generic.IEnumerable<string>
    static member ReadLines : string * System.Text.Encoding -> System.Collections.Generic.IEnumerable<string>
    static member Replace : string * string * string -> unit
    static member Replace : string * string * string * bool -> unit
    static member SetAccessControl : string * System.Security.AccessControl.FileSecurity -> unit
    static member SetAttributes : string * System.IO.FileAttributes -> unit
    static member SetCreationTime : string * System.DateTime -> unit
    static member SetCreationTimeUtc : string * System.DateTime -> unit
    static member SetLastAccessTime : string * System.DateTime -> unit
    static member SetLastAccessTimeUtc : string * System.DateTime -> unit
    static member SetLastWriteTime : string * System.DateTime -> unit
    static member SetLastWriteTimeUtc : string * System.DateTime -> unit
    static member WriteAllBytes : string * System.Byte [] -> unit
    static member WriteAllLines : string * string [] -> unit
    static member WriteAllLines : string * System.Collections.Generic.IEnumerable<string> -> unit
    static member WriteAllLines : string * string [] * System.Text.Encoding -> unit
    static member WriteAllLines : string * System.Collections.Generic.IEnumerable<string> * System.Text.Encoding -> unit
    static member WriteAllText : string * string -> unit
    static member WriteAllText : string * string * System.Text.Encoding -> unit
  end

Full name: System.IO.File
File.OpenRead(path: string) : FileStream
val buffer : byte []

  type: byte []
  implements: System.ICloneable
  implements: System.Collections.IList
  implements: System.Collections.ICollection
  implements: System.Collections.IStructuralComparable
  implements: System.Collections.IStructuralEquatable
  implements: System.Collections.Generic.IList<byte>
  implements: System.Collections.Generic.ICollection<byte>
  implements: seq<byte>
  implements: System.Collections.IEnumerable
  inherits: System.Array
module Array

from Microsoft.FSharp.Collections
val zeroCreate : int -> 'T []

Full name: Microsoft.FSharp.Collections.Array.zeroCreate
val nextBlock : (unit -> Async<AsyncSeqInner<byte []>>)

Returns next block as 'Item' of async seq
val count : int

  type: int
  implements: System.IComparable
  implements: System.IFormattable
  implements: System.IConvertible
  implements: System.IComparable<int>
  implements: System.IEquatable<int>
  inherits: System.ValueType
Multiple overloads
member Stream.AsyncRead : count:int -> Async<byte []>
member Stream.AsyncRead : buffer:byte [] * ?offset:int * ?count:int -> Async<int>
val res : byte []

  type: byte []
  implements: System.ICloneable
  implements: System.Collections.IList
  implements: System.Collections.ICollection
  implements: System.Collections.IStructuralComparable
  implements: System.Collections.IStructuralEquatable
  implements: System.Collections.Generic.IList<byte>
  implements: System.Collections.Generic.ICollection<byte>
  implements: seq<byte>
  implements: System.Collections.IEnumerable
  inherits: System.Array
module Seq

from Microsoft.FSharp.Collections
val take : int -> seq<'T> -> seq<'T>

Full name: Microsoft.FSharp.Collections.Seq.take
val ofSeq : seq<'T> -> 'T []

Full name: Microsoft.FSharp.Collections.Array.ofSeq
val compareAsyncSeqs : Async<AsyncSeqInner<'a>> -> Async<AsyncSeqInner<'a>> -> Async<bool> (requires equality)

Full name: Test.compareAsyncSeqs

Asynchronous function that compares two asynchronous sequences
 item by item. If an item doesn't match, 'false' is returned
 immediately without generating the rest of the sequence. If the
 lengths don't match, exception is thrown.

val seq1 : Async<AsyncSeqInner<'a>> (requires equality)
val seq2 : Async<AsyncSeqInner<'a>> (requires equality)
val item1 : AsyncSeqInner<'a> (requires equality)
val item2 : AsyncSeqInner<'a> (requires equality)
val b1 : 'a (requires equality)
val ns1 : AsyncSeq<'a> (requires equality)
val b2 : 'a (requires equality)
val ns2 : AsyncSeq<'a> (requires equality)
val failwith : string -> 'T

Full name: Microsoft.FSharp.Core.Operators.failwith
val s1 : Async<AsyncSeqInner<byte []>>

Full name: Test.s1

Compare two files using 1k blocks
val s2 : Async<AsyncSeqInner<byte []>>

Full name: Test.s2

More information

Link: http://fssnip.net/1k
Posted: 3 years ago
Author: Tomas Petricek (website)
Tags: async, asynchronous, seq, sequence, files