pluginmanager

package
v1.8.8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 27, 2024 License: Apache-2.0 Imports: 28 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var CheckPointCleanInterval = flag.Int("CheckPointCleanInterval", 600, "checkpoint clean interval, second")
View Source
var CheckPointFile = flag.String("CheckPointFile", "checkpoint", "checkpoint file name, base dir(binary dir)")
View Source
var CheckPointManager checkPointManager
View Source
var DisabledLogtailConfig = make(map[string]*LogstoreConfig)
View Source
var DisabledLogtailConfigLock sync.Mutex

Configs that were disabled because of slow or hang config.

View Source
var ErrCheckPointNotInit = errors.New("checkpoint db not init")
View Source
var FetchAllInterval = time.Second * time.Duration(12*60*60)

12h

View Source
var LastLogtailConfig map[string]*LogstoreConfig
View Source
var LogtailConfig map[string]*LogstoreConfig

Following variables are exported so that tests of main package can reference them.

View Source
var MaxCleanItemPerInterval = flag.Int("MaxCleanItemPerInterval", 1000, "max clean items per interval")

Functions

func CollectConfigResult added in v1.4.0

func CollectConfigResult(logGroup *protocol.LogGroup)

func CollectContainers added in v1.4.0

func CollectContainers(logGroup *protocol.LogGroup)

func FindPort added in v1.7.1

func FindPort(res http.ResponseWriter, req *http.Request)

func GetConfigFlushers added in v1.8.0

func GetConfigFlushers(runner PluginRunner) []pipeline.Flusher

func GetFlushCancelToken added in v1.4.0

func GetFlushCancelToken(runner PluginRunner) <-chan struct{}

func GetFlushStoreLen added in v1.4.0

func GetFlushStoreLen(runner PluginRunner) int

func GetMetrics added in v1.8.8

func GetMetrics() []map[string]string

func GetPluginPriority

func GetPluginPriority(pluginName string) int

func HoldOn

func HoldOn(exitFlag bool) error

HoldOn stops all config instance and checkpoint manager so that it is ready to load new configs or quit. For user-defined config, timeoutStop is used to avoid hanging.

func Init

func Init() (err error)

Init initializes plugin manager.

func LoadLogstoreConfig

func LoadLogstoreConfig(project string, logstore string, configName string, logstoreKey int64, jsonStr string) error

func Resume

func Resume() error

Resume starts all configs.

Types

type AggregatorWrapperV1 added in v1.8.8

type AggregatorWrapperV1 struct {
	pipeline.PluginContext

	Aggregator    pipeline.AggregatorV1
	Config        *LogstoreConfig
	LogGroupsChan chan *protocol.LogGroup
	Interval      time.Duration
	// contains filtered or unexported fields
}

AggregatorWrapperV1 wrappers Aggregator. It implements LogGroupQueue interface, and is passed to associated Aggregator. Aggregator uses Add function to pass log groups to wrapper, and then wrapper passes log groups to associated LogstoreConfig through channel LogGroupsChan. In fact, LogGroupsChan == (associated) LogstoreConfig.LogGroupsChan.

func (*AggregatorWrapperV1) Add added in v1.8.8

func (p *AggregatorWrapperV1) Add(loggroup *protocol.LogGroup) error

Add inserts @loggroup to LogGroupsChan if @loggroup is not empty. It is called by associated Aggregator. It returns errAggAdd when queue is full.

func (*AggregatorWrapperV1) AddWithWait added in v1.8.8

func (p *AggregatorWrapperV1) AddWithWait(loggroup *protocol.LogGroup, duration time.Duration) error

AddWithWait inserts @loggroup to LogGroupsChan, and it waits at most @duration. It works like Add but adds a timeout policy when log group queue is full. It returns errAggAdd when queue is full and timeout. NOTE: no body calls it now.

func (*AggregatorWrapperV1) Init added in v1.8.8

func (p *AggregatorWrapperV1) Init(pluginMeta *pipeline.PluginMeta) error

func (*AggregatorWrapperV1) Run added in v1.8.8

func (p *AggregatorWrapperV1) Run(control *pipeline.AsyncControl)

Run calls periodically Aggregator.Flush to get log groups from associated aggregator and pass them to LogstoreConfig through LogGroupsChan.

type AggregatorWrapperV2 added in v1.8.8

type AggregatorWrapperV2 struct {
	pipeline.PluginContext

	Aggregator    pipeline.AggregatorV2
	Config        *LogstoreConfig
	LogGroupsChan chan *protocol.LogGroup
	Interval      time.Duration
	// contains filtered or unexported fields
}

AggregatorWrapperV2 wrappers Aggregator. It implements LogGroupQueue interface, and is passed to associated Aggregator. Aggregator uses Add function to pass log groups to wrapper, and then wrapper passes log groups to associated LogstoreConfig through channel LogGroupsChan. In fact, LogGroupsChan == (associated) LogstoreConfig.LogGroupsChan.

func (*AggregatorWrapperV2) Add added in v1.8.8

func (p *AggregatorWrapperV2) Add(loggroup *protocol.LogGroup) error

func (*AggregatorWrapperV2) AddWithWait added in v1.8.8

func (p *AggregatorWrapperV2) AddWithWait(loggroup *protocol.LogGroup, duration time.Duration) error

func (*AggregatorWrapperV2) GetResult added in v1.8.8

func (p *AggregatorWrapperV2) GetResult(context pipeline.PipelineContext) error

func (*AggregatorWrapperV2) Init added in v1.8.8

func (p *AggregatorWrapperV2) Init(pluginMeta *pipeline.PluginMeta) error

func (*AggregatorWrapperV2) Record added in v1.8.8

type AlwaysOnlineManager

type AlwaysOnlineManager struct {
	// contains filtered or unexported fields
}

AlwaysOnlineManager is used to manage the plugins that do not want to stop when config reloading

func GetAlwaysOnlineManager

func GetAlwaysOnlineManager() *AlwaysOnlineManager

GetAlwaysOnlineManager get a AlwaysOnlineManager instance

func (*AlwaysOnlineManager) AddCachedConfig

func (aom *AlwaysOnlineManager) AddCachedConfig(config *LogstoreConfig, timeout time.Duration)

AddCachedConfig add cached config into manager, manager will stop and delete this config when timeout

func (*AlwaysOnlineManager) GetCachedConfig

func (aom *AlwaysOnlineManager) GetCachedConfig(configName string) (config *LogstoreConfig, ok bool)

GetCachedConfig get cached config from manager and delete this item, so manager will not close this config

func (*AlwaysOnlineManager) GetDeletedConfigs

func (aom *AlwaysOnlineManager) GetDeletedConfigs(
	existConfigs map[string]*LogstoreConfig) map[string]*LogstoreConfig

GetDeletedConfigs returns cached configs not in @existConfigs.

type CommonWrapper added in v1.8.8

type CommonWrapper struct {
	pipeline.PluginContext
	Config *LogstoreConfig

	LogGroupsChan chan *protocol.LogGroup
	Interval      time.Duration
	// contains filtered or unexported fields
}

type ConfigVersion added in v1.4.0

type ConfigVersion string

type ContextImp

type ContextImp struct {
	MetricsRecords []*pipeline.MetricsRecord
	// contains filtered or unexported fields
}

func (*ContextImp) AddPlugin

func (p *ContextImp) AddPlugin(name string)

func (*ContextImp) ExportMetricRecords added in v1.8.8

func (p *ContextImp) ExportMetricRecords() (results []map[string]string)

func (*ContextImp) GetCheckPoint

func (p *ContextImp) GetCheckPoint(key string) (value []byte, exist bool)

func (*ContextImp) GetCheckPointObject

func (p *ContextImp) GetCheckPointObject(key string, obj interface{}) (exist bool)

func (*ContextImp) GetConfigName

func (p *ContextImp) GetConfigName() string

func (*ContextImp) GetExtension added in v1.5.0

func (p *ContextImp) GetExtension(name string, cfg any) (pipeline.Extension, error)

func (*ContextImp) GetLogstore

func (p *ContextImp) GetLogstore() string

func (*ContextImp) GetLogstoreConfigMetricRecord added in v1.8.8

func (p *ContextImp) GetLogstoreConfigMetricRecord() *pipeline.MetricsRecord

func (*ContextImp) GetMetricRecord added in v1.8.8

func (p *ContextImp) GetMetricRecord() *pipeline.MetricsRecord

func (*ContextImp) GetProject

func (p *ContextImp) GetProject() string

func (*ContextImp) GetRuntimeContext

func (p *ContextImp) GetRuntimeContext() context.Context

func (*ContextImp) InitContext

func (p *ContextImp) InitContext(project, logstore, configName string)

func (*ContextImp) RegisterCounterMetric

func (p *ContextImp) RegisterCounterMetric(metricsRecord *pipeline.MetricsRecord, metric pipeline.CounterMetric)

func (*ContextImp) RegisterLatencyMetric

func (p *ContextImp) RegisterLatencyMetric(metricsRecord *pipeline.MetricsRecord, metric pipeline.LatencyMetric)

func (*ContextImp) RegisterLogstoreConfigMetricRecord added in v1.8.8

func (p *ContextImp) RegisterLogstoreConfigMetricRecord(labels []pipeline.LabelPair) *pipeline.MetricsRecord

func (*ContextImp) RegisterMetricRecord added in v1.8.8

func (p *ContextImp) RegisterMetricRecord(labels []pipeline.LabelPair) *pipeline.MetricsRecord

func (*ContextImp) RegisterStringMetric

func (p *ContextImp) RegisterStringMetric(metricsRecord *pipeline.MetricsRecord, metric pipeline.StringMetric)

func (*ContextImp) SaveCheckPoint

func (p *ContextImp) SaveCheckPoint(key string, value []byte) error

func (*ContextImp) SaveCheckPointObject

func (p *ContextImp) SaveCheckPointObject(key string, obj interface{}) error

type FlushData added in v1.4.0

type FlushData interface {
	protocol.LogGroup | models.PipelineGroupEvents
}

type FlushOutStore added in v1.4.0

type FlushOutStore[T FlushData] struct {
	// contains filtered or unexported fields
}

func NewFlushOutStore added in v1.4.0

func NewFlushOutStore[T FlushData]() *FlushOutStore[T]

func (*FlushOutStore[T]) Add added in v1.4.0

func (s *FlushOutStore[T]) Add(data ...*T)

func (*FlushOutStore[T]) Get added in v1.4.0

func (s *FlushOutStore[T]) Get() []*T

func (*FlushOutStore[T]) Len added in v1.4.0

func (s *FlushOutStore[T]) Len() int

func (*FlushOutStore[T]) Merge added in v1.4.0

func (s *FlushOutStore[T]) Merge(in *FlushOutStore[T])

func (*FlushOutStore[T]) Reset added in v1.4.0

func (s *FlushOutStore[T]) Reset()

func (*FlushOutStore[T]) Write added in v1.4.0

func (s *FlushOutStore[T]) Write(ch chan *T)

type FlusherWrapper

type FlusherWrapper interface {
	Init(pluginMeta *pipeline.PluginMeta) error
	IsReady(projectName string, logstoreName string, logstoreKey int64) bool
}

type FlusherWrapperV1 added in v1.8.8

type FlusherWrapperV1 struct {
	CommonWrapper
	Flusher pipeline.FlusherV1
}

func (*FlusherWrapperV1) Flush added in v1.8.8

func (wrapper *FlusherWrapperV1) Flush(projectName string, logstoreName string, configName string, logGroupList []*protocol.LogGroup) error

func (*FlusherWrapperV1) Init added in v1.8.8

func (wrapper *FlusherWrapperV1) Init(pluginMeta *pipeline.PluginMeta) error

func (*FlusherWrapperV1) IsReady added in v1.8.8

func (wrapper *FlusherWrapperV1) IsReady(projectName string, logstoreName string, logstoreKey int64) bool

type FlusherWrapperV2 added in v1.8.8

type FlusherWrapperV2 struct {
	CommonWrapper
	Flusher pipeline.FlusherV2
}

func (*FlusherWrapperV2) Export added in v1.8.8

func (wrapper *FlusherWrapperV2) Export(pipelineGroupEvents []*models.PipelineGroupEvents, pipelineContext pipeline.PipelineContext) error

func (*FlusherWrapperV2) Init added in v1.8.8

func (wrapper *FlusherWrapperV2) Init(pluginMeta *pipeline.PluginMeta) error

func (*FlusherWrapperV2) IsReady added in v1.8.8

func (wrapper *FlusherWrapperV2) IsReady(projectName string, logstoreName string, logstoreKey int64) bool

type InputAlarm

type InputAlarm struct {
	// contains filtered or unexported fields
}

func (*InputAlarm) Collect

func (r *InputAlarm) Collect(collector pipeline.Collector) error

func (*InputAlarm) Description

func (r *InputAlarm) Description() string

func (*InputAlarm) Init

func (r *InputAlarm) Init(context pipeline.Context) (int, error)

type InputContainer added in v1.4.0

type InputContainer struct {
	// contains filtered or unexported fields
}

func (*InputContainer) Collect added in v1.4.0

func (r *InputContainer) Collect(collector pipeline.Collector) error

func (*InputContainer) Description added in v1.4.0

func (r *InputContainer) Description() string

func (*InputContainer) Init added in v1.4.0

func (r *InputContainer) Init(context pipeline.Context) (int, error)

type InputStatistics

type InputStatistics struct {
	// contains filtered or unexported fields
}

func (*InputStatistics) Collect

func (r *InputStatistics) Collect(collector pipeline.Collector) error

func (*InputStatistics) Description

func (r *InputStatistics) Description() string

func (*InputStatistics) Init

func (r *InputStatistics) Init(context pipeline.Context) (int, error)

type LogstoreConfig

type LogstoreConfig struct {
	// common fields
	ProjectName  string
	LogstoreName string
	ConfigName   string
	LogstoreKey  int64
	FlushOutFlag bool
	// Each LogstoreConfig can have its independent GlobalConfig if the "global" field
	//   is offered in configuration, see build-in StatisticsConfig and AlarmConfig.
	GlobalConfig *config.GlobalConfig

	Version      ConfigVersion
	Context      pipeline.Context
	Statistics   LogstoreStatistics
	PluginRunner PluginRunner

	K8sLabelSet           map[string]struct{}
	ContainerLabelSet     map[string]struct{}
	EnvSet                map[string]struct{}
	CollectContainersFlag bool
	// contains filtered or unexported fields
}
var AlarmConfig *LogstoreConfig
var ContainerConfig *LogstoreConfig
var StatisticsConfig *LogstoreConfig

Two built-in logtail configs to report statistics and alarm (from system and other logtail configs).

func (*LogstoreConfig) ProcessLog added in v1.1.1

func (lc *LogstoreConfig) ProcessLog(logByte []byte, packID string, topic string, tags []byte) int

func (*LogstoreConfig) ProcessLogGroup added in v1.8.0

func (lc *LogstoreConfig) ProcessLogGroup(logByte []byte, packID string) int

func (*LogstoreConfig) ProcessRawLog

func (lc *LogstoreConfig) ProcessRawLog(rawLog []byte, packID string, topic string) int

func (*LogstoreConfig) ProcessRawLogV2

func (lc *LogstoreConfig) ProcessRawLogV2(rawLog []byte, packID string, topic string, tags []byte) int

ProcessRawLogV2 ... V1 -> V2: enable topic field, and use tags field to pass more tags. unsafe parameter: rawLog,packID and tags safe parameter: topic

func (*LogstoreConfig) Start

func (lc *LogstoreConfig) Start()

Start initializes plugin instances in config and starts them. Procedures:

  1. Start flusher goroutine and push FlushOutLogGroups inherited from last config instance to LogGroupsChan, so that they can be flushed to flushers.
  2. Start aggregators, allocate new goroutine for each one.
  3. Start processor goroutine to process logs from LogsChan.
  4. Start inputs (including metrics and services), just like aggregator, each input has its own goroutine.

func (*LogstoreConfig) Stop

func (lc *LogstoreConfig) Stop(exitFlag bool) error

Stop stops plugin instances and corresponding goroutines of config. @exitFlag passed from Logtail, indicates that if Logtail will quit after this. Procedures: 1. SetUrgent to all flushers to indicate them current state. 2. Stop all input plugins, stop generating logs. 3. Stop processor goroutine, pass all existing logs to aggregator. 4. Stop all aggregator plugins, make all logs to LogGroups. 5. Set stopping flag, stop flusher goroutine. 6. If Logtail is exiting and there are remaining data, try to flush once. 7. Stop flusher plugins.

type LogstoreStatistics

type LogstoreStatistics struct {
	CollecLatencytMetric pipeline.LatencyMetric
	RawLogMetric         pipeline.CounterMetric
	SplitLogMetric       pipeline.CounterMetric
	FlushLogMetric       pipeline.CounterMetric
	FlushLogGroupMetric  pipeline.CounterMetric
	FlushReadyMetric     pipeline.CounterMetric
	FlushLatencyMetric   pipeline.LatencyMetric
}

func (*LogstoreStatistics) Init

func (p *LogstoreStatistics) Init(context pipeline.Context)

type MetricWrapperV1 added in v1.8.8

type MetricWrapperV1 struct {
	pipeline.PluginContext

	Input    pipeline.MetricInputV1
	Config   *LogstoreConfig
	Tags     map[string]string
	Interval time.Duration

	LogsChan      chan *pipeline.LogWithContext
	LatencyMetric pipeline.LatencyMetric
	// contains filtered or unexported fields
}

func (*MetricWrapperV1) AddData added in v1.8.8

func (p *MetricWrapperV1) AddData(tags map[string]string, fields map[string]string, t ...time.Time)

func (*MetricWrapperV1) AddDataArray added in v1.8.8

func (p *MetricWrapperV1) AddDataArray(tags map[string]string,
	columns []string,
	values []string,
	t ...time.Time)

func (*MetricWrapperV1) AddDataArrayWithContext added in v1.8.8

func (p *MetricWrapperV1) AddDataArrayWithContext(tags map[string]string,
	columns []string,
	values []string,
	ctx map[string]interface{},
	t ...time.Time)

func (*MetricWrapperV1) AddDataWithContext added in v1.8.8

func (p *MetricWrapperV1) AddDataWithContext(tags map[string]string, fields map[string]string, ctx map[string]interface{}, t ...time.Time)

func (*MetricWrapperV1) AddRawLog added in v1.8.8

func (p *MetricWrapperV1) AddRawLog(log *protocol.Log)

func (*MetricWrapperV1) AddRawLogWithContext added in v1.8.8

func (p *MetricWrapperV1) AddRawLogWithContext(log *protocol.Log, ctx map[string]interface{})

func (*MetricWrapperV1) Init added in v1.8.8

func (p *MetricWrapperV1) Init(pluginMeta *pipeline.PluginMeta, inputInterval int) error

func (*MetricWrapperV1) Run added in v1.8.8

func (p *MetricWrapperV1) Run(control *pipeline.AsyncControl)

type MetricWrapperV2 added in v1.8.8

type MetricWrapperV2 struct {
	pipeline.PluginContext

	Input    pipeline.MetricInputV2
	Config   *LogstoreConfig
	Tags     map[string]string
	Interval time.Duration

	LogsChan      chan *pipeline.LogWithContext
	LatencyMetric pipeline.LatencyMetric
}

func (*MetricWrapperV2) Init added in v1.8.8

func (p *MetricWrapperV2) Init(pluginMeta *pipeline.PluginMeta, inputInterval int) error

func (*MetricWrapperV2) Read added in v1.8.8

func (p *MetricWrapperV2) Read(pipelineContext pipeline.PipelineContext) error

type PluginRunner added in v1.4.0

type PluginRunner interface {
	Init(inputQueueSize int, aggrQueueSize int) error

	AddDefaultAggregatorIfEmpty() error

	AddDefaultFlusherIfEmpty() error

	ReceiveRawLog(log *pipeline.LogWithContext)

	AddPlugin(pluginMeta *pipeline.PluginMeta, category pluginCategory, plugin interface{}, config map[string]interface{}) error

	GetExtension(name string) (pipeline.Extension, bool)

	Run()

	RunPlugins(category pluginCategory, control *pipeline.AsyncControl)

	Merge(p PluginRunner)

	Stop(exit bool) error
}

type ProcessorWrapper

type ProcessorWrapper struct {
	pipeline.PluginContext
	Config *LogstoreConfig

	LogsChan chan *pipeline.LogWithContext
	// contains filtered or unexported fields
}

type ProcessorWrapperV1 added in v1.8.8

type ProcessorWrapperV1 struct {
	ProcessorWrapper
	Processor pipeline.ProcessorV1
}

func (*ProcessorWrapperV1) Init added in v1.8.8

func (wrapper *ProcessorWrapperV1) Init(pluginMeta *pipeline.PluginMeta) error

func (*ProcessorWrapperV1) Process added in v1.8.8

func (wrapper *ProcessorWrapperV1) Process(logArray []*protocol.Log) []*protocol.Log

type ProcessorWrapperV2 added in v1.8.8

type ProcessorWrapperV2 struct {
	ProcessorWrapper
	Processor pipeline.ProcessorV2
}

func (*ProcessorWrapperV2) Init added in v1.8.8

func (wrapper *ProcessorWrapperV2) Init(pluginMeta *pipeline.PluginMeta) error

func (*ProcessorWrapperV2) Process added in v1.8.8

func (wrapper *ProcessorWrapperV2) Process(in *models.PipelineGroupEvents, context pipeline.PipelineContext)

type ServiceWrapperV1 added in v1.8.8

type ServiceWrapperV1 struct {
	pipeline.PluginContext
	Input    pipeline.ServiceInputV1
	Config   *LogstoreConfig
	Tags     map[string]string
	Interval time.Duration

	LogsChan chan *pipeline.LogWithContext
	// contains filtered or unexported fields
}

func (*ServiceWrapperV1) AddData added in v1.8.8

func (p *ServiceWrapperV1) AddData(tags map[string]string, fields map[string]string, t ...time.Time)

func (*ServiceWrapperV1) AddDataArray added in v1.8.8

func (p *ServiceWrapperV1) AddDataArray(tags map[string]string,
	columns []string,
	values []string,
	t ...time.Time)

func (*ServiceWrapperV1) AddDataArrayWithContext added in v1.8.8

func (p *ServiceWrapperV1) AddDataArrayWithContext(tags map[string]string,
	columns []string,
	values []string,
	ctx map[string]interface{},
	t ...time.Time)

func (*ServiceWrapperV1) AddDataWithContext added in v1.8.8

func (p *ServiceWrapperV1) AddDataWithContext(tags map[string]string, fields map[string]string, ctx map[string]interface{}, t ...time.Time)

func (*ServiceWrapperV1) AddRawLog added in v1.8.8

func (p *ServiceWrapperV1) AddRawLog(log *protocol.Log)

func (*ServiceWrapperV1) AddRawLogWithContext added in v1.8.8

func (p *ServiceWrapperV1) AddRawLogWithContext(log *protocol.Log, ctx map[string]interface{})

func (*ServiceWrapperV1) Init added in v1.8.8

func (p *ServiceWrapperV1) Init(pluginMeta *pipeline.PluginMeta) error

func (*ServiceWrapperV1) Run added in v1.8.8

func (*ServiceWrapperV1) Stop added in v1.8.8

func (p *ServiceWrapperV1) Stop() error

type ServiceWrapperV2 added in v1.8.8

type ServiceWrapperV2 struct {
	pipeline.PluginContext
	Input    pipeline.ServiceInputV2
	Config   *LogstoreConfig
	Tags     map[string]string
	Interval time.Duration

	LogsChan chan *pipeline.LogWithContext
}

func (*ServiceWrapperV2) Init added in v1.8.8

func (p *ServiceWrapperV2) Init(pluginMeta *pipeline.PluginMeta) error

func (*ServiceWrapperV2) StartService added in v1.8.8

func (p *ServiceWrapperV2) StartService(pipelineContext pipeline.PipelineContext) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL