6 people like it.
Like the snippet!
Heterogeneous Parallel Async
Perform parallel Async returning heterogeneous types.
(The solution presented here is based on a gist sent to me by [Anton Tayanovskyy](http://t0yv0.blogspot.com/ "Anton Tayanovskyy"), Twitter: [@t0yv0](https://twitter.com/t0yv0).)
1:
2:
3:
4:
5:
6:
7:
8:
9:
10:
11:
12:
13:
14:
15:
16:
17:
18:
19:
20:
21:
22:
23:
24:
25:
26:
27:
28:
29:
30:
31:
32:
33:
34:
35:
36:
37:
38:
39:
40:
41:
42:
43:
44:
45:
46:
47:
48:
49:
50:
51:
52:
53:
54:
55:
56:
57:
58:
59:
60:
61:
62:
63:
64:
65:
66:
67:
68:
69:
70:
71:
72:
73:
74:
|
type Parallel<'T> =
private {
Compute : Async<obj>[]
Unpack : obj [] -> int -> 'T
}
static member ( <*> ) (f: Parallel<'A -> 'B>, x: Parallel<'A>) : Parallel<'B> =
{
Compute = Array.append f.Compute x.Compute
Unpack = fun xs pos ->
let fv = f.Unpack xs pos
let xv = x.Unpack xs (pos + f.Compute.Length)
fv xv
}
static member ( <*> ) (f: Parallel<'A ->' B>, x: Async<'A>) : Parallel<'B> =
f <*> Parallel.Await(x)
and Parallel =
static member Run<'T> (p: Parallel<'T>) : Async<'T> =
async {
let! results =
match p.Compute.Length with
| 0 -> async.Return [|box ()|]
| 1 -> async { let! r = p.Compute.[0]
return [| r |] }
| _ -> Async.Parallel p.Compute
return p.Unpack results 0
}
static member Await<'T> (x: Async<'T>) : Parallel<'T> =
{
Compute =
[|
async {
let! v = x
return box v
}
|]
Unpack = fun xs pos -> unbox xs.[pos]
}
static member Pure<'T>(x: 'T) : Parallel<'T> =
{
Compute = [||]
Unpack = fun _ _ -> x
}
let myInt, myChar, myBool, myString =
Parallel.Pure (fun w x y z -> (w, x, y, z))
<*> async { return 1 }
<*> async { return 'b' }
<*> async { return true }
<*> async { return "abc" }
|> Parallel.Run
|> Async.RunSynchronously
let test1 () =
Parallel.Pure (fun x y z -> (x, y, z))
<*> async { return 1 }
// unit -> Parallel<('b -> 'c -> int * 'b * 'c)>
<*> async { return 'b' }
// unit -> Parallel<('c -> int * char * 'c)>
<*> async { return true }
// unit -> Parallel<int * char * bool>
|> Parallel.Run
// unit -> Async<int * char * bool>
|> Async.RunSynchronously
// unit -> int * char * bool
|
Parallel.Compute: Async<obj> []
Multiple items
type Async
static member AsBeginEnd : computation:('Arg -> Async<'T>) -> ('Arg * AsyncCallback * obj -> IAsyncResult) * (IAsyncResult -> 'T) * (IAsyncResult -> unit)
static member AwaitEvent : event:IEvent<'Del,'T> * ?cancelAction:(unit -> unit) -> Async<'T> (requires delegate and 'Del :> Delegate)
static member AwaitIAsyncResult : iar:IAsyncResult * ?millisecondsTimeout:int -> Async<bool>
static member AwaitTask : task:Task<'T> -> Async<'T>
static member AwaitWaitHandle : waitHandle:WaitHandle * ?millisecondsTimeout:int -> Async<bool>
static member CancelDefaultToken : unit -> unit
static member Catch : computation:Async<'T> -> Async<Choice<'T,exn>>
static member FromBeginEnd : beginAction:(AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg:'Arg1 * beginAction:('Arg1 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg1:'Arg1 * arg2:'Arg2 * beginAction:('Arg1 * 'Arg2 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg1:'Arg1 * arg2:'Arg2 * arg3:'Arg3 * beginAction:('Arg1 * 'Arg2 * 'Arg3 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromContinuations : callback:(('T -> unit) * (exn -> unit) * (OperationCanceledException -> unit) -> unit) -> Async<'T>
static member Ignore : computation:Async<'T> -> Async<unit>
static member OnCancel : interruption:(unit -> unit) -> Async<IDisposable>
static member Parallel : computations:seq<Async<'T>> -> Async<'T []>
static member RunSynchronously : computation:Async<'T> * ?timeout:int * ?cancellationToken:CancellationToken -> 'T
static member Sleep : millisecondsDueTime:int -> Async<unit>
static member Start : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
static member StartAsTask : computation:Async<'T> * ?taskCreationOptions:TaskCreationOptions * ?cancellationToken:CancellationToken -> Task<'T>
static member StartChild : computation:Async<'T> * ?millisecondsTimeout:int -> Async<Async<'T>>
static member StartChildAsTask : computation:Async<'T> * ?taskCreationOptions:TaskCreationOptions -> Async<Task<'T>>
static member StartImmediate : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
static member StartWithContinuations : computation:Async<'T> * continuation:('T -> unit) * exceptionContinuation:(exn -> unit) * cancellationContinuation:(OperationCanceledException -> unit) * ?cancellationToken:CancellationToken -> unit
static member SwitchToContext : syncContext:SynchronizationContext -> Async<unit>
static member SwitchToNewThread : unit -> Async<unit>
static member SwitchToThreadPool : unit -> Async<unit>
static member TryCancelled : computation:Async<'T> * compensation:(OperationCanceledException -> unit) -> Async<'T>
static member CancellationToken : Async<CancellationToken>
static member DefaultCancellationToken : CancellationToken
Full name: Microsoft.FSharp.Control.Async
--------------------
type Async<'T>
Full name: Microsoft.FSharp.Control.Async<_>
type obj = System.Object
Full name: Microsoft.FSharp.Core.obj
Parallel.Unpack: obj [] -> int -> 'T
Multiple items
val int : value:'T -> int (requires member op_Explicit)
Full name: Microsoft.FSharp.Core.Operators.int
--------------------
type int = int32
Full name: Microsoft.FSharp.Core.int
--------------------
type int<'Measure> = int
Full name: Microsoft.FSharp.Core.int<_>
val f : Parallel<('A -> 'B)>
Multiple items
type Parallel =
static member Await : x:Async<'T> -> Parallel<'T>
static member Pure : x:'T -> Parallel<'T>
static member Run : p:Parallel<'T> -> Async<'T>
Full name: Script.Parallel
--------------------
type Parallel<'T> =
private {Compute: Async<obj> [];
Unpack: obj [] -> int -> 'T;}
static member ( <*> ) : f:Parallel<('A -> 'B)> * x:Parallel<'A> -> Parallel<'B>
static member ( <*> ) : f:Parallel<('A -> 'B)> * x:Async<'A> -> Parallel<'B>
Full name: Script.Parallel<_>
val x : Parallel<'A>
module Array
from Microsoft.FSharp.Collections
val append : array1:'T [] -> array2:'T [] -> 'T []
Full name: Microsoft.FSharp.Collections.Array.append
val xs : obj []
val pos : int
val fv : ('A -> 'B)
Parallel.Unpack: obj [] -> int -> 'A -> 'B
val xv : 'A
Parallel.Unpack: obj [] -> int -> 'A
property System.Array.Length: int
val x : Async<'A>
static member Parallel.Await : x:Async<'T> -> Parallel<'T>
static member Parallel.Run : p:Parallel<'T> -> Async<'T>
Full name: Script.Parallel.Run
val p : Parallel<'T>
val async : AsyncBuilder
Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.async
val results : obj []
member AsyncBuilder.Return : value:'T -> Async<'T>
val box : value:'T -> obj
Full name: Microsoft.FSharp.Core.Operators.box
val r : obj
static member Async.Parallel : computations:seq<Async<'T>> -> Async<'T []>
static member Parallel.Await : x:Async<'T> -> Parallel<'T>
Full name: Script.Parallel.Await
val x : Async<'T>
val v : 'T
val unbox : value:obj -> 'T
Full name: Microsoft.FSharp.Core.Operators.unbox
static member Parallel.Pure : x:'T -> Parallel<'T>
Full name: Script.Parallel.Pure
val x : 'T
val myInt : int
Full name: Script.myInt
val myChar : char
Full name: Script.myChar
val myBool : bool
Full name: Script.myBool
val myString : string
Full name: Script.myString
static member Parallel.Pure : x:'T -> Parallel<'T>
val w : int
val x : char
val y : bool
val z : string
static member Parallel.Run : p:Parallel<'T> -> Async<'T>
static member Async.RunSynchronously : computation:Async<'T> * ?timeout:int * ?cancellationToken:System.Threading.CancellationToken -> 'T
val test1 : unit -> int * char * bool
Full name: Script.test1
val x : int
val y : char
val z : bool
More information