import "github.com/heroku/log-shuttle"
batch.go batcher.go cleanurl.go config.go counter.go formatter.go gzip_formatter.go http_outlet.go kinesis_formatter.go kinesis_types.go logline.go logplex_formatter.go reader.go shuttle.go
const ( InputFormatRaw = iota // default, just text InputFormatRFC5424 // newline termined RFC5424 formated lines InputFormatLengthPrefixedRFC5424 // length prefixed RFC5424 formatted lines )
Input format constants. TODO: ensure these are really used properly
const ( DefaultMaxLineLength = 10000 // Logplex max is 10000 bytes, so default to that DefaultInputFormat = InputFormatRaw DefaultBackBuff = 50 DefaultTimeout = 5 * time.Second DefaultWaitDuration = 250 * time.Millisecond DefaultMaxAttempts = 3 DefaultStatsInterval = 0 * time.Second DefaultStatsSource = "" DefaultVerbose = false DefaultSkipVerify = false DefaultPriVal = "190" DefaultVersion = "1" DefaultProcID = "shuttle" DefaultAppName = "token" DefaultHostname = "shuttle" DefaultMsgID = "- -" DefaultLogsURL = "" DefaultNumOutlets = 4 DefaultBatchSize = 500 DefaultID = "" DefaultDrop = true DefaultUseGzip = false DefaultKinesisShards = 1 )
Default option values
const ( // EOFRetrySleep is the amount of time to sleep between retries caused by an io.EOF, in ms. EOFRetrySleep = 100 // OtherRetrySleep is the time to sleep between retries for any other error, in ms. OtherRetrySleep = 1000 // DepthHighWatermark is the high watermark, beyond which the outlet looses batches instead of retrying. DepthHighWatermark = 0.6 // RetryWithTypeFormat if the format string for retries that also have a type RetryWithTypeFormat = "at=post retry=%t msgcount=%d inbox.length=%d request_id=%q attempts=%d error=%q errtype=\"%T\"\n" )
const ( // LogplexBatchTimeFormat is the format of timestamps as expected by Logplex LogplexBatchTimeFormat = "2006-01-02T15:04:05.000000+00:00" // LogplexContentType is the content type logplex expects LogplexContentType = "application/logplex-1" )
var ( DefaultFormatterFunc = NewLogplexBatchFormatter )
Defaults that can't be constants
Batch holds incoming log lines and provides some helpers for dealing with their grouping
NewBatch returns a new batch with a capacity pre-set
Add a logline to the batch and return a boolean indicating if the batch is full or not
MsgCount returns the number of msgs in the batch
type Batcher struct {
// contains filtered or unexported fields
}
Batcher coalesces logs coming via inLogs into batches, which are sent out via outBatches
NewBatcher created an empty Batcher for the provided shuttle
Batch loops getting an empty batch and filling it. Filled batcches are sent via the outBatches channel. If outBatches is full, then the batch is dropped and the drops counters is incremented by the number of messages dropped.
type Config struct { MaxLineLength int BackBuff int BatchSize int NumOutlets int InputFormat int MaxAttempts int KinesisShards int LogsURL string Prival string Version string Procid string Hostname string Appname string Msgid string StatsSource string SkipVerify bool Verbose bool UseGzip bool Drop bool WaitDuration time.Duration Timeout time.Duration StatsInterval time.Duration ID string FormatterFunc NewHTTPFormatterFunc // Loggers Logger *log.Logger ErrLogger *log.Logger // contains filtered or unexported fields }
Config holds the various config options for a shuttle
NewConfig returns a newly created Config, filled in with defaults
ComputeHeader computes the syslogFrameHeaderFormat once so we don't have to do that for every formatter itteration Should be called after setting up the rest of the config or if the config changes
Counter is used to track 2 values for a given metric. The first item is the "all time" metric counterand the second is the last value since the metric was ReadAndReset. Counters are safe for concurrent use.
NewCounter returns a new Counter initialized to the initial value
Add increments the counter (alltime and current), returning the new value
AllTime returns the current alltime value of the Counter
Read returns the current value of the Counter
ReadAndReset returns the current value and the last time it was reset, then resets the value and the last reset time to time.Now()
type GzipFormatter struct {
// contains filtered or unexported fields
}
GzipFormatter is an HTTPFormatter that is built with a delegate HTTPFormatter but which compresses the request body
func NewGzipFormatter(delegate HTTPFormatter) *GzipFormatter
NewGzipFormatter builds a new GzipFormatter with the supplied delegate
func (g *GzipFormatter) Close() error
Close the stream
func (g *GzipFormatter) MsgCount() int
MsgCount return the number of messages contained in the formatted batch
func (g *GzipFormatter) Read(p []byte) (int, error)
Read bytes from the formatter stream
func (g *GzipFormatter) Request() (*http.Request, error)
Request returns a http.Request to be used with a http.Client The request has it's body and headers set as necessary
type HTTPFormatter interface { Request() (*http.Request, error) // Request() returns a *http.Request ready to be handled by an outlet SubFormatter }
HTTPFormatter is the interface that http outlets use to format a HTTP request.
func NewKinesisFormatter(b Batch, eData []errData, config *Config) HTTPFormatter
NewKinesisFormatter constructs a proper HTTPFormatter for Kinesis http targets
func NewLogplexBatchFormatter(b Batch, eData []errData, config *Config) HTTPFormatter
NewLogplexBatchFormatter returns a new LogplexBatchFormatter wrapping the provided batch
type HTTPOutlet struct { // User supplied loggers Logger *log.Logger // contains filtered or unexported fields }
HTTPOutlet handles delivery of batches to HTTP endpoints by creating formatters for each request. HTTPOutlets handle retries, response parsing and lost counters
func NewHTTPOutlet(s *Shuttle) *HTTPOutlet
NewHTTPOutlet returns a properly constructed HTTPOutlet for the given shuttle
func (h *HTTPOutlet) Outlet()
Outlet receives batches from the inbox and submits them to logplex via HTTP.
KinesisFormatter formats batches destined for AWS Kinesis HTTP endpoints Kinesis has a very small payload side, so recommend setting config.BatchSize in the 1-3 range so as to not loose logs because we go over the batch size. Kinesis formats the Data using the LogplexLineFormatter, which is additionally base64 encoded.
func (kf *KinesisFormatter) MsgCount() int
MsgCount returns the number of records that the formatter is formatting
func (kf *KinesisFormatter) Request() (*http.Request, error)
Request constructs a request for this formatter See: http://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html
type KinesisRecord struct {
// contains filtered or unexported fields
}
KinesisRecord is used to marshal LoglexLineFormatters to Kinesis Records for the PutRecords API Call
WriteTo writes the LogplexLineFormatter to the provided writer in Kinesis' PutRecordsFormat. Conforms to the WriterTo interface.
type LogLine struct {
// contains filtered or unexported fields
}
LogLine holds the new line terminated log messages and when shuttle received them.
Length returns the length of the raw byte of the LogLine
type LogLineReader struct {
// contains filtered or unexported fields
}
LogLineReader performs the reading of lines from an io.ReadCloser, encapsulating lines into a LogLine and emitting them on outbox
func NewLogLineReader(input io.ReadCloser, s *Shuttle) *LogLineReader
NewLogLineReader constructs a new reader with it's own Outbox.
func (rdr *LogLineReader) Close() error
Close the reader for input
func (rdr *LogLineReader) ReadLines() error
ReadLines from the input created for. Return any errors blocks until the underlying reader is closed
LogplexBatchFormatter implements on io.Reader that returns Logplex formatted log lines. Wraps log lines in length prefixed rfc5424 formatting, splitting them as necessary to config.MaxLineLength
func (bf *LogplexBatchFormatter) MsgCount() int
MsgCount of the wrapped batch.
func (bf *LogplexBatchFormatter) Request() (*http.Request, error)
Request returns a properly constructed *http.Request, complete with headers and ContentLength set.
type LogplexLineFormatter struct {
// contains filtered or unexported fields
}
LogplexLineFormatter formats individual loglines into length prefixed rfc5424 messages via an io.Reader interface
func NewLogplexErrorFormatter(err errData, config *Config) *LogplexLineFormatter
NewLogplexErrorFormatter returns a LogplexLineFormatter for the error data. These can be used to inject error data into the log stream
func NewLogplexLineFormatter(ll LogLine, config *Config) *LogplexLineFormatter
NewLogplexLineFormatter returns a new LogplexLineFormatter wrapping the provided LogLine
func (llf *LogplexLineFormatter) AppName() string
AppName returns the name of app name field based on the inputFormat For use in syslog framing
func (llf *LogplexLineFormatter) MsgCount() int
MsgCount is always 1 for a Line
func (llf *LogplexLineFormatter) Read(p []byte) (n int, err error)
Implements the io.Reader interface tries to fill p as full as possible before returning
func (llf *LogplexLineFormatter) Reset()
Reset the reader so that the log line can be re-read
type NewHTTPFormatterFunc func(b Batch, eData []errData, config *Config) HTTPFormatter
NewHTTPFormatterFunc defines the function type for defining creating and returning a new Formatter
type Shuttle struct { LogLineReader Batches chan Batch MetricsRegistry metrics.Registry Drops, Lost *Counter NewFormatterFunc NewHTTPFormatterFunc Logger *log.Logger ErrLogger *log.Logger // contains filtered or unexported fields }
Shuttle is the main entry point into the library
Code:
config := NewConfig() // Modulate the config as needed before creating a new shuttle s := NewShuttle(config) s.LoadReader(os.Stdin) s.Launch() // Start up the batching/delivering go routines s.Land() // Spin down the batching/delivering go routines
NewShuttle returns a properly constructed Shuttle with a given config
CloseReaders closes all tracked readers and returns any errors returned by Close()ing the readers
DockReaders closes all tracked readers and waits for all reading go routines to finish.
Land gracefully terminates the shuttle instance, ensuring that anything read is batched and delivered. A panic is likely to happen if Land() is called before any readers passed to any ReadLogLines() calls aren't closed.
Launch a shuttle by spawing it's outlets and batchers (in that order), which is the reverse of shutdown.
func (s *Shuttle) LoadReader(rdr io.ReadCloser)
LoadReader into the shuttle for processing it's lines. Use this if you want log-shuttle to track the readers for you. The errors returned by ReadLogLines are discarded.
WaitForReadersToFinish to finish reading
type SubFormatter interface { MsgCount() int // MsgCount is the number of messages after formatting io.Reader }
SubFormatter formats a complete batch or a subsection of a batch. It may split lines in the batch as needed by the destination, making the MsgCount() of the formatter different from the MsgCount of the source batch. A formatter may emitt more (likely) or less bytes for a given LogLine than the actual Logline.
Package shuttle imports 19 packages (graph) and is imported by 2 packages. Updated 2018-03-06. Refresh now. Tools for package owners.