rolling

package module
v2.0.4+incompatible Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 10, 2019 License: Apache-2.0 Imports: 4 Imported by: 11

README

rolling

A rolling/sliding window implementation for Google-golang

Usage

Point Window
var p = rolling.NewPointPolicy(rolling.NewWindow(5))

for x := 0; x < 5; x = x + 1 {
  p.Append(x)
}
p.Reduce(func(w Window) float64 {
  fmt.Println(w) // [ [0] [1] [2] [3] [4] ]
  return 0
})
w.Append(5)
p.Reduce(func(w Window) float64 {
  fmt.Println(w) // [ [5] [1] [2] [3] [4] ]
  return 0
})
w.Append(6)
p.Reduce(func(w Window) float64 {
  fmt.Println(w) // [ [5] [6] [2] [3] [4] ]
  return 0
})

The above creates a window that always contains 5 data points and then fills it with the values 0 - 4. When the next value is appended it will overwrite the first value. The window continuously overwrites the oldest value with the latest to preserve the specified value count. This type of window is useful for collecting data that have a known interval on which they are capture or for tracking data where time is not a factor.

Time Window
var p = rolling.NewTimeWindow(rolling.NewWindow(3000), time.Millisecond)
var start = time.Now()
for range time.Tick(time.Millisecond) {
  if time.Since(start) > 3*time.Second {
    break
  }
  p.Append(1)
}

The above creates a time window that contains 3,000 buckets where each bucket contains, at most, 1ms of recorded data. The subsequent loop populates each bucket with exactly one measure (the value 1) and stops when the window is full. As time progresses, the oldest values will be removed such that if the above code performed a time.Sleep(3*time.Second) then the window would be empty again.

The choice of bucket size depends on the frequency with which data are expected to be recorded. On each increment of time equal to the given duration the window will expire one bucket and purge the collected values. The smaller the bucket duration then the less data are lost when a bucket expires.

This type of bucket is most useful for collecting real-time values such as request rates, error rates, and latencies of operations.

Aggregating Windows

Each window exposes a Reduce(func(w Window) float64) float64 method that can be used to aggregate the data stored within. The method takes in a function that can compute the contents of the Window into a single value. For convenience, this package provides some common reductions:

fmt.Println(p.Reduce(rolling.Count))
fmt.Println(p.Reduce(rolling.Avg))
fmt.Println(p.Reduce(rolling.Min))
fmt.Println(p.Reduce(rolling.Max))
fmt.Println(p.Reduce(rolling.Sum))
fmt.Println(p.Reduce(rolling.Percentile(99.9)))
fmt.Println(p.Reduce(rolling.FastPercentile(99.9)))

The Count, Avg, Min, Max, and Sum each perform their expected computation. The Percentile aggregator first takes the target percentile and returns an aggregating function that works identically to the Sum, et al.

For cases of very large datasets, the FastPercentile can be used as a replacement for the standard percentile calculation. This alternative version uses the p-squared algorithm for estimating the percentile by processing only one value at a time, in any order. The results are quite accurate but can vary from the actual percentile by a small amount. It's a tradeoff of accuracy for speed when calculating percentiles from large data sets. For more on the p-squared algorithm see: http://www.cs.wustl.edu/~jain/papers/ftp/psqr.pdf.

Custom Aggregations

Any function that matches the form of func(rolling.Window)float64 may be given to the Reduce method of any window policy. The Window type is a named version of [][]float64. Calling len(window) will return the number of buckets. Each bucket is, itself, a slice of floats where len(bucket) is the number of values measured within that bucket. Most aggregate will take the form of:

func MyAggregate(w rolling.Window) float64 {
  for _, bucket := range w {
    for _, value := range bucket {
      // aggregate something
    }
  }
}

Contributors

Pull requests, issues and comments welcome. For pull requests:

  • Add tests for new features and bug fixes
  • Follow the existing style
  • Separate unrelated changes into multiple pull requests

See the existing issues for things to start contributing.

For bigger changes, make sure you start a discussion first by creating an issue and explaining the intended change.

Atlassian requires contributors to sign a Contributor License Agreement, known as a CLA. This serves as a record stating that the contributor is entitled to contribute the code/documentation/translation to the project and is willing to have it used in distributions and derivative works (or is willing to transfer ownership).

Prior to accepting your contributions we ask that you please follow the appropriate link below to digitally sign the CLA. The Corporate CLA is for those who are contributing as a member of an organization and the individual CLA is for those contributing as an individual.

License

Copyright (c) 2017 Atlassian and others. Apache 2.0 licensed, see LICENSE.txt file.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Avg

func Avg(w Window) float64

Avg the values within the window.

func Count

func Count(w Window) float64

Count returns the number of elements in a window.

func FastPercentile

func FastPercentile(perc float64) func(w Window) float64

FastPercentile implements the pSquare percentile estimation algorithm for calculating percentiles from streams of data using fixed memory allocations.

func Max

func Max(w Window) float64

Max the values within the window.

func Min

func Min(w Window) float64

Min the values within the window.

func Percentile

func Percentile(perc float64) func(w Window) float64

Percentile returns an aggregating function that computes the given percentile calculation for a window.

func Sum

func Sum(w Window) float64

Sum the values within the window.

Types

type PointPolicy

type PointPolicy struct {
	// contains filtered or unexported fields
}

PointPolicy is a rolling window policy that tracks the last N values inserted regardless of insertion time.

func NewPointPolicy

func NewPointPolicy(window Window) *PointPolicy

NewPointPolicy generates a Policy that operates on a rolling set of input points. The number of points is determined by the size of the given window. Each bucket will contain, at most, one data point when the window is full.

func (*PointPolicy) Append

func (w *PointPolicy) Append(value float64)

Append a value to the window.

func (*PointPolicy) Reduce

func (w *PointPolicy) Reduce(f func(Window) float64) float64

Reduce the window to a single value using a reduction function.

type TimePolicy

type TimePolicy struct {
	// contains filtered or unexported fields
}

TimePolicy is a window Accumulator implementation that uses some duration of time to determine the content of the window.

func NewTimePolicy

func NewTimePolicy(window Window, bucketDuration time.Duration) *TimePolicy

NewTimePolicy manages a window with rolling time duratinos. The given duration will be used to bucket data within the window. If data points are received entire windows aparts then the window will only contain a single data point. If one or more durations of the window are missed then they are zeroed out to keep the window consistent.

func (*TimePolicy) Append

func (w *TimePolicy) Append(value float64)

Append a value to the window using a time bucketing strategy.

func (*TimePolicy) Reduce

func (w *TimePolicy) Reduce(f func(Window) float64) float64

Reduce the window to a single value using a reduction function.

type Window

type Window [][]float64

Window represents a bucketed set of data. It should be used in conjunction with a Policy to populate it with data using some windowing policy.

func NewPreallocatedWindow

func NewPreallocatedWindow(buckets int, bucketSize int) Window

NewPreallocatedWindow creates a Window both with the given number of buckets and with a preallocated bucket size. This constructor may be used when the number of data points per-bucket can be estimated and/or when the desire is to allocate a large slice so that allocations do not happen as the Window is populated by a Policy.

func NewWindow

func NewWindow(buckets int) Window

NewWindow creates a Window with the given number of buckets. The number of buckets is meaningful to each Policy. The Policy implementations will describe their use of buckets.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL