7 people like it.
Like the snippet!
Fast Async Stream Copy
This is a simple code implementation to copy Streams leveraging the Async workflow.
After the first read, the write and read processes are done in parallel. I also used an Array of Array as buffer to avoid false sharing memory. An other improvment option could be to increase the buffer size if the copy is done in same machine.
1:
2:
3:
4:
5:
6:
7:
8:
9:
10:
11:
12:
13:
14:
15:
16:
17:
18:
19:
20:
21:
22:
23:
24:
25:
26:
27:
28:
29:
30:
31:
32:
33:
34:
35:
36:
37:
38:
39:
40:
41:
42:
43:
|
open System.IO
open System
let inline Parallel2 (job1, job2) = async {
let! task1 = Async.StartChild job1
let! task2 = Async.StartChild job2
let! res1 = task1
let! res2 = task2
return (res1, res2) }
let fastCopy (source:System.IO.Stream) (destination:System.IO.Stream) = async {
source.Position <- 0L
destination.Position <- 0L
let buffer = Array.init 2 (fun _ -> Array.zeroCreate<byte> 4096)
let index = ref 1
let! read = source.AsyncRead(buffer.[!index])
if read > 0 then
let rec copyAsync (s:Stream) (d:Stream) (buff:byte[][]) read = async {
let! (_, read') = Async.Parallel2(d.AsyncWrite(buff.[!index], 0, read),
s.AsyncRead(buff.[(!index ^^^ 1)]))
if read' > 0 then
index := !index ^^^ 1
return! copyAsync s d buff read' }
return! copyAsync source destination buffer read
return (source, destination)}
let fileSource = @"c:\Temp\sourceFile.jpg"
let fileDest = @"c:\Temp\destinationFile.jpg"
let streamSource = new FileStream(fileSource, FileMode.Open,
FileAccess.Read,FileShare.Read,0x1000, true)
let streamDest = new FileStream(fileDest, FileMode.Create,
FileAccess.Write, FileShare.Write,0x1000,true)
Async.StartWithContinuations(fastCopy streamSource streamDest,
(fun comp ->
let source, dest = comp
source.Dispose()
dest.Dispose()),
(fun _ -> ()),
(fun _ -> ()))
|
namespace System
namespace System.IO
val Parallel2 : job1:Async<'a> * job2:Async<'b> -> Async<'a * 'b>
Full name: Script.Parallel2
val job1 : Async<'a>
val job2 : Async<'b>
val async : AsyncBuilder
Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.async
val task1 : Async<'a>
Multiple items
type Async
static member AsBeginEnd : computation:('Arg -> Async<'T>) -> ('Arg * AsyncCallback * obj -> IAsyncResult) * (IAsyncResult -> 'T) * (IAsyncResult -> unit)
static member AwaitEvent : event:IEvent<'Del,'T> * ?cancelAction:(unit -> unit) -> Async<'T> (requires delegate and 'Del :> Delegate)
static member AwaitIAsyncResult : iar:IAsyncResult * ?millisecondsTimeout:int -> Async<bool>
static member AwaitTask : task:Task<'T> -> Async<'T>
static member AwaitWaitHandle : waitHandle:WaitHandle * ?millisecondsTimeout:int -> Async<bool>
static member CancelDefaultToken : unit -> unit
static member Catch : computation:Async<'T> -> Async<Choice<'T,exn>>
static member FromBeginEnd : beginAction:(AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg:'Arg1 * beginAction:('Arg1 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg1:'Arg1 * arg2:'Arg2 * beginAction:('Arg1 * 'Arg2 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg1:'Arg1 * arg2:'Arg2 * arg3:'Arg3 * beginAction:('Arg1 * 'Arg2 * 'Arg3 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromContinuations : callback:(('T -> unit) * (exn -> unit) * (OperationCanceledException -> unit) -> unit) -> Async<'T>
static member Ignore : computation:Async<'T> -> Async<unit>
static member OnCancel : interruption:(unit -> unit) -> Async<IDisposable>
static member Parallel : computations:seq<Async<'T>> -> Async<'T []>
static member RunSynchronously : computation:Async<'T> * ?timeout:int * ?cancellationToken:CancellationToken -> 'T
static member Sleep : millisecondsDueTime:int -> Async<unit>
static member Start : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
static member StartAsTask : computation:Async<'T> * ?taskCreationOptions:TaskCreationOptions * ?cancellationToken:CancellationToken -> Task<'T>
static member StartChild : computation:Async<'T> * ?millisecondsTimeout:int -> Async<Async<'T>>
static member StartChildAsTask : computation:Async<'T> * ?taskCreationOptions:TaskCreationOptions -> Async<Task<'T>>
static member StartImmediate : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
static member StartWithContinuations : computation:Async<'T> * continuation:('T -> unit) * exceptionContinuation:(exn -> unit) * cancellationContinuation:(OperationCanceledException -> unit) * ?cancellationToken:CancellationToken -> unit
static member SwitchToContext : syncContext:SynchronizationContext -> Async<unit>
static member SwitchToNewThread : unit -> Async<unit>
static member SwitchToThreadPool : unit -> Async<unit>
static member TryCancelled : computation:Async<'T> * compensation:(OperationCanceledException -> unit) -> Async<'T>
static member CancellationToken : Async<CancellationToken>
static member DefaultCancellationToken : CancellationToken
Full name: Microsoft.FSharp.Control.Async
--------------------
type Async<'T>
Full name: Microsoft.FSharp.Control.Async<_>
static member Async.StartChild : computation:Async<'T> * ?millisecondsTimeout:int -> Async<Async<'T>>
val task2 : Async<'b>
val res1 : 'a
val res2 : 'b
val fastCopy : source:Stream -> destination:Stream -> Async<Stream * Stream>
Full name: Script.fastCopy
val source : Stream
type Stream =
inherit MarshalByRefObject
member BeginRead : buffer:byte[] * offset:int * count:int * callback:AsyncCallback * state:obj -> IAsyncResult
member BeginWrite : buffer:byte[] * offset:int * count:int * callback:AsyncCallback * state:obj -> IAsyncResult
member CanRead : bool
member CanSeek : bool
member CanTimeout : bool
member CanWrite : bool
member Close : unit -> unit
member CopyTo : destination:Stream -> unit + 1 overload
member Dispose : unit -> unit
member EndRead : asyncResult:IAsyncResult -> int
...
Full name: System.IO.Stream
val destination : Stream
property Stream.Position: int64
val buffer : byte [] []
type Array =
member Clone : unit -> obj
member CopyTo : array:Array * index:int -> unit + 1 overload
member GetEnumerator : unit -> IEnumerator
member GetLength : dimension:int -> int
member GetLongLength : dimension:int -> int64
member GetLowerBound : dimension:int -> int
member GetUpperBound : dimension:int -> int
member GetValue : [<ParamArray>] indices:int[] -> obj + 7 overloads
member Initialize : unit -> unit
member IsFixedSize : bool
...
Full name: System.Array
val init : count:int -> initializer:(int -> 'T) -> 'T []
Full name: Microsoft.FSharp.Collections.Array.init
val zeroCreate : count:int -> 'T []
Full name: Microsoft.FSharp.Collections.Array.zeroCreate
Multiple items
val byte : value:'T -> byte (requires member op_Explicit)
Full name: Microsoft.FSharp.Core.Operators.byte
--------------------
type byte = Byte
Full name: Microsoft.FSharp.Core.byte
val index : int ref
Multiple items
val ref : value:'T -> 'T ref
Full name: Microsoft.FSharp.Core.Operators.ref
--------------------
type 'T ref = Ref<'T>
Full name: Microsoft.FSharp.Core.ref<_>
val read : int
member Stream.AsyncRead : count:int -> Async<byte []>
member Stream.AsyncRead : buffer:byte [] * ?offset:int * ?count:int -> Async<int>
val copyAsync : (Stream -> Stream -> byte [] [] -> int -> Async<unit>)
val s : Stream
val d : Stream
val buff : byte [] []
val read' : int
member Stream.AsyncWrite : buffer:byte [] * ?offset:int * ?count:int -> Async<unit>
val fileSource : string
Full name: Script.fileSource
val fileDest : string
Full name: Script.fileDest
val streamSource : FileStream
Full name: Script.streamSource
Multiple items
type FileStream =
inherit Stream
new : path:string * mode:FileMode -> FileStream + 14 overloads
member BeginRead : array:byte[] * offset:int * numBytes:int * userCallback:AsyncCallback * stateObject:obj -> IAsyncResult
member BeginWrite : array:byte[] * offset:int * numBytes:int * userCallback:AsyncCallback * stateObject:obj -> IAsyncResult
member CanRead : bool
member CanSeek : bool
member CanWrite : bool
member EndRead : asyncResult:IAsyncResult -> int
member EndWrite : asyncResult:IAsyncResult -> unit
member Flush : unit -> unit + 1 overload
member GetAccessControl : unit -> FileSecurity
...
Full name: System.IO.FileStream
--------------------
FileStream(path: string, mode: FileMode) : unit
(+0 other overloads)
FileStream(handle: Win32.SafeHandles.SafeFileHandle, access: FileAccess) : unit
(+0 other overloads)
FileStream(path: string, mode: FileMode, access: FileAccess) : unit
(+0 other overloads)
FileStream(handle: Win32.SafeHandles.SafeFileHandle, access: FileAccess, bufferSize: int) : unit
(+0 other overloads)
FileStream(path: string, mode: FileMode, access: FileAccess, share: FileShare) : unit
(+0 other overloads)
FileStream(handle: Win32.SafeHandles.SafeFileHandle, access: FileAccess, bufferSize: int, isAsync: bool) : unit
(+0 other overloads)
FileStream(path: string, mode: FileMode, access: FileAccess, share: FileShare, bufferSize: int) : unit
(+0 other overloads)
FileStream(path: string, mode: FileMode, access: FileAccess, share: FileShare, bufferSize: int, options: FileOptions) : unit
(+0 other overloads)
FileStream(path: string, mode: FileMode, access: FileAccess, share: FileShare, bufferSize: int, useAsync: bool) : unit
(+0 other overloads)
FileStream(path: string, mode: FileMode, rights: Security.AccessControl.FileSystemRights, share: FileShare, bufferSize: int, options: FileOptions) : unit
(+0 other overloads)
type FileMode =
| CreateNew = 1
| Create = 2
| Open = 3
| OpenOrCreate = 4
| Truncate = 5
| Append = 6
Full name: System.IO.FileMode
field FileMode.Open = 3
type FileAccess =
| Read = 1
| Write = 2
| ReadWrite = 3
Full name: System.IO.FileAccess
field FileAccess.Read = 1
type FileShare =
| None = 0
| Read = 1
| Write = 2
| ReadWrite = 3
| Delete = 4
| Inheritable = 16
Full name: System.IO.FileShare
field FileShare.Read = 1
val streamDest : FileStream
Full name: Script.streamDest
field FileMode.Create = 2
field FileAccess.Write = 2
field FileShare.Write = 2
static member Async.StartWithContinuations : computation:Async<'T> * continuation:('T -> unit) * exceptionContinuation:(exn -> unit) * cancellationContinuation:(OperationCanceledException -> unit) * ?cancellationToken:Threading.CancellationToken -> unit
val comp : Stream * Stream
val dest : Stream
Stream.Dispose() : unit
More information