log-shuttle: github.com/heroku/log-shuttle Index | Examples | Files

package shuttle

import "github.com/heroku/log-shuttle"



Package Files

batch.go batcher.go cleanurl.go cloudwatchlogs_formatter.go config.go counter.go formatter.go gzip_formatter.go http_outlet.go kinesis_formatter.go kinesis_types.go logline.go logplex_formatter.go metrics_reporter.go reader.go shuttle.go


const (
    InputFormatRaw                   = iota // default, just text
    InputFormatRFC5424                      // newline termined RFC5424 formated lines
    InputFormatLengthPrefixedRFC5424        // length prefixed RFC5424 formatted lines

Input format constants. TODO: ensure these are really used properly

const (
    DefaultMaxLineLength = 10000 // Logplex max is 10000 bytes, so default to that
    DefaultInputFormat   = InputFormatRaw
    DefaultBackBuff      = 50
    DefaultTimeout       = 5 * time.Second
    DefaultWaitDuration  = 250 * time.Millisecond
    DefaultMaxAttempts   = 3
    DefaultStatsInterval = 0 * time.Second
    DefaultStatsSource   = ""
    DefaultVerbose       = false
    DefaultSkipVerify    = false
    DefaultPriVal        = "190"
    DefaultVersion       = "1"
    DefaultProcID        = "shuttle"
    DefaultAppName       = "token"
    DefaultHostname      = "shuttle"
    DefaultMsgID         = "- -"
    DefaultLogsURL       = ""
    DefaultNumOutlets    = 4
    DefaultBatchSize     = 500
    DefaultID            = ""
    DefaultDrop          = true
    DefaultUseGzip       = false
    DefaultKinesisShards = 1

Default option values

const (
    // EOFRetrySleep is the amount of time to sleep between retries caused by an io.EOF, in ms.
    EOFRetrySleep = 100
    // OtherRetrySleep is the time to sleep between retries for any other error, in ms.
    OtherRetrySleep = 1000
    // DepthHighWatermark is the high watermark, beyond which the outlet looses batches instead of retrying.
    DepthHighWatermark = 0.6
    // RetryWithTypeFormat if the format string for retries that also have a type
    RetryWithTypeFormat = "at=post retry=%t msgcount=%d inbox.length=%d request_id=%q attempts=%d error=%q errtype=\"%T\"\n"
const (
    // LogplexBatchTimeFormat is the format of timestamps as expected by Logplex
    LogplexBatchTimeFormat = "2006-01-02T15:04:05.000000+00:00"
    // LogplexContentType is the content type logplex expects
    LogplexContentType = "application/logplex-1"


var (
    DefaultFormatterFunc = NewLogplexBatchFormatter

Defaults that can't be constants

type Batch Uses

type Batch struct {
    UUID string
    // contains filtered or unexported fields

Batch holds incoming log lines and provides some helpers for dealing with their grouping

func NewBatch Uses

func NewBatch(capacity int) Batch

NewBatch returns a new batch with a capacity pre-set

func (*Batch) Add Uses

func (b *Batch) Add(ll LogLine) bool

Add a logline to the batch and return a boolean indicating if the batch is full or not

func (*Batch) MsgCount Uses

func (b *Batch) MsgCount() int

MsgCount returns the number of msgs in the batch

type Batcher Uses

type Batcher struct {
    // contains filtered or unexported fields

Batcher coalesces logs coming via inLogs into batches, which are sent out via outBatches

func NewBatcher Uses

func NewBatcher(s *Shuttle) Batcher

NewBatcher created an empty Batcher for the provided shuttle

func (Batcher) Batch Uses

func (b Batcher) Batch()

Batch loops getting an empty batch and filling it. Filled batcches are sent via the outBatches channel. If outBatches is full, then the batch is dropped and the drops counters is incremented by the number of messages dropped.

type CloudWatchLogsFormatter Uses

type CloudWatchLogsFormatter struct {
    // contains filtered or unexported fields

CloudWatchLogsFormatter formats a batch of logs for the Amazon Cloud Watch Logs service. NewCloudWatchLogsFormatterFunc should be used to create a HTTPFormatterFunc tied to a specific region/logGroupName/logStreamName.

func (*CloudWatchLogsFormatter) HandleResponse Uses

func (f *CloudWatchLogsFormatter) HandleResponse(resp *http.Response) error

HandleResponse to the request that was generated. See ResponseHandler for more info.

func (*CloudWatchLogsFormatter) MsgCount Uses

func (f *CloudWatchLogsFormatter) MsgCount() int

MsgCount of the request. See HTTPSubFormatter for more info.

func (*CloudWatchLogsFormatter) Request Uses

func (f *CloudWatchLogsFormatter) Request() (*http.Request, error)

Request to submit for the batch. See HTTPFormatter for more info.

type Config Uses

type Config struct {
    MaxLineLength int
    BackBuff      int
    BatchSize     int
    NumOutlets    int
    InputFormat   int
    MaxAttempts   int
    KinesisShards int
    LogsURL       string
    Prival        string
    Version       string
    Procid        string
    Hostname      string
    Appname       string
    Msgid         string
    StatsSource   string
    SkipVerify    bool
    Verbose       bool
    UseGzip       bool
    Drop          bool
    WaitDuration  time.Duration
    Timeout       time.Duration
    StatsInterval time.Duration

    ID            string
    FormatterFunc NewHTTPFormatterFunc

    // Loggers
    Logger    *log.Logger
    ErrLogger *log.Logger
    // contains filtered or unexported fields

Config holds the various config options for a shuttle

func NewConfig Uses

func NewConfig() Config

NewConfig returns a newly created Config, filled in with defaults

func (*Config) ComputeHeader Uses

func (c *Config) ComputeHeader()

ComputeHeader computes the syslogFrameHeaderFormat once so we don't have to do that for every formatter itteration Should be called after setting up the rest of the config or if the config changes

type Counter Uses

type Counter struct {
    // contains filtered or unexported fields

Counter is used to track 2 values for a given metric. The first item is the "all time" metric counterand the second is the last value since the metric was ReadAndReset. Counters are safe for concurrent use.

func NewCounter Uses

func NewCounter(initial int) *Counter

NewCounter returns a new Counter initialized to the initial value

func (*Counter) Add Uses

func (c *Counter) Add(u int) int

Add increments the counter (alltime and current), returning the new value

func (*Counter) AllTime Uses

func (c *Counter) AllTime() int

AllTime returns the current alltime value of the Counter

func (*Counter) Read Uses

func (c *Counter) Read() int

Read returns the current value of the Counter

func (*Counter) ReadAndReset Uses

func (c *Counter) ReadAndReset() (int, time.Time)

ReadAndReset returns the current value and the last time it was reset, then resets the value and the last reset time to time.Now()

type GzipFormatter Uses

type GzipFormatter struct {
    // contains filtered or unexported fields

GzipFormatter is an HTTPFormatter that is built with a delegate HTTPFormatter but which compresses the request body

func NewGzipFormatter Uses

func NewGzipFormatter(delegate HTTPFormatter) *GzipFormatter

NewGzipFormatter builds a new GzipFormatter with the supplied delegate

func (*GzipFormatter) Close Uses

func (g *GzipFormatter) Close() error

Close the stream

func (*GzipFormatter) MsgCount Uses

func (g *GzipFormatter) MsgCount() int

MsgCount return the number of messages contained in the formatted batch

func (*GzipFormatter) Read Uses

func (g *GzipFormatter) Read(p []byte) (int, error)

Read bytes from the formatter stream

func (*GzipFormatter) Request Uses

func (g *GzipFormatter) Request() (*http.Request, error)

Request returns a http.Request to be used with a http.Client The request has it's body and headers set as necessary

type HTTPFormatter Uses

type HTTPFormatter interface {
    Request() (*http.Request, error) // Request() returns a *http.Request ready to be handled by an outlet

HTTPFormatter is the interface that http outlets use to format a HTTP request.

func NewKinesisFormatter Uses

func NewKinesisFormatter(b Batch, eData []errData, config *Config) HTTPFormatter

NewKinesisFormatter constructs a proper HTTPFormatter for Kinesis http targets

func NewLogplexBatchFormatter Uses

func NewLogplexBatchFormatter(b Batch, eData []errData, config *Config) HTTPFormatter

NewLogplexBatchFormatter returns a new LogplexBatchFormatter wrapping the provided batch

type HTTPOutlet Uses

type HTTPOutlet struct {

    // User supplied loggers
    Logger *log.Logger
    // contains filtered or unexported fields

HTTPOutlet handles delivery of batches to HTTP endpoints by creating formatters for each request. HTTPOutlets handle retries, response parsing and lost counters

func NewHTTPOutlet Uses

func NewHTTPOutlet(s *Shuttle) *HTTPOutlet

NewHTTPOutlet returns a properly constructed HTTPOutlet for the given shuttle

func (*HTTPOutlet) Outlet Uses

func (h *HTTPOutlet) Outlet()

Outlet receives batches from the inbox and submits them to logplex via HTTP.

type KinesisFormatter Uses

type KinesisFormatter struct {
    // contains filtered or unexported fields

KinesisFormatter formats batches destined for AWS Kinesis HTTP endpoints Kinesis has a very small payload side, so recommend setting config.BatchSize in the 1-3 range so as to not loose logs because we go over the batch size. Kinesis formats the Data using the LogplexLineFormatter, which is additionally base64 encoded.

func (*KinesisFormatter) MsgCount Uses

func (kf *KinesisFormatter) MsgCount() int

MsgCount returns the number of records that the formatter is formatting

func (*KinesisFormatter) Request Uses

func (kf *KinesisFormatter) Request() (*http.Request, error)

Request constructs a request for this formatter See: http://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html

type KinesisRecord Uses

type KinesisRecord struct {
    // contains filtered or unexported fields

KinesisRecord is used to marshal LoglexLineFormatters to Kinesis Records for the PutRecords API Call

func (KinesisRecord) WriteTo Uses

func (r KinesisRecord) WriteTo(w io.Writer) (int64, error)

WriteTo writes the LogplexLineFormatter to the provided writer in Kinesis' PutRecordsFormat. Conforms to the WriterTo interface.

type LogLine Uses

type LogLine struct {
    // contains filtered or unexported fields

LogLine holds the new line terminated log messages and when shuttle received them.

func (LogLine) Length Uses

func (ll LogLine) Length() int

Length returns the length of the raw byte of the LogLine

type LogLineReader Uses

type LogLineReader struct {
    // contains filtered or unexported fields

LogLineReader performs the reading of lines from an io.ReadCloser, encapsulating lines into a LogLine and emitting them on outbox

func NewLogLineReader Uses

func NewLogLineReader(input io.ReadCloser, s *Shuttle) *LogLineReader

NewLogLineReader constructs a new reader with it's own Outbox.

func (*LogLineReader) Close Uses

func (rdr *LogLineReader) Close() error

Close the reader for input

func (*LogLineReader) ReadLines Uses

func (rdr *LogLineReader) ReadLines() error

ReadLines from the input created for. Return any errors blocks until the underlying reader is closed

type LogplexBatchFormatter Uses

type LogplexBatchFormatter struct {
    // contains filtered or unexported fields

LogplexBatchFormatter implements on io.Reader that returns Logplex formatted log lines. Wraps log lines in length prefixed rfc5424 formatting, splitting them as necessary to config.MaxLineLength

func (*LogplexBatchFormatter) MsgCount Uses

func (bf *LogplexBatchFormatter) MsgCount() int

MsgCount of the wrapped batch.

func (*LogplexBatchFormatter) Request Uses

func (bf *LogplexBatchFormatter) Request() (*http.Request, error)

Request returns a properly constructed *http.Request, complete with headers and ContentLength set.

type LogplexLineFormatter Uses

type LogplexLineFormatter struct {
    // contains filtered or unexported fields

LogplexLineFormatter formats individual loglines into length prefixed rfc5424 messages via an io.Reader interface

func NewLogplexErrorFormatter Uses

func NewLogplexErrorFormatter(err errData, config *Config) *LogplexLineFormatter

NewLogplexErrorFormatter returns a LogplexLineFormatter for the error data. These can be used to inject error data into the log stream

func NewLogplexLineFormatter Uses

func NewLogplexLineFormatter(ll LogLine, config *Config) *LogplexLineFormatter

NewLogplexLineFormatter returns a new LogplexLineFormatter wrapping the provided LogLine

func (*LogplexLineFormatter) AppName Uses

func (llf *LogplexLineFormatter) AppName() string

AppName returns the name of app name field based on the inputFormat For use in syslog framing

func (*LogplexLineFormatter) MsgCount Uses

func (llf *LogplexLineFormatter) MsgCount() int

MsgCount is always 1 for a Line

func (*LogplexLineFormatter) Read Uses

func (llf *LogplexLineFormatter) Read(p []byte) (n int, err error)

Implements the io.Reader interface tries to fill p as full as possible before returning

func (*LogplexLineFormatter) Reset Uses

func (llf *LogplexLineFormatter) Reset()

Reset the reader so that the log line can be re-read

type MetricsReporter Uses

type MetricsReporter struct {
    // contains filtered or unexported fields

MetricsReporter handles reporting of metrics to a specified source at a given duration

func NewMetricsReporter Uses

func NewMetricsReporter(r metrics.Registry, source string, l *log.Logger) *MetricsReporter

NewMetricsReporter returns a properly constructed MetricsReporter

func (MetricsReporter) Emit Uses

func (e MetricsReporter) Emit(d time.Duration)

Emit emits log-shuttle metrics in logfmt compatible formats every d duration using the MetricsReporter logger. source is added to the line as log_shuttle_stats_source if not empty. It waits for a stop signal and will stop emitting metrics when received. Example output: space=<space-id> instance=<instance-id> runc-shuttle2019/06/10 12:48:25 batch.fill.count=0 batch.fill.max=0.000000 batch.fill.mean=0.000000 batch.fill.min=0.000000 batch.fill.p75=0.000000 batch.fill.p95=0.000000 batch.fill.p99=0.000000 batch.fill.rate.15min=0.000 batch.fill.rate.1min=0.000 batch.fill.rate.5min=0.000 batch.fill.rate.mean=0.000 batch.fill.stddev=0.000000 jw-debug-metrics=true lines.batched.count=0 lines.dropped.count=0 lines.read.count=0 msg.lost.count=0 outlet.inbox.length=0 outlet.post.failure.count=0 outlet.post.failure.max=0.000000 outlet.post.failure.mean=0.000000 outlet.post.failure.min=0.000000 outlet.post.failure.p75=0.000000 outlet.post.failure.p95=0.000000 outlet.post.failure.p99=0.000000 outlet.post.failure.rate.15min=0.000 outlet.post.failure.rate.1min=0.000 outlet.post.failure.rate.5min=0.000 outlet.post.failure.rate.mean=0.000 outlet.post.failure.stddev=0.000000 outlet.post.success.count=0 outlet.post.success.max=0.000000 outlet.post.success.mean=0.000000 outlet.post.success.min=0.000000 outlet.post.success.p75=0.000000 outlet.post.success.p95=0.000000 outlet.post.success.p99=0.000000 outlet.post.success.rate.15min=0.000 outlet.post.success.rate.1min=0.000 outlet.post.success.rate.5min=0.000 outlet.post.success.rate.mean=0.000 outlet.post.success.stddev=0.000000

func (MetricsReporter) Stop Uses

func (e MetricsReporter) Stop()

Stop stops a MetricsEmitter from emitting log to its logger

type NewHTTPFormatterFunc Uses

type NewHTTPFormatterFunc func(b Batch, eData []errData, config *Config) HTTPFormatter

NewHTTPFormatterFunc defines the function type for defining creating and returning a new Formatter

func NewCloudWatchLogsFormatterFunc Uses

func NewCloudWatchLogsFormatterFunc(region, host, logGroupName, logStreamName string) (NewHTTPFormatterFunc, error)

NewCloudWatchLogsFormatterFunc that creates a HTTPFormatterFunc for formatting batched into Cloud Watch Logs requests tied to a specific region/host/log group/log stream.

type ResponseHandler Uses

type ResponseHandler interface {
    HandleResponse(*http.Response) error

ResponseHandler needs to handle responses to the requests an outlet submits. If a HTTPFormatter is also a ResponseHandler, and the request didn't error, an outlet must call HandleResponse with the response to the generated request.

type Shuttle Uses

type Shuttle struct {

    Batches chan Batch

    MetricsRegistry metrics.Registry

    Drops, Lost      *Counter
    NewFormatterFunc NewHTTPFormatterFunc
    Logger           *log.Logger
    ErrLogger        *log.Logger
    // contains filtered or unexported fields

Shuttle is the main entry point into the library


config := NewConfig()
// Modulate the config as needed before creating a new shuttle
s := NewShuttle(config)
s.Launch() // Start up the batching/delivering go routines
s.Land()   // Spin down the batching/delivering go routines

func NewShuttle Uses

func NewShuttle(config Config) *Shuttle

NewShuttle returns a properly constructed Shuttle with a given config

func (*Shuttle) CloseReaders Uses

func (s *Shuttle) CloseReaders() []error

CloseReaders closes all tracked readers and returns any errors returned by Close()ing the readers

func (*Shuttle) DockReaders Uses

func (s *Shuttle) DockReaders() []error

DockReaders closes all tracked readers and waits for all reading go routines to finish.

func (*Shuttle) Land Uses

func (s *Shuttle) Land()

Land gracefully terminates the shuttle instance, ensuring that anything read is batched and delivered. A panic is likely to happen if Land() is called before any readers passed to any ReadLogLines() calls aren't closed.

func (*Shuttle) Launch Uses

func (s *Shuttle) Launch()

Launch a shuttle by spawing it's outlets and batchers (in that order), which is the reverse of shutdown.

func (*Shuttle) LoadReader Uses

func (s *Shuttle) LoadReader(rdr io.ReadCloser)

LoadReader into the shuttle for processing it's lines. Use this if you want log-shuttle to track the readers for you. The errors returned by ReadLogLines are discarded.

func (*Shuttle) WaitForReadersToFinish Uses

func (s *Shuttle) WaitForReadersToFinish()

WaitForReadersToFinish to finish reading

type SubFormatter Uses

type SubFormatter interface {
    MsgCount() int // MsgCount is the number of messages after formatting

SubFormatter formats a complete batch or a subsection of a batch. It may split lines in the batch as needed by the destination, making the MsgCount() of the formatter different from the MsgCount of the source batch. A formatter may emitt more (likely) or less bytes for a given LogLine than the actual Logline.

Package shuttle imports 25 packages (graph) and is imported by 4 packages. Updated 2020-06-17. Refresh now. Tools for package owners.