dagr

package module
v1.1.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 1, 2018 License: BSD-2-Clause Imports: 12 Imported by: 0

README

Dagr

Build Status

$ go get go.spiff.io/dagr

Dagr is a simple library for tracking measurements and writing them out in an InfluxDB line-protocol-friendly format.

Note: the Dagr and outflux APIs are currently unstable and may change during v0 development. I'll try to make these breaks infrequent, but keep in mind that these are being developed while being used, so I'll inevitably discover quirks in the API that I just have to fix and break something in the process. When this happens, the commit should make a note of the break.

Usage

As I see it, there are roughly two categories of measurements when dealing with a program: long-term stats and events. The next two sections attempt to elaborate on this, but they're roughly the same and just use different types and send models.

Stats

A long-term stat is something like the number of requests sent to an endpoint over its lifetime. Typically, you don't need to record each individual request, you just need to know how many requests have come in since the last time a measurement was sent. So, you keep a counter:

// Setup
counter := new(dagr.Int)
measurement := dagr.NewPoint("http_request",
	dagr.Tags{"host": "example.local", "method": "GET", "path": "/v1/parrots"},
	dagr.Fields{"count": counter},
)

// Handler of some kind
func (h Handler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
	// Atomically increment the counter
	counter.Add(1)
}

// Every now and then, write it to a stream that somehow gets over to InfluxDB:
go func() {
	for range time.Tick(time.Second) {
		// Ignoring any error that could be returned by WriteMeasurement
		WriteMeasurement(os.Stdout, measurement) // Replace os.Stdout with any io.Writer
		// Output:
		// http_event,host=example.local,method=GET,path=/v1/parrots count=123i 1136214245000000000
	}
}()

The above will keep a count for the number of times /v1/parrots is accessed, assuming the handler only handles that particular path. As a long-term thing, this is fairly useful, because we'd like to know how many people want a list of parrots. The important thing is that updates to the regular Dagr types, Int, Float, String, and Bool, are atomic: you can increment from multiple goroutines and the increment will only block for a minimal amount of time. This also means that you can update these fields mid-write without interrupting the write or causing a data race (but you may occasionally end up with slightly out of sync fields between two writes).

While the Int and Float types are useful as accumulators, you can also use Bool and String to keep global process state up to date in a Point, allowing you to periodically send whether the process is sleeping or what its current stage is (e.g., started vs. listener started vs. stopped).

Of course, you could also have short-time stats and allocate and expire them as needed. Dagr doesn't really enforce this model, it's just one way I think of things.

Events

An event, on the other hand, is something short-lived but worth recording, like a really important error. For example:

// Setup
event := dagr.RawPoint{
	Key:    "recv_error",
	Tag:    dagr.Tags{"host": "example.local"},
	Fields: dagr.Fields{
		"fatal":   dagr.RawBool(true),
		"message": dagr.RawString("Parrot has been scritched"),
	},
	Time:   time.Now(), // if omitted, WriteMeasurement will use time.Now() anyway
}

// And just send it once (again, ignoring errors)
WriteMeasurement(os.Stdout, event)
// Output:
// http_event,host=example.local fatal=T,message="Parrot has been scritched" 1136214245000000000
//
// Note that RawEvent does not guarantee tag or field order, unlike Point, so the above may not be
// exactly the same every time.

A RawPoint is just a simple Measurement implementation that satisfies the bare minimum needed to write a point in line protocol format. It also serves as a decent example for how to begin writing your own points, if necessary. More advanced usage can be seen in both Point and PointSet.

Naturally, you may want to alert off of such errors, because a hole in the space time continuum is known to create bunny people and generally leads to all sorts of chaos. Point is, these are two different use-cases Dagr was built to accommodate, and there are likely more it could handle.

Contributing

If you want to contribute changes, create an issue to discuss what you want to do ahead of time.

Anything from bug fixes to documentation to tests to just correcting typos is welcome. Removing code is accepted with justification since the API is currently in breakable-when-good mode. Adding features requires the most justification just because dagr already contains code that could be removed.

Changes are reviewed on https://git.spiff.io, so you'll need to be given access to it before you push any changes. If you're not familiar with Gerrit, its user guide (at https://gerrit-documentation.storage.googleapis.com/Documentation/2.12.2/intro-user.html) is a good resource to check out first. If you need help with it, we'll work through it. You're welcome to submit a pull request while working on something if you're looking for cursory feedback before squashing it for review on git.spiff.io.

The only documentation requirement to contribute is that you must add your real name (i.e., the one you'd introduce yourself with, not the one on a birth certficiate), an email address you can be contacted at, and optionally a handle or alias to AUTHORS.md. If this places an undue burden on you, please send me an email at ncower@gmail.com or create a new issue so we can talk about it. This should be part of your first changeset if you're not already credited.

License

Dagr is licensed under the 2-clause BSD license. You should have received a copy of this license with Dagr in the LICENSE.txt file.

Documentation

Overview

Package dagr is an InfluxDB measurement library.

You can use dagr to keeping track of and write primitive types understood by InfluxDB. Measurements are written in line protocol format, per InfluxDB 0.9.x docs:

- Line Protocol: https://influxdb.com/docs/v0.9/write_protocols/line.html - Line Protocol Syntax: https://influxdb.com/docs/v0.9/write_protocols/write_syntax.html

dagr has a handful of types, Int, Float, Bool, and String, that allow atomic updates to their values from concurrent goroutines, and a few provisions for writing measurements in line protocol format.

Index

Examples

Constants

View Source
const (
	ErrNoFields    = Error(1 + iota) // Returned by WriteMeasurement(s) when a measurement has no fields
	ErrEmptyKey                      // Used to panic when attempting to allocate a point with an empty key
	ErrNoAllocator                   // Used to panic when attempting to allocate a PointSet with a nil allocator
)

Variables

View Source
var (
	DiscardLogger        = discardLogger{}
	StdLogger            = defaultLogger{}
	Log           Logger = DiscardLogger
)

Functions

func AddFloat

func AddFloat(field Field, incr float64) bool

AddFloat adds incr to any field that implements either FloatAdder or IntAdder (by converting incr to an int64) and returns true. If the field doesn't implement either interface, it returns false. If field is nil, it returns false.

func AddInt

func AddInt(field Field, incr int64) bool

AddInt adds incr to any field that implements either IntAdder or FloatAdder (by converting incr to a float64) and returns true. If the field doesn't implement either interface, it returns false. This can be used to interact with elements of Fields without requiring you to perform type assertions. If field is nil, it returns false.

func WriteMeasurement

func WriteMeasurement(w io.Writer, m Measurement) (n int64, err error)

WriteMeasurement writes a single measurement, m, to w. It returns the number of bytes written and any error that occurred when writing the measurement.

When writing tags and fields, both are sorted by name in ascending order. So, a tag named "pid" will precede a tag named "version", and a field name "depth" will precede a field named "value".

If the measurement has no fields, it returns 0 and ErrNoFields.

If the measurement implements io.WriterTo, this simply calls that instead of WriteMeasurement.

Example
integer := new(Int)
boolean := new(Bool)
float := new(Float)
str := new(String)

integer.Set(123)
boolean.Set(true)
float.Set(123.456)
str.Set(`a "string" of sorts`)

m := NewPoint(
	"service.some_event",
	Tags{"pid": fmt.Sprint(1234), "host": "example.local"},
	Fields{"value": integer, "depth": float, "on": boolean, "msg": str},
)

WriteMeasurement(os.Stdout, m)
Output:

service.some_event,host=example.local,pid=1234 depth=123.456,msg="a \"string\" of sorts",on=T,value=123i 1136214245000000000
Example (Compiled)

Compiled measurements can be used to speed up encoding if you're frequently writing a large number of measurements. In most cases, this is entirely unnecessary. Compiled measurements only update their field values and are otherwise immutable.

integer := new(Int)
boolean := new(Bool)
float := new(Float)
str := new(String)

integer.Set(123)
boolean.Set(true)
float.Set(123.456)
str.Set(`a "string" of sorts`)

m := NewPoint(
	"service.some_event",
	Tags{"pid": fmt.Sprint(1234), "host": "example.local"},
	Fields{"value": integer, "depth": float, "on": boolean, "msg": str},
).Compiled()

WriteMeasurement(os.Stdout, m)
Output:

service.some_event,host=example.local,pid=1234 depth=123.456,msg="a \"string\" of sorts",on=T,value=123i 1136214245000000000

func WriteMeasurements

func WriteMeasurements(w io.Writer, ms ...Measurement) (n int64, err error)

WriteMeasurements writes all measurements with fields to w. Like WriteMeasurement, this will buffer the measurements before writing them in their entirety to w. This is effectively the same as iterating over ms and writing each measurement to a temporary buffer before writing to w.

Unlike WriteMeasurement, this will not return an error if a measurement has no fields. Measurements without fields are silently ignored. If no measurements are written, WriteMeasurements returns 0 and nil.

Types

type Bool

type Bool uint32

Bool is a Field that stores an InfluxDB boolean value. When written, it is encoded as either T or F, depending on its value. Its zero value is false.

func (*Bool) Dup

func (b *Bool) Dup() Field

func (*Bool) MarshalJSON

func (b *Bool) MarshalJSON() ([]byte, error)

func (*Bool) Set

func (b *Bool) Set(new bool)

Set sets the Bool's value to new.

func (*Bool) Snapshot

func (b *Bool) Snapshot() Field

func (*Bool) UnmarshalJSON

func (b *Bool) UnmarshalJSON(js []byte) error

func (*Bool) WriteTo

func (b *Bool) WriteTo(w io.Writer) (int64, error)

type Error

type Error int

Error is any error code that is returned by a dagr function or method or might be worth catching a panic on.

func (Error) Error

func (e Error) Error() string

type Field

type Field interface {
	Dup() Field
	io.WriterTo
}

Field is any field value an InfluxDB measurement may hold. Fields must be duplicate-able (e.g., for snapshotting and such). Methods that modify field state must be safe for use across multiple goroutines. If the type implementing Field does not have mutable state, it is considered read-only.

In order to keep fields and their names separate, Field is not responsible for writing its name, and should not attempt to write a name, only its value.

type Fields

type Fields map[string]Field

func (Fields) Dup

func (fs Fields) Dup(deep bool) Fields

Dup clones the Fields map. If deep is true, it will also duplicate the fields held, creating new field instances, otherwise it retains its references to the Fields held by fs. If fs is nil, it returns nil.

type Float

type Float uint64

Float is a Field that stores an InfluxDB float value. When written, it's encoded as a float64 using as few digits as possible (i.e., its precision is -1 when passed to FormatFloat). Different behavior may be desirable, in which case it's necessary to implement your own float field. Updates to Float are atomic.

func (*Float) Add

func (f *Float) Add(incr float64)

Add adds incr to the value held by Float.

func (*Float) Dup

func (f *Float) Dup() Field

func (*Float) MarshalJSON

func (f *Float) MarshalJSON() ([]byte, error)

func (*Float) Set

func (f *Float) Set(new float64)

Set sets the Float's value to new.

func (*Float) Snapshot

func (f *Float) Snapshot() Field

func (*Float) UnmarshalJSON

func (f *Float) UnmarshalJSON(in []byte) error

func (*Float) WriteTo

func (f *Float) WriteTo(w io.Writer) (int64, error)

type FloatAdder

type FloatAdder interface {
	Add(float64)
}

FloatAdder defines anything that can have a float64 added to itself (e.g., a Float).

type Int

type Int int64

Int is a Field that stores an InfluxDB integer value. When written, it's encoded as a 64-bit integer with the 'i' suffix, per InfluxDB documentation (e.g., "123456i" sans quotes).

func (*Int) Add

func (n *Int) Add(incr int64)

Add adds incr to the value held by the Int.

func (*Int) Dup

func (n *Int) Dup() Field

func (*Int) MarshalJSON

func (n *Int) MarshalJSON() ([]byte, error)

func (*Int) Set

func (n *Int) Set(new int64)

Set sets the value held by the Int.

func (*Int) Snapshot

func (n *Int) Snapshot() Field

func (*Int) UnmarshalJSON

func (n *Int) UnmarshalJSON(in []byte) error

func (*Int) WriteTo

func (n *Int) WriteTo(w io.Writer) (int64, error)

type IntAdder

type IntAdder interface {
	Add(int64)
}

IntAdder defines anything that can have an int64 added to itself (e.g., an Int).

type Logger

type Logger interface {
	Printf(string, ...interface{})
}

type Measurement

type Measurement interface {
	GetKey() string
	GetTags() Tags
	GetFields() Fields
}

Measurement defines the minimum interface for a measurement that could be passed to InfluxDB. All measurements must have a key and a minimum of one field. Tags are optional.

Unless stated otherwise, the results of Tags and Fields are considered immutable.

type Point

type Point struct {
	// contains filtered or unexported fields
}

Point is a single Measurement as understood by InfluxDB.

func NewPoint

func NewPoint(key string, tags Tags, fields Fields) *Point

NewPoint allocates a new Point with the given key, tags, and fields. If key is empty, NewPoint panics. If fields is empty, the point cannot be written until it has at least one field.

func (*Point) Compiled

func (p *Point) Compiled() Measurement

Compiled returns a compiled form of the point. The resulting Measurement is immutable except for its field values. New fields may not be added and it will not return a key, tags, or values. The compiled form of a point is only useful to improve write times on points when necessary. If the point has no fields, it returns nil, as the point is not valid to write.

func (*Point) GetFields

func (p *Point) GetFields() Fields

GetFields returns a map of the point's fields. This map is a copy of the point's state and may be modified without affecting the point. The fields themselves are those held by the point, however, and modifying them will modify the point's state.

func (*Point) GetKey

func (p *Point) GetKey() string

GetKey returns the point's key.

func (*Point) GetTags

func (p *Point) GetTags() Tags

GetTags returns a copy of the point's tags as a map of names to values. Names and values are not escaped. It is safe to copy and modify the result of this method.

func (*Point) MarshalJSON

func (p *Point) MarshalJSON() ([]byte, error)
Example
integer := new(Int)
boolean := new(Bool)
float := new(Float)
str := new(String)

integer.Set(123)
boolean.Set(true)
float.Set(123.456)
str.Set(`a "string" of sorts`)

m := NewPoint(
	"service.some_event",
	Tags{"pid": fmt.Sprint(1234), "host": "example.local"},
	Fields{"value": integer, "depth": float, "on": boolean, "msg": str},
)

bs, err := json.MarshalIndent(m, "", "  ")
if err != nil {
	panic(err)
}

fmt.Printf("%s", bs)
Output:

{
  "Key": "service.some_event",
  "Timestamp": "1136214245000000000",
  "Tags": {
    "depth": 123.456,
    "msg": "a \"string\" of sorts",
    "on": true,
    "value": 123
  },
  "Fields": {
    "host": "example.local",
    "pid": "1234"
  }
}

func (*Point) RemoveField

func (p *Point) RemoveField(name string)

RemoveField removes a field with the given name. If name is empty, the call is a no-op. It is safe to call RemoveField from concurrent goroutines.

func (*Point) RemoveTag

func (p *Point) RemoveTag(name string)

RemoveTag removes the tag with the given name from the point. If name is empty, the call is a no-op. It is safe to call from concurrent goroutines.

func (*Point) SetField

func (p *Point) SetField(name string, value Field)

SetField sets a field with the given name and value. If name is empty, the call is a no-op. If value is nil, it removes the field.

func (*Point) SetKey

func (p *Point) SetKey(key string)

SetKey sets the point's key.

func (*Point) SetTag

func (p *Point) SetTag(name, value string)

SetTag sets a tag on the point with the given name and value. If the value is empty, the tag is removed. If the name is empty, the call is a no-op. It is safe to call SetTag from concurrent goroutines.

func (*Point) WriteTo

func (p *Point) WriteTo(w io.Writer) (int64, error)

WriteTo writes the point to the given writer, w. If an error occurs while building the point, it writes nothing and return an error. If the point has no fields, it returns the error ErrNoFields.

type PointAllocFunc

type PointAllocFunc func(identifier string, opaque interface{}) (key string, tags Tags, fields Fields)

func (PointAllocFunc) AllocatePoint

func (fn PointAllocFunc) AllocatePoint(identifier string, opaque interface{}) (key string, tags Tags, fields Fields)

type PointAllocator

type PointAllocator interface {
	AllocatePoint(identifier string, opaque interface{}) (key string, tags Tags, fields Fields)
}

PointAllocator is used to prepare a point from an identifier. Given the identifier, the PointAllocator must return a key, tags, and fields for a new Point to use. The PointAllocator can signal that an identifier should not receive a point by returning an empty key or no fields.

The map of tags and fields are copied, so you may return prefabricated maps of tags or fields (though usually you'll want at least one of a unique tag or field per point).

The opaque value has no effect on the identifier, but can be used to pass specific information to affect the result of the allocator (e.g., an *http.Request that might contain a path to include a tag).

type PointSet

type PointSet struct {
	// contains filtered or unexported fields
}

PointSet is a simple collection of (compiled) points that supports dynamic allocation and revocation of points. It is intended for situations where you have a single point form varying across a tag or field, such as a request path or some other varying data.

func NewPointSet

func NewPointSet(allocator PointAllocator) *PointSet

NewPointSet allocates a new PointSet with the given allocator. If allocator is nil, the function panics with ErrNoAllocator. You shouldn't bother recovering from this because there is no recovering from a PointSet without an allocator.

func (*PointSet) Clear

func (p *PointSet) Clear()

Clear erases all points held by the PointSet.

func (*PointSet) FieldsForID

func (p *PointSet) FieldsForID(identifier string, opaque interface{}) Fields

FieldsForID returns the fields for a particular identifier. If no point is found and no point can be allocated for the identifier, it returns nil. The identifier may be an empty string.

func (*PointSet) GetFields

func (p *PointSet) GetFields() Fields

func (*PointSet) GetKey

func (p *PointSet) GetKey() string

Key returns an empty string, as a PointSet is a collection of points and relies on its WriterTo implementation for encoding its output.

func (*PointSet) GetTags

func (p *PointSet) GetTags() Tags

func (*PointSet) Remove

func (p *PointSet) Remove(identifier string)

func (*PointSet) WriteTo

func (p *PointSet) WriteTo(w io.Writer) (int64, error)

type RawBool

type RawBool bool

func (RawBool) Dup

func (f RawBool) Dup() Field

func (RawBool) MarshalJSON

func (f RawBool) MarshalJSON() ([]byte, error)

func (RawBool) WriteTo

func (f RawBool) WriteTo(w io.Writer) (n int64, err error)

type RawFloat

type RawFloat float64

func (RawFloat) Dup

func (f RawFloat) Dup() Field

func (RawFloat) MarshalJSON

func (f RawFloat) MarshalJSON() ([]byte, error)

func (RawFloat) WriteTo

func (f RawFloat) WriteTo(w io.Writer) (int64, error)

type RawInt

type RawInt int64

func (RawInt) Dup

func (f RawInt) Dup() Field

func (RawInt) MarshalJSON

func (f RawInt) MarshalJSON() ([]byte, error)

func (RawInt) WriteTo

func (f RawInt) WriteTo(w io.Writer) (int64, error)

type RawPoint

type RawPoint struct {
	Key    string
	Tags   Tags
	Fields Fields
	Time   time.Time
}

RawPoint is a simple Field implementation that has no read/write concurrency guarantees. It's useful as a read-only point that can be written out and quickly discarded. It's a good idea to make use of the raw field types for this as well to avoid the overhead of atomics.

func (RawPoint) GetFields

func (p RawPoint) GetFields() Fields

func (RawPoint) GetKey

func (p RawPoint) GetKey() string

func (RawPoint) GetTags

func (p RawPoint) GetTags() Tags

func (RawPoint) GetTime

func (p RawPoint) GetTime() time.Time

type RawString

type RawString string

func (RawString) Dup

func (s RawString) Dup() Field

func (RawString) Snapshot

func (s RawString) Snapshot() Field

func (RawString) WriteTo

func (s RawString) WriteTo(w io.Writer) (int64, error)

type SnapshotField

type SnapshotField interface {
	Field
	Snapshot() Field
}

Snapshotter is an interface that any Field may implement that returns a more-or-less frozen instance of a field.

type SnapshotMeasurement

type SnapshotMeasurement interface {
	Measurement
	Snapshot() TimeMeasurement
}

type StaticPointAllocator

type StaticPointAllocator struct {
	Key    string
	Tags   Tags
	Fields Fields // All fields will be duplicated to ensure that they're unique among points

	// If either IdnetifierTag or IdentifierField is set, the tag/field with that name will be assigned the
	// identifier passed to the StaticPointAllocator.
	IdentifierTag   string
	IdentifierField string
}

StaticPointAllocator is a simple PointAllocator that reuses a single key and set of tags and fields.

If fields is nil, IdentifierField must be non-empty, otherwise it will never produce a valid point.

func (StaticPointAllocator) AllocatePoint

func (s StaticPointAllocator) AllocatePoint(identifier string, _ interface{}) (key string, tags Tags, fields Fields)

type String

type String struct {
	// contains filtered or unexported fields
}

String is a Field that stores an InfluxDB string value.

func (*String) Dup

func (s *String) Dup() Field

func (*String) MarshalJSON

func (s *String) MarshalJSON() ([]byte, error)

func (*String) Set

func (s *String) Set(new string)

Set sets the String's value to new.

func (*String) Snapshot

func (s *String) Snapshot() Field

func (*String) UnmarshalJSON

func (s *String) UnmarshalJSON(in []byte) error

func (*String) WriteTo

func (s *String) WriteTo(w io.Writer) (int64, error)

type Tags

type Tags map[string]string

func (Tags) Dup

func (t Tags) Dup() Tags

Dup clones the Tags map, t. If t is nil, it returns nil.

type TimeMeasurement

type TimeMeasurement interface {
	GetTime() time.Time

	Measurement
}

TimeMeasurement is a measurement that has a known, known, fixed time. It can be considered a measurement that is fixed to a specific point in time. TimeMeasurements are not necessarily read-only and may, for example, return time.Now(). In general, it is better to not implement TimeMeasurement if your measurement is returning time.Now().

func Snapshot

func Snapshot(m Measurement) TimeMeasurement

Snapshot creates and returns a new measurement that implements TimeMeasurement. This Measurement is detached from its original source and is intended to be fixed in time. This is roughly the same as duplicating a point and its fields.

If either the measurement's key or fields are empty, Snapshot returns nil.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL