Documentation ¶
Overview ¶
Package base defines data types of log processing and interfaces of log processing steps, for example LogRecord struct and LogTransform interface
Index ¶
- func NewSinkLogger(parentLogger logger.Logger, clientAddress string, clientNumber ClientNumber) logger.Logger
- type BufferReceiverSink
- type ChunkBufferer
- type ChunkConsumer
- type ChunkConsumerArgs
- type ChunkConsumerOverrideCreator
- type ChunkDecoder
- type ClientNumber
- type FieldSetExtractor
- type FilterResult
- type LogAllocator
- type LogChunk
- type LogChunkAccepter
- type LogChunkInfo
- type LogChunkMaker
- type LogCustomCounterRegistry
- type LogFieldLocator
- type LogFields
- type LogInput
- type LogInputCounterSet
- type LogListener
- type LogParser
- type LogProcessCounterSet
- func (pcounter *LogProcessCounterSet) CountChunk(outputIndex int, chunk *LogChunk)
- func (pcounter *LogProcessCounterSet) CountStream(outputIndex int, stream LogStream)
- func (pcounter *LogProcessCounterSet) RegisterCustomCounter(label string) func(length int)
- func (pcounter *LogProcessCounterSet) SelectMetricKeySet(record *LogRecord) *LogInputCounterSet
- func (pcounter *LogProcessCounterSet) UpdateMetrics()
- type LogRecord
- type LogRewriter
- type LogSchema
- func (s *LogSchema) CopyTestRecord(source *LogRecord) *LogRecord
- func (s *LogSchema) CreateFieldLocator(name string) (LogFieldLocator, error)
- func (s *LogSchema) CreateFieldLocators(names []string) ([]LogFieldLocator, error)
- func (s *LogSchema) CreateTemplateVariableResolver(name string) (stringtemplate.PartProvider, error)
- func (s *LogSchema) GetFieldNames() []string
- func (s *LogSchema) GetMaxFields() int
- func (s *LogSchema) MustCreateFieldLocator(name string) LogFieldLocator
- func (s *LogSchema) MustCreateFieldLocators(names []string) []LogFieldLocator
- func (s *LogSchema) NewTestRecord1(fields LogFields) *LogRecord
- func (s *LogSchema) NewTestRecord2(tm time.Time, fields LogFields) *LogRecord
- type LogSerializer
- type LogStream
- type LogTransform
- type LogTransformFunc
- type MessageReceiverSink
- type MultiSinkBufferReceiver
- type MultiSinkMessageReceiver
- type Orchestrator
- type PipelineWorker
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func NewSinkLogger ¶
func NewSinkLogger(parentLogger logger.Logger, clientAddress string, clientNumber ClientNumber) logger.Logger
NewSinkLogger creates a derived logger for sinks created from MultiSinkMessageReceiver or MultiSinkBufferReceiver
Types ¶
type BufferReceiverSink ¶
type BufferReceiverSink interface { // Accept receives a buffer of log records from input // // The buffer is NOT usable after the function exits Accept(buffer []*LogRecord) // Tick is called periodically for custom operations // // Tick is guaranteed to be called periodically, though interval can vary and be inaccurate Tick() // Close is called when the source feeding this sink is ended Close() }
BufferReceiverSink under MultiSinkBufferReceiver receives buffered logs from a single source, e.g. a client TCP connection
type ChunkBufferer ¶
type ChunkBufferer interface { PipelineWorker // RegisterNewConsumer registers a new consumer to be launched and returns args for its construction RegisterNewConsumer() ChunkConsumerArgs // Accept accepts incoming log chunks Accept(chunk LogChunk) // Destroy destroys the buffer, saves all remaining chunks in channels and waiting for consumers to finish Destroy() }
ChunkBufferer is a worker to buffer completed chunks in memory and/or persistent storage Accept can be called concurrently by different goroutines It is to the bufferer to process any chunks left in input or output channels during shutdown
type ChunkConsumer ¶
type ChunkConsumer interface { PipelineWorker }
ChunkConsumer is a worker to consume buffered chunks for forwarding or else.
A consumer should be created with ChunkConsumerArgs as input.
It should initiate shutdown by the end of InputChannel or the InputClosed signal, and never attempt to read any leftover chunk from InputChannel once it's closed.
type ChunkConsumerArgs ¶
type ChunkConsumerArgs struct { InputChannel <-chan LogChunk // channel of fully loaded chunks to consume InputClosed channels.Awaitable // signal when input channel is closed, in case consumer is not waiting on input OnChunkConsumed func(chunk LogChunk) // to be called when a chunk is consumed / committed OnChunkLeftover func(chunk LogChunk) // to be called when a chunk is left unconsumed at the end OnFinished func() // to be called after the consumer ends }
ChunkConsumerArgs is the parameters to create a ChunkConsumer For any chunk, either OnChunkConsumed or OnChunkSkipped must be called
type ChunkConsumerOverrideCreator ¶
type ChunkConsumerOverrideCreator func(parentLogger logger.Logger, name string, decoder ChunkDecoder, args ChunkConsumerArgs) ChunkConsumer
ChunkConsumerOverrideCreator defines a function to construct a ChunkConsumer override for testing purposes.
It takes the name of the specific output, a decoder for the output type, and consumer args.
An override is created per output (to support multi-output).
type ChunkDecoder ¶
type ChunkDecoder interface {
DecodeChunkToJSON(chunk LogChunk, separator []byte, indented bool, writer io.Writer) (LogChunkInfo, error)
}
ChunkDecoder provides an interface to verify and decode resulting chunks. Used for testing and internal verifications.
type ClientNumber ¶
type ClientNumber uint
ClientNumber uniquely identifies a currently connected client from one of the inputs
It's currently set to the FD of incoming connections ¶
TODO: use something other than FD in future for inputs not based on FD
const MaxClientNumber ClientNumber = 262144
MaxClientNumber represents the max value of client number passed to MultiSinkMessageReceiver.NewSink
The value affects how many clients we can have at the same time. It should be theorically unreachable.
The value must be a constant for fix-sized arrays to be declared.
type FieldSetExtractor ¶
type FieldSetExtractor struct {
// contains filtered or unexported fields
}
FieldSetExtractor extracts values from log records by pre-defined set of fields
func NewFieldSetExtractor ¶
func NewFieldSetExtractor(locators []LogFieldLocator) *FieldSetExtractor
NewFieldSetExtractor creates a FieldSetExtractor
func (*FieldSetExtractor) Extract ¶
func (ex *FieldSetExtractor) Extract(record *LogRecord) []string
Extract extracts field set from the given log record and returns transient field values
Returned slices and values are only usable until next call. They MUST be copied for storing.
type FilterResult ¶
type FilterResult bool
FilterResult defines the result of filtering, pass (true) or drop (false)
const DROP FilterResult = false
DROP means transform aborts and the record is to be dropped
const PASS FilterResult = true
PASS means transform succeeds (whether there is change or not)
type LogAllocator ¶
type LogAllocator struct {
// contains filtered or unexported fields
}
LogAllocator allocates empty log records and backing buffers Local-cache with buffers of recycled logs has been tried and made minimal improvement
func NewLogAllocator ¶
func NewLogAllocator(schema LogSchema, outputCount int) *LogAllocator
NewLogAllocator creates LogAllocator linked to the given schema
func (*LogAllocator) NewRecord ¶
func (alloc *LogAllocator) NewRecord(input []byte) (*LogRecord, util.MutableString)
NewRecord creates new record of empty values
func (*LogAllocator) Release ¶
func (alloc *LogAllocator) Release(record *LogRecord)
Release releases this log record for recycling
type LogChunk ¶
type LogChunk struct { ID string // Unique ID of this chunk, may be used as filename Data []byte // Actual data of this chunk, transformed from LogStream Saved bool // true if saved on disk already }
LogChunk represents a chunk of log records serialized and ready for storage or transport as its own unit
type LogChunkAccepter ¶
type LogChunkAccepter func(chunk LogChunk)
LogChunkAccepter is a function which accepts completed and loaded chunks for buffering or saving
type LogChunkInfo ¶
LogChunkInfo contains information that can be extracted from a chunk binary for test purposes
type LogChunkMaker ¶
type LogChunkMaker interface { // WriteStream packages the given log stream and optionally returns a completed chunk WriteStream(stream LogStream) *LogChunk // FlushBuffer packages buffered log stream(s) into a chunk if there is any FlushBuffer() *LogChunk }
LogChunkMaker packages serialized log streams into certain chunk format for storage or transport, e.g. fluentd forward message One chunk may come from more than one stream joined together
type LogCustomCounterRegistry ¶
LogCustomCounterRegistry allows registration of custom record counters by label
RegisterCustomCounter returns a function to be called to count record length
type LogFieldLocator ¶
type LogFieldLocator int
LogFieldLocator is used to locate a named field in LogRecord, bound to a LogSchema
const MissingFieldLocator LogFieldLocator = -1
MissingFieldLocator represents non-existing index to a log field
func (LogFieldLocator) Del ¶
func (loc LogFieldLocator) Del(fields []string)
Del resets the field value to empty string
func (LogFieldLocator) Get ¶
func (loc LogFieldLocator) Get(fields []string) util.MutableString
Get returns the field value or empty string
func (LogFieldLocator) Name ¶
func (loc LogFieldLocator) Name(schema LogSchema) string
Name returns the field name
func (LogFieldLocator) Set ¶
func (loc LogFieldLocator) Set(fields []string, value util.MutableString)
Set assigns the field value
type LogFields ¶
type LogFields []util.MutableString
LogFields represents named fields in LogRecord, to be used with LogSchema.
Fields are by default empty strings and empty fields are the same as missing fields, which should be excluded from output.
Some fields at the end of this slice may be reserved by schema.MaxFields and they shouldn't be processed.
type LogInput ¶
type LogInput interface { PipelineWorker Address() string // Bound/assigned address if applicable, e.g. 127.0.0.1:65531 }
LogInput represents an input source in the beginning of pipeline, e.g. a TCP/Syslog input It integrates endpoint/listener, parser and necessary steps to construct a raw base.LogRecord
type LogInputCounterSet ¶
type LogInputCounterSet struct {
// contains filtered or unexported fields
}
LogInputCounterSet tracks metrics for incoming logs
LogInputCounterSet must be accessed through pointer. It's not concurrently usable. Counter-vectors and counters created here may duplicate with others, as long as the labels match.
func NewLogInputCounter ¶
func NewLogInputCounter(metricCreator promreg.MetricCreator) *LogInputCounterSet
NewLogInputCounter creates a LogInputCounter
func (*LogInputCounterSet) CountRecordDrop ¶
func (icounter *LogInputCounterSet) CountRecordDrop(record *LogRecord)
CountRecordDrop updates counters for log record dropping
func (*LogInputCounterSet) CountRecordPass ¶
func (icounter *LogInputCounterSet) CountRecordPass(record *LogRecord)
CountRecordPass updates counters for log record passing
func (*LogInputCounterSet) RegisterCustomCounter ¶
RegisterCustomCounter registers a counter by label and count/length pointers
func (*LogInputCounterSet) UpdateMetrics ¶
func (icounter *LogInputCounterSet) UpdateMetrics()
UpdateMetrics writes unwritten values in the counter to underlying Prometheus counters
type LogListener ¶
type LogListener interface { PipelineWorker }
LogListener represents an input endpoint for logs, e.g. a file listener or TCP listener
Due to data complexity, the output is to be received by a MultiSinkMessageReceiver passed during construction ¶
LogListener always works in background as one or more goroutines
type LogProcessCounterSet ¶
type LogProcessCounterSet struct {
// contains filtered or unexported fields
}
LogProcessCounterSet tracks metrics for log transform, serialization and chunk making.
LogProcessCounterSet must be accessed through pointer. It's not concurrently usable. Counter-vectors and counters created here may duplicate with others, as long as the labels match.
It tracks counters by metric keys (ex: vhost+source) that are not part of orchestration keys (ex: level), by creating a fixed-length array containing counters for each of key-set. The positions of final counters are decided during registration process.
LogInputCounter's own custom counter registry is ignored here, as map access per counter update would be very inefficient.
func NewLogProcessCounter ¶
func NewLogProcessCounter(factory promreg.MetricCreator, schema LogSchema, keyLocators []LogFieldLocator, outputNames []string) *LogProcessCounterSet
NewLogProcessCounter creates a LogProcessCounter
func (*LogProcessCounterSet) CountChunk ¶
func (pcounter *LogProcessCounterSet) CountChunk(outputIndex int, chunk *LogChunk)
CountChunk updates counters for chunk generation
func (*LogProcessCounterSet) CountStream ¶
func (pcounter *LogProcessCounterSet) CountStream(outputIndex int, stream LogStream)
CountStream updates counters for stream serialization
func (*LogProcessCounterSet) RegisterCustomCounter ¶
func (pcounter *LogProcessCounterSet) RegisterCustomCounter(label string) func(length int)
RegisterCustomCounter registers a custom counter by label and count/length pointers
This method must not be called in processing stage, when counters are already being selected and updated
func (*LogProcessCounterSet) SelectMetricKeySet ¶
func (pcounter *LogProcessCounterSet) SelectMetricKeySet(record *LogRecord) *LogInputCounterSet
SelectMetricKeySet switches the current metric key set to that of the given record.
1. Subsequent transforms would write counter values to the correct key-set.
2. Returns an input counter for that key-set.
func (*LogProcessCounterSet) UpdateMetrics ¶
func (pcounter *LogProcessCounterSet) UpdateMetrics()
UpdateMetrics writes unwritten values in the counter to underlying Prometheus counters
type LogRecord ¶
type LogRecord struct { Fields LogFields // Field values by index and empty string if unset. The string values inside are temporary and only valid until record is released. RawLength int // Input length or approximated length of entire record, for statistics Timestamp time.Time // Timestamp, might be zero until processed by a LogTransform Unescaped bool // Whether the main message field has been un-escaped. Multi-line logs start with true. // contains filtered or unexported fields }
LogRecord defines the structure of log record before it's finalized for forwarding.
type LogRewriter ¶
type LogRewriter interface { // MaxFieldLength returns the maximum possible length of the rewritten value MaxFieldLength(value string, record *LogRecord) int // WriteFieldBody writes field value to the beginning of given buffer and returns the end position WriteFieldBody(value string, record *LogRecord, buffer []byte) int }
LogRewriter rewrites the value of specific field during log serialization.
It's a special form of LogTransform, per-field and without filtering, meant to reduce heap allocations. For example, inlining of component names to the beginning of log messages can be re-implemented there, writing directly to the last buffer for serialized log records without any costly string concentration.
Each LogRewriter takes the field value, the record and the next LogRewriter.
type LogSchema ¶
type LogSchema struct { OnLocated func(index int) // optional callback invoked after successful CreateFieldLocator calls // contains filtered or unexported fields }
LogSchema defines the field names and mark fields which should belong to "environment".
In case of runtime schema update, only new fields should be appended at the end.
func MustNewLogSchema ¶
MustNewLogSchema creates a new LogSchema or panic.
func NewLogSchema ¶
NewLogSchema creates a new LogSchema with field names and environment field names.
func (*LogSchema) CopyTestRecord ¶
CopyTestRecord makes a deep copy of given record
func (*LogSchema) CreateFieldLocator ¶
func (s *LogSchema) CreateFieldLocator(name string) (LogFieldLocator, error)
CreateFieldLocator creates a LogFieldLocator by field name
func (*LogSchema) CreateFieldLocators ¶
func (s *LogSchema) CreateFieldLocators(names []string) ([]LogFieldLocator, error)
CreateFieldLocators creates LogFieldLocator(s) for field names
func (*LogSchema) CreateTemplateVariableResolver ¶
func (s *LogSchema) CreateTemplateVariableResolver(name string) (stringtemplate.PartProvider, error)
CreateTemplateVariableResolver creates a variable resolver by field name, to be used with stringtemplate.StringTemplate
func (*LogSchema) GetFieldNames ¶
GetFieldNames returns all the field names in the same order
func (*LogSchema) GetMaxFields ¶
GetMaxFields returns the maximum number of fields
Max is equal or greater than the numbers of actual fields for reservation
func (*LogSchema) MustCreateFieldLocator ¶
func (s *LogSchema) MustCreateFieldLocator(name string) LogFieldLocator
MustCreateFieldLocator creates LogFieldLocator by field name or panic (if field doesn't exist in schema)
func (*LogSchema) MustCreateFieldLocators ¶
func (s *LogSchema) MustCreateFieldLocators(names []string) []LogFieldLocator
MustCreateFieldLocators creates LogFieldLocators for field names or panic (if a field doesn't exist in schema)
func (*LogSchema) NewTestRecord1 ¶
NewTestRecord1 creates new record with initial field values for testing
type LogSerializer ¶
type LogSerializer interface { // SerializeRecord serializes the given log record into a log stream // Input records should be released in the call // Output LogStream is transient and only usable before the next call SerializeRecord(record *LogRecord) LogStream }
LogSerializer serializes log records into a stream of certain format, e.g. JSON or msgpack
type LogStream ¶
type LogStream []byte
LogStream represents serialized log record. Each LogStream should contain exactly one record. The data should be uncompressed and temporary
type LogTransform ¶
type LogTransform interface { // Transform transforms the given record and returns PASS or DROP in addition to the new log record // The returned record may share children with the input, modified in-place. // The input record shall not be used afterwards Transform(input *LogRecord) FilterResult }
LogTransform filters and/or transforms logs one by one May have persistent states
type LogTransformFunc ¶
type LogTransformFunc func(input *LogRecord) FilterResult
LogTransformFunc defines a function to perform transformation on a single log record
type MessageReceiverSink ¶
type MessageReceiverSink interface { // Accept receives a raw message from input // // The message slice is NOT usable after the function exits Accept(message []byte) // Flush is called periodically for custom operations // // Flush is guaranteed to be called periodically, though interval can vary and be inaccurate // // Flush is also to be called right before Close(), after all Accept() calls Flush() // Close is called when the source feeding this sink is ended Close() }
MessageReceiverSink receives raw log messages from a single source, e.g. a client TCP connection
type MultiSinkBufferReceiver ¶
type MultiSinkBufferReceiver interface { // NewSink creates a sink to receive logs from the input identified by the given id // // clientAddress is a descriptive string of address, e.g. "10.1.0.1:50001" // // clientNumber identifies a currently connected client from one of the inputs, e.g. socket FD. NewSink(clientAddress string, clientNumber ClientNumber) BufferReceiverSink }
MultiSinkBufferReceiver receives buffered logs from a multi-source input, e.g. a TCP with different incoming connections
For an ordinary TCP input, there is a single MultiSinkBufferReceiver, and one BufferReceiverSink for each connection ¶
The only production implementations are Orchestrator(s)
type MultiSinkMessageReceiver ¶
type MultiSinkMessageReceiver interface { // NewSink creates a sink to receive raw messages from the input source identified by the given id or client number // // clientAddress is a descriptive string of address, e.g. "10.1.0.1:50001" // // clientNumber identifies a currently connected client from one of the inputs, e.g. socket FD. NewSink(clientAddress string, clientNumber ClientNumber) MessageReceiverSink }
MultiSinkMessageReceiver receives raw log messages from a multi-source input, e.g. a TCP listener with different incoming connections
For an ordinary TCP input, there is a single MultiSinkMessageReceiver, and one MessageReceiverSink for each connection ¶
The only production implementation is LogParsingReceiver
type Orchestrator ¶
type Orchestrator interface { MultiSinkBufferReceiver // Shutdown performs cleanup; should be called after all inputs have been stopped Shutdown() }
Orchestrator takes log records and distribute them to internal pipelines
type PipelineWorker ¶
PipelineWorker represents a background worker in a stage of the processing pipeline, e.g. a parser or transformer
Source Files ¶
- base.go
- chunkbufferer.go
- chunkconsumer.go
- fieldsetextractor.go
- logallocator.go
- logchunk.go
- logchunkaccepter.go
- logchunkmaker.go
- logcustomcounterhost.go
- logcustomcounterinterface.go
- logfieldlocator.go
- loginput.go
- loginputcounterset.go
- loglistener.go
- logparser.go
- logprocesscounterset.go
- logrecord.go
- logrewriter.go
- logschema.go
- logserializer.go
- logstream.go
- logtransform.go
- multisinkbufferreceiver.go
- multisinkmessagereceiver.go
- orchestrator.go
- pipelineworker.go
Directories ¶
Path | Synopsis |
---|---|
Package bconfig provides configuration interfaces for log processing units and factory mechanism
|
Package bconfig provides configuration interfaces for log processing units and factory mechanism |
Package bmatch provides log matchers used for filtering and transforms
|
Package bmatch provides log matchers used for filtering and transforms |
Package bsupport provides helpers for log processing and abstract types for implementations,
|
Package bsupport provides helpers for log processing and abstract types for implementations, |
Package btest provides test utilities and stubs of interfaces inside the base package
|
Package btest provides test utilities and stubs of interfaces inside the base package |