Documentation ¶
Index ¶
- Constants
- Variables
- func AddAggregatorCreator(name string, creator AggregatorCreator)
- func AddExtensionCreator(name string, creator ExtensionCreator)
- func AddFlusherCreator(name string, creator FlusherCreator)
- func AddMetricCreator(name string, creator MetricCreator)
- func AddProcessorCreator(name string, creator ProcessorCreator)
- func AddServiceCreator(name string, creator ServiceCreator)
- type Aggregator
- type AggregatorCreator
- type AggregatorV1
- type AggregatorV2
- type AsyncControl
- type Collector
- type CommonContext
- type Context
- type CounterMetric
- type Extension
- type ExtensionCreator
- type Flusher
- type FlusherCreator
- type FlusherV1
- type FlusherV2
- type LatencyMetric
- type LogGroupQueue
- type LogGroupWithContext
- type LogWithContext
- type MetricCreator
- type MetricInput
- type MetricInputV1
- type MetricInputV2
- type PipelineCollector
- type PipelineContext
- type Processor
- type ProcessorCreator
- type ProcessorV1
- type ProcessorV2
- type ServiceCreator
- type ServiceInput
- type ServiceInputV1
- type ServiceInputV2
- type StringMetric
Constants ¶
const ( MetricInputType = iota ServiceInputType = iota FilterType = iota ProcessorType = iota AggregatorType = iota )
logtail plugin type define
Variables ¶
var Aggregators = map[string]AggregatorCreator{}
var Extensions = map[string]ExtensionCreator{}
var Flushers = map[string]FlusherCreator{}
var MetricInputs = map[string]MetricCreator{}
var Processors = map[string]ProcessorCreator{}
var ServiceInputs = map[string]ServiceCreator{}
Functions ¶
func AddAggregatorCreator ¶
func AddAggregatorCreator(name string, creator AggregatorCreator)
func AddExtensionCreator ¶
func AddExtensionCreator(name string, creator ExtensionCreator)
func AddFlusherCreator ¶
func AddFlusherCreator(name string, creator FlusherCreator)
func AddMetricCreator ¶
func AddMetricCreator(name string, creator MetricCreator)
func AddProcessorCreator ¶
func AddProcessorCreator(name string, creator ProcessorCreator)
func AddServiceCreator ¶
func AddServiceCreator(name string, creator ServiceCreator)
Types ¶
type Aggregator ¶
type Aggregator interface { // Init called for init some system resources, like socket, mutex... // return flush interval(ms) and error flag, if interval is 0, use default interval Init(Context, LogGroupQueue) (int, error) // Description returns a one-sentence description on the Input. Description() string // Reset resets the aggregators caches and aggregates. Reset() }
Aggregator is an interface for implementing an Aggregator plugin. the RunningAggregator wraps this interface and guarantees that
type AggregatorCreator ¶
type AggregatorCreator func() Aggregator
type AggregatorV1 ¶
type AggregatorV1 interface { Aggregator // Add the metric to the aggregator. Add(log *protocol.Log, ctx map[string]interface{}) error // Flush pushes the current aggregates to the accumulator. Flush() []*protocol.LogGroup }
AggregatorV1 Add, Flush, and Reset can not be called concurrently, so locking is not required when implementing an Aggregator plugin.
type AggregatorV2 ¶
type AggregatorV2 interface { Aggregator // Add the metric to the aggregator. Record(*models.PipelineGroupEvents, PipelineContext) error // GetResult the current aggregates to the accumulator. GetResult(PipelineContext) error }
AggregatorV2 Apply, Push, and Reset can not be called concurrently, so locking is not required when implementing an Aggregator plugin.
type AsyncControl ¶
type AsyncControl struct {
// contains filtered or unexported fields
}
AsyncControl is an asynchronous execution control that can be canceled.
func NewAsyncControl ¶
func NewAsyncControl() *AsyncControl
func (*AsyncControl) CancelToken ¶
func (p *AsyncControl) CancelToken() <-chan struct{}
CancelToken returns a readonly channel that can be subscribed to as a cancel token
func (*AsyncControl) Notify ¶
func (p *AsyncControl) Notify()
func (*AsyncControl) Run ¶
func (p *AsyncControl) Run(task func(*AsyncControl))
Run function as a Task
func (*AsyncControl) WaitCancel ¶
func (p *AsyncControl) WaitCancel()
Waiting for executing task to be canceled
type Collector ¶
type Collector interface { AddData(tags map[string]string, fields map[string]string, t ...time.Time) AddDataArray(tags map[string]string, columns []string, values []string, t ...time.Time) AddRawLog(log *protocol.Log) AddDataWithContext(tags map[string]string, fields map[string]string, ctx map[string]interface{}, t ...time.Time) AddDataArrayWithContext(tags map[string]string, columns []string, values []string, ctx map[string]interface{}, t ...time.Time) AddRawLogWithContext(log *protocol.Log, ctx map[string]interface{}) }
Collector ...
type CommonContext ¶
type Context ¶
type Context interface { InitContext(project, logstore, configName string) GetConfigName() string GetProject() string GetLogstore() string GetRuntimeContext() context.Context GetExtension(name string, cfg any) (Extension, error) RegisterCounterMetric(metric CounterMetric) RegisterStringMetric(metric StringMetric) RegisterLatencyMetric(metric LatencyMetric) MetricSerializeToPB(log *protocol.Log) SaveCheckPoint(key string, value []byte) error GetCheckPoint(key string) (value []byte, exist bool) SaveCheckPointObject(key string, obj interface{}) error GetCheckPointObject(key string, obj interface{}) (exist bool) }
Context for plugin
type CounterMetric ¶
type Extension ¶
type Extension interface { // Description returns a one-sentence description on the Extension Description() string // Init called for init some system resources, like socket, mutex... Init(Context) error // Stop stops the services and release resources Stop() error }
Extension ...
type ExtensionCreator ¶
type ExtensionCreator func() Extension
type Flusher ¶
type Flusher interface { // Init called for init some system resources, like socket, mutex... Init(Context) error // Description returns a one-sentence description on the Input. Description() string // IsReady checks if flusher is ready to accept more data. // @projectName, @logstoreName, @logstoreKey: meta of the corresponding data. // Note: If SetUrgent is called, please make some adjustment so that IsReady // can return true to accept more data in time and config instance can be // stopped gracefully. IsReady(projectName string, logstoreName string, logstoreKey int64) bool // SetUrgent indicates the flusher that it will be destroyed soon. // @flag indicates if main program (Logtail mostly) will exit after calling this. // // Note: there might be more data to flush after SetUrgent is called, and if flag // is true, these data will be passed to flusher through IsReady/Export before // program exits. // // Recommendation: set some state flags in this method to guide the behavior // of other methods. SetUrgent(flag bool) // Stop stops flusher and release resources. // It is time for flusher to do cleaning jobs, includes: // 1. Export cached but not flushed data. For flushers that contain some kinds of // aggregation or buffering, it is important to flush cached out now, otherwise // data will lost. // 2. Release opened resources: goroutines, file handles, connections, etc. // 3. Maybe more, it depends. // In a word, flusher should only have things that can be recycled by GC after this. Stop() error }
Flusher ... Sample flusher implementation: see plugin_manager/flusher_sls.gox.
type FlusherCreator ¶
type FlusherCreator func() Flusher
type FlusherV1 ¶
type FlusherV1 interface { Flusher // Flush flushes data to destination, such as SLS, console, file, etc. // It is expected to return no error at most time because IsReady will be called // before it to make sure there is space for next data. Flush(projectName string, logstoreName string, configName string, logGroupList []*protocol.LogGroup) error }
type FlusherV2 ¶
type FlusherV2 interface { Flusher // Export data to destination, such as gRPC, console, file, etc. // It is expected to return no error at most time because IsReady will be called // before it to make sure there is space for next data. Export([]*models.PipelineGroupEvents, PipelineContext) error }
type LatencyMetric ¶
type LogGroupQueue ¶
type LogGroupQueue interface { // no blocking Add(loggroup *protocol.LogGroup) error AddWithWait(loggroup *protocol.LogGroup, duration time.Duration) error }
LogGroupQueue for aggregator, Non blocked if aggregator's buffer is full, aggregator can add LogGroup to this queue return error if LogGroupQueue is full
type LogGroupWithContext ¶
type LogWithContext ¶
type MetricCreator ¶
type MetricCreator func() MetricInput
type MetricInput ¶
type MetricInput interface { // Init called for init some system resources, like socket, mutex... // return call interval(ms) and error flag, if interval is 0, use default interval Init(Context) (int, error) // Description returns a one-sentence description on the Input Description() string }
MetricInput ...
type MetricInputV1 ¶
type MetricInputV1 interface { MetricInput // Collect takes in an accumulator and adds the metrics that the Input // gathers. This is called every "interval" Collect(Collector) error }
type MetricInputV2 ¶
type MetricInputV2 interface { MetricInput // Collect takes in an accumulator and adds the metrics that the Input // gathers. This is called every "interval" Read(PipelineContext) error }
type PipelineCollector ¶
type PipelineCollector interface { // Collect single group and events data belonging to this group Collect(groupInfo *models.GroupInfo, eventList ...models.PipelineEvent) // CollectList collect GroupEvents list that have been grouped CollectList(groupEventsList ...*models.PipelineGroupEvents) // ToArray returns an array containing all of the PipelineGroupEvents in this collector. ToArray() []*models.PipelineGroupEvents // Observe returns a chan that can consume PipelineGroupEvents from this collector. Observe() chan *models.PipelineGroupEvents Close() }
PipelineCollector collect data in the plugin and send the data to the next operator
type PipelineContext ¶
type PipelineContext interface {
Collector() PipelineCollector
}
PipelineContext which may include collector interface、checkpoint interface、config read and many more..
func NewGroupedPipelineConext ¶
func NewGroupedPipelineConext() PipelineContext
func NewNoopPipelineConext ¶
func NewNoopPipelineConext() PipelineContext
func NewObservePipelineConext ¶
func NewObservePipelineConext(queueSize int) PipelineContext
type Processor ¶
type Processor interface { // Init called for init some system resources, like socket, mutex... Init(Context) error // Description returns a one-sentence description on the Input Description() string }
Processor also can be a filter
type ProcessorCreator ¶
type ProcessorCreator func() Processor
type ProcessorV1 ¶
type ProcessorV2 ¶
type ProcessorV2 interface { Processor Process(in *models.PipelineGroupEvents, context PipelineContext) }
type ServiceCreator ¶
type ServiceCreator func() ServiceInput
type ServiceInput ¶
type ServiceInput interface { // Init called for init some system resources, like socket, mutex... // return interval(ms) and error flag, if interval is 0, use default interval Init(Context) (int, error) // Description returns a one-sentence description on the Input Description() string // Stop stops the services and closes any necessary channels and connections Stop() error }
ServiceInput ...
type ServiceInputV1 ¶
type ServiceInputV1 interface { ServiceInput // Start starts the ServiceInput's service, whatever that may be Start(Collector) error }
type ServiceInputV2 ¶
type ServiceInputV2 interface { ServiceInput // StartService starts the ServiceInput's service, whatever that may be StartService(PipelineContext) error }