base

package
v0.0.0-...-f132d4f Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 2, 2023 License: MIT Imports: 13 Imported by: 0

Documentation

Overview

Package base defines data types of log processing and interfaces of log processing steps, for example LogRecord struct and LogTransform interface

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewSinkLogger

func NewSinkLogger(parentLogger logger.Logger, clientAddress string, clientNumber ClientNumber) logger.Logger

NewSinkLogger creates a derived logger for sinks created from MultiSinkMessageReceiver or MultiSinkBufferReceiver

Types

type BufferReceiverSink

type BufferReceiverSink interface {
	// Accept receives a buffer of log records from input
	//
	// The buffer is NOT usable after the function exits
	Accept(buffer []*LogRecord)

	// Tick is called periodically for custom operations
	//
	// Tick is guaranteed to be called periodically, though interval can vary and be inaccurate
	Tick()

	// Close is called when the source feeding this sink is ended
	Close()
}

BufferReceiverSink under MultiSinkBufferReceiver receives buffered logs from a single source, e.g. a client TCP connection

type ChunkBufferer

type ChunkBufferer interface {
	PipelineWorker

	// RegisterNewConsumer registers a new consumer to be launched and returns args for its construction
	RegisterNewConsumer() ChunkConsumerArgs

	// Accept accepts incoming log chunks
	Accept(chunk LogChunk)

	// Destroy destroys the buffer, saves all remaining chunks in channels and waiting for consumers to finish
	Destroy()
}

ChunkBufferer is a worker to buffer completed chunks in memory and/or persistent storage Accept can be called concurrently by different goroutines It is to the bufferer to process any chunks left in input or output channels during shutdown

type ChunkConsumer

type ChunkConsumer interface {
	PipelineWorker
}

ChunkConsumer is a worker to consume buffered chunks for forwarding or else.

A consumer should be created with ChunkConsumerArgs as input.

It should initiate shutdown by the end of InputChannel or the InputClosed signal, and never attempt to read any leftover chunk from InputChannel once it's closed.

type ChunkConsumerArgs

type ChunkConsumerArgs struct {
	InputChannel    <-chan LogChunk      // channel of fully loaded chunks to consume
	InputClosed     channels.Awaitable   // signal when input channel is closed, in case consumer is not waiting on input
	OnChunkConsumed func(chunk LogChunk) // to be called when a chunk is consumed / committed
	OnChunkLeftover func(chunk LogChunk) // to be called when a chunk is left unconsumed at the end
	OnFinished      func()               // to be called after the consumer ends
}

ChunkConsumerArgs is the parameters to create a ChunkConsumer For any chunk, either OnChunkConsumed or OnChunkSkipped must be called

type ChunkConsumerOverrideCreator

type ChunkConsumerOverrideCreator func(parentLogger logger.Logger, name string, decoder ChunkDecoder, args ChunkConsumerArgs) ChunkConsumer

ChunkConsumerOverrideCreator defines a function to construct a ChunkConsumer override for testing purposes.

It takes the name of the specific output, a decoder for the output type, and consumer args.

An override is created per output (to support multi-output).

type ChunkDecoder

type ChunkDecoder interface {
	DecodeChunkToJSON(chunk LogChunk, separator []byte, indented bool, writer io.Writer) (LogChunkInfo, error)
}

ChunkDecoder provides an interface to verify and decode resulting chunks. Used for testing and internal verifications.

type ClientNumber

type ClientNumber uint

ClientNumber uniquely identifies a currently connected client from one of the inputs

It's currently set to the FD of incoming connections

TODO: use something other than FD in future for inputs not based on FD

const MaxClientNumber ClientNumber = 262144

MaxClientNumber represents the max value of client number passed to MultiSinkMessageReceiver.NewSink

The value affects how many clients we can have at the same time. It should be theorically unreachable.

The value must be a constant for fix-sized arrays to be declared.

type FieldSetExtractor

type FieldSetExtractor struct {
	// contains filtered or unexported fields
}

FieldSetExtractor extracts values from log records by pre-defined set of fields

func NewFieldSetExtractor

func NewFieldSetExtractor(locators []LogFieldLocator) *FieldSetExtractor

NewFieldSetExtractor creates a FieldSetExtractor

func (*FieldSetExtractor) Extract

func (ex *FieldSetExtractor) Extract(record *LogRecord) []string

Extract extracts field set from the given log record and returns transient field values

Returned slices and values are only usable until next call. They MUST be copied for storing.

type FilterResult

type FilterResult bool

FilterResult defines the result of filtering, pass (true) or drop (false)

const DROP FilterResult = false

DROP means transform aborts and the record is to be dropped

const PASS FilterResult = true

PASS means transform succeeds (whether there is change or not)

type LogAllocator

type LogAllocator struct {
	// contains filtered or unexported fields
}

LogAllocator allocates empty log records and backing buffers Local-cache with buffers of recycled logs has been tried and made minimal improvement

func NewLogAllocator

func NewLogAllocator(schema LogSchema, outputCount int) *LogAllocator

NewLogAllocator creates LogAllocator linked to the given schema

func (*LogAllocator) NewRecord

func (alloc *LogAllocator) NewRecord(input []byte) (*LogRecord, util.MutableString)

NewRecord creates new record of empty values

func (*LogAllocator) Release

func (alloc *LogAllocator) Release(record *LogRecord)

Release releases this log record for recycling

type LogChunk

type LogChunk struct {
	ID    string // Unique ID of this chunk, may be used as filename
	Data  []byte // Actual data of this chunk, transformed from LogStream
	Saved bool   // true if saved on disk already
}

LogChunk represents a chunk of log records serialized and ready for storage or transport as its own unit

func (LogChunk) String

func (chunk LogChunk) String() string

type LogChunkAccepter

type LogChunkAccepter func(chunk LogChunk)

LogChunkAccepter is a function which accepts completed and loaded chunks for buffering or saving

type LogChunkInfo

type LogChunkInfo struct {
	Tag        string
	NumRecords int
}

LogChunkInfo contains information that can be extracted from a chunk binary for test purposes

type LogChunkMaker

type LogChunkMaker interface {
	// WriteStream packages the given log stream and optionally returns a completed chunk
	WriteStream(stream LogStream) *LogChunk

	// FlushBuffer packages buffered log stream(s) into a chunk if there is any
	FlushBuffer() *LogChunk
}

LogChunkMaker packages serialized log streams into certain chunk format for storage or transport, e.g. fluentd forward message One chunk may come from more than one stream joined together

type LogCustomCounterRegistry

type LogCustomCounterRegistry interface {
	RegisterCustomCounter(label string) func(length int)
}

LogCustomCounterRegistry allows registration of custom record counters by label

RegisterCustomCounter returns a function to be called to count record length

type LogFieldLocator

type LogFieldLocator int

LogFieldLocator is used to locate a named field in LogRecord, bound to a LogSchema

const MissingFieldLocator LogFieldLocator = -1

MissingFieldLocator represents non-existing index to a log field

func (LogFieldLocator) Del

func (loc LogFieldLocator) Del(fields []string)

Del resets the field value to empty string

func (LogFieldLocator) Get

func (loc LogFieldLocator) Get(fields []string) util.MutableString

Get returns the field value or empty string

func (LogFieldLocator) Name

func (loc LogFieldLocator) Name(schema LogSchema) string

Name returns the field name

func (LogFieldLocator) Set

func (loc LogFieldLocator) Set(fields []string, value util.MutableString)

Set assigns the field value

type LogFields

type LogFields []util.MutableString

LogFields represents named fields in LogRecord, to be used with LogSchema.

Fields are by default empty strings and empty fields are the same as missing fields, which should be excluded from output.

Some fields at the end of this slice may be reserved by schema.MaxFields and they shouldn't be processed.

type LogInput

type LogInput interface {
	PipelineWorker
	Address() string // Bound/assigned address if applicable, e.g. 127.0.0.1:65531
}

LogInput represents an input source in the beginning of pipeline, e.g. a TCP/Syslog input It integrates endpoint/listener, parser and necessary steps to construct a raw base.LogRecord

type LogInputCounterSet

type LogInputCounterSet struct {
	// contains filtered or unexported fields
}

LogInputCounterSet tracks metrics for incoming logs

LogInputCounterSet must be accessed through pointer. It's not concurrently usable. Counter-vectors and counters created here may duplicate with others, as long as the labels match.

func NewLogInputCounter

func NewLogInputCounter(metricCreator promreg.MetricCreator) *LogInputCounterSet

NewLogInputCounter creates a LogInputCounter

func (*LogInputCounterSet) CountRecordDrop

func (icounter *LogInputCounterSet) CountRecordDrop(record *LogRecord)

CountRecordDrop updates counters for log record dropping

func (*LogInputCounterSet) CountRecordPass

func (icounter *LogInputCounterSet) CountRecordPass(record *LogRecord)

CountRecordPass updates counters for log record passing

func (*LogInputCounterSet) RegisterCustomCounter

func (host *LogInputCounterSet) RegisterCustomCounter(label string) func(length int)

RegisterCustomCounter registers a counter by label and count/length pointers

func (*LogInputCounterSet) UpdateMetrics

func (icounter *LogInputCounterSet) UpdateMetrics()

UpdateMetrics writes unwritten values in the counter to underlying Prometheus counters

type LogListener

type LogListener interface {
	PipelineWorker
}

LogListener represents an input endpoint for logs, e.g. a file listener or TCP listener

Due to data complexity, the output is to be received by a MultiSinkMessageReceiver passed during construction

LogListener always works in background as one or more goroutines

type LogParser

type LogParser interface {
	Parse(input []byte, timestamp time.Time) *LogRecord
}

LogParser parses incoming logs from LogListener to structured records one by one

type LogProcessCounterSet

type LogProcessCounterSet struct {
	// contains filtered or unexported fields
}

LogProcessCounterSet tracks metrics for log transform, serialization and chunk making.

LogProcessCounterSet must be accessed through pointer. It's not concurrently usable. Counter-vectors and counters created here may duplicate with others, as long as the labels match.

It tracks counters by metric keys (ex: vhost+source) that are not part of orchestration keys (ex: level), by creating a fixed-length array containing counters for each of key-set. The positions of final counters are decided during registration process.

LogInputCounter's own custom counter registry is ignored here, as map access per counter update would be very inefficient.

func NewLogProcessCounter

func NewLogProcessCounter(factory promreg.MetricCreator, schema LogSchema, keyLocators []LogFieldLocator, outputNames []string) *LogProcessCounterSet

NewLogProcessCounter creates a LogProcessCounter

func (*LogProcessCounterSet) CountChunk

func (pcounter *LogProcessCounterSet) CountChunk(outputIndex int, chunk *LogChunk)

CountChunk updates counters for chunk generation

func (*LogProcessCounterSet) CountStream

func (pcounter *LogProcessCounterSet) CountStream(outputIndex int, stream LogStream)

CountStream updates counters for stream serialization

func (*LogProcessCounterSet) RegisterCustomCounter

func (pcounter *LogProcessCounterSet) RegisterCustomCounter(label string) func(length int)

RegisterCustomCounter registers a custom counter by label and count/length pointers

This method must not be called in processing stage, when counters are already being selected and updated

func (*LogProcessCounterSet) SelectMetricKeySet

func (pcounter *LogProcessCounterSet) SelectMetricKeySet(record *LogRecord) *LogInputCounterSet

SelectMetricKeySet switches the current metric key set to that of the given record.

1. Subsequent transforms would write counter values to the correct key-set.

2. Returns an input counter for that key-set.

func (*LogProcessCounterSet) UpdateMetrics

func (pcounter *LogProcessCounterSet) UpdateMetrics()

UpdateMetrics writes unwritten values in the counter to underlying Prometheus counters

type LogRecord

type LogRecord struct {
	Fields    LogFields // Field values by index and empty string if unset. The string values inside are temporary and only valid until record is released.
	RawLength int       // Input length or approximated length of entire record, for statistics
	Timestamp time.Time // Timestamp, might be zero until processed by a LogTransform
	Unescaped bool      // Whether the main message field has been un-escaped. Multi-line logs start with true.
	// contains filtered or unexported fields
}

LogRecord defines the structure of log record before it's finalized for forwarding.

type LogRewriter

type LogRewriter interface {
	// MaxFieldLength returns the maximum possible length of the rewritten value
	MaxFieldLength(value string, record *LogRecord) int

	// WriteFieldBody writes field value to the beginning of given buffer and returns the end position
	WriteFieldBody(value string, record *LogRecord, buffer []byte) int
}

LogRewriter rewrites the value of specific field during log serialization.

It's a special form of LogTransform, per-field and without filtering, meant to reduce heap allocations. For example, inlining of component names to the beginning of log messages can be re-implemented there, writing directly to the last buffer for serialized log records without any costly string concentration.

Each LogRewriter takes the field value, the record and the next LogRewriter.

type LogSchema

type LogSchema struct {
	OnLocated func(index int) // optional callback invoked after successful CreateFieldLocator calls
	// contains filtered or unexported fields
}

LogSchema defines the field names and mark fields which should belong to "environment".

In case of runtime schema update, only new fields should be appended at the end.

func MustNewLogSchema

func MustNewLogSchema(fieldNames []string) LogSchema

MustNewLogSchema creates a new LogSchema or panic.

func NewLogSchema

func NewLogSchema(fieldNames []string, maxFields int) (LogSchema, error)

NewLogSchema creates a new LogSchema with field names and environment field names.

func (*LogSchema) CopyTestRecord

func (s *LogSchema) CopyTestRecord(source *LogRecord) *LogRecord

CopyTestRecord makes a deep copy of given record

func (*LogSchema) CreateFieldLocator

func (s *LogSchema) CreateFieldLocator(name string) (LogFieldLocator, error)

CreateFieldLocator creates a LogFieldLocator by field name

func (*LogSchema) CreateFieldLocators

func (s *LogSchema) CreateFieldLocators(names []string) ([]LogFieldLocator, error)

CreateFieldLocators creates LogFieldLocator(s) for field names

func (*LogSchema) CreateTemplateVariableResolver

func (s *LogSchema) CreateTemplateVariableResolver(name string) (stringtemplate.PartProvider, error)

CreateTemplateVariableResolver creates a variable resolver by field name, to be used with stringtemplate.StringTemplate

func (*LogSchema) GetFieldNames

func (s *LogSchema) GetFieldNames() []string

GetFieldNames returns all the field names in the same order

func (*LogSchema) GetMaxFields

func (s *LogSchema) GetMaxFields() int

GetMaxFields returns the maximum number of fields

Max is equal or greater than the numbers of actual fields for reservation

func (*LogSchema) MustCreateFieldLocator

func (s *LogSchema) MustCreateFieldLocator(name string) LogFieldLocator

MustCreateFieldLocator creates LogFieldLocator by field name or panic (if field doesn't exist in schema)

func (*LogSchema) MustCreateFieldLocators

func (s *LogSchema) MustCreateFieldLocators(names []string) []LogFieldLocator

MustCreateFieldLocators creates LogFieldLocators for field names or panic (if a field doesn't exist in schema)

func (*LogSchema) NewTestRecord1

func (s *LogSchema) NewTestRecord1(fields LogFields) *LogRecord

NewTestRecord1 creates new record with initial field values for testing

func (*LogSchema) NewTestRecord2

func (s *LogSchema) NewTestRecord2(tm time.Time, fields LogFields) *LogRecord

NewTestRecord2 creates new record with initial timestamp and field values for testing

type LogSerializer

type LogSerializer interface {
	// SerializeRecord serializes the given log record into a log stream
	// Input records should be released in the call
	// Output LogStream is transient and only usable before the next call
	SerializeRecord(record *LogRecord) LogStream
}

LogSerializer serializes log records into a stream of certain format, e.g. JSON or msgpack

type LogStream

type LogStream []byte

LogStream represents serialized log record. Each LogStream should contain exactly one record. The data should be uncompressed and temporary

type LogTransform

type LogTransform interface {
	// Transform transforms the given record and returns PASS or DROP in addition to the new log record
	// The returned record may share children with the input, modified in-place.
	// The input record shall not be used afterwards
	Transform(input *LogRecord) FilterResult
}

LogTransform filters and/or transforms logs one by one May have persistent states

type LogTransformFunc

type LogTransformFunc func(input *LogRecord) FilterResult

LogTransformFunc defines a function to perform transformation on a single log record

type MessageReceiverSink

type MessageReceiverSink interface {
	// Accept receives a raw message from input
	//
	// The message slice is NOT usable after the function exits
	Accept(message []byte)

	// Flush is called periodically for custom operations
	//
	// Flush is guaranteed to be called periodically, though interval can vary and be inaccurate
	//
	// Flush is also to be called right before Close(), after all Accept() calls
	Flush()

	// Close is called when the source feeding this sink is ended
	Close()
}

MessageReceiverSink receives raw log messages from a single source, e.g. a client TCP connection

type MultiSinkBufferReceiver

type MultiSinkBufferReceiver interface {
	// NewSink creates a sink to receive logs from the input identified by the given id
	//
	// clientAddress is a descriptive string of address, e.g. "10.1.0.1:50001"
	//
	// clientNumber identifies a currently connected client from one of the inputs, e.g. socket FD.
	NewSink(clientAddress string, clientNumber ClientNumber) BufferReceiverSink
}

MultiSinkBufferReceiver receives buffered logs from a multi-source input, e.g. a TCP with different incoming connections

For an ordinary TCP input, there is a single MultiSinkBufferReceiver, and one BufferReceiverSink for each connection

The only production implementations are Orchestrator(s)

type MultiSinkMessageReceiver

type MultiSinkMessageReceiver interface {
	// NewSink creates a sink to receive raw messages from the input source identified by the given id or client number
	//
	// clientAddress is a descriptive string of address, e.g. "10.1.0.1:50001"
	//
	// clientNumber identifies a currently connected client from one of the inputs, e.g. socket FD.
	NewSink(clientAddress string, clientNumber ClientNumber) MessageReceiverSink
}

MultiSinkMessageReceiver receives raw log messages from a multi-source input, e.g. a TCP listener with different incoming connections

For an ordinary TCP input, there is a single MultiSinkMessageReceiver, and one MessageReceiverSink for each connection

The only production implementation is LogParsingReceiver

type Orchestrator

type Orchestrator interface {
	MultiSinkBufferReceiver

	// Shutdown performs cleanup; should be called after all inputs have been stopped
	Shutdown()
}

Orchestrator takes log records and distribute them to internal pipelines

type PipelineWorker

type PipelineWorker interface {
	Start()
	Stopped() channels.Awaitable
}

PipelineWorker represents a background worker in a stage of the processing pipeline, e.g. a parser or transformer

Directories

Path Synopsis
Package bconfig provides configuration interfaces for log processing units and factory mechanism
Package bconfig provides configuration interfaces for log processing units and factory mechanism
Package bmatch provides log matchers used for filtering and transforms
Package bmatch provides log matchers used for filtering and transforms
Package bsupport provides helpers for log processing and abstract types for implementations,
Package bsupport provides helpers for log processing and abstract types for implementations,
Package btest provides test utilities and stubs of interfaces inside the base package
Package btest provides test utilities and stubs of interfaces inside the base package

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL