backend

package
v4.0.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 4, 2021 License: Apache-2.0 Imports: 18 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DefaultSendTaskSleepInterval = time.Millisecond * 50 //默认睡眠间隔为50ms
	DefaultSendQueueMaxSize      = 102400                //10.24w
	MaxSendRetry                 = 10
)

send

Variables

View Source
var (
	MinStep int //最小上报周期,单位sec
)
View Source
var (
	StraPath string
)

Functions

func Init

func Init(cfg BackendSection) error

func RegisterDataSource

func RegisterDataSource(pluginId string, datasource DataSource)

func RegisterPushEndpoint

func RegisterPushEndpoint(pluginId string, push PushEndpoint)

Types

type BackendSection

type BackendSection struct {
	DataSource string `yaml:"datasource"`
	StraPath   string `yaml:"straPath"`

	M3db     m3db.M3dbSection         `yaml:"m3db"`
	Tsdb     tsdb.TsdbSection         `yaml:"tsdb"`
	Influxdb influxdb.InfluxdbSection `yaml:"influxdb"`
	OpenTsdb OpenTsdbSection          `yaml:"opentsdb"`
	Kafka    KafkaSection             `yaml:"kafka"`
}

type DataSource

type DataSource interface {
	PushEndpoint

	// query data for judge
	QueryData(inputs []dataobj.QueryData) []*dataobj.TsdbQueryResponse
	// query data for ui
	QueryDataForUI(input dataobj.QueryDataForUI) []*dataobj.TsdbQueryResponse

	// query metrics & tags
	QueryMetrics(recv dataobj.EndpointsRecv) *dataobj.MetricResp
	QueryTagPairs(recv dataobj.EndpointMetricRecv) []dataobj.IndexTagkvResp
	QueryIndexByClude(recv []dataobj.CludeRecv) []dataobj.XcludeResp
	QueryIndexByFullTags(recv []dataobj.IndexByFullTagsRecv) ([]dataobj.IndexByFullTagsResp, int)

	// tsdb instance
	GetInstance(metric, endpoint string, tags map[string]string) []string
}

func GetDataSourceFor

func GetDataSourceFor(pluginId string) (DataSource, error)

get backend datasource (pluginId == "" for default datasource)

type KafkaData

type KafkaData map[string]interface{}

type KafkaPushEndpoint

type KafkaPushEndpoint struct {
	// config
	Section KafkaSection

	// 发送缓存队列 node -> queue_of_data
	KafkaQueue chan KafkaData
}

func (*KafkaPushEndpoint) Init

func (kafka *KafkaPushEndpoint) Init()

func (*KafkaPushEndpoint) Push2Queue

func (kafka *KafkaPushEndpoint) Push2Queue(items []*dataobj.MetricValue)

type KafkaSection

type KafkaSection struct {
	Enabled      bool   `yaml:"enabled"`
	Name         string `yaml:"name"`
	Topic        string `yaml:"topic"`
	BrokersPeers string `yaml:"brokersPeers"`
	ConnTimeout  int    `yaml:"connTimeout"`
	CallTimeout  int    `yaml:"callTimeout"`
	MaxRetry     int    `yaml:"maxRetry"`
	KeepAlive    int64  `yaml:"keepAlive"`
	SaslUser     string `yaml:"saslUser"`
	SaslPasswd   string `yaml:"saslPasswd"`
}

type KfClient

type KfClient struct {
	Topic        string
	BrokersPeers []string
	// contains filtered or unexported fields
}

func NewKfClient

func NewKfClient(c KafkaSection) (kafkaSender *KfClient, err error)

func (*KfClient) Close

func (kf *KfClient) Close() error

func (*KfClient) Send

func (kf *KfClient) Send(data KafkaData) error

type OpenTsdbPushEndpoint

type OpenTsdbPushEndpoint struct {
	// config
	Section OpenTsdbSection

	OpenTsdbConnPoolHelper *pools.OpenTsdbConnPoolHelper

	// 发送缓存队列 node -> queue_of_data
	OpenTsdbQueue *list.SafeListLimited
}

func (*OpenTsdbPushEndpoint) Init

func (opentsdb *OpenTsdbPushEndpoint) Init()

func (*OpenTsdbPushEndpoint) Push2Queue

func (opentsdb *OpenTsdbPushEndpoint) Push2Queue(items []*dataobj.MetricValue)

将原始数据入到tsdb发送缓存队列

type OpenTsdbSection

type OpenTsdbSection struct {
	Enabled     bool   `yaml:"enabled"`
	Name        string `yaml:"name"`
	Batch       int    `yaml:"batch"`
	ConnTimeout int    `yaml:"connTimeout"`
	CallTimeout int    `yaml:"callTimeout"`
	WorkerNum   int    `yaml:"workerNum"`
	MaxConns    int    `yaml:"maxConns"`
	MaxIdle     int    `yaml:"maxIdle"`
	MaxRetry    int    `yaml:"maxRetry"`
	Address     string `yaml:"address"`
}

type PushEndpoint

type PushEndpoint interface {
	// push data
	Push2Queue(items []*dataobj.MetricValue)
}

func GetPushEndpoints

func GetPushEndpoints() ([]PushEndpoint, error)

get all push endpoints

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL