7 people like it.

Async based MapReduce

Async is a very versatile structure, which has been used to compose CPU/IO bound computations. So it is very tempting to implement a MapReduce function based on Async and borrowing ideas from the theory of list homomorphisms.

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
14: 
15: 
16: 
17: 
18: 
19: 
20: 
21: 
22: 
23: 
24: 
25: 
26: 
27: 
28: 
29: 
30: 
31: 
32: 
33: 
34: 
35: 
36: 
37: 
38: 
39: 
40: 
41: 
42: 
43: 
44: 
45: 
46: 
47: 
48: 
49: 
50: 
51: 
52: 
53: 
54: 
55: 
(Details)

// A balanced binary-tree representation of the parallel execution of mapReduce
//           r
//          / \
//         /   \
//        /     \
//       /       \
//      /         \
//     r           r
//    /\          /\
//   /  \        /  \
//  /    \      /    \
// f a1  f a2  f a3  f a4


//(|f ,⊗|)
let mapReduce (mapF : 'T -> Async<'R>) (reduceF : 'R -> 'R -> Async<'R>) (input : 'T []) : Async<'R> = 
    let rec mapReduce' s e =
        async { 
            if s + 1 >= e then return! mapF input.[s]
            else 
                let m = (s + e) / 2
                let! (left, right) =  mapReduce' s m <||> mapReduce' m e
                return! reduceF left right
        }
    mapReduce' 0 input.Length


// Example: the classic map/reduce word-count

let mapF uri =
    async {
        let! text = Download(new Uri(uri))
        let words = text.Split([|' '; '.'; ','|], StringSplitOptions.RemoveEmptyEntries)
        return 
            words 
            |> Seq.map (fun word -> word.ToUpper())
            |> Seq.filter (fun word -> not (noiseWords |> Seq.exists (fun noiseWord -> noiseWord.ToUpper() = word)) && Seq.length word > 3)
            |> Seq.groupBy id 
            |> Seq.map (fun (key, values) -> (key, values |> Seq.length)) |> Seq.toList
    }

let reduceF (left : (string * int) list) (right : (string * int) list) = 
    async {
        return 
            left @ right 
            |> Seq.groupBy fst 
            |> Seq.map (fun (key, values) -> (key, values |> Seq.sumBy snd)) 
            |> Seq.toList
    }

mapReduce mapF reduceF links
|> Async.RunSynchronously
|> List.sortBy (fun (_, count) -> -count) 
open System
open System.Net
open Microsoft.FSharp.Control.WebExtensions

let noiseWords = [|"a"; "about"; "above"; "all"; "along"; "also"; "although"; "am"; "an"; "any"; "are"; "aren't"; "as"; "at";
            "be"; "because"; "been"; "but"; "by"; "can"; "cannot"; "could"; "couldn't";
            "did"; "didn't"; "do"; "does"; "doesn't"; "e.g."; "either"; "etc"; "etc."; "even"; "ever";
            "for"; "from"; "further"; "get"; "gets"; "got"; "had"; "hardly"; "has"; "hasn't"; "having"; "he";
            "hence"; "her"; "here"; "hereby"; "herein"; "hereof"; "hereon"; "hereto"; "herewith"; "him";
            "his"; "how"; "however"; "I"; "i.e."; "if"; "into"; "it"; "it's"; "its"; "me"; "more"; "most"; "mr"; "my";
            "near"; "nor"; "now"; "of"; "onto"; "other"; "our"; "out"; "over"; "really"; "said"; "same"; "she"; "should";
            "shouldn't"; "since"; "so"; "some"; "such"; "than"; "that"; "the"; "their"; "them"; "then"; "there"; "thereby";
            "therefore"; "therefrom"; "therein"; "thereof"; "thereon"; "thereto"; "therewith"; "these"; "they"; "this";
            "those"; "through"; "thus"; "to"; "too"; "under"; "until"; "unto"; "upon"; "us"; "very"; "viz"; "was"; "wasn't";
            "we"; "were"; "what"; "when"; "where"; "whereby"; "wherein"; "whether"; "which"; "while"; "who"; "whom"; "whose";
            "why"; "with"; "without"; "would"; "you"; "your" ; "have"; "thou"; "will"; "shall"|]

let links =
    [| "http://www.cs.ukzn.ac.za/~hughm/ap/data/shakespeare/allswellthatendswell.txt";
        "http://www.cs.ukzn.ac.za/~hughm/ap/data/shakespeare/amsnd.txt";
        "http://www.cs.ukzn.ac.za/~hughm/ap/data/shakespeare/antandcleo.txt";
        "http://www.cs.ukzn.ac.za/~hughm/ap/data/shakespeare/asyoulikeit.txt";
        "http://www.cs.ukzn.ac.za/~hughm/ap/data/shakespeare/comedyoferrors.txt";
        "http://www.cs.ukzn.ac.za/~hughm/ap/data/shakespeare/cymbeline.txt";
        "http://www.cs.ukzn.ac.za/~hughm/ap/data/shakespeare/hamlet.txt";
        "http://www.cs.ukzn.ac.za/~hughm/ap/data/shakespeare/henryiv1.txt";
        "http://www.cs.ukzn.ac.za/~hughm/ap/data/shakespeare/henryiv2.txt";
        "http://www.cs.ukzn.ac.za/~hughm/ap/data/shakespeare/henryv.txt";
        "http://www.cs.ukzn.ac.za/~hughm/ap/data/shakespeare/henryvi1.txt";
        "http://www.cs.ukzn.ac.za/~hughm/ap/data/shakespeare/henryvi2.txt";
        "http://www.cs.ukzn.ac.za/~hughm/ap/data/shakespeare/henryvi3.txt";
        "http://www.cs.ukzn.ac.za/~hughm/ap/data/shakespeare/henryviii.txt";
        "http://www.cs.ukzn.ac.za/~hughm/ap/data/shakespeare/juliuscaesar.txt";
        "http://www.cs.ukzn.ac.za/~hughm/ap/data/shakespeare/kingjohn.txt";
        "http://www.cs.ukzn.ac.za/~hughm/ap/data/shakespeare/kinglear.txt";
        "http://www.cs.ukzn.ac.za/~hughm/ap/data/shakespeare/loveslobourslost.txt";
        "http://www.cs.ukzn.ac.za/~hughm/ap/data/shakespeare/maan.txt";
        "http://www.cs.ukzn.ac.za/~hughm/ap/data/shakespeare/macbeth.txt";
        "http://www.cs.ukzn.ac.za/~hughm/ap/data/shakespeare/measureformeasure.txt";
        "http://www.cs.ukzn.ac.za/~hughm/ap/data/shakespeare/merchantofvenice.txt";
        "http://www.cs.ukzn.ac.za/~hughm/ap/data/shakespeare/othello.txt";
        "http://www.cs.ukzn.ac.za/~hughm/ap/data/shakespeare/richardii.txt";
        "http://www.cs.ukzn.ac.za/~hughm/ap/data/shakespeare/richardiii.txt";
        "http://www.cs.ukzn.ac.za/~hughm/ap/data/shakespeare/romeoandjuliet.txt";
        "http://www.cs.ukzn.ac.za/~hughm/ap/data/shakespeare/tamingoftheshrew.txt";
        "http://www.cs.ukzn.ac.za/~hughm/ap/data/shakespeare/tempest.txt";
        "http://www.cs.ukzn.ac.za/~hughm/ap/data/shakespeare/themwofw.txt";
        "http://www.cs.ukzn.ac.za/~hughm/ap/data/shakespeare/thetgofv.txt";
        "http://www.cs.ukzn.ac.za/~hughm/ap/data/shakespeare/timon.txt";
        "http://www.cs.ukzn.ac.za/~hughm/ap/data/shakespeare/titus.txt";
        "http://www.cs.ukzn.ac.za/~hughm/ap/data/shakespeare/troilusandcressida.txt";
        "http://www.cs.ukzn.ac.za/~hughm/ap/data/shakespeare/twelfthnight.txt";
        "http://www.cs.ukzn.ac.za/~hughm/ap/data/shakespeare/winterstale.txt"|]

let Download(url : Uri) =
    async {
        let client = new WebClient()
        let! html = client.AsyncDownloadString(url)
        return html
    }

let (<||>) first second = async { let! results = Async.Parallel([|first; second|]) in return (results.[0], results.[1]) }
val mapReduce : mapF:('T -> Async<'R>) -> reduceF:('R -> 'R -> Async<'R>) -> input:'T [] -> Async<'R>

Full name: Script.mapReduce
val mapF : ('T -> Async<'R>)
Multiple items
type Async
static member AsBeginEnd : computation:('Arg -> Async<'T>) -> ('Arg * AsyncCallback * obj -> IAsyncResult) * (IAsyncResult -> 'T) * (IAsyncResult -> unit)
static member AwaitEvent : event:IEvent<'Del,'T> * ?cancelAction:(unit -> unit) -> Async<'T> (requires delegate and 'Del :> Delegate)
static member AwaitIAsyncResult : iar:IAsyncResult * ?millisecondsTimeout:int -> Async<bool>
static member AwaitTask : task:Task<'T> -> Async<'T>
static member AwaitWaitHandle : waitHandle:WaitHandle * ?millisecondsTimeout:int -> Async<bool>
static member CancelDefaultToken : unit -> unit
static member Catch : computation:Async<'T> -> Async<Choice<'T,exn>>
static member FromBeginEnd : beginAction:(AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg:'Arg1 * beginAction:('Arg1 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg1:'Arg1 * arg2:'Arg2 * beginAction:('Arg1 * 'Arg2 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg1:'Arg1 * arg2:'Arg2 * arg3:'Arg3 * beginAction:('Arg1 * 'Arg2 * 'Arg3 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromContinuations : callback:(('T -> unit) * (exn -> unit) * (OperationCanceledException -> unit) -> unit) -> Async<'T>
static member Ignore : computation:Async<'T> -> Async<unit>
static member OnCancel : interruption:(unit -> unit) -> Async<IDisposable>
static member Parallel : computations:seq<Async<'T>> -> Async<'T []>
static member RunSynchronously : computation:Async<'T> * ?timeout:int * ?cancellationToken:CancellationToken -> 'T
static member Sleep : millisecondsDueTime:int -> Async<unit>
static member Start : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
static member StartAsTask : computation:Async<'T> * ?taskCreationOptions:TaskCreationOptions * ?cancellationToken:CancellationToken -> Task<'T>
static member StartChild : computation:Async<'T> * ?millisecondsTimeout:int -> Async<Async<'T>>
static member StartChildAsTask : computation:Async<'T> * ?taskCreationOptions:TaskCreationOptions -> Async<Task<'T>>
static member StartImmediate : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
static member StartWithContinuations : computation:Async<'T> * continuation:('T -> unit) * exceptionContinuation:(exn -> unit) * cancellationContinuation:(OperationCanceledException -> unit) * ?cancellationToken:CancellationToken -> unit
static member SwitchToContext : syncContext:SynchronizationContext -> Async<unit>
static member SwitchToNewThread : unit -> Async<unit>
static member SwitchToThreadPool : unit -> Async<unit>
static member TryCancelled : computation:Async<'T> * compensation:(OperationCanceledException -> unit) -> Async<'T>
static member CancellationToken : Async<CancellationToken>
static member DefaultCancellationToken : CancellationToken

Full name: Microsoft.FSharp.Control.Async

--------------------
type Async<'T>

Full name: Microsoft.FSharp.Control.Async<_>
val reduceF : ('R -> 'R -> Async<'R>)
val input : 'T []
val mapReduce' : (int -> int -> Async<'R>)
val s : int
val e : int
val async : AsyncBuilder

Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.async
val m : int
val left : 'R
val right : 'R
property Array.Length: int
val mapF : uri:string -> Async<(string * int) list>

Full name: Script.mapF
val uri : string
val text : string
val Download : url:Uri -> Async<string>

Full name: Script.Download
Multiple items
type Uri =
  new : uriString:string -> Uri + 5 overloads
  member AbsolutePath : string
  member AbsoluteUri : string
  member Authority : string
  member DnsSafeHost : string
  member Equals : comparand:obj -> bool
  member Fragment : string
  member GetComponents : components:UriComponents * format:UriFormat -> string
  member GetHashCode : unit -> int
  member GetLeftPart : part:UriPartial -> string
  ...

Full name: System.Uri

--------------------
Uri(uriString: string) : unit
Uri(uriString: string, uriKind: UriKind) : unit
Uri(baseUri: Uri, relativeUri: string) : unit
Uri(baseUri: Uri, relativeUri: Uri) : unit
val words : string []
String.Split([<ParamArray>] separator: char []) : string []
String.Split(separator: string [], options: StringSplitOptions) : string []
String.Split(separator: char [], options: StringSplitOptions) : string []
String.Split(separator: char [], count: int) : string []
String.Split(separator: string [], count: int, options: StringSplitOptions) : string []
String.Split(separator: char [], count: int, options: StringSplitOptions) : string []
type StringSplitOptions =
  | None = 0
  | RemoveEmptyEntries = 1

Full name: System.StringSplitOptions
field StringSplitOptions.RemoveEmptyEntries = 1
module Seq

from Microsoft.FSharp.Collections
val map : mapping:('T -> 'U) -> source:seq<'T> -> seq<'U>

Full name: Microsoft.FSharp.Collections.Seq.map
val word : string
String.ToUpper() : string
String.ToUpper(culture: Globalization.CultureInfo) : string
val filter : predicate:('T -> bool) -> source:seq<'T> -> seq<'T>

Full name: Microsoft.FSharp.Collections.Seq.filter
val not : value:bool -> bool

Full name: Microsoft.FSharp.Core.Operators.not
val noiseWords : string []

Full name: Script.noiseWords
val exists : predicate:('T -> bool) -> source:seq<'T> -> bool

Full name: Microsoft.FSharp.Collections.Seq.exists
val noiseWord : string
val length : source:seq<'T> -> int

Full name: Microsoft.FSharp.Collections.Seq.length
val groupBy : projection:('T -> 'Key) -> source:seq<'T> -> seq<'Key * seq<'T>> (requires equality)

Full name: Microsoft.FSharp.Collections.Seq.groupBy
val id : x:'T -> 'T

Full name: Microsoft.FSharp.Core.Operators.id
val key : string
val values : seq<string>
val toList : source:seq<'T> -> 'T list

Full name: Microsoft.FSharp.Collections.Seq.toList
val reduceF : left:(string * int) list -> right:(string * int) list -> Async<(string * int) list>

Full name: Script.reduceF
val left : (string * int) list
Multiple items
val string : value:'T -> string

Full name: Microsoft.FSharp.Core.Operators.string

--------------------
type string = String

Full name: Microsoft.FSharp.Core.string
Multiple items
val int : value:'T -> int (requires member op_Explicit)

Full name: Microsoft.FSharp.Core.Operators.int

--------------------
type int = int32

Full name: Microsoft.FSharp.Core.int

--------------------
type int<'Measure> = int

Full name: Microsoft.FSharp.Core.int<_>
type 'T list = List<'T>

Full name: Microsoft.FSharp.Collections.list<_>
val right : (string * int) list
val fst : tuple:('T1 * 'T2) -> 'T1

Full name: Microsoft.FSharp.Core.Operators.fst
val values : seq<string * int>
val sumBy : projection:('T -> 'U) -> source:seq<'T> -> 'U (requires member ( + ) and member get_Zero)

Full name: Microsoft.FSharp.Collections.Seq.sumBy
val snd : tuple:('T1 * 'T2) -> 'T2

Full name: Microsoft.FSharp.Core.Operators.snd
val links : string []

Full name: Script.links
static member Async.RunSynchronously : computation:Async<'T> * ?timeout:int * ?cancellationToken:Threading.CancellationToken -> 'T
Multiple items
module List

from Microsoft.FSharp.Collections

--------------------
type List<'T> =
  | ( [] )
  | ( :: ) of Head: 'T * Tail: 'T list
  interface IEnumerable
  interface IEnumerable<'T>
  member Head : 'T
  member IsEmpty : bool
  member Item : index:int -> 'T with get
  member Length : int
  member Tail : 'T list
  static member Cons : head:'T * tail:'T list -> 'T list
  static member Empty : 'T list

Full name: Microsoft.FSharp.Collections.List<_>
val sortBy : projection:('T -> 'Key) -> list:'T list -> 'T list (requires comparison)

Full name: Microsoft.FSharp.Collections.List.sortBy
val count : int
Raw view Test code New version

More information

Link:http://fssnip.net/73
Posted:13 years ago
Author:Nick Palladinos
Tags: async , mapreduce