109 people like it.
Like the snippet!
Asynchronous sequences
An asynchronous sequence is similar to the seq type, but the elements of the sequence are generated asynchronously without blocking the caller as in Async. This snippet declares asynchronous sequence and uses it to compare two files in 1k blocks.
1:
2:
3:
4:
5:
6:
7:
8:
|
open System.IO
/// Represents a sequence of values 'T where items
/// are generated asynchronously on-demand
type AsyncSeq<'T> = Async<AsyncSeqInner<'T>>
and AsyncSeqInner<'T> =
| Ended
| Item of 'T * AsyncSeq<'T>
|
1:
2:
3:
4:
5:
6:
7:
8:
9:
10:
11:
12:
13:
14:
15:
16:
17:
18:
19:
20:
21:
22:
23:
24:
25:
26:
27:
28:
29:
30:
31:
32:
33:
34:
35:
36:
|
/// Read file 'fn' in blocks of size 'size'
/// (returns on-demand asynchronous sequence)
let readInBlocks fn size = async {
let stream = File.OpenRead(fn)
let buffer = Array.zeroCreate size
/// Returns next block as 'Item' of async seq
let rec nextBlock() = async {
let! count = stream.AsyncRead(buffer, 0, size)
if count > 0 then return Ended
else
// Create buffer with the right size
let res =
if count = size then buffer
else buffer |> Seq.take count |> Array.ofSeq
return Item(res, nextBlock()) }
return! nextBlock() }
/// Asynchronous function that compares two asynchronous sequences
/// item by item. If an item doesn't match, 'false' is returned
/// immediately without generating the rest of the sequence. If the
/// lengths don't match, exception is thrown.
let rec compareAsyncSeqs seq1 seq2 = async {
let! item1 = seq1
let! item2 = seq2
match item1, item2 with
| Item(b1, ns1), Item(b2, ns2) when b1 <> b2 -> return false
| Item(b1, ns1), Item(b2, ns2) -> return! compareAsyncSeqs ns1 ns2
| Ended, Ended -> return true
| _ -> return failwith "Size doesn't match" }
/// Compare two files using 1k blocks
let s1 = readInBlocks "f1" 1000
let s2 = readInBlocks "f2" 1000
compareAsyncSeqs s1 s2
|
namespace System
namespace System.IO
type AsyncSeq<'T> = Async<AsyncSeqInner<'T>>
Full name: Script.AsyncSeq<_>
Represents a sequence of values 'T where items
are generated asynchronously on-demand
Multiple items
type Async
static member AsBeginEnd : computation:('Arg -> Async<'T>) -> ('Arg * AsyncCallback * obj -> IAsyncResult) * (IAsyncResult -> 'T) * (IAsyncResult -> unit)
static member AwaitEvent : event:IEvent<'Del,'T> * ?cancelAction:(unit -> unit) -> Async<'T> (requires delegate and 'Del :> Delegate)
static member AwaitIAsyncResult : iar:IAsyncResult * ?millisecondsTimeout:int -> Async<bool>
static member AwaitTask : task:Task<'T> -> Async<'T>
static member AwaitWaitHandle : waitHandle:WaitHandle * ?millisecondsTimeout:int -> Async<bool>
static member CancelDefaultToken : unit -> unit
static member Catch : computation:Async<'T> -> Async<Choice<'T,exn>>
static member FromBeginEnd : beginAction:(AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg:'Arg1 * beginAction:('Arg1 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg1:'Arg1 * arg2:'Arg2 * beginAction:('Arg1 * 'Arg2 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg1:'Arg1 * arg2:'Arg2 * arg3:'Arg3 * beginAction:('Arg1 * 'Arg2 * 'Arg3 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromContinuations : callback:(('T -> unit) * (exn -> unit) * (OperationCanceledException -> unit) -> unit) -> Async<'T>
static member Ignore : computation:Async<'T> -> Async<unit>
static member OnCancel : interruption:(unit -> unit) -> Async<IDisposable>
static member Parallel : computations:seq<Async<'T>> -> Async<'T []>
static member RunSynchronously : computation:Async<'T> * ?timeout:int * ?cancellationToken:CancellationToken -> 'T
static member Sleep : millisecondsDueTime:int -> Async<unit>
static member Start : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
static member StartAsTask : computation:Async<'T> * ?taskCreationOptions:TaskCreationOptions * ?cancellationToken:CancellationToken -> Task<'T>
static member StartChild : computation:Async<'T> * ?millisecondsTimeout:int -> Async<Async<'T>>
static member StartChildAsTask : computation:Async<'T> * ?taskCreationOptions:TaskCreationOptions -> Async<Task<'T>>
static member StartImmediate : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
static member StartWithContinuations : computation:Async<'T> * continuation:('T -> unit) * exceptionContinuation:(exn -> unit) * cancellationContinuation:(OperationCanceledException -> unit) * ?cancellationToken:CancellationToken -> unit
static member SwitchToContext : syncContext:SynchronizationContext -> Async<unit>
static member SwitchToNewThread : unit -> Async<unit>
static member SwitchToThreadPool : unit -> Async<unit>
static member TryCancelled : computation:Async<'T> * compensation:(OperationCanceledException -> unit) -> Async<'T>
static member CancellationToken : Async<CancellationToken>
static member DefaultCancellationToken : CancellationToken
Full name: Microsoft.FSharp.Control.Async
--------------------
type Async<'T>
Full name: Microsoft.FSharp.Control.Async<_>
type AsyncSeqInner<'T> =
| Ended
| Item of 'T * AsyncSeq<'T>
Full name: Script.AsyncSeqInner<_>
union case AsyncSeqInner.Ended: AsyncSeqInner<'T>
union case AsyncSeqInner.Item: 'T * AsyncSeq<'T> -> AsyncSeqInner<'T>
val readInBlocks : fn:string -> size:int -> Async<AsyncSeqInner<byte []>>
Full name: Script.readInBlocks
Read file 'fn' in blocks of size 'size'
(returns on-demand asynchronous sequence)
val fn : string
val size : int
val async : AsyncBuilder
Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.async
val stream : FileStream
type File =
static member AppendAllLines : path:string * contents:IEnumerable<string> -> unit + 1 overload
static member AppendAllText : path:string * contents:string -> unit + 1 overload
static member AppendText : path:string -> StreamWriter
static member Copy : sourceFileName:string * destFileName:string -> unit + 1 overload
static member Create : path:string -> FileStream + 3 overloads
static member CreateText : path:string -> StreamWriter
static member Decrypt : path:string -> unit
static member Delete : path:string -> unit
static member Encrypt : path:string -> unit
static member Exists : path:string -> bool
...
Full name: System.IO.File
File.OpenRead(path: string) : FileStream
val buffer : byte []
module Array
from Microsoft.FSharp.Collections
val zeroCreate : count:int -> 'T []
Full name: Microsoft.FSharp.Collections.Array.zeroCreate
val nextBlock : (unit -> Async<AsyncSeqInner<byte []>>)
Returns next block as 'Item' of async seq
val count : int
member Stream.AsyncRead : count:int -> Async<byte []>
member Stream.AsyncRead : buffer:byte [] * ?offset:int * ?count:int -> Async<int>
val res : byte []
module Seq
from Microsoft.FSharp.Collections
val take : count:int -> source:seq<'T> -> seq<'T>
Full name: Microsoft.FSharp.Collections.Seq.take
val ofSeq : source:seq<'T> -> 'T []
Full name: Microsoft.FSharp.Collections.Array.ofSeq
val compareAsyncSeqs : seq1:Async<AsyncSeqInner<'a>> -> seq2:Async<AsyncSeqInner<'a>> -> Async<bool> (requires equality)
Full name: Script.compareAsyncSeqs
Asynchronous function that compares two asynchronous sequences
item by item. If an item doesn't match, 'false' is returned
immediately without generating the rest of the sequence. If the
lengths don't match, exception is thrown.
val seq1 : Async<AsyncSeqInner<'a>> (requires equality)
val seq2 : Async<AsyncSeqInner<'a>> (requires equality)
val item1 : AsyncSeqInner<'a> (requires equality)
val item2 : AsyncSeqInner<'a> (requires equality)
val b1 : 'a (requires equality)
val ns1 : AsyncSeq<'a> (requires equality)
val b2 : 'a (requires equality)
val ns2 : AsyncSeq<'a> (requires equality)
val failwith : message:string -> 'T
Full name: Microsoft.FSharp.Core.Operators.failwith
val s1 : Async<AsyncSeqInner<byte []>>
Full name: Script.s1
Compare two files using 1k blocks
val s2 : Async<AsyncSeqInner<byte []>>
Full name: Script.s2
More information