whisper

package module
v0.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 27, 2023 License: BSD-3-Clause Imports: 18 Imported by: 0

README

Go Whisper

Build Status

Go Whisper is a Go implementation of the Whisper database, which is part of the Graphite Project.

To create a new whisper database you must define it's retention levels (see: storage schemas), aggregation method and the xFilesFactor. The xFilesFactor specifies the fraction of data points in a propagation interval that must have known values for a propagation to occur.

Examples

Create a new whisper database in "/tmp/test.wsp" with two retention levels (1 second for 1 day and 1 hour for 5 weeks), it will sum values when propagating them to the next retention level, and it requires half the values of the first retention level to be set before they are propagated.

retentions, err := whisper.ParseRetentionDefs("1s:1d,1h:5w")
if err == nil {
  wsp, err := whisper.Create("/tmp/test.wsp", retentions, whisper.Sum, 0.5)
}

Alternatively you can open an existing whisper database.

wsp, err := whisper.Open("/tmp/test.wsp")

Once you have a whisper database you can set values at given time points. This sets the time point 1 hour ago to 12345.678.

wsp.Update(12345.678, time.Now().Add(time.ParseDuration("-1h")).Unix())

And you can retrieve time series from it. This example fetches a time series for the last 1 hour and then iterates through it's points.

series, err := wsp.Fetch(time.Now().Add(time.ParseDuration("-1h")).Unix(), time.Now().Unix())
if err != nil {
  // handle
}
for _, point := range series.Points() {
  fmt.Println(point.Time, point.Value)
}

Thread Safety

This implementation is not thread safe. Writing to a database concurrently will cause bad things to happen. It is up to the user to manage this in their application as they need to.

Compressed Format

go-whisper library supports a compressed format, which maintains the same functionality of standard whisper file, while keeping data in a much smaller size. This compressed format is called cwhisper.

Compression algorithm source: 4.1 Time series compression in Gorilla: A Fast, Scalable, In-Memory Time Series Database.

Data point in cwhisper ranges from 2 - 14 bytes (12 bytes for standard format). So in theory, cwhisper file size could be 16.67% - 116.67% of standard file size. So the theoretical compression ratio is 6 - 0.86.

In random data point testing, compressed/uncompressed ratio is between 18.88% and 113.25%.

In real production payload, we are seeing 50%+ less disk space usage.

Read/Write Performance between standard and compressed formats:

BenchmarkWriteStandard-8           	   50000	     33824 ns/op
BenchmarkWriteCompressed-8         	 1000000	      1630 ns/op

BenchmarkReadStandard-8            	     500	   2270392 ns/op
BenchmarkReadCompressed-8          	   10000	    260862 ns/op
Drawbacks
  • cwhisper is faster and smaller, but unlike standard format, you can't easily backfill/update/rewrite old data points because it's not data-point addressable.
  • file size could grow if data points are irregular.
Suitable Application
  • cwhisper is most suitable for metrics that are mostly regular and less likely needed to backfill/rewrite old data, like system metrics. cwhisper also works nicely for sparse metrics.
How does it work in a nutshell

An example format: https://github.com/go-graphite/go-whisper/blob/master/doc/compressed.md

In cwhisper, archives are broken down into multiple blocks (by default 7200 data points per block as recommended by the gorilla paper), and data points are compressed into blocks. cwhisper assumes 2 as the default data point size, but when it detects that the default size is too small, it would grow the file.

cwhisper still has one file per metric, it's doing round-robin update, instead of rotating data points, block is rotation unit for archives.

Licence

Go Whisper is licenced under a BSD Licence.

Documentation

Overview

Package whisper implements Graphite's Whisper database format

Index

Constants

View Source
const (
	// size constants
	ByteSize        = 1
	IntSize         = 4
	FloatSize       = 4
	Float64Size     = 8
	PointSize       = 12
	MetadataSize    = 16
	ArchiveInfoSize = 12
)
View Source
const (
	Seconds = 1
	Minutes = 60
	Hours   = 3600
	Days    = 86400
	Weeks   = 86400 * 7
	Years   = 86400 * 365
)
View Source
const MaxCompressedPointSize = PointSize + 2

In worst case scenario all data points would required 2 bytes more space after compression, this buffer size make sure that it's always big enough to contain the compressed result

Variables

View Source
var (
	CompressedMetadataSize     = 28 + FreeCompressedMetadataSize
	FreeCompressedMetadataSize = 16

	VersionSize = 1

	CompressedArchiveInfoSize     = 92 + FreeCompressedArchiveInfoSize
	FreeCompressedArchiveInfoSize = 36

	BlockRangeSize = 16

	// One can see that blocks that extend longer than two
	// hours provide diminishing returns for compressed size. A
	// two-hour block allows us to achieve a compression ratio of
	// 1.37 bytes per data point.
	//                                      4.1.2 Compressing values
	//     Gorilla: A Fast, Scalable, In-Memory Time Series Database
	DefaultPointsPerBlock = 7200 // recommended by the gorilla paper algorithm

)
View Source
var Now = time.Now

Functions

func Compare

func Compare(
	file1 string,
	file2 string,
	now int,
	ignoreBuffer bool,
	quarantinesRaw string,
	verbose bool,
	strict bool,
	muteThreshold int,
) (msg string, err error)

skipcq: RVV-A0005

func Debug

func Debug(compress, bitsWrite bool)

func GenDataPointSlice

func GenDataPointSlice() []dataPoint

func GenTestArchive

func GenTestArchive(buf []byte, ret Retention) *archiveInfo

Types

type AggregationMethod

type AggregationMethod int

Note: 4 bytes long in Whisper Header, 1 byte long in Archive Header

const (
	Average AggregationMethod = iota + 1
	Sum
	Last
	Max
	Min
	First

	Mix        // only used in whisper header
	Percentile // only used in archive header
)
const Unknown AggregationMethod = -1

func ParseAggregationMethod

func ParseAggregationMethod(am string) AggregationMethod

func (AggregationMethod) String

func (am AggregationMethod) String() string

type MixAggregationSpec

type MixAggregationSpec struct {
	Method     AggregationMethod
	Percentile float32
}

func (*MixAggregationSpec) String

func (mas *MixAggregationSpec) String() string

type Options

type Options struct {
	Sparse     bool
	FLock      bool
	FlockType  int
	Compressed bool
	// It's a hint, used if the retention is big enough, more in
	// Retention.calculateSuitablePointsPerBlock
	PointsPerBlock  int
	PointSize       float32
	InMemory        bool
	InMemoryContent []byte
	OpenFileFlag    *int

	MixAggregationSpecs        []MixAggregationSpec
	MixAvgCompressedPointSizes map[int][]float32

	SIMV bool // single interval multiple values

	IgnoreNowOnWrite bool
}

type Retention

type Retention struct {
	// contains filtered or unexported fields
}

A retention level.

Retention levels describe a given archive in the database. How detailed it is and how far back it records.

func NewRetention

func NewRetention(secondsPerPoint, numberOfPoints int) Retention

func ParseRetentionDef

func ParseRetentionDef(retentionDef string) (*Retention, error)

Parse a retention definition as you would find in the storage-schemas.conf of a Carbon install. Note that this only parses a single retention definition, if you have multiple definitions (separated by a comma) you will have to split them yourself.

ParseRetentionDef("10s:14d") Retention{10, 120960}

See: http://graphite.readthedocs.org/en/1.0/config-carbon.html#storage-schemas-conf

func (*Retention) MaxRetention

func (r *Retention) MaxRetention() int

func (*Retention) NumberOfPoints

func (r *Retention) NumberOfPoints() int

func (*Retention) SecondsPerPoint

func (r *Retention) SecondsPerPoint() int

func (*Retention) SetAvgCompressedPointSize

func (r *Retention) SetAvgCompressedPointSize(size float32)

func (*Retention) Size

func (r *Retention) Size() int

func (Retention) String

func (r Retention) String() string

type Retentions

type Retentions []*Retention

TODO: maybe we should make it array of structs, rather than an array of struct pointers.

func MustParseRetentionDefs

func MustParseRetentionDefs(retentionDefs string) Retentions

func NewRetentionsNoPointer

func NewRetentionsNoPointer(r2 []Retention) Retentions

func ParseRetentionDefs

func ParseRetentionDefs(retentionDefs string) (Retentions, error)

func (Retentions) Equal

func (r1 Retentions) Equal(r2 Retentions) bool

func (Retentions) Len

func (r Retentions) Len() int

func (Retentions) Swap

func (r Retentions) Swap(i, j int)

type TimeSeries

type TimeSeries struct {
	// contains filtered or unexported fields
}

func (*TimeSeries) FromTime

func (ts *TimeSeries) FromTime() int

func (*TimeSeries) PointPointers

func (ts *TimeSeries) PointPointers() []*TimeSeriesPoint

func (*TimeSeries) Points

func (ts *TimeSeries) Points() []TimeSeriesPoint

func (*TimeSeries) Step

func (ts *TimeSeries) Step() int

func (*TimeSeries) String

func (ts *TimeSeries) String() string

func (*TimeSeries) UntilTime

func (ts *TimeSeries) UntilTime() int

func (*TimeSeries) Values

func (ts *TimeSeries) Values() []float64

type TimeSeriesPoint

type TimeSeriesPoint struct {
	Time  int
	Value float64
}

type Whisper

type Whisper struct {
	Extended bool

	// TODO: improve
	NonFatalErrors []error
	// contains filtered or unexported fields
}

Represents a Whisper database file.

func Create

func Create(path string, retentions Retentions, aggregationMethod AggregationMethod, xFilesFactor float32) (whisper *Whisper, err error)

Create a new Whisper database file and write it's header.

func CreateWithOptions

func CreateWithOptions(path string, retentions Retentions, aggregationMethod AggregationMethod, xFilesFactor float32, options *Options) (whisper *Whisper, err error)

CreateWithOptions is more customizable create function

avgCompressedPointSize specification order:

Options.PointSize < Retention.avgCompressedPointSize < Options.MixAggregationSpecs.AvgCompressedPointSize

func Open

func Open(path string) (whisper *Whisper, err error)

Open an existing Whisper database and read it's header

func OpenWithOptions

func OpenWithOptions(path string, options *Options) (whisper *Whisper, err error)

func (*Whisper) AggregationMethod

func (whisper *Whisper) AggregationMethod() AggregationMethod

Return raw aggregation method

func (*Whisper) CheckEmpty

func (whisper *Whisper) CheckEmpty(fromTime, untilTime int) (exist bool, err error)

Check a TimeSeries has a points for a given time span from the file.

func (*Whisper) CheckIntegrity

func (whisper *Whisper) CheckIntegrity() error

func (*Whisper) Close

func (whisper *Whisper) Close() error

Close the whisper file

func (*Whisper) CompressTo

func (whisper *Whisper) CompressTo(dstPath string) error

For archive.Buffer handling, CompressTo assumes a simple archive layout that higher archive will propagate to lower archive. [wrong]

CompressTo should stop compression/return errors when runs into any issues (if feasible).

func (*Whisper) Dump

func (whisper *Whisper) Dump(all, showDecompressionInfo bool)

skipcq: RVV-A0005

func (*Whisper) Fetch

func (whisper *Whisper) Fetch(fromTime, untilTime int) (timeSeries *TimeSeries, err error)

Fetch a TimeSeries for a given time span from the file.

func (*Whisper) FetchByAggregation

func (whisper *Whisper) FetchByAggregation(fromTime, untilTime int, spec *MixAggregationSpec) (timeSeries *TimeSeries, err error)

func (*Whisper) File

func (whisper *Whisper) File() *os.File

func (*Whisper) FillClassic

func (dstw *Whisper) FillClassic(srcw *Whisper) error

func (*Whisper) FillCompressed

func (dstw *Whisper) FillCompressed(srcw *Whisper) error

FillCompressed backfill cwhisper files from srcw. The old and new whisper should have the same retention policies.

func (*Whisper) GetDiscardedPointsSinceOpen

func (whisper *Whisper) GetDiscardedPointsSinceOpen() uint32

Returns updated amount of out-of-order discarded points since opening whisper file

func (*Whisper) HasMatchingConfigs

func (whisper *Whisper) HasMatchingConfigs(rets Retentions, aggr AggregationMethod, xff float32, options *Options) bool

func (*Whisper) IsCompressed

func (whisper *Whisper) IsCompressed() bool

func (*Whisper) MaxRetention

func (whisper *Whisper) MaxRetention() int

Return max retention in seconds

func (*Whisper) MetadataSize

func (whisper *Whisper) MetadataSize() int

Calculate the number of bytes the metadata section will be.

func (*Whisper) Retentions

func (whisper *Whisper) Retentions() []Retention

Return retentions

func (*Whisper) Size

func (whisper *Whisper) Size() int

Calculate the total number of bytes the Whisper file should be according to the metadata.

func (*Whisper) StartTime

func (whisper *Whisper) StartTime() int

Calculate the starting time for a whisper db.

func (*Whisper) Update

func (whisper *Whisper) Update(value float64, timestamp int) (err error)

Update a value in the database.

If the timestamp is in the future or outside of the maximum retention it will fail immediately.

func (*Whisper) UpdateConfig

func (whisper *Whisper) UpdateConfig(rets Retentions, aggr AggregationMethod, xff float32, options *Options) (err error)

func (*Whisper) UpdateMany

func (whisper *Whisper) UpdateMany(points []*TimeSeriesPoint) (err error)

func (*Whisper) UpdateManyForArchive

func (whisper *Whisper) UpdateManyForArchive(points []*TimeSeriesPoint, targetRetention int) (err error)

Note: for compressed format, extensions is triggered after update is done, so updates of the same data set being done in one UpdateManyForArchive call would have different result in file than in many UpdateManyForArchive calls.

func (*Whisper) WriteHeaderCompressed

func (whisper *Whisper) WriteHeaderCompressed() (err error)

func (*Whisper) XFilesFactor

func (whisper *Whisper) XFilesFactor() float32

Return xFilesFactor

Directories

Path Synopsis
internal

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL