writer

package
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 16, 2020 License: MIT Imports: 5 Imported by: 0

Documentation

Overview

Package writer contains useful types for buffering, batching and periodically syncing writes onto a provided metric writing client.

The following example demonstrate the usage of a *writer.PointWriter. This is designed to buffer calls to Write metrics and flush them in configurable batch sizes (see WithBufferSize). It is also designed to periodically flush the buffer if a configurable duration ellapses between calls to Write. This is useful to ensure metrics are flushed to the client during a pause in their production.

Example Usage

 import (
     "github.com/influxdata/influxdb-client-go"
     "github.com/influxdata/influxdb-client-go/writer"
 )

 func main() {
     var (
         cli, _ = influxdb.New("http://localhost:9999", "some-token")
         bucket = "default"
         org    = "influx"
     )

     wr := writer.New(cli, bucket, org, writer.WithBufferSize(10))

     wr.Write(influxdb.NewRowMetric(
         map[string]interface{}{
			    "value": 16,
		    },
		    "temp_celsius",
		    map[string]string{
			    "room": "living_room",
		    },
		    time.Now(),
     ),
     influxdb.NewRowMetric(
         map[string]interface{}{
			    "value": 17,
		    },
		    "temp_celsius",
		    map[string]string{
			    "room": "living_room",
		    },
		    time.Now(),
     ))

     wr.Close()
 }

writer.New(...) return a PointerWriter which is composed of multiple other types available in this package.

It first wraps the provided client in a *BucketWriter which takes care of ensuring all written metrics are called on the underyling client with a specific organisation and bucket. This is not safe for concurrent use.

It then wraps this writer in a *BufferedWriter and configures its buffer size accordingly. This type implements the buffering of metrics and exposes a flush method. Once the buffer size is exceed flush is called automatically. However, Flush() can be called manually on this type. This is also not safe for concurrent use.

Finally, it wraps the buffered writer in a *PointsWriter which takes care of ensuring Flush is called automatically when it hasn't been called for a configured duration. This final type is safe for concurrent use.

Automatic Retries

The writer package offers automatic retry capabilities during known transient failures This is when the API being consumed reports "unavailable" or "too many requests" error conditions

import (

"time"

"github.com/influxdata/influxdb-client-go"
"github.com/influxdata/influxdb-client-go/writer"

)

func main() {
	var (
		cli, _ = influxdb.New("http://localhost:9999", "some-token")
		bucket = "default"
		org    = "influx"
	)

	// construct a writer with 3 maximum attempts per call to Write and linear backoff derived from number of attempts
	// i.e. a first attempt is followed by a 1 second delay before the second attempt
	// a second attempt is followed by a 2 second delay before the third attempt
	var (
		retryOpts = []writer.RetryOption{writer.WithMaxAttempts(3), writer.WithBackoff(writer.LinearBackoff(time.Second))}
		wr        = writer.New(cli, bucket, org, writer.WithRetries(retryOpts...))
	)
}

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type BackoffFunc

type BackoffFunc func(attempt int) time.Duration

BackoffFunc is a function which when called with an attempt number returns a duration which should be waited for until a subsequent attempt is made

func LinearBackoff

func LinearBackoff(scale time.Duration) BackoffFunc

LinearBackoff returns a BackoffFunc which when called returns attempt * scale. e.g. LinearBackoff(time.Second)(5) returns 5 seconds

type BucketMetricWriter

type BucketMetricWriter interface {
	Write(ctx context.Context, bucket string, org string, m ...influxdb.Metric) (int, error)
}

BucketMetricWriter is a type which Metrics can be written to a particular bucket in a particular organisation

type BucketWriter

type BucketWriter struct {
	// contains filtered or unexported fields
}

BucketWriter writes metrics to a particular bucket within a particular organisation

func NewBucketWriter

func NewBucketWriter(w BucketMetricWriter, bucket, org string) *BucketWriter

NewBucketWriter allocates, configures and returned a new BucketWriter for writing metrics to a specific organisations bucket

func (*BucketWriter) Write

func (b *BucketWriter) Write(m ...influxdb.Metric) (int, error)

Write writes the provided metrics to the underlying metrics writer using the org and bucket configured on the bucket writer

type BufferedWriter

type BufferedWriter struct {
	// contains filtered or unexported fields
}

BufferedWriter is a buffered implementation of the MetricsWriter interface It is unashamedly derived from the bufio pkg https://golang.org/pkg/bufio Metrics are buffered up until the buffer size is met and then flushed to an underlying MetricsWriter The writer can also be flushed manually by calling Flush BufferedWriter is not safe to be called concurrently and therefore concurrency should be managed by the caller

func NewBufferedWriter

func NewBufferedWriter(w MetricsWriter) *BufferedWriter

NewBufferedWriter returns a new *BufferedWriter with the default buffer size

func NewBufferedWriterSize

func NewBufferedWriterSize(w MetricsWriter, size int) *BufferedWriter

NewBufferedWriterSize returns a new *BufferedWriter with a buffer allocated with the provided size

func (*BufferedWriter) Available

func (b *BufferedWriter) Available() int

Available returns how many bytes are unused in the buffer.

func (*BufferedWriter) Buffered

func (b *BufferedWriter) Buffered() int

Buffered returns the number of bytes that have been written into the current buffer.

func (*BufferedWriter) Flush

func (b *BufferedWriter) Flush() error

Flush writes any buffered data to the underlying MetricsWriter

func (*BufferedWriter) Write

func (b *BufferedWriter) Write(m ...influxdb.Metric) (nn int, err error)

Write writes the provided metrics to the underlying buffer if there is available capacity. Otherwise it flushes the buffer and attempts to assign the remain metrics to the buffer. This process repeats until all the metrics are either flushed or in the buffer

type Config

type Config struct {
	// contains filtered or unexported fields
}

Config is a structure used to configure a point writer

type MetricsWriteFlusher

type MetricsWriteFlusher interface {
	Write(m ...influxdb.Metric) (int, error)
	Available() int
	Flush() error
}

MetricsWriteFlush is a type of metrics writer which is buffered and metrics can be flushed to

type MetricsWriter

type MetricsWriter interface {
	Write(...influxdb.Metric) (int, error)
}

MetricsWriter is a type which metrics can be written to

type Option

type Option func(*Config)

Option is a functional option for Configuring point writers

func WithBufferSize

func WithBufferSize(size int) Option

WithBufferSize sets the size of the underlying buffer on the point writer

func WithContext added in v0.1.3

func WithContext(ctxt context.Context) Option

WithContext sets the context.Context used for each flush

func WithFlushInterval

func WithFlushInterval(interval time.Duration) Option

WithFlushInterval sets the flush interval on the writer The point writer will wait at least this long between flushes of the undeyling buffered writer

func WithRetries

func WithRetries(options ...RetryOption) Option

WithRetries configures automatic retry behavior on specific transient error conditions when attempting to Write metrics to a client

type Options

type Options []Option

Options is a slice of Option

func (Options) Apply

func (o Options) Apply(c *Config)

Apply calls each option in the slice on options on the provided Config

func (Options) Config

func (o Options) Config() Config

Config constructs a default configuration and then applies the callee options and returns the config

type PointWriter

type PointWriter struct {
	// contains filtered or unexported fields
}

PointWriter delegates calls to Write to an underlying flushing writer implementation. It also periodically calls flush on the underlying writer and is safe to be called concurrently. As the flushing writer can also flush on calls to Write when the number of metrics being written exceeds the buffer capacity, it also ensures to reset its timer in this scenario as to avoid calling flush multiple times

func New

func New(writer BucketMetricWriter, bkt, org string, opts ...Option) *PointWriter

New constructs a point writer with an underlying buffer from the provided BucketMetricWriter The writer will flushed metrics to the underlying BucketMetricWriter when the buffer is full or the configured flush interval ellapses without a flush occuring

func NewPointWriter

func NewPointWriter(w MetricsWriteFlusher, flushInterval time.Duration) *PointWriter

NewPointWriter configures and returns a *PointWriter writer type The new writer will automatically begin scheduling periodic flushes based on the provided duration

func (*PointWriter) Close

func (p *PointWriter) Close() error

Close signals to stop flushing metrics and causes subsequent calls to Write to return a closed pipe error Close returns once scheduledge flushing has stopped Close does a final flush on return and returns any error from the final flush if it occurs

func (*PointWriter) Write

func (p *PointWriter) Write(m ...influxdb.Metric) (int, error)

Write delegates to an underlying metrics writer If the delegating call is going to cause a flush, it signals to the schduled periodic flush to reset its timer

type RetryOption

type RetryOption func(*RetryWriter)

RetryOption is a functional option for the RetryWriters type

func WithBackoff

func WithBackoff(fn BackoffFunc) RetryOption

WithBackoff sets of the BackoffFunc on the RetryWriter

func WithMaxAttempts

func WithMaxAttempts(maxAttempts int) RetryOption

WithMaxAttempts sets the maximum number of attempts for a Write operation attempt

func WithRetrySleepLimit added in v0.2.0

func WithRetrySleepLimit(retrySleepLimit int) RetryOption

WithRetrySleepLimit sets the retry sleep limit. This optiona allows us to abort retry sleeps past some number of seconds.

type RetryWriter

type RetryWriter struct {
	MetricsWriter
	// contains filtered or unexported fields
}

RetryWriter is a metrics writers which decorates other metrics writer implementations and automatically retries attempts to write metrics under certain error conditions

func NewRetryWriter

func NewRetryWriter(w MetricsWriter, opts ...RetryOption) *RetryWriter

NewRetryWriter returns a configured *RetryWriter which decorates the supplied MetricsWriter

func (*RetryWriter) Write

func (r *RetryWriter) Write(m ...influxdb.Metric) (n int, err error)

Write delegates to underlying MetricsWriter and then automatically retries when errors occur

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL