7 people like it.

Circular Buffer

A Circular, or Ring, Buffer that flattens incoming arrays and allows consumers to take arbitrary-sized chunks. Improvements and suggestions welcome. Fork my gist at https://gist.github.com/1648579.

Circular Buffer

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
14: 
15: 
16: 
17: 
18: 
19: 
20: 
21: 
22: 
23: 
24: 
25: 
26: 
27: 
28: 
29: 
30: 
31: 
32: 
33: 
34: 
35: 
36: 
37: 
38: 
39: 
40: 
41: 
42: 
43: 
44: 
45: 
46: 
47: 
48: 
49: 
50: 
51: 
52: 
53: 
54: 
55: 
56: 
57: 
58: 
type CircularBuffer<'a> (bufferSize: int) =
    do if bufferSize <= 0 then invalidArg "bufferSize" "The bufferSize must be greater than 0."

    let buffer = Array.zeroCreate<'a> bufferSize
    let mutable head = bufferSize - 1
    let mutable tail = 0
    let mutable length = 0

    let rec nextBuffer offset count =
        seq {
            let overflow = count + offset - bufferSize
            if overflow > 0 then
                yield (offset, bufferSize - offset)
                yield! nextBuffer 0 overflow
            else
                yield (offset, count)
        }

    member __.Dequeue(count) =
        if length = 0 then invalidOp "Queue exhausted."
        if count > length then invalidOp "Requested count is too large."

        let dequeued = Array.concat [| for o, c in nextBuffer tail count -> buffer.[o..o+c-1] |]

        tail <- (tail + count) % bufferSize
        length <- length - count
        dequeued

    member __.Enqueue(value: _[], offset, count) =
        let mutable offset = offset

        // NOTE: We could save a lot by just pulling the last
        // bufferSize elements, but we'll be converting to a
        // blocking agent eventually.
        head <- (head + 1) % bufferSize
        for x, y in nextBuffer head count do
            Array.blit value offset buffer x y
            offset <- offset + y

        if length = bufferSize then
            tail <- (tail + count) % bufferSize
        else
            let overflow = length + count - bufferSize
            if overflow > 0 then
                tail <- (tail + overflow) % bufferSize
            length <- min (length + count) bufferSize

    member __.Enqueue(value: _[]) =
        __.Enqueue(value, 0, value.Length)

    member __.Enqueue(value: _[], offset) =
        __.Enqueue(value, offset, value.Length - offset)

    member __.Enqueue(value: ArraySegment<_>) =
        __.Enqueue(value.Array, value.Offset, value.Count)

    member __.Enqueue(value) =
        __.Enqueue([|value|], 0, 1)

Usage

  1: 
  2: 
  3: 
  4: 
  5: 
  6: 
  7: 
  8: 
  9: 
 10: 
 11: 
 12: 
 13: 
 14: 
 15: 
 16: 
 17: 
 18: 
 19: 
 20: 
 21: 
 22: 
 23: 
 24: 
 25: 
 26: 
 27: 
 28: 
 29: 
 30: 
 31: 
 32: 
 33: 
 34: 
 35: 
 36: 
 37: 
 38: 
 39: 
 40: 
 41: 
 42: 
 43: 
 44: 
 45: 
 46: 
 47: 
 48: 
 49: 
 50: 
 51: 
 52: 
 53: 
 54: 
 55: 
 56: 
 57: 
 58: 
 59: 
 60: 
 61: 
 62: 
 63: 
 64: 
 65: 
 66: 
 67: 
 68: 
 69: 
 70: 
 71: 
 72: 
 73: 
 74: 
 75: 
 76: 
 77: 
 78: 
 79: 
 80: 
 81: 
 82: 
 83: 
 84: 
 85: 
 86: 
 87: 
 88: 
 89: 
 90: 
 91: 
 92: 
 93: 
 94: 
 95: 
 96: 
 97: 
 98: 
 99: 
100: 
101: 
102: 
103: 
104: 
105: 
106: 
107: 
108: 
109: 
110: 
111: 
112: 
113: 
114: 
115: 
116: 
117: 
118: 
119: 
120: 
121: 
122: 
123: 
124: 
125: 
126: 
127: 
128: 
129: 
130: 
131: 
132: 
133: 
134: 
135: 
136: 
137: 
138: 
139: 
140: 
141: 
142: 
143: 
144: 
145: 
146: 
147: 
148: 
149: 
150: 
151: 
152: 
let queue = CircularBuffer(5)

let stopwatch = Stopwatch.StartNew()

// Printing from a queue 1..5
queue.Enqueue(1)
queue.Enqueue(2)
queue.Enqueue(3)
queue.Enqueue(4)
queue.Enqueue(5)
Debug.Assert([|1;2;3;4;5|] = queue.Dequeue(5))

// Printing from a queue 1..8, twice
queue.Enqueue(1)
queue.Enqueue(2)
queue.Enqueue(3)
queue.Enqueue(4)
queue.Enqueue(5) // <---
queue.Enqueue(6)
queue.Enqueue(7)
queue.Enqueue(8)
queue.Enqueue(1)
queue.Enqueue(2) // <---
queue.Enqueue(3)
queue.Enqueue(4)
queue.Enqueue(5)
queue.Enqueue(6)
queue.Enqueue(7) // <---
queue.Enqueue(8)
Debug.Assert([|4;5;6;7;8|] = queue.Dequeue(5))

// Printing from a queue 1..5
queue.Enqueue(1)
queue.Enqueue(2)
queue.Enqueue(3)
queue.Enqueue(4)
queue.Enqueue(5)
Debug.Assert([|1;2;3|] = queue.Dequeue(3))

// Clear out the rest
queue.Dequeue(2)

// Printing from a queue 1..3
queue.Enqueue(1)
queue.Enqueue(2)
queue.Enqueue(3)
Debug.Assert([|1;2;3|] = queue.Dequeue(3))

// Printing from a queue 1..8 and dequeue 5, then enqueue 1..3 and dequeue 3
queue.Enqueue(1)
queue.Enqueue(2)
queue.Enqueue(3)
queue.Enqueue(4)
queue.Enqueue(5) // <---
queue.Enqueue(6)
queue.Enqueue(7)
queue.Enqueue(8)
Debug.Assert([|4;5;6;7;8|] = queue.Dequeue(5))
queue.Enqueue(1)
queue.Enqueue(2)
queue.Enqueue(3)
Debug.Assert([|1;2;3|] = queue.Dequeue(3))

printfn "Enqueue(value) tests passed in %d ms" stopwatch.ElapsedMilliseconds

stopwatch.Reset()
stopwatch.Start()

// Printing from a queue 1..5
queue.Enqueue([|1;2;3;4;5|])
Debug.Assert([|1;2;3;4;5|] = queue.Dequeue(5))

// Printing from a queue 1..8, twice
queue.Enqueue([|1;2;3;4;5;6;7;8;1;2;3;4;5;6;7;8|])
Debug.Assert([|4;5;6;7;8|] = queue.Dequeue(5))

// Printing from a queue 1..5
queue.Enqueue([|1;2;3;4;5|])
Debug.Assert([|1;2;3|] = queue.Dequeue(3))

// Clear out the rest
queue.Dequeue(2)

// Printing from a queue 1..3
queue.Enqueue([|1;2;3|])
Debug.Assert([|1;2;3|] = queue.Dequeue(3))

// Printing from a queue 1..8 and dequeue 5, then enqueue 1..3 and dequeue 3
queue.Enqueue([|1;2;3;4;5;6;7;8|])
Debug.Assert([|4;5;6;7;8|] = queue.Dequeue(5))
queue.Enqueue([|1;2;3|])
Debug.Assert([|1;2;3|] = queue.Dequeue(3))

printfn "Enqueue(array) tests passed in %d ms" stopwatch.ElapsedMilliseconds

stopwatch.Reset()
stopwatch.Start()

// Consider a large array with various, incoming array segments.
let source =
    [| 1;2;3;4;5
       1;2;3;4;5;6;7;8;1;2;3;4;5;6;7;8
       1;2;3;4;5
       1;2;3
       1;2;3;4;5;6;7;8
       1;2;3 |]

let incoming =
    let generator =
        seq { yield ArraySegment<_>(source,0,5)
//              Threading.Thread.Sleep(1)
              yield ArraySegment<_>(source,5,16)
//              Threading.Thread.Sleep(2)
              yield ArraySegment<_>(source,21,5)
//              Threading.Thread.Sleep(1)
              yield ArraySegment<_>(source,26,3)
//              Threading.Thread.Sleep(1)
              yield ArraySegment<_>(source,29,8)
//              Threading.Thread.Sleep(1)
              yield ArraySegment<_>(source,37,3) } 
    in generator.GetEnumerator()

let enqueueNext() =
    incoming.MoveNext() |> ignore
    queue.Enqueue(incoming.Current)

// Printing from a queue 1..5
enqueueNext()
Debug.Assert([|1;2;3;4;5|] = queue.Dequeue(5))

// Printing from a queue 1..8, twice
enqueueNext()
Debug.Assert([|4;5;6;7;8|] = queue.Dequeue(5))

// Printing from a queue 1..5
enqueueNext()
Debug.Assert([|1;2;3|] = queue.Dequeue(3))

// Clear out the rest
queue.Dequeue(2)

// Printing from a queue 1..3
enqueueNext()
Debug.Assert([|1;2;3|] = queue.Dequeue(3))

// Printing from a queue 1..8 and dequeue 5, then enqueue 1..3 and dequeue 3
enqueueNext()
Debug.Assert([|4;5;6;7;8|] = queue.Dequeue(5))
enqueueNext()
Debug.Assert([|1;2;3|] = queue.Dequeue(3))

printfn "Enqueue(array) tests passed in %d ms" stopwatch.ElapsedMilliseconds
Multiple items
type CircularBuffer<'a> =
  new : bufferSize:int -> CircularBuffer<'a>
  member Dequeue : count:int -> 'a []
  member Enqueue : value:'a [] -> unit
  member Enqueue : value:ArraySegment<'a> -> unit
  member Enqueue : value:'a -> unit
  member Enqueue : value:'a [] * offset:int -> unit
  member Enqueue : value:'a [] * offset:int * count:int -> unit

Full name: Script.CircularBuffer<_>

--------------------
new : bufferSize:int -> CircularBuffer<'a>
val bufferSize : int
Multiple items
val int : value:'T -> int (requires member op_Explicit)

Full name: Microsoft.FSharp.Core.Operators.int

--------------------
type int = int32

Full name: Microsoft.FSharp.Core.int

--------------------
type int<'Measure> = int

Full name: Microsoft.FSharp.Core.int<_>
val invalidArg : argumentName:string -> message:string -> 'T

Full name: Microsoft.FSharp.Core.Operators.invalidArg
val buffer : 'a []
type Array =
  member Clone : unit -> obj
  member CopyTo : array:Array * index:int -> unit + 1 overload
  member GetEnumerator : unit -> IEnumerator
  member GetLength : dimension:int -> int
  member GetLongLength : dimension:int -> int64
  member GetLowerBound : dimension:int -> int
  member GetUpperBound : dimension:int -> int
  member GetValue : [<ParamArray>] indices:int[] -> obj + 7 overloads
  member Initialize : unit -> unit
  member IsFixedSize : bool
  ...

Full name: System.Array
val zeroCreate : count:int -> 'T []

Full name: Microsoft.FSharp.Collections.Array.zeroCreate
val mutable head : int
val mutable tail : int
val mutable length : int
val nextBuffer : (int -> int -> seq<int * int>)
val offset : int
val count : int
Multiple items
val seq : sequence:seq<'T> -> seq<'T>

Full name: Microsoft.FSharp.Core.Operators.seq

--------------------
type seq<'T> = Collections.Generic.IEnumerable<'T>

Full name: Microsoft.FSharp.Collections.seq<_>
val overflow : int
member CircularBuffer.Dequeue : count:int -> 'a []

Full name: Script.CircularBuffer`1.Dequeue
val invalidOp : message:string -> 'T

Full name: Microsoft.FSharp.Core.Operators.invalidOp
val dequeued : 'a []
val concat : arrays:seq<'T []> -> 'T []

Full name: Microsoft.FSharp.Collections.Array.concat
val o : int
val c : int
val __ : CircularBuffer<'a>
member CircularBuffer.Enqueue : value:'a [] * offset:int * count:int -> unit

Full name: Script.CircularBuffer`1.Enqueue
val value : 'a []
val mutable offset : int
val x : int
val y : int
val blit : source:'T [] -> sourceIndex:int -> target:'T [] -> targetIndex:int -> count:int -> unit

Full name: Microsoft.FSharp.Collections.Array.blit
val min : e1:'T -> e2:'T -> 'T (requires comparison)

Full name: Microsoft.FSharp.Core.Operators.min
member CircularBuffer.Enqueue : value:'a [] -> unit

Full name: Script.CircularBuffer`1.Enqueue
member CircularBuffer.Enqueue : value:'a [] -> unit
member CircularBuffer.Enqueue : value:ArraySegment<'a> -> unit
member CircularBuffer.Enqueue : value:'a -> unit
member CircularBuffer.Enqueue : value:'a [] * offset:int -> unit
member CircularBuffer.Enqueue : value:'a [] * offset:int * count:int -> unit
property Array.Length: int
member CircularBuffer.Enqueue : value:'a [] * offset:int -> unit

Full name: Script.CircularBuffer`1.Enqueue
member CircularBuffer.Enqueue : value:ArraySegment<'a> -> unit

Full name: Script.CircularBuffer`1.Enqueue
val value : ArraySegment<'a>
Multiple items
type ArraySegment<'T> =
  struct
    new : array:'T[] -> ArraySegment<'T> + 1 overload
    member Array : 'T[]
    member Count : int
    member Equals : obj:obj -> bool + 1 overload
    member GetHashCode : unit -> int
    member Offset : int
  end

Full name: System.ArraySegment<_>

--------------------
ArraySegment()
ArraySegment(array: 'T []) : unit
ArraySegment(array: 'T [], offset: int, count: int) : unit
property ArraySegment.Array: 'a []
property ArraySegment.Offset: int
property ArraySegment.Count: int
member CircularBuffer.Enqueue : value:'a -> unit

Full name: Script.CircularBuffer`1.Enqueue
val value : 'a
val queue : CircularBuffer<int>

Full name: Script.queue
val stopwatch : Stopwatch

Full name: Script.stopwatch
Multiple items
type Stopwatch =
  new : unit -> Stopwatch
  member Elapsed : TimeSpan
  member ElapsedMilliseconds : int64
  member ElapsedTicks : int64
  member IsRunning : bool
  member Reset : unit -> unit
  member Restart : unit -> unit
  member Start : unit -> unit
  member Stop : unit -> unit
  static val Frequency : int64
  ...

Full name: System.Diagnostics.Stopwatch

--------------------
Stopwatch() : unit
Stopwatch.StartNew() : Stopwatch
type Debug =
  static member Assert : condition:bool -> unit + 3 overloads
  static member AutoFlush : bool with get, set
  static member Close : unit -> unit
  static member Fail : message:string -> unit + 1 overload
  static member Flush : unit -> unit
  static member Indent : unit -> unit
  static member IndentLevel : int with get, set
  static member IndentSize : int with get, set
  static member Listeners : TraceListenerCollection
  static member Print : message:string -> unit + 1 overload
  ...

Full name: System.Diagnostics.Debug
Debug.Assert(condition: bool) : unit
Debug.Assert(condition: bool, message: string) : unit
Debug.Assert(condition: bool, message: string, detailMessage: string) : unit
Debug.Assert(condition: bool, message: string, detailMessageFormat: string, [<ParamArray>] args: obj []) : unit
member CircularBuffer.Dequeue : count:int -> 'a []
val printfn : format:Printf.TextWriterFormat<'T> -> 'T

Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.printfn
property Stopwatch.ElapsedMilliseconds: int64
Stopwatch.Reset() : unit
Stopwatch.Start() : unit
val source : int []

Full name: Script.source
val incoming : Collections.Generic.IEnumerator<ArraySegment<int>>

Full name: Script.incoming
val generator : seq<ArraySegment<int>>
Collections.Generic.IEnumerable.GetEnumerator() : Collections.Generic.IEnumerator<ArraySegment<int>>
val enqueueNext : unit -> unit

Full name: Script.enqueueNext
Collections.IEnumerator.MoveNext() : bool
val ignore : value:'T -> unit

Full name: Microsoft.FSharp.Core.Operators.ignore
property Collections.Generic.IEnumerator.Current: ArraySegment<int>

More information

Link:http://fssnip.net/a8
Posted:12 years ago
Author:Ryan Riley
Tags: circular buffer , data structure , stream , pipeline