6 people like it.
Like the snippet!
Circular Buffer
A Circular, or Ring, Buffer that flattens incoming arrays and allows consumers to take arbitrary-sized chunks. Improvements and suggestions welcome. Fork my gist at https://gist.github.com/1648579.
1:
2:
3:
4:
5:
6:
7:
8:
9:
10:
11:
12:
13:
14:
15:
16:
17:
18:
19:
20:
21:
22:
23:
24:
25:
26:
27:
28:
29:
30:
31:
32:
33:
34:
35:
36:
37:
38:
39:
40:
41:
42:
43:
44:
45:
46:
47:
48:
49:
50:
51:
52:
53:
54:
55:
56:
57:
58:
|
type CircularBuffer<'a> (bufferSize: int) =
do if bufferSize <= 0 then invalidArg "bufferSize" "The bufferSize must be greater than 0."
let buffer = Array.zeroCreate<'a> bufferSize
let mutable head = bufferSize - 1
let mutable tail = 0
let mutable length = 0
let rec nextBuffer offset count =
seq {
let overflow = count + offset - bufferSize
if overflow > 0 then
yield (offset, bufferSize - offset)
yield! nextBuffer 0 overflow
else
yield (offset, count)
}
member __.Dequeue(count) =
if length = 0 then invalidOp "Queue exhausted."
if count > length then invalidOp "Requested count is too large."
let dequeued = Array.concat [| for o, c in nextBuffer tail count -> buffer.[o..o+c-1] |]
tail <- (tail + count) % bufferSize
length <- length - count
dequeued
member __.Enqueue(value: _[], offset, count) =
let mutable offset = offset
// NOTE: We could save a lot by just pulling the last
// bufferSize elements, but we'll be converting to a
// blocking agent eventually.
head <- (head + 1) % bufferSize
for x, y in nextBuffer head count do
Array.blit value offset buffer x y
offset <- offset + y
if length = bufferSize then
tail <- (tail + count) % bufferSize
else
let overflow = length + count - bufferSize
if overflow > 0 then
tail <- (tail + overflow) % bufferSize
length <- min (length + count) bufferSize
member __.Enqueue(value: _[]) =
__.Enqueue(value, 0, value.Length)
member __.Enqueue(value: _[], offset) =
__.Enqueue(value, offset, value.Length - offset)
member __.Enqueue(value: ArraySegment<_>) =
__.Enqueue(value.Array, value.Offset, value.Count)
member __.Enqueue(value) =
__.Enqueue([|value|], 0, 1)
|
1:
2:
3:
4:
5:
6:
7:
8:
9:
10:
11:
12:
13:
14:
15:
16:
17:
18:
19:
20:
21:
22:
23:
24:
25:
26:
27:
28:
29:
30:
31:
32:
33:
34:
35:
36:
37:
38:
39:
40:
41:
42:
43:
44:
45:
46:
47:
48:
49:
50:
51:
52:
53:
54:
55:
56:
57:
58:
59:
60:
61:
62:
63:
64:
65:
66:
67:
68:
69:
70:
71:
72:
73:
74:
75:
76:
77:
78:
79:
80:
81:
82:
83:
84:
85:
86:
87:
88:
89:
90:
91:
92:
93:
94:
95:
96:
97:
98:
99:
100:
101:
102:
103:
104:
105:
106:
107:
108:
109:
110:
111:
112:
113:
114:
115:
116:
117:
118:
119:
120:
121:
122:
123:
124:
125:
126:
127:
128:
129:
130:
131:
132:
133:
134:
135:
136:
137:
138:
139:
140:
141:
142:
143:
144:
145:
146:
147:
148:
149:
150:
151:
152:
|
let queue = CircularBuffer(5)
let stopwatch = Stopwatch.StartNew()
// Printing from a queue 1..5
queue.Enqueue(1)
queue.Enqueue(2)
queue.Enqueue(3)
queue.Enqueue(4)
queue.Enqueue(5)
Debug.Assert([|1;2;3;4;5|] = queue.Dequeue(5))
// Printing from a queue 1..8, twice
queue.Enqueue(1)
queue.Enqueue(2)
queue.Enqueue(3)
queue.Enqueue(4)
queue.Enqueue(5) // <---
queue.Enqueue(6)
queue.Enqueue(7)
queue.Enqueue(8)
queue.Enqueue(1)
queue.Enqueue(2) // <---
queue.Enqueue(3)
queue.Enqueue(4)
queue.Enqueue(5)
queue.Enqueue(6)
queue.Enqueue(7) // <---
queue.Enqueue(8)
Debug.Assert([|4;5;6;7;8|] = queue.Dequeue(5))
// Printing from a queue 1..5
queue.Enqueue(1)
queue.Enqueue(2)
queue.Enqueue(3)
queue.Enqueue(4)
queue.Enqueue(5)
Debug.Assert([|1;2;3|] = queue.Dequeue(3))
// Clear out the rest
queue.Dequeue(2)
// Printing from a queue 1..3
queue.Enqueue(1)
queue.Enqueue(2)
queue.Enqueue(3)
Debug.Assert([|1;2;3|] = queue.Dequeue(3))
// Printing from a queue 1..8 and dequeue 5, then enqueue 1..3 and dequeue 3
queue.Enqueue(1)
queue.Enqueue(2)
queue.Enqueue(3)
queue.Enqueue(4)
queue.Enqueue(5) // <---
queue.Enqueue(6)
queue.Enqueue(7)
queue.Enqueue(8)
Debug.Assert([|4;5;6;7;8|] = queue.Dequeue(5))
queue.Enqueue(1)
queue.Enqueue(2)
queue.Enqueue(3)
Debug.Assert([|1;2;3|] = queue.Dequeue(3))
printfn "Enqueue(value) tests passed in %d ms" stopwatch.ElapsedMilliseconds
stopwatch.Reset()
stopwatch.Start()
// Printing from a queue 1..5
queue.Enqueue([|1;2;3;4;5|])
Debug.Assert([|1;2;3;4;5|] = queue.Dequeue(5))
// Printing from a queue 1..8, twice
queue.Enqueue([|1;2;3;4;5;6;7;8;1;2;3;4;5;6;7;8|])
Debug.Assert([|4;5;6;7;8|] = queue.Dequeue(5))
// Printing from a queue 1..5
queue.Enqueue([|1;2;3;4;5|])
Debug.Assert([|1;2;3|] = queue.Dequeue(3))
// Clear out the rest
queue.Dequeue(2)
// Printing from a queue 1..3
queue.Enqueue([|1;2;3|])
Debug.Assert([|1;2;3|] = queue.Dequeue(3))
// Printing from a queue 1..8 and dequeue 5, then enqueue 1..3 and dequeue 3
queue.Enqueue([|1;2;3;4;5;6;7;8|])
Debug.Assert([|4;5;6;7;8|] = queue.Dequeue(5))
queue.Enqueue([|1;2;3|])
Debug.Assert([|1;2;3|] = queue.Dequeue(3))
printfn "Enqueue(array) tests passed in %d ms" stopwatch.ElapsedMilliseconds
stopwatch.Reset()
stopwatch.Start()
// Consider a large array with various, incoming array segments.
let source =
[| 1;2;3;4;5
1;2;3;4;5;6;7;8;1;2;3;4;5;6;7;8
1;2;3;4;5
1;2;3
1;2;3;4;5;6;7;8
1;2;3 |]
let incoming =
let generator =
seq { yield ArraySegment<_>(source,0,5)
// Threading.Thread.Sleep(1)
yield ArraySegment<_>(source,5,16)
// Threading.Thread.Sleep(2)
yield ArraySegment<_>(source,21,5)
// Threading.Thread.Sleep(1)
yield ArraySegment<_>(source,26,3)
// Threading.Thread.Sleep(1)
yield ArraySegment<_>(source,29,8)
// Threading.Thread.Sleep(1)
yield ArraySegment<_>(source,37,3) }
in generator.GetEnumerator()
let enqueueNext() =
incoming.MoveNext() |> ignore
queue.Enqueue(incoming.Current)
// Printing from a queue 1..5
enqueueNext()
Debug.Assert([|1;2;3;4;5|] = queue.Dequeue(5))
// Printing from a queue 1..8, twice
enqueueNext()
Debug.Assert([|4;5;6;7;8|] = queue.Dequeue(5))
// Printing from a queue 1..5
enqueueNext()
Debug.Assert([|1;2;3|] = queue.Dequeue(3))
// Clear out the rest
queue.Dequeue(2)
// Printing from a queue 1..3
enqueueNext()
Debug.Assert([|1;2;3|] = queue.Dequeue(3))
// Printing from a queue 1..8 and dequeue 5, then enqueue 1..3 and dequeue 3
enqueueNext()
Debug.Assert([|4;5;6;7;8|] = queue.Dequeue(5))
enqueueNext()
Debug.Assert([|1;2;3|] = queue.Dequeue(3))
printfn "Enqueue(array) tests passed in %d ms" stopwatch.ElapsedMilliseconds
|
Multiple items
type CircularBuffer<'a> =
new : bufferSize:int -> CircularBuffer<'a>
member Dequeue : count:int -> 'a []
member Enqueue : value:'a [] -> unit
member Enqueue : value:ArraySegment<'a> -> unit
member Enqueue : value:'a -> unit
member Enqueue : value:'a [] * offset:int -> unit
member Enqueue : value:'a [] * offset:int * count:int -> unit
Full name: Script.CircularBuffer<_>
--------------------
new : bufferSize:int -> CircularBuffer<'a>
val bufferSize : int
Multiple items
val int : value:'T -> int (requires member op_Explicit)
Full name: Microsoft.FSharp.Core.Operators.int
--------------------
type int = int32
Full name: Microsoft.FSharp.Core.int
--------------------
type int<'Measure> = int
Full name: Microsoft.FSharp.Core.int<_>
val invalidArg : argumentName:string -> message:string -> 'T
Full name: Microsoft.FSharp.Core.Operators.invalidArg
val buffer : 'a []
type Array =
member Clone : unit -> obj
member CopyTo : array:Array * index:int -> unit + 1 overload
member GetEnumerator : unit -> IEnumerator
member GetLength : dimension:int -> int
member GetLongLength : dimension:int -> int64
member GetLowerBound : dimension:int -> int
member GetUpperBound : dimension:int -> int
member GetValue : [<ParamArray>] indices:int[] -> obj + 7 overloads
member Initialize : unit -> unit
member IsFixedSize : bool
...
Full name: System.Array
val zeroCreate : count:int -> 'T []
Full name: Microsoft.FSharp.Collections.Array.zeroCreate
val mutable head : int
val mutable tail : int
val mutable length : int
val nextBuffer : (int -> int -> seq<int * int>)
val offset : int
val count : int
Multiple items
val seq : sequence:seq<'T> -> seq<'T>
Full name: Microsoft.FSharp.Core.Operators.seq
--------------------
type seq<'T> = Collections.Generic.IEnumerable<'T>
Full name: Microsoft.FSharp.Collections.seq<_>
val overflow : int
member CircularBuffer.Dequeue : count:int -> 'a []
Full name: Script.CircularBuffer`1.Dequeue
val invalidOp : message:string -> 'T
Full name: Microsoft.FSharp.Core.Operators.invalidOp
val dequeued : 'a []
val concat : arrays:seq<'T []> -> 'T []
Full name: Microsoft.FSharp.Collections.Array.concat
val o : int
val c : int
val __ : CircularBuffer<'a>
member CircularBuffer.Enqueue : value:'a [] * offset:int * count:int -> unit
Full name: Script.CircularBuffer`1.Enqueue
val value : 'a []
val mutable offset : int
val x : int
val y : int
val blit : source:'T [] -> sourceIndex:int -> target:'T [] -> targetIndex:int -> count:int -> unit
Full name: Microsoft.FSharp.Collections.Array.blit
val min : e1:'T -> e2:'T -> 'T (requires comparison)
Full name: Microsoft.FSharp.Core.Operators.min
member CircularBuffer.Enqueue : value:'a [] -> unit
Full name: Script.CircularBuffer`1.Enqueue
member CircularBuffer.Enqueue : value:'a [] -> unit
member CircularBuffer.Enqueue : value:ArraySegment<'a> -> unit
member CircularBuffer.Enqueue : value:'a -> unit
member CircularBuffer.Enqueue : value:'a [] * offset:int -> unit
member CircularBuffer.Enqueue : value:'a [] * offset:int * count:int -> unit
property Array.Length: int
member CircularBuffer.Enqueue : value:'a [] * offset:int -> unit
Full name: Script.CircularBuffer`1.Enqueue
member CircularBuffer.Enqueue : value:ArraySegment<'a> -> unit
Full name: Script.CircularBuffer`1.Enqueue
val value : ArraySegment<'a>
Multiple items
type ArraySegment<'T> =
struct
new : array:'T[] -> ArraySegment<'T> + 1 overload
member Array : 'T[]
member Count : int
member Equals : obj:obj -> bool + 1 overload
member GetHashCode : unit -> int
member Offset : int
end
Full name: System.ArraySegment<_>
--------------------
ArraySegment()
ArraySegment(array: 'T []) : unit
ArraySegment(array: 'T [], offset: int, count: int) : unit
property ArraySegment.Array: 'a []
property ArraySegment.Offset: int
property ArraySegment.Count: int
member CircularBuffer.Enqueue : value:'a -> unit
Full name: Script.CircularBuffer`1.Enqueue
val value : 'a
val queue : CircularBuffer<int>
Full name: Script.queue
val stopwatch : Stopwatch
Full name: Script.stopwatch
Multiple items
type Stopwatch =
new : unit -> Stopwatch
member Elapsed : TimeSpan
member ElapsedMilliseconds : int64
member ElapsedTicks : int64
member IsRunning : bool
member Reset : unit -> unit
member Restart : unit -> unit
member Start : unit -> unit
member Stop : unit -> unit
static val Frequency : int64
...
Full name: System.Diagnostics.Stopwatch
--------------------
Stopwatch() : unit
Stopwatch.StartNew() : Stopwatch
type Debug =
static member Assert : condition:bool -> unit + 3 overloads
static member AutoFlush : bool with get, set
static member Close : unit -> unit
static member Fail : message:string -> unit + 1 overload
static member Flush : unit -> unit
static member Indent : unit -> unit
static member IndentLevel : int with get, set
static member IndentSize : int with get, set
static member Listeners : TraceListenerCollection
static member Print : message:string -> unit + 1 overload
...
Full name: System.Diagnostics.Debug
Debug.Assert(condition: bool) : unit
Debug.Assert(condition: bool, message: string) : unit
Debug.Assert(condition: bool, message: string, detailMessage: string) : unit
Debug.Assert(condition: bool, message: string, detailMessageFormat: string, [<ParamArray>] args: obj []) : unit
member CircularBuffer.Dequeue : count:int -> 'a []
val printfn : format:Printf.TextWriterFormat<'T> -> 'T
Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.printfn
property Stopwatch.ElapsedMilliseconds: int64
Stopwatch.Reset() : unit
Stopwatch.Start() : unit
val source : int []
Full name: Script.source
val incoming : Collections.Generic.IEnumerator<ArraySegment<int>>
Full name: Script.incoming
val generator : seq<ArraySegment<int>>
Collections.Generic.IEnumerable.GetEnumerator() : Collections.Generic.IEnumerator<ArraySegment<int>>
val enqueueNext : unit -> unit
Full name: Script.enqueueNext
Collections.IEnumerator.MoveNext() : bool
val ignore : value:'T -> unit
Full name: Microsoft.FSharp.Core.Operators.ignore
property Collections.Generic.IEnumerator.Current: ArraySegment<int>
More information