7 people like it.
Like the snippet!
Async based MapReduce
Async is a very versatile structure, which has been used to compose CPU/IO bound computations.
So it is very tempting to implement a MapReduce function based on Async and borrowing ideas from the theory of list homomorphisms.
1:
2:
3:
4:
5:
6:
7:
8:
9:
10:
11:
12:
13:
14:
15:
16:
17:
18:
19:
20:
21:
22:
23:
24:
25:
26:
27:
28:
29:
30:
31:
32:
33:
34:
35:
36:
37:
38:
39:
40:
41:
42:
43:
44:
45:
46:
47:
48:
49:
50:
51:
52:
53:
54:
55:
|
(Details)
// A balanced binary-tree representation of the parallel execution of mapReduce
// r
// / \
// / \
// / \
// / \
// / \
// r r
// /\ /\
// / \ / \
// / \ / \
// f a1 f a2 f a3 f a4
//(|f ,⊗|)
let mapReduce (mapF : 'T -> Async<'R>) (reduceF : 'R -> 'R -> Async<'R>) (input : 'T []) : Async<'R> =
let rec mapReduce' s e =
async {
if s + 1 >= e then return! mapF input.[s]
else
let m = (s + e) / 2
let! (left, right) = mapReduce' s m <||> mapReduce' m e
return! reduceF left right
}
mapReduce' 0 input.Length
// Example: the classic map/reduce word-count
let mapF uri =
async {
let! text = Download(new Uri(uri))
let words = text.Split([|' '; '.'; ','|], StringSplitOptions.RemoveEmptyEntries)
return
words
|> Seq.map (fun word -> word.ToUpper())
|> Seq.filter (fun word -> not (noiseWords |> Seq.exists (fun noiseWord -> noiseWord.ToUpper() = word)) && Seq.length word > 3)
|> Seq.groupBy id
|> Seq.map (fun (key, values) -> (key, values |> Seq.length)) |> Seq.toList
}
let reduceF (left : (string * int) list) (right : (string * int) list) =
async {
return
left @ right
|> Seq.groupBy fst
|> Seq.map (fun (key, values) -> (key, values |> Seq.sumBy snd))
|> Seq.toList
}
mapReduce mapF reduceF links
|> Async.RunSynchronously
|> List.sortBy (fun (_, count) -> -count)
|
open System
open System.Net
open Microsoft.FSharp.Control.WebExtensions
let noiseWords = [|"a"; "about"; "above"; "all"; "along"; "also"; "although"; "am"; "an"; "any"; "are"; "aren't"; "as"; "at";
"be"; "because"; "been"; "but"; "by"; "can"; "cannot"; "could"; "couldn't";
"did"; "didn't"; "do"; "does"; "doesn't"; "e.g."; "either"; "etc"; "etc."; "even"; "ever";
"for"; "from"; "further"; "get"; "gets"; "got"; "had"; "hardly"; "has"; "hasn't"; "having"; "he";
"hence"; "her"; "here"; "hereby"; "herein"; "hereof"; "hereon"; "hereto"; "herewith"; "him";
"his"; "how"; "however"; "I"; "i.e."; "if"; "into"; "it"; "it's"; "its"; "me"; "more"; "most"; "mr"; "my";
"near"; "nor"; "now"; "of"; "onto"; "other"; "our"; "out"; "over"; "really"; "said"; "same"; "she"; "should";
"shouldn't"; "since"; "so"; "some"; "such"; "than"; "that"; "the"; "their"; "them"; "then"; "there"; "thereby";
"therefore"; "therefrom"; "therein"; "thereof"; "thereon"; "thereto"; "therewith"; "these"; "they"; "this";
"those"; "through"; "thus"; "to"; "too"; "under"; "until"; "unto"; "upon"; "us"; "very"; "viz"; "was"; "wasn't";
"we"; "were"; "what"; "when"; "where"; "whereby"; "wherein"; "whether"; "which"; "while"; "who"; "whom"; "whose";
"why"; "with"; "without"; "would"; "you"; "your" ; "have"; "thou"; "will"; "shall"|]
let links =
[| "http://www.cs.ukzn.ac.za/~hughm/ap/data/shakespeare/allswellthatendswell.txt";
"http://www.cs.ukzn.ac.za/~hughm/ap/data/shakespeare/amsnd.txt";
"http://www.cs.ukzn.ac.za/~hughm/ap/data/shakespeare/antandcleo.txt";
"http://www.cs.ukzn.ac.za/~hughm/ap/data/shakespeare/asyoulikeit.txt";
"http://www.cs.ukzn.ac.za/~hughm/ap/data/shakespeare/comedyoferrors.txt";
"http://www.cs.ukzn.ac.za/~hughm/ap/data/shakespeare/cymbeline.txt";
"http://www.cs.ukzn.ac.za/~hughm/ap/data/shakespeare/hamlet.txt";
"http://www.cs.ukzn.ac.za/~hughm/ap/data/shakespeare/henryiv1.txt";
"http://www.cs.ukzn.ac.za/~hughm/ap/data/shakespeare/henryiv2.txt";
"http://www.cs.ukzn.ac.za/~hughm/ap/data/shakespeare/henryv.txt";
"http://www.cs.ukzn.ac.za/~hughm/ap/data/shakespeare/henryvi1.txt";
"http://www.cs.ukzn.ac.za/~hughm/ap/data/shakespeare/henryvi2.txt";
"http://www.cs.ukzn.ac.za/~hughm/ap/data/shakespeare/henryvi3.txt";
"http://www.cs.ukzn.ac.za/~hughm/ap/data/shakespeare/henryviii.txt";
"http://www.cs.ukzn.ac.za/~hughm/ap/data/shakespeare/juliuscaesar.txt";
"http://www.cs.ukzn.ac.za/~hughm/ap/data/shakespeare/kingjohn.txt";
"http://www.cs.ukzn.ac.za/~hughm/ap/data/shakespeare/kinglear.txt";
"http://www.cs.ukzn.ac.za/~hughm/ap/data/shakespeare/loveslobourslost.txt";
"http://www.cs.ukzn.ac.za/~hughm/ap/data/shakespeare/maan.txt";
"http://www.cs.ukzn.ac.za/~hughm/ap/data/shakespeare/macbeth.txt";
"http://www.cs.ukzn.ac.za/~hughm/ap/data/shakespeare/measureformeasure.txt";
"http://www.cs.ukzn.ac.za/~hughm/ap/data/shakespeare/merchantofvenice.txt";
"http://www.cs.ukzn.ac.za/~hughm/ap/data/shakespeare/othello.txt";
"http://www.cs.ukzn.ac.za/~hughm/ap/data/shakespeare/richardii.txt";
"http://www.cs.ukzn.ac.za/~hughm/ap/data/shakespeare/richardiii.txt";
"http://www.cs.ukzn.ac.za/~hughm/ap/data/shakespeare/romeoandjuliet.txt";
"http://www.cs.ukzn.ac.za/~hughm/ap/data/shakespeare/tamingoftheshrew.txt";
"http://www.cs.ukzn.ac.za/~hughm/ap/data/shakespeare/tempest.txt";
"http://www.cs.ukzn.ac.za/~hughm/ap/data/shakespeare/themwofw.txt";
"http://www.cs.ukzn.ac.za/~hughm/ap/data/shakespeare/thetgofv.txt";
"http://www.cs.ukzn.ac.za/~hughm/ap/data/shakespeare/timon.txt";
"http://www.cs.ukzn.ac.za/~hughm/ap/data/shakespeare/titus.txt";
"http://www.cs.ukzn.ac.za/~hughm/ap/data/shakespeare/troilusandcressida.txt";
"http://www.cs.ukzn.ac.za/~hughm/ap/data/shakespeare/twelfthnight.txt";
"http://www.cs.ukzn.ac.za/~hughm/ap/data/shakespeare/winterstale.txt"|]
let Download(url : Uri) =
async {
let client = new WebClient()
let! html = client.AsyncDownloadString(url)
return html
}
let (<||>) first second = async { let! results = Async.Parallel([|first; second|]) in return (results.[0], results.[1]) }
val mapReduce : mapF:('T -> Async<'R>) -> reduceF:('R -> 'R -> Async<'R>) -> input:'T [] -> Async<'R>
Full name: Script.mapReduce
val mapF : ('T -> Async<'R>)
Multiple items
type Async
static member AsBeginEnd : computation:('Arg -> Async<'T>) -> ('Arg * AsyncCallback * obj -> IAsyncResult) * (IAsyncResult -> 'T) * (IAsyncResult -> unit)
static member AwaitEvent : event:IEvent<'Del,'T> * ?cancelAction:(unit -> unit) -> Async<'T> (requires delegate and 'Del :> Delegate)
static member AwaitIAsyncResult : iar:IAsyncResult * ?millisecondsTimeout:int -> Async<bool>
static member AwaitTask : task:Task<'T> -> Async<'T>
static member AwaitWaitHandle : waitHandle:WaitHandle * ?millisecondsTimeout:int -> Async<bool>
static member CancelDefaultToken : unit -> unit
static member Catch : computation:Async<'T> -> Async<Choice<'T,exn>>
static member FromBeginEnd : beginAction:(AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg:'Arg1 * beginAction:('Arg1 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg1:'Arg1 * arg2:'Arg2 * beginAction:('Arg1 * 'Arg2 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg1:'Arg1 * arg2:'Arg2 * arg3:'Arg3 * beginAction:('Arg1 * 'Arg2 * 'Arg3 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromContinuations : callback:(('T -> unit) * (exn -> unit) * (OperationCanceledException -> unit) -> unit) -> Async<'T>
static member Ignore : computation:Async<'T> -> Async<unit>
static member OnCancel : interruption:(unit -> unit) -> Async<IDisposable>
static member Parallel : computations:seq<Async<'T>> -> Async<'T []>
static member RunSynchronously : computation:Async<'T> * ?timeout:int * ?cancellationToken:CancellationToken -> 'T
static member Sleep : millisecondsDueTime:int -> Async<unit>
static member Start : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
static member StartAsTask : computation:Async<'T> * ?taskCreationOptions:TaskCreationOptions * ?cancellationToken:CancellationToken -> Task<'T>
static member StartChild : computation:Async<'T> * ?millisecondsTimeout:int -> Async<Async<'T>>
static member StartChildAsTask : computation:Async<'T> * ?taskCreationOptions:TaskCreationOptions -> Async<Task<'T>>
static member StartImmediate : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
static member StartWithContinuations : computation:Async<'T> * continuation:('T -> unit) * exceptionContinuation:(exn -> unit) * cancellationContinuation:(OperationCanceledException -> unit) * ?cancellationToken:CancellationToken -> unit
static member SwitchToContext : syncContext:SynchronizationContext -> Async<unit>
static member SwitchToNewThread : unit -> Async<unit>
static member SwitchToThreadPool : unit -> Async<unit>
static member TryCancelled : computation:Async<'T> * compensation:(OperationCanceledException -> unit) -> Async<'T>
static member CancellationToken : Async<CancellationToken>
static member DefaultCancellationToken : CancellationToken
Full name: Microsoft.FSharp.Control.Async
--------------------
type Async<'T>
Full name: Microsoft.FSharp.Control.Async<_>
val reduceF : ('R -> 'R -> Async<'R>)
val input : 'T []
val mapReduce' : (int -> int -> Async<'R>)
val s : int
val e : int
val async : AsyncBuilder
Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.async
val m : int
val left : 'R
val right : 'R
property Array.Length: int
val mapF : uri:string -> Async<(string * int) list>
Full name: Script.mapF
val uri : string
val text : string
val Download : url:Uri -> Async<string>
Full name: Script.Download
Multiple items
type Uri =
new : uriString:string -> Uri + 5 overloads
member AbsolutePath : string
member AbsoluteUri : string
member Authority : string
member DnsSafeHost : string
member Equals : comparand:obj -> bool
member Fragment : string
member GetComponents : components:UriComponents * format:UriFormat -> string
member GetHashCode : unit -> int
member GetLeftPart : part:UriPartial -> string
...
Full name: System.Uri
--------------------
Uri(uriString: string) : unit
Uri(uriString: string, uriKind: UriKind) : unit
Uri(baseUri: Uri, relativeUri: string) : unit
Uri(baseUri: Uri, relativeUri: Uri) : unit
val words : string []
String.Split([<ParamArray>] separator: char []) : string []
String.Split(separator: string [], options: StringSplitOptions) : string []
String.Split(separator: char [], options: StringSplitOptions) : string []
String.Split(separator: char [], count: int) : string []
String.Split(separator: string [], count: int, options: StringSplitOptions) : string []
String.Split(separator: char [], count: int, options: StringSplitOptions) : string []
type StringSplitOptions =
| None = 0
| RemoveEmptyEntries = 1
Full name: System.StringSplitOptions
field StringSplitOptions.RemoveEmptyEntries = 1
module Seq
from Microsoft.FSharp.Collections
val map : mapping:('T -> 'U) -> source:seq<'T> -> seq<'U>
Full name: Microsoft.FSharp.Collections.Seq.map
val word : string
String.ToUpper() : string
String.ToUpper(culture: Globalization.CultureInfo) : string
val filter : predicate:('T -> bool) -> source:seq<'T> -> seq<'T>
Full name: Microsoft.FSharp.Collections.Seq.filter
val not : value:bool -> bool
Full name: Microsoft.FSharp.Core.Operators.not
val noiseWords : string []
Full name: Script.noiseWords
val exists : predicate:('T -> bool) -> source:seq<'T> -> bool
Full name: Microsoft.FSharp.Collections.Seq.exists
val noiseWord : string
val length : source:seq<'T> -> int
Full name: Microsoft.FSharp.Collections.Seq.length
val groupBy : projection:('T -> 'Key) -> source:seq<'T> -> seq<'Key * seq<'T>> (requires equality)
Full name: Microsoft.FSharp.Collections.Seq.groupBy
val id : x:'T -> 'T
Full name: Microsoft.FSharp.Core.Operators.id
val key : string
val values : seq<string>
val toList : source:seq<'T> -> 'T list
Full name: Microsoft.FSharp.Collections.Seq.toList
val reduceF : left:(string * int) list -> right:(string * int) list -> Async<(string * int) list>
Full name: Script.reduceF
val left : (string * int) list
Multiple items
val string : value:'T -> string
Full name: Microsoft.FSharp.Core.Operators.string
--------------------
type string = String
Full name: Microsoft.FSharp.Core.string
Multiple items
val int : value:'T -> int (requires member op_Explicit)
Full name: Microsoft.FSharp.Core.Operators.int
--------------------
type int = int32
Full name: Microsoft.FSharp.Core.int
--------------------
type int<'Measure> = int
Full name: Microsoft.FSharp.Core.int<_>
type 'T list = List<'T>
Full name: Microsoft.FSharp.Collections.list<_>
val right : (string * int) list
val fst : tuple:('T1 * 'T2) -> 'T1
Full name: Microsoft.FSharp.Core.Operators.fst
val values : seq<string * int>
val sumBy : projection:('T -> 'U) -> source:seq<'T> -> 'U (requires member ( + ) and member get_Zero)
Full name: Microsoft.FSharp.Collections.Seq.sumBy
val snd : tuple:('T1 * 'T2) -> 'T2
Full name: Microsoft.FSharp.Core.Operators.snd
val links : string []
Full name: Script.links
static member Async.RunSynchronously : computation:Async<'T> * ?timeout:int * ?cancellationToken:Threading.CancellationToken -> 'T
Multiple items
module List
from Microsoft.FSharp.Collections
--------------------
type List<'T> =
| ( [] )
| ( :: ) of Head: 'T * Tail: 'T list
interface IEnumerable
interface IEnumerable<'T>
member Head : 'T
member IsEmpty : bool
member Item : index:int -> 'T with get
member Length : int
member Tail : 'T list
static member Cons : head:'T * tail:'T list -> 'T list
static member Empty : 'T list
Full name: Microsoft.FSharp.Collections.List<_>
val sortBy : projection:('T -> 'Key) -> list:'T list -> 'T list (requires comparison)
Full name: Microsoft.FSharp.Collections.List.sortBy
val count : int
More information