3 people like it.
Like the snippet!
"Fastest wins" parallel download with Hopac
One of the main features of Hopac - selective synchronization using "alternatives". In this snippet we download three web pages in parallel and the one that finishes first "wins" (or the timeout alternative becomes available for picking). What's nice in this solution is that the other two downloading jobs are cancelled immediately when the winner/timeout is available (i.e. an implicitly provided to the Asyncs CancellationTokens are cancelled).
Alts is highly composable and, for example, the whole Alt.choose [ ... ] thing could be nested in another Alt.choose or combined with <|> or <&> operators with another Alt and so on.
1:
2:
3:
4:
5:
6:
7:
8:
9:
10:
11:
12:
13:
14:
15:
16:
17:
18:
|
open Hopac
open Hopac.Extensions
open Hopac.Alt.Infixes
open System.Net
open System
let download (url: string) : Alt<string * string> =
Async.toAlt <| async {
use webClient = new WebClient()
let! res = webClient.AsyncDownloadString (Uri url)
return url, res
}
Alt.choose [ download "http://yahoo.com"
download "http://google.com"
download "http://ya.ru"
Timer.Global.timeOut (TimeSpan.FromMilliseconds 3000.) >>%? ("timed", "out") ]
|> run
|
namespace Hopac
module Extensions
from Hopac
Multiple items
type Alt<'T> =
inherit Job<'T>
Full name: Hopac.Alt<_>
--------------------
module Alt
from Hopac
module Infixes
from Hopac
namespace System
namespace System.Net
val download : url:string -> Alt<string * string>
Full name: Script.download
val url : string
Multiple items
val string : value:'T -> string
Full name: Microsoft.FSharp.Core.Operators.string
--------------------
type string = String
Full name: Microsoft.FSharp.Core.string
type Alt<'T> =
inherit Job<'T>
Full name: Hopac.Alt<_>
Multiple items
module Async
from Hopac.Extensions
--------------------
type Async
static member AsBeginEnd : computation:('Arg -> Async<'T>) -> ('Arg * AsyncCallback * obj -> IAsyncResult) * (IAsyncResult -> 'T) * (IAsyncResult -> unit)
static member AwaitEvent : event:IEvent<'Del,'T> * ?cancelAction:(unit -> unit) -> Async<'T> (requires delegate and 'Del :> Delegate)
static member AwaitIAsyncResult : iar:IAsyncResult * ?millisecondsTimeout:int -> Async<bool>
static member AwaitTask : task:Task<'T> -> Async<'T>
static member AwaitWaitHandle : waitHandle:WaitHandle * ?millisecondsTimeout:int -> Async<bool>
static member CancelDefaultToken : unit -> unit
static member Catch : computation:Async<'T> -> Async<Choice<'T,exn>>
static member FromBeginEnd : beginAction:(AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg:'Arg1 * beginAction:('Arg1 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg1:'Arg1 * arg2:'Arg2 * beginAction:('Arg1 * 'Arg2 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg1:'Arg1 * arg2:'Arg2 * arg3:'Arg3 * beginAction:('Arg1 * 'Arg2 * 'Arg3 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromContinuations : callback:(('T -> unit) * (exn -> unit) * (OperationCanceledException -> unit) -> unit) -> Async<'T>
static member Ignore : computation:Async<'T> -> Async<unit>
static member OnCancel : interruption:(unit -> unit) -> Async<IDisposable>
static member Parallel : computations:seq<Async<'T>> -> Async<'T []>
static member RunSynchronously : computation:Async<'T> * ?timeout:int * ?cancellationToken:CancellationToken -> 'T
static member Sleep : millisecondsDueTime:int -> Async<unit>
static member Start : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
static member StartAsTask : computation:Async<'T> * ?taskCreationOptions:TaskCreationOptions * ?cancellationToken:CancellationToken -> Task<'T>
static member StartChild : computation:Async<'T> * ?millisecondsTimeout:int -> Async<Async<'T>>
static member StartChildAsTask : computation:Async<'T> * ?taskCreationOptions:TaskCreationOptions -> Async<Task<'T>>
static member StartImmediate : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
static member StartWithContinuations : computation:Async<'T> * continuation:('T -> unit) * exceptionContinuation:(exn -> unit) * cancellationContinuation:(OperationCanceledException -> unit) * ?cancellationToken:CancellationToken -> unit
static member SwitchToContext : syncContext:SynchronizationContext -> Async<unit>
static member SwitchToNewThread : unit -> Async<unit>
static member SwitchToThreadPool : unit -> Async<unit>
static member TryCancelled : computation:Async<'T> * compensation:(OperationCanceledException -> unit) -> Async<'T>
static member CancellationToken : Async<CancellationToken>
static member DefaultCancellationToken : CancellationToken
Full name: Microsoft.FSharp.Control.Async
--------------------
type Async<'T>
Full name: Microsoft.FSharp.Control.Async<_>
val toAlt : Async<'x> -> Alt<'x>
Full name: Hopac.Extensions.Async.toAlt
val async : AsyncBuilder
Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.async
val webClient : WebClient
Multiple items
type WebClient =
inherit Component
new : unit -> WebClient
member BaseAddress : string with get, set
member CachePolicy : RequestCachePolicy with get, set
member CancelAsync : unit -> unit
member Credentials : ICredentials with get, set
member DownloadData : address:string -> byte[] + 1 overload
member DownloadDataAsync : address:Uri -> unit + 1 overload
member DownloadFile : address:string * fileName:string -> unit + 1 overload
member DownloadFileAsync : address:Uri * fileName:string -> unit + 1 overload
member DownloadString : address:string -> string + 1 overload
...
Full name: System.Net.WebClient
--------------------
WebClient() : unit
val res : string
member WebClient.AsyncDownloadString : address:Uri -> Async<string>
Multiple items
type Uri =
new : uriString:string -> Uri + 5 overloads
member AbsolutePath : string
member AbsoluteUri : string
member Authority : string
member DnsSafeHost : string
member Equals : comparand:obj -> bool
member Fragment : string
member GetComponents : components:UriComponents * format:UriFormat -> string
member GetHashCode : unit -> int
member GetLeftPart : part:UriPartial -> string
...
Full name: System.Uri
--------------------
Uri(uriString: string) : unit
Uri(uriString: string, uriKind: UriKind) : unit
Uri(baseUri: Uri, relativeUri: string) : unit
Uri(baseUri: Uri, relativeUri: Uri) : unit
val choose : seq<#Alt<'x>> -> Alt<'x>
Full name: Hopac.Alt.choose
module Timer
from Hopac
module Global
from Hopac.Timer
val timeOut : TimeSpan -> Alt<unit>
Full name: Hopac.Timer.Global.timeOut
Multiple items
type TimeSpan =
struct
new : ticks:int64 -> TimeSpan + 3 overloads
member Add : ts:TimeSpan -> TimeSpan
member CompareTo : value:obj -> int + 1 overload
member Days : int
member Duration : unit -> TimeSpan
member Equals : value:obj -> bool + 1 overload
member GetHashCode : unit -> int
member Hours : int
member Milliseconds : int
member Minutes : int
...
end
Full name: System.TimeSpan
--------------------
TimeSpan()
TimeSpan(ticks: int64) : unit
TimeSpan(hours: int, minutes: int, seconds: int) : unit
TimeSpan(days: int, hours: int, minutes: int, seconds: int) : unit
TimeSpan(days: int, hours: int, minutes: int, seconds: int, milliseconds: int) : unit
TimeSpan.FromMilliseconds(value: float) : TimeSpan
val run : Job<'x> -> 'x
Full name: Hopac.TopLevel.run
More information