7 people like it.

Fast Async Stream Copy

This is a simple code implementation to copy Streams leveraging the Async workflow. After the first read, the write and read processes are done in parallel. I also used an Array of Array as buffer to avoid false sharing memory. An other improvment option could be to increase the buffer size if the copy is done in same machine.

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
14: 
15: 
16: 
17: 
18: 
19: 
20: 
21: 
22: 
23: 
24: 
25: 
26: 
27: 
28: 
29: 
30: 
31: 
32: 
33: 
34: 
35: 
36: 
37: 
38: 
39: 
40: 
41: 
42: 
43: 
open System.IO
open System

let inline Parallel2 (job1, job2) = async {
        let! task1 = Async.StartChild job1
        let! task2 = Async.StartChild job2
        let! res1 = task1
        let! res2 = task2
        return (res1, res2) }


let fastCopy (source:System.IO.Stream) (destination:System.IO.Stream) = async {
        source.Position <- 0L
        destination.Position <- 0L

        let buffer = Array.init 2 (fun _ -> Array.zeroCreate<byte> 4096)
        let index = ref 1
        let! read = source.AsyncRead(buffer.[!index])
        if read > 0 then 
            let rec copyAsync (s:Stream) (d:Stream) (buff:byte[][]) read = async {
                let! (_, read') = Async.Parallel2(d.AsyncWrite(buff.[!index], 0, read), 
                                       s.AsyncRead(buff.[(!index ^^^ 1)]))
                if read' > 0 then 
                    index := !index ^^^ 1
                    return! copyAsync s d buff read' }
            return! copyAsync source destination buffer read
        return (source, destination)}

let fileSource = @"c:\Temp\sourceFile.jpg"
let fileDest = @"c:\Temp\destinationFile.jpg"

let streamSource = new FileStream(fileSource, FileMode.Open,
      FileAccess.Read,FileShare.Read,0x1000, true)
let streamDest = new FileStream(fileDest, FileMode.Create,
      FileAccess.Write, FileShare.Write,0x1000,true)
    
Async.StartWithContinuations(fastCopy streamSource streamDest,
    (fun comp -> 
        let source, dest = comp
        source.Dispose()
        dest.Dispose()),
    (fun _ -> ()),
    (fun _ -> ()))
namespace System
namespace System.IO
val Parallel2 : job1:Async<'a> * job2:Async<'b> -> Async<'a * 'b>

Full name: Script.Parallel2
val job1 : Async<'a>
val job2 : Async<'b>
val async : AsyncBuilder

Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.async
val task1 : Async<'a>
Multiple items
type Async
static member AsBeginEnd : computation:('Arg -> Async<'T>) -> ('Arg * AsyncCallback * obj -> IAsyncResult) * (IAsyncResult -> 'T) * (IAsyncResult -> unit)
static member AwaitEvent : event:IEvent<'Del,'T> * ?cancelAction:(unit -> unit) -> Async<'T> (requires delegate and 'Del :> Delegate)
static member AwaitIAsyncResult : iar:IAsyncResult * ?millisecondsTimeout:int -> Async<bool>
static member AwaitTask : task:Task<'T> -> Async<'T>
static member AwaitWaitHandle : waitHandle:WaitHandle * ?millisecondsTimeout:int -> Async<bool>
static member CancelDefaultToken : unit -> unit
static member Catch : computation:Async<'T> -> Async<Choice<'T,exn>>
static member FromBeginEnd : beginAction:(AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg:'Arg1 * beginAction:('Arg1 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg1:'Arg1 * arg2:'Arg2 * beginAction:('Arg1 * 'Arg2 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg1:'Arg1 * arg2:'Arg2 * arg3:'Arg3 * beginAction:('Arg1 * 'Arg2 * 'Arg3 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromContinuations : callback:(('T -> unit) * (exn -> unit) * (OperationCanceledException -> unit) -> unit) -> Async<'T>
static member Ignore : computation:Async<'T> -> Async<unit>
static member OnCancel : interruption:(unit -> unit) -> Async<IDisposable>
static member Parallel : computations:seq<Async<'T>> -> Async<'T []>
static member RunSynchronously : computation:Async<'T> * ?timeout:int * ?cancellationToken:CancellationToken -> 'T
static member Sleep : millisecondsDueTime:int -> Async<unit>
static member Start : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
static member StartAsTask : computation:Async<'T> * ?taskCreationOptions:TaskCreationOptions * ?cancellationToken:CancellationToken -> Task<'T>
static member StartChild : computation:Async<'T> * ?millisecondsTimeout:int -> Async<Async<'T>>
static member StartChildAsTask : computation:Async<'T> * ?taskCreationOptions:TaskCreationOptions -> Async<Task<'T>>
static member StartImmediate : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
static member StartWithContinuations : computation:Async<'T> * continuation:('T -> unit) * exceptionContinuation:(exn -> unit) * cancellationContinuation:(OperationCanceledException -> unit) * ?cancellationToken:CancellationToken -> unit
static member SwitchToContext : syncContext:SynchronizationContext -> Async<unit>
static member SwitchToNewThread : unit -> Async<unit>
static member SwitchToThreadPool : unit -> Async<unit>
static member TryCancelled : computation:Async<'T> * compensation:(OperationCanceledException -> unit) -> Async<'T>
static member CancellationToken : Async<CancellationToken>
static member DefaultCancellationToken : CancellationToken

Full name: Microsoft.FSharp.Control.Async

--------------------
type Async<'T>

Full name: Microsoft.FSharp.Control.Async<_>
static member Async.StartChild : computation:Async<'T> * ?millisecondsTimeout:int -> Async<Async<'T>>
val task2 : Async<'b>
val res1 : 'a
val res2 : 'b
val fastCopy : source:Stream -> destination:Stream -> Async<Stream * Stream>

Full name: Script.fastCopy
val source : Stream
type Stream =
  inherit MarshalByRefObject
  member BeginRead : buffer:byte[] * offset:int * count:int * callback:AsyncCallback * state:obj -> IAsyncResult
  member BeginWrite : buffer:byte[] * offset:int * count:int * callback:AsyncCallback * state:obj -> IAsyncResult
  member CanRead : bool
  member CanSeek : bool
  member CanTimeout : bool
  member CanWrite : bool
  member Close : unit -> unit
  member CopyTo : destination:Stream -> unit + 1 overload
  member Dispose : unit -> unit
  member EndRead : asyncResult:IAsyncResult -> int
  ...

Full name: System.IO.Stream
val destination : Stream
property Stream.Position: int64
val buffer : byte [] []
type Array =
  member Clone : unit -> obj
  member CopyTo : array:Array * index:int -> unit + 1 overload
  member GetEnumerator : unit -> IEnumerator
  member GetLength : dimension:int -> int
  member GetLongLength : dimension:int -> int64
  member GetLowerBound : dimension:int -> int
  member GetUpperBound : dimension:int -> int
  member GetValue : [<ParamArray>] indices:int[] -> obj + 7 overloads
  member Initialize : unit -> unit
  member IsFixedSize : bool
  ...

Full name: System.Array
val init : count:int -> initializer:(int -> 'T) -> 'T []

Full name: Microsoft.FSharp.Collections.Array.init
val zeroCreate : count:int -> 'T []

Full name: Microsoft.FSharp.Collections.Array.zeroCreate
Multiple items
val byte : value:'T -> byte (requires member op_Explicit)

Full name: Microsoft.FSharp.Core.Operators.byte

--------------------
type byte = Byte

Full name: Microsoft.FSharp.Core.byte
val index : int ref
Multiple items
val ref : value:'T -> 'T ref

Full name: Microsoft.FSharp.Core.Operators.ref

--------------------
type 'T ref = Ref<'T>

Full name: Microsoft.FSharp.Core.ref<_>
val read : int
member Stream.AsyncRead : count:int -> Async<byte []>
member Stream.AsyncRead : buffer:byte [] * ?offset:int * ?count:int -> Async<int>
val copyAsync : (Stream -> Stream -> byte [] [] -> int -> Async<unit>)
val s : Stream
val d : Stream
val buff : byte [] []
val read' : int
member Stream.AsyncWrite : buffer:byte [] * ?offset:int * ?count:int -> Async<unit>
val fileSource : string

Full name: Script.fileSource
val fileDest : string

Full name: Script.fileDest
val streamSource : FileStream

Full name: Script.streamSource
Multiple items
type FileStream =
  inherit Stream
  new : path:string * mode:FileMode -> FileStream + 14 overloads
  member BeginRead : array:byte[] * offset:int * numBytes:int * userCallback:AsyncCallback * stateObject:obj -> IAsyncResult
  member BeginWrite : array:byte[] * offset:int * numBytes:int * userCallback:AsyncCallback * stateObject:obj -> IAsyncResult
  member CanRead : bool
  member CanSeek : bool
  member CanWrite : bool
  member EndRead : asyncResult:IAsyncResult -> int
  member EndWrite : asyncResult:IAsyncResult -> unit
  member Flush : unit -> unit + 1 overload
  member GetAccessControl : unit -> FileSecurity
  ...

Full name: System.IO.FileStream

--------------------
FileStream(path: string, mode: FileMode) : unit
   (+0 other overloads)
FileStream(handle: Win32.SafeHandles.SafeFileHandle, access: FileAccess) : unit
   (+0 other overloads)
FileStream(path: string, mode: FileMode, access: FileAccess) : unit
   (+0 other overloads)
FileStream(handle: Win32.SafeHandles.SafeFileHandle, access: FileAccess, bufferSize: int) : unit
   (+0 other overloads)
FileStream(path: string, mode: FileMode, access: FileAccess, share: FileShare) : unit
   (+0 other overloads)
FileStream(handle: Win32.SafeHandles.SafeFileHandle, access: FileAccess, bufferSize: int, isAsync: bool) : unit
   (+0 other overloads)
FileStream(path: string, mode: FileMode, access: FileAccess, share: FileShare, bufferSize: int) : unit
   (+0 other overloads)
FileStream(path: string, mode: FileMode, access: FileAccess, share: FileShare, bufferSize: int, options: FileOptions) : unit
   (+0 other overloads)
FileStream(path: string, mode: FileMode, access: FileAccess, share: FileShare, bufferSize: int, useAsync: bool) : unit
   (+0 other overloads)
FileStream(path: string, mode: FileMode, rights: Security.AccessControl.FileSystemRights, share: FileShare, bufferSize: int, options: FileOptions) : unit
   (+0 other overloads)
type FileMode =
  | CreateNew = 1
  | Create = 2
  | Open = 3
  | OpenOrCreate = 4
  | Truncate = 5
  | Append = 6

Full name: System.IO.FileMode
field FileMode.Open = 3
type FileAccess =
  | Read = 1
  | Write = 2
  | ReadWrite = 3

Full name: System.IO.FileAccess
field FileAccess.Read = 1
type FileShare =
  | None = 0
  | Read = 1
  | Write = 2
  | ReadWrite = 3
  | Delete = 4
  | Inheritable = 16

Full name: System.IO.FileShare
field FileShare.Read = 1
val streamDest : FileStream

Full name: Script.streamDest
field FileMode.Create = 2
field FileAccess.Write = 2
field FileShare.Write = 2
static member Async.StartWithContinuations : computation:Async<'T> * continuation:('T -> unit) * exceptionContinuation:(exn -> unit) * cancellationContinuation:(OperationCanceledException -> unit) * ?cancellationToken:Threading.CancellationToken -> unit
val comp : Stream * Stream
val dest : Stream
Stream.Dispose() : unit
Raw view Test code New version

More information

Link:http://fssnip.net/pp
Posted:9 years ago
Author:Riccardo Terrell
Tags: async , stream