10 people like it.
Like the snippet!
Reducers
A simple yet powerful library for parallel collection processing. Inspired by Clojure's Reducers.
1:
2:
3:
4:
5:
6:
7:
8:
9:
10:
11:
12:
13:
14:
15:
16:
17:
18:
19:
20:
21:
22:
23:
24:
25:
26:
27:
28:
29:
30:
31:
32:
33:
34:
35:
36:
37:
38:
39:
40:
41:
42:
43:
44:
45:
46:
47:
48:
49:
50:
51:
52:
53:
54:
55:
56:
57:
58:
59:
60:
61:
62:
63:
64:
65:
66:
67:
68:
69:
70:
71:
72:
73:
74:
75:
76:
77:
78:
79:
80:
81:
82:
83:
84:
85:
86:
87:
88:
89:
90:
91:
92:
93:
94:
95:
96:
97:
98:
99:
100:
101:
102:
103:
104:
105:
106:
107:
108:
109:
110:
111:
112:
113:
114:
115:
116:
117:
118:
119:
120:
121:
122:
123:
124:
125:
126:
127:
128:
129:
130:
131:
132:
133:
134:
135:
136:
137:
138:
139:
140:
141:
|
// http://clojure.com/blog/2012/05/08/reducers-a-library-and-model-for-collection-processing.html
module Reducer =
open System
open System.Text
open System.Collections.Generic
type ReduceFunc<'T,'R> =
abstract Invoke : 'T * 'R -> 'R
type CombineFunc<'R> =
abstract Invoke : 'R * 'R -> 'R
type Reducer<'T> =
abstract Apply<'R> : ReduceFunc<'T, 'R> * CombineFunc<'R> * (unit -> 'R) -> 'R
// helper functions
let inline toReduceFunc (f : 'T -> 'R -> 'R) =
{
new ReduceFunc<'T, 'R> with
member self.Invoke(value : 'T, acc : 'R) = f value acc
}
let inline toCombineFunc (f : 'R -> 'R -> 'R) =
{
new CombineFunc<'R> with
member self.Invoke(left : 'R, right : 'R) = f left right
}
// executor functions
let inline toSeqReducer (values : seq<'T>) : Reducer<'T> =
{
new Reducer<'T> with
member self.Apply<'R>(rf : ReduceFunc<'T, 'R>, _, init : unit -> 'R) : 'R =
let mutable r = init ()
for value in values do
r <- rf.Invoke(value, r)
r
}
let inline toParallelReducer (seqReduceCount : int) (values : 'T []) : Reducer<'T> =
{
new Reducer<'T> with
member self.Apply<'R>(rf : ReduceFunc<'T, 'R>, cf : CombineFunc<'R>, init : unit -> 'R) : 'R =
let rec reduceCombine s e =
async {
if e - s <= seqReduceCount then
let s' = if s > 0 then s + 1 else s
let result = { s' .. e } |> Seq.fold (fun r i -> rf.Invoke(values.[i], r)) (init ())
return result
else
let m = (s + e) / 2
let! result = Async.Parallel [| reduceCombine s m; reduceCombine m e |]
return cf.Invoke(result.[0], result.[1])
}
reduceCombine 0 (values.Length - 1) |> Async.RunSynchronously
}
// transform functions
let inline collect (f : 'A -> Reducer<'B>) (input : Reducer<'A>) : Reducer<'B> =
{
new Reducer<'B> with
member self.Apply<'R> (rf, cf, init) =
input.Apply<'R>(toReduceFunc (fun a r -> (f a).Apply(rf, cf, fun () -> r)), cf, init)
}
let inline map (f : 'A -> 'B) (input : Reducer<'A>) : Reducer<'B> =
{
new Reducer<'B> with
member self.Apply<'R> (rf, cf, init) =
input.Apply<'R>(toReduceFunc (fun a r -> rf.Invoke(f a, r)), cf, init)
}
let inline filter (p : 'A -> bool) (input : Reducer<'A>) : Reducer<'A> =
{
new Reducer<'A> with
member self.Apply<'R> (rf, cf, init) =
input.Apply<'R>(toReduceFunc (fun a r -> if p a then rf.Invoke(a, r) else r), cf, init)
}
// reduce functions
let inline reduce (reducef : 'T -> 'R -> 'R) (combineF : 'R -> 'R -> 'R) (init : (unit -> 'R)) (reducer : Reducer<'T>) : 'R =
reducer.Apply(toReduceFunc reducef, toCombineFunc combineF, init)
let inline sum (reducer : Reducer<int>) : int =
reduce (+) (+) (fun () -> 0) reducer
let inline length (reducer : Reducer<'T>) : int =
reduce (fun _ r -> r + 1) (+) (fun () -> 0) reducer
let inline concat (reducer : Reducer<string>) : string =
let result =
reduce (fun (v : string) (builder : StringBuilder) -> builder.Append(v))
(fun (left : StringBuilder) (right : StringBuilder) -> left.Append(right))
(fun () -> new StringBuilder())
reducer
result.ToString()
let inline toArray (reducer : Reducer<'T>) : 'T [] =
let result =
reduce (fun v (list : List<'T>) -> list.Add(v); list)
(fun (left : List<'T>) (right : List<'T>) -> left.AddRange(right); left)
(fun () -> new List<'T>())
reducer
result.ToArray()
let inline groupBy (selectorF : 'T -> 'Key)
(transformF : 'T -> 'Elem)
(aggregateF : 'Key * seq<'Elem> -> 'Elem)
(reducer : Reducer<'T>) : seq<'Key * 'Elem> =
let inline reduceF (v : 'T) (r : Dictionary<'Key, List<'Elem>>) =
let key = selectorF v
let elem = transformF v
if r.ContainsKey(key) then
r.[key].Add(elem)
else
r.Add(key, new List<_>([| elem |]))
r
let inline combineF (left : Dictionary<'Key, List<'Elem>>) (right : Dictionary<'Key, List<'Elem>>) =
for keyValue in right |> Seq.toArray do
if left.ContainsKey(keyValue.Key) then
left.[keyValue.Key].AddRange(right.[keyValue.Key])
else
left.[keyValue.Key] <- new List<_>([| (keyValue.Key, keyValue.Value) |> aggregateF |])
left
let result =
reduce reduceF combineF
(fun () -> new Dictionary<'Key, List<'Elem>>())
reducer
result |> Seq.map (fun keyValue -> (keyValue.Key, (keyValue.Key, keyValue.Value) |> aggregateF ))
// Example - wordcount
let lines = System.IO.File.ReadAllLines("largefile.txt")
lines
|> Reducer.toParallelReducer 10
|> Reducer.collect (fun line -> Reducer.toSeqReducer <| line.Split(' '))
|> Reducer.groupBy id (fun _ -> 1) (fun (_, items) -> Seq.sum items)
|
namespace System
namespace System.Text
namespace System.Collections
namespace System.Collections.Generic
type ReduceFunc<'T,'R> =
interface
abstract member Invoke : 'T * 'R -> 'R
end
Full name: Script.Reducer.ReduceFunc<_,_>
abstract member ReduceFunc.Invoke : 'T * 'R -> 'R
Full name: Script.Reducer.ReduceFunc`2.Invoke
type CombineFunc<'R> =
interface
abstract member Invoke : 'R * 'R -> 'R
end
Full name: Script.Reducer.CombineFunc<_>
abstract member CombineFunc.Invoke : 'R * 'R -> 'R
Full name: Script.Reducer.CombineFunc`1.Invoke
type Reducer<'T> =
interface
abstract member Apply : ReduceFunc<'T,'R> * CombineFunc<'R> * (unit -> 'R) -> 'R
end
Full name: Script.Reducer.Reducer<_>
abstract member Reducer.Apply : ReduceFunc<'T,'R> * CombineFunc<'R> * (unit -> 'R) -> 'R
Full name: Script.Reducer.Reducer`1.Apply
type unit = Unit
Full name: Microsoft.FSharp.Core.unit
val toReduceFunc : f:('T -> 'R -> 'R) -> ReduceFunc<'T,'R>
Full name: Script.Reducer.toReduceFunc
val f : ('T -> 'R -> 'R)
val self : ReduceFunc<'T,'R>
abstract member ReduceFunc.Invoke : 'T * 'R -> 'R
val value : 'T
val acc : 'R
val toCombineFunc : f:('R -> 'R -> 'R) -> CombineFunc<'R>
Full name: Script.Reducer.toCombineFunc
val f : ('R -> 'R -> 'R)
val self : CombineFunc<'R>
abstract member CombineFunc.Invoke : 'R * 'R -> 'R
val left : 'R
val right : 'R
val toSeqReducer : values:seq<'T> -> Reducer<'T>
Full name: Script.Reducer.toSeqReducer
val values : seq<'T>
Multiple items
val seq : sequence:seq<'T> -> seq<'T>
Full name: Microsoft.FSharp.Core.Operators.seq
--------------------
type seq<'T> = IEnumerable<'T>
Full name: Microsoft.FSharp.Collections.seq<_>
val self : Reducer<'T>
abstract member Reducer.Apply : ReduceFunc<'T,'R> * CombineFunc<'R> * (unit -> 'R) -> 'R
val rf : ReduceFunc<'T,'R>
val init : (unit -> 'R)
val mutable r : 'R
val toParallelReducer : seqReduceCount:int -> values:'T [] -> Reducer<'T>
Full name: Script.Reducer.toParallelReducer
val seqReduceCount : int
Multiple items
val int : value:'T -> int (requires member op_Explicit)
Full name: Microsoft.FSharp.Core.Operators.int
--------------------
type int = int32
Full name: Microsoft.FSharp.Core.int
--------------------
type int<'Measure> = int
Full name: Microsoft.FSharp.Core.int<_>
val values : 'T []
val cf : CombineFunc<'R>
val reduceCombine : (int -> int -> Async<'R>)
val s : int
val e : int
val async : AsyncBuilder
Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.async
val s' : int
val result : 'R
module Seq
from Microsoft.FSharp.Collections
val fold : folder:('State -> 'T -> 'State) -> state:'State -> source:seq<'T> -> 'State
Full name: Microsoft.FSharp.Collections.Seq.fold
val r : 'R
val i : int
val m : int
val result : 'R []
Multiple items
type Async
static member AsBeginEnd : computation:('Arg -> Async<'T>) -> ('Arg * AsyncCallback * obj -> IAsyncResult) * (IAsyncResult -> 'T) * (IAsyncResult -> unit)
static member AwaitEvent : event:IEvent<'Del,'T> * ?cancelAction:(unit -> unit) -> Async<'T> (requires delegate and 'Del :> Delegate)
static member AwaitIAsyncResult : iar:IAsyncResult * ?millisecondsTimeout:int -> Async<bool>
static member AwaitTask : task:Task<'T> -> Async<'T>
static member AwaitWaitHandle : waitHandle:WaitHandle * ?millisecondsTimeout:int -> Async<bool>
static member CancelDefaultToken : unit -> unit
static member Catch : computation:Async<'T> -> Async<Choice<'T,exn>>
static member FromBeginEnd : beginAction:(AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg:'Arg1 * beginAction:('Arg1 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg1:'Arg1 * arg2:'Arg2 * beginAction:('Arg1 * 'Arg2 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg1:'Arg1 * arg2:'Arg2 * arg3:'Arg3 * beginAction:('Arg1 * 'Arg2 * 'Arg3 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromContinuations : callback:(('T -> unit) * (exn -> unit) * (OperationCanceledException -> unit) -> unit) -> Async<'T>
static member Ignore : computation:Async<'T> -> Async<unit>
static member OnCancel : interruption:(unit -> unit) -> Async<IDisposable>
static member Parallel : computations:seq<Async<'T>> -> Async<'T []>
static member RunSynchronously : computation:Async<'T> * ?timeout:int * ?cancellationToken:CancellationToken -> 'T
static member Sleep : millisecondsDueTime:int -> Async<unit>
static member Start : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
static member StartAsTask : computation:Async<'T> * ?taskCreationOptions:TaskCreationOptions * ?cancellationToken:CancellationToken -> Task<'T>
static member StartChild : computation:Async<'T> * ?millisecondsTimeout:int -> Async<Async<'T>>
static member StartChildAsTask : computation:Async<'T> * ?taskCreationOptions:TaskCreationOptions -> Async<Task<'T>>
static member StartImmediate : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
static member StartWithContinuations : computation:Async<'T> * continuation:('T -> unit) * exceptionContinuation:(exn -> unit) * cancellationContinuation:(OperationCanceledException -> unit) * ?cancellationToken:CancellationToken -> unit
static member SwitchToContext : syncContext:SynchronizationContext -> Async<unit>
static member SwitchToNewThread : unit -> Async<unit>
static member SwitchToThreadPool : unit -> Async<unit>
static member TryCancelled : computation:Async<'T> * compensation:(OperationCanceledException -> unit) -> Async<'T>
static member CancellationToken : Async<CancellationToken>
static member DefaultCancellationToken : CancellationToken
Full name: Microsoft.FSharp.Control.Async
--------------------
type Async<'T>
Full name: Microsoft.FSharp.Control.Async<_>
static member Async.Parallel : computations:seq<Async<'T>> -> Async<'T []>
property Array.Length: int
static member Async.RunSynchronously : computation:Async<'T> * ?timeout:int * ?cancellationToken:Threading.CancellationToken -> 'T
val collect : f:('A -> Reducer<'B>) -> input:Reducer<'A> -> Reducer<'B>
Full name: Script.Reducer.collect
val f : ('A -> Reducer<'B>)
val input : Reducer<'A>
val self : Reducer<'B>
val rf : ReduceFunc<'B,'R>
val a : 'A
val map : f:('A -> 'B) -> input:Reducer<'A> -> Reducer<'B>
Full name: Script.Reducer.map
val f : ('A -> 'B)
val filter : p:('A -> bool) -> input:Reducer<'A> -> Reducer<'A>
Full name: Script.Reducer.filter
val p : ('A -> bool)
type bool = Boolean
Full name: Microsoft.FSharp.Core.bool
val self : Reducer<'A>
val rf : ReduceFunc<'A,'R>
val reduce : reducef:('T -> 'R -> 'R) -> combineF:('R -> 'R -> 'R) -> init:(unit -> 'R) -> reducer:Reducer<'T> -> 'R
Full name: Script.Reducer.reduce
val reducef : ('T -> 'R -> 'R)
val combineF : ('R -> 'R -> 'R)
val reducer : Reducer<'T>
val sum : reducer:Reducer<int> -> int
Full name: Script.Reducer.sum
val reducer : Reducer<int>
val length : reducer:Reducer<'T> -> int
Full name: Script.Reducer.length
val r : int
val concat : reducer:Reducer<string> -> string
Full name: Script.Reducer.concat
val reducer : Reducer<string>
Multiple items
val string : value:'T -> string
Full name: Microsoft.FSharp.Core.Operators.string
--------------------
type string = String
Full name: Microsoft.FSharp.Core.string
val result : StringBuilder
val v : string
val builder : StringBuilder
Multiple items
type StringBuilder =
new : unit -> StringBuilder + 5 overloads
member Append : value:string -> StringBuilder + 18 overloads
member AppendFormat : format:string * arg0:obj -> StringBuilder + 4 overloads
member AppendLine : unit -> StringBuilder + 1 overload
member Capacity : int with get, set
member Chars : int -> char with get, set
member Clear : unit -> StringBuilder
member CopyTo : sourceIndex:int * destination:char[] * destinationIndex:int * count:int -> unit
member EnsureCapacity : capacity:int -> int
member Equals : sb:StringBuilder -> bool
...
Full name: System.Text.StringBuilder
--------------------
StringBuilder() : unit
StringBuilder(capacity: int) : unit
StringBuilder(value: string) : unit
StringBuilder(value: string, capacity: int) : unit
StringBuilder(capacity: int, maxCapacity: int) : unit
StringBuilder(value: string, startIndex: int, length: int, capacity: int) : unit
StringBuilder.Append(value: char []) : StringBuilder
(+0 other overloads)
StringBuilder.Append(value: obj) : StringBuilder
(+0 other overloads)
StringBuilder.Append(value: uint64) : StringBuilder
(+0 other overloads)
StringBuilder.Append(value: uint32) : StringBuilder
(+0 other overloads)
StringBuilder.Append(value: uint16) : StringBuilder
(+0 other overloads)
StringBuilder.Append(value: decimal) : StringBuilder
(+0 other overloads)
StringBuilder.Append(value: float) : StringBuilder
(+0 other overloads)
StringBuilder.Append(value: float32) : StringBuilder
(+0 other overloads)
StringBuilder.Append(value: int64) : StringBuilder
(+0 other overloads)
StringBuilder.Append(value: int) : StringBuilder
(+0 other overloads)
val left : StringBuilder
val right : StringBuilder
StringBuilder.ToString() : string
StringBuilder.ToString(startIndex: int, length: int) : string
val toArray : reducer:Reducer<'T> -> 'T []
Full name: Script.Reducer.toArray
val result : List<'T>
val v : 'T
Multiple items
val list : List<'T>
--------------------
type 'T list = List<'T>
Full name: Microsoft.FSharp.Collections.list<_>
Multiple items
type List<'T> =
new : unit -> List<'T> + 2 overloads
member Add : item:'T -> unit
member AddRange : collection:IEnumerable<'T> -> unit
member AsReadOnly : unit -> ReadOnlyCollection<'T>
member BinarySearch : item:'T -> int + 2 overloads
member Capacity : int with get, set
member Clear : unit -> unit
member Contains : item:'T -> bool
member ConvertAll<'TOutput> : converter:Converter<'T, 'TOutput> -> List<'TOutput>
member CopyTo : array:'T[] -> unit + 2 overloads
...
nested type Enumerator
Full name: System.Collections.Generic.List<_>
--------------------
List() : unit
List(capacity: int) : unit
List(collection: IEnumerable<'T>) : unit
List.Add(item: 'T) : unit
val left : List<'T>
val right : List<'T>
List.AddRange(collection: IEnumerable<'T>) : unit
List.ToArray() : 'T []
val groupBy : selectorF:('T -> 'Key) -> transformF:('T -> 'Elem) -> aggregateF:('Key * seq<'Elem> -> 'Elem) -> reducer:Reducer<'T> -> seq<'Key * 'Elem> (requires equality)
Full name: Script.Reducer.groupBy
val selectorF : ('T -> 'Key) (requires equality)
val transformF : ('T -> 'Elem)
val aggregateF : ('Key * seq<'Elem> -> 'Elem) (requires equality)
val reduceF : ('T -> Dictionary<'Key,List<'Elem>> -> Dictionary<'Key,List<'Elem>>) (requires equality)
val r : Dictionary<'Key,List<'Elem>> (requires equality)
Multiple items
type Dictionary<'TKey,'TValue> =
new : unit -> Dictionary<'TKey, 'TValue> + 5 overloads
member Add : key:'TKey * value:'TValue -> unit
member Clear : unit -> unit
member Comparer : IEqualityComparer<'TKey>
member ContainsKey : key:'TKey -> bool
member ContainsValue : value:'TValue -> bool
member Count : int
member GetEnumerator : unit -> Enumerator<'TKey, 'TValue>
member GetObjectData : info:SerializationInfo * context:StreamingContext -> unit
member Item : 'TKey -> 'TValue with get, set
...
nested type Enumerator
nested type KeyCollection
nested type ValueCollection
Full name: System.Collections.Generic.Dictionary<_,_>
--------------------
Dictionary() : unit
Dictionary(capacity: int) : unit
Dictionary(comparer: IEqualityComparer<'TKey>) : unit
Dictionary(dictionary: IDictionary<'TKey,'TValue>) : unit
Dictionary(capacity: int, comparer: IEqualityComparer<'TKey>) : unit
Dictionary(dictionary: IDictionary<'TKey,'TValue>, comparer: IEqualityComparer<'TKey>) : unit
val key : 'Key (requires equality)
val elem : 'Elem
Dictionary.ContainsKey(key: 'Key) : bool
Dictionary.Add(key: 'Key, value: List<'Elem>) : unit
val combineF : (Dictionary<'Key,List<'Elem>> -> Dictionary<'Key,List<'Elem>> -> Dictionary<'Key,List<'Elem>>) (requires equality)
val left : Dictionary<'Key,List<'Elem>> (requires equality)
val right : Dictionary<'Key,List<'Elem>> (requires equality)
val keyValue : KeyValuePair<'Key,List<'Elem>> (requires equality)
val toArray : source:seq<'T> -> 'T []
Full name: Microsoft.FSharp.Collections.Seq.toArray
property KeyValuePair.Key: 'Key
property KeyValuePair.Value: List<'Elem>
val result : Dictionary<'Key,List<'Elem>> (requires equality)
val map : mapping:('T -> 'U) -> source:seq<'T> -> seq<'U>
Full name: Microsoft.FSharp.Collections.Seq.map
val lines : string []
Full name: Script.lines
namespace System.IO
type File =
static member AppendAllLines : path:string * contents:IEnumerable<string> -> unit + 1 overload
static member AppendAllText : path:string * contents:string -> unit + 1 overload
static member AppendText : path:string -> StreamWriter
static member Copy : sourceFileName:string * destFileName:string -> unit + 1 overload
static member Create : path:string -> FileStream + 3 overloads
static member CreateText : path:string -> StreamWriter
static member Decrypt : path:string -> unit
static member Delete : path:string -> unit
static member Encrypt : path:string -> unit
static member Exists : path:string -> bool
...
Full name: System.IO.File
System.IO.File.ReadAllLines(path: string) : string []
System.IO.File.ReadAllLines(path: string, encoding: System.Text.Encoding) : string []
module Reducer
from Script
val toParallelReducer : seqReduceCount:int -> values:'T [] -> Reducer.Reducer<'T>
Full name: Script.Reducer.toParallelReducer
val collect : f:('A -> Reducer.Reducer<'B>) -> input:Reducer.Reducer<'A> -> Reducer.Reducer<'B>
Full name: Script.Reducer.collect
val line : string
val toSeqReducer : values:seq<'T> -> Reducer.Reducer<'T>
Full name: Script.Reducer.toSeqReducer
System.String.Split([<System.ParamArray>] separator: char []) : string []
System.String.Split(separator: string [], options: System.StringSplitOptions) : string []
System.String.Split(separator: char [], options: System.StringSplitOptions) : string []
System.String.Split(separator: char [], count: int) : string []
System.String.Split(separator: string [], count: int, options: System.StringSplitOptions) : string []
System.String.Split(separator: char [], count: int, options: System.StringSplitOptions) : string []
val groupBy : selectorF:('T -> 'Key) -> transformF:('T -> 'Elem) -> aggregateF:('Key * seq<'Elem> -> 'Elem) -> reducer:Reducer.Reducer<'T> -> seq<'Key * 'Elem> (requires equality)
Full name: Script.Reducer.groupBy
val id : x:'T -> 'T
Full name: Microsoft.FSharp.Core.Operators.id
val items : seq<int>
val sum : source:seq<'T> -> 'T (requires member ( + ) and member get_Zero)
Full name: Microsoft.FSharp.Collections.Seq.sum
More information