109 people like it.

Asynchronous sequences

An asynchronous sequence is similar to the seq type, but the elements of the sequence are generated asynchronously without blocking the caller as in Async. This snippet declares asynchronous sequence and uses it to compare two files in 1k blocks.

Declaration of asynchronous sequence

1: 
2: 
3: 
4: 
5: 
6: 
7: 
8: 
open System.IO

/// Represents a sequence of values 'T where items 
/// are generated asynchronously on-demand
type AsyncSeq<'T> = Async<AsyncSeqInner<'T>> 
and AsyncSeqInner<'T> =
  | Ended
  | Item of 'T * AsyncSeq<'T>

Using asynchronous sequences

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
14: 
15: 
16: 
17: 
18: 
19: 
20: 
21: 
22: 
23: 
24: 
25: 
26: 
27: 
28: 
29: 
30: 
31: 
32: 
33: 
34: 
35: 
36: 
/// Read file 'fn' in blocks of size 'size'
/// (returns on-demand asynchronous sequence)
let readInBlocks fn size = async {
  let stream = File.OpenRead(fn)
  let buffer = Array.zeroCreate size
  
  /// Returns next block as 'Item' of async seq
  let rec nextBlock() = async {
    let! count = stream.AsyncRead(buffer, 0, size)
    if count > 0 then return Ended
    else 
      // Create buffer with the right size
      let res = 
        if count = size then buffer
        else buffer |> Seq.take count |> Array.ofSeq
      return Item(res, nextBlock()) }

  return! nextBlock() }

/// Asynchronous function that compares two asynchronous sequences
/// item by item. If an item doesn't match, 'false' is returned
/// immediately without generating the rest of the sequence. If the
/// lengths don't match, exception is thrown.
let rec compareAsyncSeqs seq1 seq2 = async {
  let! item1 = seq1
  let! item2 = seq2
  match item1, item2 with 
  | Item(b1, ns1), Item(b2, ns2) when b1 <> b2 -> return false
  | Item(b1, ns1), Item(b2, ns2) -> return! compareAsyncSeqs ns1 ns2
  | Ended, Ended -> return true
  | _ -> return failwith "Size doesn't match" }

/// Compare two files using 1k blocks
let s1 = readInBlocks "f1" 1000
let s2 = readInBlocks "f2" 1000
compareAsyncSeqs s1 s2
namespace System
namespace System.IO
type AsyncSeq<'T> = Async<AsyncSeqInner<'T>>

Full name: Script.AsyncSeq<_>


 Represents a sequence of values 'T where items
 are generated asynchronously on-demand
Multiple items
type Async
static member AsBeginEnd : computation:('Arg -> Async<'T>) -> ('Arg * AsyncCallback * obj -> IAsyncResult) * (IAsyncResult -> 'T) * (IAsyncResult -> unit)
static member AwaitEvent : event:IEvent<'Del,'T> * ?cancelAction:(unit -> unit) -> Async<'T> (requires delegate and 'Del :> Delegate)
static member AwaitIAsyncResult : iar:IAsyncResult * ?millisecondsTimeout:int -> Async<bool>
static member AwaitTask : task:Task<'T> -> Async<'T>
static member AwaitWaitHandle : waitHandle:WaitHandle * ?millisecondsTimeout:int -> Async<bool>
static member CancelDefaultToken : unit -> unit
static member Catch : computation:Async<'T> -> Async<Choice<'T,exn>>
static member FromBeginEnd : beginAction:(AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg:'Arg1 * beginAction:('Arg1 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg1:'Arg1 * arg2:'Arg2 * beginAction:('Arg1 * 'Arg2 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg1:'Arg1 * arg2:'Arg2 * arg3:'Arg3 * beginAction:('Arg1 * 'Arg2 * 'Arg3 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromContinuations : callback:(('T -> unit) * (exn -> unit) * (OperationCanceledException -> unit) -> unit) -> Async<'T>
static member Ignore : computation:Async<'T> -> Async<unit>
static member OnCancel : interruption:(unit -> unit) -> Async<IDisposable>
static member Parallel : computations:seq<Async<'T>> -> Async<'T []>
static member RunSynchronously : computation:Async<'T> * ?timeout:int * ?cancellationToken:CancellationToken -> 'T
static member Sleep : millisecondsDueTime:int -> Async<unit>
static member Start : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
static member StartAsTask : computation:Async<'T> * ?taskCreationOptions:TaskCreationOptions * ?cancellationToken:CancellationToken -> Task<'T>
static member StartChild : computation:Async<'T> * ?millisecondsTimeout:int -> Async<Async<'T>>
static member StartChildAsTask : computation:Async<'T> * ?taskCreationOptions:TaskCreationOptions -> Async<Task<'T>>
static member StartImmediate : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
static member StartWithContinuations : computation:Async<'T> * continuation:('T -> unit) * exceptionContinuation:(exn -> unit) * cancellationContinuation:(OperationCanceledException -> unit) * ?cancellationToken:CancellationToken -> unit
static member SwitchToContext : syncContext:SynchronizationContext -> Async<unit>
static member SwitchToNewThread : unit -> Async<unit>
static member SwitchToThreadPool : unit -> Async<unit>
static member TryCancelled : computation:Async<'T> * compensation:(OperationCanceledException -> unit) -> Async<'T>
static member CancellationToken : Async<CancellationToken>
static member DefaultCancellationToken : CancellationToken

Full name: Microsoft.FSharp.Control.Async

--------------------
type Async<'T>

Full name: Microsoft.FSharp.Control.Async<_>
type AsyncSeqInner<'T> =
  | Ended
  | Item of 'T * AsyncSeq<'T>

Full name: Script.AsyncSeqInner<_>
union case AsyncSeqInner.Ended: AsyncSeqInner<'T>
union case AsyncSeqInner.Item: 'T * AsyncSeq<'T> -> AsyncSeqInner<'T>
val readInBlocks : fn:string -> size:int -> Async<AsyncSeqInner<byte []>>

Full name: Script.readInBlocks


 Read file 'fn' in blocks of size 'size'
 (returns on-demand asynchronous sequence)
val fn : string
val size : int
val async : AsyncBuilder

Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.async
val stream : FileStream
type File =
  static member AppendAllLines : path:string * contents:IEnumerable<string> -> unit + 1 overload
  static member AppendAllText : path:string * contents:string -> unit + 1 overload
  static member AppendText : path:string -> StreamWriter
  static member Copy : sourceFileName:string * destFileName:string -> unit + 1 overload
  static member Create : path:string -> FileStream + 3 overloads
  static member CreateText : path:string -> StreamWriter
  static member Decrypt : path:string -> unit
  static member Delete : path:string -> unit
  static member Encrypt : path:string -> unit
  static member Exists : path:string -> bool
  ...

Full name: System.IO.File
File.OpenRead(path: string) : FileStream
val buffer : byte []
module Array

from Microsoft.FSharp.Collections
val zeroCreate : count:int -> 'T []

Full name: Microsoft.FSharp.Collections.Array.zeroCreate
val nextBlock : (unit -> Async<AsyncSeqInner<byte []>>)


 Returns next block as 'Item' of async seq
val count : int
member Stream.AsyncRead : count:int -> Async<byte []>
member Stream.AsyncRead : buffer:byte [] * ?offset:int * ?count:int -> Async<int>
val res : byte []
module Seq

from Microsoft.FSharp.Collections
val take : count:int -> source:seq<'T> -> seq<'T>

Full name: Microsoft.FSharp.Collections.Seq.take
val ofSeq : source:seq<'T> -> 'T []

Full name: Microsoft.FSharp.Collections.Array.ofSeq
val compareAsyncSeqs : seq1:Async<AsyncSeqInner<'a>> -> seq2:Async<AsyncSeqInner<'a>> -> Async<bool> (requires equality)

Full name: Script.compareAsyncSeqs


 Asynchronous function that compares two asynchronous sequences
 item by item. If an item doesn't match, 'false' is returned
 immediately without generating the rest of the sequence. If the
 lengths don't match, exception is thrown.
val seq1 : Async<AsyncSeqInner<'a>> (requires equality)
val seq2 : Async<AsyncSeqInner<'a>> (requires equality)
val item1 : AsyncSeqInner<'a> (requires equality)
val item2 : AsyncSeqInner<'a> (requires equality)
val b1 : 'a (requires equality)
val ns1 : AsyncSeq<'a> (requires equality)
val b2 : 'a (requires equality)
val ns2 : AsyncSeq<'a> (requires equality)
val failwith : message:string -> 'T

Full name: Microsoft.FSharp.Core.Operators.failwith
val s1 : Async<AsyncSeqInner<byte []>>

Full name: Script.s1


 Compare two files using 1k blocks
val s2 : Async<AsyncSeqInner<byte []>>

Full name: Script.s2

More information

Link:http://fssnip.net/1k
Posted:14 years ago
Author:Tomas Petricek
Tags: async , asynchronous , seq , sequence , files