9 people like it.

Parallel fold

Idea from Guy L. Steele - Organizing Functional Code for Parallel Execution; or, foldl and foldr Considered Slightly Harmful - https://vimeo.com/6624203

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
14: 
15: 
16: 
17: 
18: 
19: 
20: 
21: 
22: 
23: 
24: 
25: 
26: 
27: 
28: 
29: 
30: 
31: 
32: 
33: 
34: 
35: 
36: 
37: 
38: 
39: 
40: 
41: 
42: 
43: 
44: 
45: 
46: 
47: 
48: 
49: 
50: 
51: 
52: 
53: 
54: 
// Reduce / Aggregate / Fold 
// Usual way makes deep recursion and trusts tail-opt: 
// (a,b,c,d,e,f,g,h) => (((((((a + b) + c) + d) + e) + f) + g) + h)
// This more is quicksort-style parallel: 
// (a,b,c,d,e,f,g,h) => (((a + b) + (c + d)) + ((e + f) + (g + h)))

// No Haskell Kinds support for F# so List and Array are easiest to make as separate methods.
open System.Threading.Tasks
module List = 
  let reduceParallel<'a> f (ie :'a list) =
    let rec reduceRec f (ie :'a list) = function
      | 1 -> ie.[0]
      | 2 -> f ie.[0] ie.[1]
      | len -> 
        let h = len / 2
        let o1 = Task.Run(fun _ -> reduceRec f (ie |> List.take h) h)
        let o2 = Task.Run(fun _ -> reduceRec f (ie |> List.skip h) (len-h))
        Task.WaitAll(o1, o2)
        f o1.Result o2.Result
    match ie.Length with
    | 0 -> failwith "Sequence contains no elements"
    | c -> reduceRec f ie c

module Array = 
  let reduceParallel<'a> f (ie :'a array) =
    let rec reduceRec f (ie :'a array) = function
      | 1 -> ie.[0]
      | 2 -> f ie.[0] ie.[1]
      | len -> 
        let h = len / 2
        let o1 = Task.Run(fun _ -> reduceRec f (ie |> Array.take h) h)
        let o2 = Task.Run(fun _ -> reduceRec f (ie |> Array.skip h) (len-h))
        Task.WaitAll(o1, o2)
        f o1.Result o2.Result
    match ie.Length with
    | 0 -> failwith "Sequence contains no elements"
    | c -> reduceRec f ie c

(*
Show case in F#-interactive with #time

> [1 .. 500] |> List.fold(fun a -> fun x -> System.Threading.Thread.Sleep(30);x+a) 0;;
Real: 00:00:15.654, CPU: 00:00:00.015, GC gen0: 0, gen1: 0, gen2: 0
val it : int = 125250
> [1 .. 500] |> List.reduceParallel(fun a -> fun x -> System.Threading.Thread.Sleep(30);x+a);;
Real: 00:00:02.710, CPU: 00:00:00.000, GC gen0: 0, gen1: 0, gen2: 0
val it : int = 125250
> [|1 .. 500|] |> Array.fold(fun a -> fun x -> System.Threading.Thread.Sleep(30);x+a) 0;;
Real: 00:00:15.639, CPU: 00:00:00.000, GC gen0: 0, gen1: 0, gen2: 0
val it : int = 125250
> [|1 .. 500|] |> Array.reduceParallel(fun a -> fun x -> System.Threading.Thread.Sleep(30);x+a);;
Real: 00:00:02.537, CPU: 00:00:00.000, GC gen0: 0, gen1: 0, gen2: 0
val it : int = 125250
*)
namespace System
namespace System.Threading
namespace System.Threading.Tasks
Multiple items
module List

from Microsoft.FSharp.Collections

--------------------
type List<'T> =
  | ( [] )
  | ( :: ) of Head: 'T * Tail: 'T list
  interface IEnumerable
  interface IEnumerable<'T>
  member Head : 'T
  member IsEmpty : bool
  member Item : index:int -> 'T with get
  member Length : int
  member Tail : 'T list
  static member Cons : head:'T * tail:'T list -> 'T list
  static member Empty : 'T list

Full name: Microsoft.FSharp.Collections.List<_>
val reduceParallel : f:('a -> 'a -> 'a) -> ie:'a list -> 'a

Full name: Script.List.reduceParallel
val f : ('a -> 'a -> 'a)
val ie : 'a list
type 'T list = List<'T>

Full name: Microsoft.FSharp.Collections.list<_>
val reduceRec : (('a -> 'a -> 'a) -> 'a list -> int -> 'a)
val len : int
val h : int
val o1 : obj
Multiple items
type Task =
  new : action:Action -> Task + 7 overloads
  member AsyncState : obj
  member ContinueWith : continuationAction:Action<Task> -> Task + 9 overloads
  member CreationOptions : TaskCreationOptions
  member Dispose : unit -> unit
  member Exception : AggregateException
  member Id : int
  member IsCanceled : bool
  member IsCompleted : bool
  member IsFaulted : bool
  ...

Full name: System.Threading.Tasks.Task

--------------------
type Task<'TResult> =
  inherit Task
  new : function:Func<'TResult> -> Task<'TResult> + 7 overloads
  member ContinueWith : continuationAction:Action<Task<'TResult>> -> Task + 9 overloads
  member Result : 'TResult with get, set
  static member Factory : TaskFactory<'TResult>

Full name: System.Threading.Tasks.Task<_>

--------------------
Task(action: System.Action) : unit
Task(action: System.Action, cancellationToken: System.Threading.CancellationToken) : unit
Task(action: System.Action, creationOptions: TaskCreationOptions) : unit
Task(action: System.Action<obj>, state: obj) : unit
Task(action: System.Action, cancellationToken: System.Threading.CancellationToken, creationOptions: TaskCreationOptions) : unit
Task(action: System.Action<obj>, state: obj, cancellationToken: System.Threading.CancellationToken) : unit
Task(action: System.Action<obj>, state: obj, creationOptions: TaskCreationOptions) : unit
Task(action: System.Action<obj>, state: obj, cancellationToken: System.Threading.CancellationToken, creationOptions: TaskCreationOptions) : unit

--------------------
Task(function: System.Func<'TResult>) : unit
Task(function: System.Func<'TResult>, cancellationToken: System.Threading.CancellationToken) : unit
Task(function: System.Func<'TResult>, creationOptions: TaskCreationOptions) : unit
Task(function: System.Func<obj,'TResult>, state: obj) : unit
Task(function: System.Func<'TResult>, cancellationToken: System.Threading.CancellationToken, creationOptions: TaskCreationOptions) : unit
Task(function: System.Func<obj,'TResult>, state: obj, cancellationToken: System.Threading.CancellationToken) : unit
Task(function: System.Func<obj,'TResult>, state: obj, creationOptions: TaskCreationOptions) : unit
Task(function: System.Func<obj,'TResult>, state: obj, cancellationToken: System.Threading.CancellationToken, creationOptions: TaskCreationOptions) : unit
val o2 : obj
Task.WaitAll([<System.ParamArray>] tasks: Task []) : unit
Task.WaitAll(tasks: Task [], cancellationToken: System.Threading.CancellationToken) : unit
Task.WaitAll(tasks: Task [], millisecondsTimeout: int) : bool
Task.WaitAll(tasks: Task [], timeout: System.TimeSpan) : bool
Task.WaitAll(tasks: Task [], millisecondsTimeout: int, cancellationToken: System.Threading.CancellationToken) : bool
property List.Length: int
val failwith : message:string -> 'T

Full name: Microsoft.FSharp.Core.Operators.failwith
val c : int
Multiple items
module Array

from Script

--------------------
module Array

from Microsoft.FSharp.Collections
val reduceParallel : f:('a -> 'a -> 'a) -> ie:'a array -> 'a

Full name: Script.Array.reduceParallel
val ie : 'a array
type 'T array = 'T []

Full name: Microsoft.FSharp.Core.array<_>
val reduceRec : (('a -> 'a -> 'a) -> 'a array -> int -> 'a)
module Array

from Microsoft.FSharp.Collections
property System.Array.Length: int
Raw view Test code New version

More information

Link:http://fssnip.net/pa
Posted:9 years ago
Author:Tuomas Hietanen
Tags: fold , aggregate , reduce , parallel , collections