mgr

package
v1.5.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 18, 2020 License: Apache-2.0 Imports: 47 Imported by: 0

Documentation

Index

Constants

View Source
const (
	StatusOK   = "ok"
	StatusBad  = "bad"
	StatusLost = "lost"
)
View Source
const (
	DefaultTryTimes = 3
	MetaTmp         = "meta_tmp/"

	DefaultRawDataBatchLen = 10
	RawDataMaxBatchLines   = 100
	DefaultRawDataSize     = 16 * 1024
)
View Source
const (
	StatsShell = "stats"
	PREFIX     = "/logkit"
)
View Source
const (
	SpeedUp     = "up"
	SpeedDown   = "down"
	SpeedStable = "stable"

	RunnerRunning = "running"
	RunnerStopped = "stopped"
)
View Source
const DefaultMyTag = "default"
View Source
const (
	KeyMetricType = "type"
)
View Source
const KeyRouterConfig = "router"
View Source
const KeySendConfig = "senders"

Variables

View Source
var DEFAULT_LOGKIT_REST_DIR = ".logkitconfs"
View Source
var DEFAULT_PORT = 3000
View Source
var DIR_NOT_EXIST_SLEEP_TIME = "300" //300 s
View Source
var KeySampleLog = "sampleLog"
View Source
var RawDataTimeOut = 30 * time.Second

Functions

func GetMySlaveUrl

func GetMySlaveUrl(address, schema string) (uri string, err error)

func MergeExtraInfoTags

func MergeExtraInfoTags(meta *reader.Meta, prefix string, tags map[string]interface{}) map[string]interface{}

func NewMetric

func NewMetric(tp string) (metric.Collector, error)

func ParseData

func ParseData(parserConfig conf.MapConf) (parsedData []Data, err error)

parse模块中各种type的日志都能获取解析后的数据

func RawData

func RawData(readerConfig conf.MapConf) ([]string, error)

RawData 从 reader 模块中根据 type 获取多条字符串形式的样例日志

func Register

func Register(masters []string, myhost, tag string) error

func RespError

func RespError(c echo.Context, respCode int, errCode, errMsg string) error

func RespSuccess

func RespSuccess(c echo.Context, data interface{}) error

func SendData

func SendData(senderConfig map[string]interface{}) error

func TransformData

func TransformData(transformerConfig map[string]interface{}) ([]Data, error)

Types

type CleanInfo

type CleanInfo struct {
	// contains filtered or unexported fields
}

type Cluster

type Cluster struct {
	ClusterConfig
	// contains filtered or unexported fields
}

func NewCluster

func NewCluster(cc *ClusterConfig) *Cluster

func (*Cluster) AddSlave

func (cc *Cluster) AddSlave(url, tag string)

func (*Cluster) RunRegisterLoop

func (cc *Cluster) RunRegisterLoop() error

func (*Cluster) UpdateSlaveStatus

func (cc *Cluster) UpdateSlaveStatus()

type ClusterConfig

type ClusterConfig struct {
	MasterUrl []string `json:"master_url"`
	IsMaster  bool     `json:"is_master"`
	Enable    bool     `json:"enable"`
	Address   string   `json:"address"`
	Tag       string   `json:"tag"`
}

type ClusterStatus

type ClusterStatus struct {
	Status map[string]RunnerStatus `json:"status"`
	Tag    string                  `json:"tag"`
	Err    string                  `json:"error"`
}

type CollectLog

type CollectLog struct {
	CollectLogPath   string `json:"collect_log_path"`
	CollectLogEnable bool   `json:"collect_log_enable"`
	ReadFrom         string `json:"read_from"`
	EnvTag           string `json:"-"`
	Pandora
}

type CompatibleErrorResult

type CompatibleErrorResult struct {
	ReadErrors      *ErrorStatistic            `json:"read_errors"`
	ParseErrors     *ErrorStatistic            `json:"parse_errors"`
	TransformErrors map[string]*ErrorStatistic `json:"transform_errors"`
	SendErrors      map[string]*ErrorStatistic `json:"send_errors"`
}

为了兼容之前的消息传递是errorqueue的结构

type ErrorsList

type ErrorsList struct {
	ReadErrors      *equeue.ErrorQueue            `json:"read_errors"`
	ParseErrors     *equeue.ErrorQueue            `json:"parse_errors"`
	TransformErrors map[string]*equeue.ErrorQueue `json:"transform_errors"`
	SendErrors      map[string]*equeue.ErrorQueue `json:"send_errors"`
}

func NewErrorsList

func NewErrorsList() *ErrorsList

func (*ErrorsList) Clone

func (list *ErrorsList) Clone() *ErrorsList

Clone 返回当前 ErrorList 的完整拷贝,若无数据则会返回 nil

func (*ErrorsList) Empty

func (list *ErrorsList) Empty() bool

Empty 检查列表是否为空

func (*ErrorsList) HasParseErr

func (list *ErrorsList) HasParseErr() bool

func (*ErrorsList) HasReadErr

func (list *ErrorsList) HasReadErr() bool

func (*ErrorsList) HasSendErr

func (list *ErrorsList) HasSendErr() bool

func (*ErrorsList) HasTransformErr

func (list *ErrorsList) HasTransformErr() bool

func (*ErrorsList) List

func (list *ErrorsList) List() (dst ErrorsResult)

List 复制出一个顺序的 Errors

func (*ErrorsList) Reset

func (list *ErrorsList) Reset()

Reset 清空列表

type ErrorsResult

type ErrorsResult struct {
	ReadErrors      []equeue.ErrorInfo            `json:"read_errors"`
	ParseErrors     []equeue.ErrorInfo            `json:"parse_errors"`
	TransformErrors map[string][]equeue.ErrorInfo `json:"transform_errors"`
	SendErrors      map[string][]equeue.ErrorInfo `json:"send_errors"`
}

type LogExportRunner

type LogExportRunner struct {
	RunnerInfo
	// contains filtered or unexported fields
}

func NewLogExportRunner

func NewLogExportRunner(rc RunnerConfig, cleanChan chan<- cleaner.CleanSignal, rr *reader.Registry, pr *parser.Registry, sr *sender.Registry) (runner *LogExportRunner, err error)

func NewLogExportRunnerWithService

func NewLogExportRunnerWithService(info RunnerInfo, reader reader.Reader, cleaner *cleaner.Cleaner, parser parser.Parser,
	transformers []transforms.Transformer, senders []sender.Sender, router *router.Router, meta *reader.Meta) (runner *LogExportRunner, err error)

func (*LogExportRunner) Cleaner

func (r *LogExportRunner) Cleaner() CleanInfo

func (*LogExportRunner) Delete

func (r *LogExportRunner) Delete() (err error)

func (*LogExportRunner) GetErrors

func (r *LogExportRunner) GetErrors() ErrorsResult

func (*LogExportRunner) LagStats

func (r *LogExportRunner) LagStats() (rl *LagInfo, err error)

func (*LogExportRunner) Name

func (r *LogExportRunner) Name() string

func (*LogExportRunner) Reset

func (r *LogExportRunner) Reset() (err error)

func (*LogExportRunner) Run

func (r *LogExportRunner) Run()

func (*LogExportRunner) Status

func (r *LogExportRunner) Status() (rs RunnerStatus)

func (*LogExportRunner) StatusBackup

func (r *LogExportRunner) StatusBackup()

StatusBackup 除了备份Status的数据之外,还会备份historyError数据,因为重构前混到一起,导致备份写到同一个statistics.meta文件中

func (*LogExportRunner) StatusRestore

func (r *LogExportRunner) StatusRestore()

StatusRestore 除了恢复Status的数据之外,还会恢复historyError数据,因为重构前混到一起,导致备份写到同一个statistics.meta文件中

func (*LogExportRunner) Stop

func (r *LogExportRunner) Stop()

Stop 清理所有使用到的资源, 等待10秒尝试读取完毕 先停Reader,不再读取,然后停Run函数,让读取的都转到发送,最后停Sender结束整个过程。 Parser 无状态,无需stop。

func (*LogExportRunner) TokenRefresh

func (r *LogExportRunner) TokenRefresh(tokens AuthTokens) error

type Manager

type Manager struct {
	ManagerConfig
	DefaultDir string

	Version    string
	SystemInfo string

	CollectLogRunner *self.LogRunner
	// contains filtered or unexported fields
}

func NewCustomManager

func NewCustomManager(conf ManagerConfig, rr *reader.Registry, pr *parser.Registry, sr *sender.Registry) (*Manager, error)

func NewManager

func NewManager(conf ManagerConfig) (*Manager, error)

func (*Manager) Add

func (m *Manager) Add(confPath string)

func (*Manager) AddRunner

func (m *Manager) AddRunner(name string, conf RunnerConfig, createTime time.Time) (err error)

func (*Manager) Configs

func (m *Manager) Configs() (rss map[string]RunnerConfig)

func (*Manager) DeleteRunner

func (m *Manager) DeleteRunner(name string) (err error)

func (*Manager) Error

func (m *Manager) Error(name string) (rss ErrorsResult, err error)

func (*Manager) Errors

func (m *Manager) Errors() (es map[string]ErrorsResult)

func (*Manager) ForkRunner

func (m *Manager) ForkRunner(confPath string, config RunnerConfig, returnOnErr bool) error

func (*Manager) GetRunnerNames

func (m *Manager) GetRunnerNames() []string

func (*Manager) GetRunnerPath

func (m *Manager) GetRunnerPath(name string) (string, bool)

func (*Manager) IsRunning

func (m *Manager) IsRunning(confPath string) bool

func (*Manager) Remove

func (m *Manager) Remove(confPath string) (err error)

func (*Manager) RemoveWithConfig

func (m *Manager) RemoveWithConfig(confPath string, isDelete bool) (err error)

func (*Manager) ResetRunner

func (m *Manager) ResetRunner(name string) (err error)

ResetRunner 必须在runner实例存在下才可以reset, reset是调用runner本身的方法, 而runner stop实际上是销毁实例,所以先要启动runner

func (*Manager) RestoreWebDir

func (m *Manager) RestoreWebDir()

func (*Manager) StartRunner

func (m *Manager) StartRunner(name string) (err error)

func (*Manager) StartRunnerWithFilename

func (m *Manager) StartRunnerWithFilename(filename string) (err error)

func (*Manager) Status

func (m *Manager) Status() (rss map[string]RunnerStatus)

func (*Manager) StatusAndConfig

func (m *Manager) StatusAndConfig() (rs map[string]RunnerStatus, rc map[string]RunnerConfig)

func (*Manager) Stop

func (m *Manager) Stop() error

func (*Manager) StopRunner

func (m *Manager) StopRunner(name string) (err error)

func (*Manager) StopRunnerWithFilename

func (m *Manager) StopRunnerWithFilename(filename string) error

func (*Manager) UpdateReaderRegister

func (m *Manager) UpdateReaderRegister()

func (*Manager) UpdateRunner

func (m *Manager) UpdateRunner(name string, conf RunnerConfig) (err error)

func (*Manager) UpdateSenderRegister

func (m *Manager) UpdateSenderRegister()

func (*Manager) UpdateToken

func (m *Manager) UpdateToken(tokens []AuthTokens) (err error)

func (*Manager) Watch

func (m *Manager) Watch(confsPath []string) (err error)

type ManagerConfig

type ManagerConfig struct {
	BindHost string `json:"bind_host"`

	Idc          string        `json:"idc"`
	Zone         string        `json:"zone"`
	RestDir      string        `json:"rest_dir"`
	Cluster      ClusterConfig `json:"cluster"`
	DisableWeb   bool          `json:"disable_web"`
	ServerBackup bool          `json:"-"`
	AuditDir     string        `json:"audit_dir"`

	CollectLog
}

type MetricConfig

type MetricConfig struct {
	MetricType string                 `json:"type"`
	Attributes map[string]bool        `json:"attributes"`
	Config     map[string]interface{} `json:"config"`
}

type MetricRunner

type MetricRunner struct {
	RunnerName string `json:"name"`
	// contains filtered or unexported fields
}

func NewMetricRunner

func NewMetricRunner(rc RunnerConfig, sr *sender.Registry) (runner *MetricRunner, err error)

func (*MetricRunner) Cleaner

func (*MetricRunner) Cleaner() CleanInfo

func (*MetricRunner) Delete

func (mr *MetricRunner) Delete() error

func (*MetricRunner) Name

func (mr *MetricRunner) Name() string

func (*MetricRunner) Reset

func (mr *MetricRunner) Reset() (err error)

func (*MetricRunner) Run

func (r *MetricRunner) Run()

func (*MetricRunner) Status

func (mr *MetricRunner) Status() (rs RunnerStatus)

func (*MetricRunner) StatusBackup

func (mr *MetricRunner) StatusBackup()

func (*MetricRunner) StatusRestore

func (mr *MetricRunner) StatusRestore()

func (*MetricRunner) Stop

func (mr *MetricRunner) Stop()

func (*MetricRunner) TokenRefresh

func (mr *MetricRunner) TokenRefresh(tokens AuthTokens) error

type PostParseRet

type PostParseRet struct {
	SamplePoints []Data `json:"SamplePoints"`
}

PostParseRet 返回值

type RegisterReq

type RegisterReq struct {
	Url string `json:"url"`
	Tag string `json:"tag"`
}

type RestService

type RestService struct {
	// contains filtered or unexported fields
}

func NewRestService

func NewRestService(mgr *Manager, router *echo.Echo) *RestService

func (*RestService) ClusterStatus

func (rs *RestService) ClusterStatus() echo.HandlerFunc

master API GET /logkit/cluster/status?tag=tagValue&url=urlValue

func (*RestService) DeleteClusterConfig

func (rs *RestService) DeleteClusterConfig() echo.HandlerFunc

DELETE /logkti/cluster/configs/<name>?tag=tagValue&url=urlValue

func (*RestService) DeleteConfig

func (rs *RestService) DeleteConfig() echo.HandlerFunc

Delete /logkit/configs/<name>

func (*RestService) DeleteSlaves

func (rs *RestService) DeleteSlaves() echo.HandlerFunc

DELETE /logkit/cluster/slaves?tag=tagValue&url=urlValue

func (*RestService) GetCleanerKeyOptions

func (rs *RestService) GetCleanerKeyOptions() echo.HandlerFunc

get /logkit/cleaner/options 获取解析选项

func (*RestService) GetClusterConfig

func (rs *RestService) GetClusterConfig() echo.HandlerFunc

master API Get /logkit/cluster/configs:name?tag=tagValue&url=urlValue

func (*RestService) GetClusterConfigs

func (rs *RestService) GetClusterConfigs() echo.HandlerFunc

master API Get /logkit/cluster/configs?tag=tagValue&url=urlValue

func (*RestService) GetClusterRunners

func (rs *RestService) GetClusterRunners() echo.HandlerFunc

master API GET /logkit/cluster/runners?tag=tagValue&url=urlValue

func (*RestService) GetConfig

func (rs *RestService) GetConfig() echo.HandlerFunc

get /logkit/configs/:name

func (*RestService) GetConfigs

func (rs *RestService) GetConfigs() echo.HandlerFunc

get /logkit/configs

func (*RestService) GetError

func (rs *RestService) GetError() echo.HandlerFunc

get /logkit/errors/<name>

func (*RestService) GetErrorCodeHumanize

func (rs *RestService) GetErrorCodeHumanize() echo.HandlerFunc

get /logkit/errorcode

func (*RestService) GetErrors

func (rs *RestService) GetErrors() echo.HandlerFunc

get /logkit/errors

func (*RestService) GetMetricKeys

func (rs *RestService) GetMetricKeys() echo.HandlerFunc

GET /logkit/metric/keys

func (*RestService) GetMetricOptions

func (rs *RestService) GetMetricOptions() echo.HandlerFunc

GET /logkit/metric/options

func (*RestService) GetMetricUsages

func (rs *RestService) GetMetricUsages() echo.HandlerFunc

GET /logkit/metric/usages

func (*RestService) GetParserKeyOptions

func (rs *RestService) GetParserKeyOptions() echo.HandlerFunc

get /logkit/parser/options 获取解析选项

func (*RestService) GetParserSampleLogs

func (rs *RestService) GetParserSampleLogs() echo.HandlerFunc

get /logkit/parser/samplelogs 获取样例日志

func (*RestService) GetParserTooltips

func (rs *RestService) GetParserTooltips() echo.HandlerFunc

get /logkit/parser/tooltips 获取解析用途提示

func (*RestService) GetParserUsages

func (rs *RestService) GetParserUsages() echo.HandlerFunc

get /logkit/parser/usages 获得解析用途说明

func (*RestService) GetReaderKeyOptions

func (rs *RestService) GetReaderKeyOptions() echo.HandlerFunc

get /logkit/reader/options 获取Reader参数配置

func (*RestService) GetReaderTooltips

func (rs *RestService) GetReaderTooltips() echo.HandlerFunc

get /logkit/reader/tooltips 获取Reader用途提示

func (*RestService) GetReaderUsages

func (rs *RestService) GetReaderUsages() echo.HandlerFunc

get /logkit/reader/usages 获取Reader用途

func (*RestService) GetRunners

func (rs *RestService) GetRunners() echo.HandlerFunc

get /logkit/runners

func (*RestService) GetSenderKeyOptions

func (rs *RestService) GetSenderKeyOptions() echo.HandlerFunc

get /logkit/sender/options 获取sender配置参数

func (*RestService) GetSenderRouterOption

func (rs *RestService) GetSenderRouterOption() echo.HandlerFunc

get /logkit/sender/router/option 获取所有sender router的配置项

func (*RestService) GetSenderRouterUsage

func (rs *RestService) GetSenderRouterUsage() echo.HandlerFunc

get /logkit/sender/router/usage 获取所有sender router匹配方式的名字和作用

func (*RestService) GetSenderUsages

func (rs *RestService) GetSenderUsages() echo.HandlerFunc

get /logkit/sender/usages 获取sender用途说明

func (*RestService) GetTransformerOptions

func (rs *RestService) GetTransformerOptions() echo.HandlerFunc

GET /logkit/transformer/options

func (*RestService) GetTransformerSampleConfigs

func (rs *RestService) GetTransformerSampleConfigs() echo.HandlerFunc

GET /logkit/transformer/sampleconfigs

func (*RestService) GetTransformerUsages

func (rs *RestService) GetTransformerUsages() echo.HandlerFunc

GET /logkit/transformer/usages

func (*RestService) GetVersion

func (rs *RestService) GetVersion() echo.HandlerFunc

func (*RestService) IsMaster

func (rs *RestService) IsMaster() echo.HandlerFunc

master API GET /logkit/cluster/ismaster

func (*RestService) Ping

func (rs *RestService) Ping() echo.HandlerFunc

master API GET /logkit/cluster/ping

func (*RestService) PostClusterConfig

func (rs *RestService) PostClusterConfig() echo.HandlerFunc

POST /logkit/cluster/configs/<name>?tag=tagValue&url=urlValue

func (*RestService) PostClusterConfigReset

func (rs *RestService) PostClusterConfigReset() echo.HandlerFunc

POST /logkit/cluster/configs/<name>/reset?tag=tagValue&url=urlValue

func (*RestService) PostClusterConfigStart

func (rs *RestService) PostClusterConfigStart() echo.HandlerFunc

POST /logkit/cluster/configs/<name>/start?tag=tagValue&url=urlValue

func (*RestService) PostClusterConfigStop

func (rs *RestService) PostClusterConfigStop() echo.HandlerFunc

POST /logkit/cluster/configs/<name>/stop?tag=tagValue&url=urlValue

func (*RestService) PostConfig

func (rs *RestService) PostConfig() echo.HandlerFunc

post /logkit/configs/<name>

func (*RestService) PostConfigReset

func (rs *RestService) PostConfigReset() echo.HandlerFunc

POST /logkit/configs/<name>/reset

func (*RestService) PostConfigStart

func (rs *RestService) PostConfigStart() echo.HandlerFunc

POST /logkit/configs/<name>/start

func (*RestService) PostConfigStop

func (rs *RestService) PostConfigStop() echo.HandlerFunc

POST /logkit/configs/<name>/stop

func (*RestService) PostParse

func (rs *RestService) PostParse() echo.HandlerFunc

post /logkit/parser/parse 接受解析请求

func (*RestService) PostParserCheck

func (rs *RestService) PostParserCheck() echo.HandlerFunc

POST /logkit/parser/check

func (*RestService) PostRead

func (rs *RestService) PostRead() echo.HandlerFunc

POST /logkit/reader/read 请求校验reader配置

func (*RestService) PostReaderCheck

func (rs *RestService) PostReaderCheck() echo.HandlerFunc

POST /logkit/reader/check 请求校验reader配置

func (*RestService) PostRegister

func (rs *RestService) PostRegister() echo.HandlerFunc

master API POST /logkit/cluster/register

func (*RestService) PostSend

func (rs *RestService) PostSend() echo.HandlerFunc

POST /logkit/sender/send 请求校验sender配置

func (*RestService) PostSenderCheck

func (rs *RestService) PostSenderCheck() echo.HandlerFunc

POST /logkit/sender/check 请求校验sender配置

func (*RestService) PostSlaveTag

func (rs *RestService) PostSlaveTag() echo.HandlerFunc

POST /logkit/cluster/slaves/tag?tag=tagValue&url=urlValue

func (*RestService) PostTag

func (rs *RestService) PostTag() echo.HandlerFunc

slave API POST /logkit/cluster/tag

func (*RestService) PostTransform

func (rs *RestService) PostTransform() echo.HandlerFunc

POST /logkit/transformer/transform Transform (multiple logs/single log) in (json array/json object) format with registered transformers Return result string in json array format

func (*RestService) PostTransformerCheck

func (rs *RestService) PostTransformerCheck() echo.HandlerFunc

POST /logkit/transformer/check

func (*RestService) PutClusterConfig

func (rs *RestService) PutClusterConfig() echo.HandlerFunc

PUT /logkit/cluster/configs/<name>?tag=tagValue&url=urlValue

func (*RestService) PutConfig

func (rs *RestService) PutConfig() echo.HandlerFunc

put /logkit/configs/<name>

func (*RestService) Register

func (rs *RestService) Register() error

func (*RestService) Slaves

func (rs *RestService) Slaves() echo.HandlerFunc

master API GET /logkit/cluster/slaves?tag=tagValue&url=urlValue

func (*RestService) Status

func (rs *RestService) Status() echo.HandlerFunc

get /logkit/status

func (*RestService) Stop

func (rs *RestService) Stop()

Stop will stop RestService

type Runner

type Runner interface {
	Name() string
	Run()
	Stop()
	Cleaner() CleanInfo
	Status() RunnerStatus
}

func NewCustomRunner

func NewCustomRunner(rc RunnerConfig, cleanChan chan<- cleaner.CleanSignal, rr *reader.Registry, pr *parser.Registry, sr *sender.Registry) (runner Runner, err error)

func NewRunner

func NewRunner(rc RunnerConfig, cleanChan chan<- cleaner.CleanSignal) (runner Runner, err error)

NewRunner 创建Runner

func NewRunnerWithService

func NewRunnerWithService(info RunnerInfo, reader reader.Reader, cleaner *cleaner.Cleaner, parser parser.Parser, transformers []transforms.Transformer,
	senders []sender.Sender, router *router.Router, meta *reader.Meta) (runner Runner, err error)

type RunnerConfig

type RunnerConfig struct {
	RunnerInfo
	SourceData    string                   `json:"sourceData,omitempty"`
	MetricConfig  []MetricConfig           `json:"metric,omitempty"`
	ReaderConfig  conf.MapConf             `json:"reader"`
	CleanerConfig conf.MapConf             `json:"cleaner,omitempty"`
	ParserConf    conf.MapConf             `json:"parser"`
	Transforms    []map[string]interface{} `json:"transforms,omitempty"`
	SendersConfig []conf.MapConf           `json:"senders"`
	Router        router.RouterConfig      `json:"router,omitempty"`
	IsInWebFolder bool                     `json:"web_folder,omitempty"`
	IsStopped     bool                     `json:"is_stopped,omitempty"`
	IsFromServer  bool                     `json:"from_server,omitempty"` // 判读是否从服务器拉取的配置
	AuditChan     chan<- audit.Message     `json:"-"`
}

RunnerConfig 从多数据源读取,经过解析后,发往多个数据目的地

func Compatible

func Compatible(rc RunnerConfig) RunnerConfig

Compatible 用于新老配置的兼容

func TrimSecretInfo

func TrimSecretInfo(conf RunnerConfig, trimSk bool) RunnerConfig

TrimSecretInfo 将配置文件中的 token 等鉴权相关信息去掉

type RunnerErrors

type RunnerErrors interface {
	GetErrors() ErrorsResult
}

type RunnerInfo

type RunnerInfo struct {
	RunnerName             string `json:"name"`
	Note                   string `json:"note,omitempty"`
	CollectInterval        int    `json:"collect_interval,omitempty"`           // metric runner收集的频率
	MaxBatchLen            int    `json:"batch_len,omitempty"`                  // 每个read batch的行数
	MaxBatchSize           int    `json:"batch_size,omitempty"`                 // 每个read batch的字节数
	MaxBatchInterval       int    `json:"batch_interval,omitempty"`             // 最大发送时间间隔
	MaxBatchTryTimes       int    `json:"batch_try_times,omitempty"`            // 最大发送次数,小于等于0代表无限重试
	MaxReaderCloseWaitTime int    `json:"max_reader_close_wait_time,omitempty"` // runner 等待reader close时间,
	ErrorsListCap          int    `json:"errors_list_cap"`                      // 记录错误信息的最大条数
	SyncEvery              int    `json:"sync_every,omitempty"`                 // 每多少次sync一下,填小于的0数字表示stop时sync,正整数表示发送成功多少次以后同步,填0或1就是每次发送成功都同步,兼容原来不配置的逻辑
	CreateTime             string `json:"createtime"`
	EnvTag                 string `json:"env_tag,omitempty"` // 用这个字段的值来获取环境变量, 作为 tag 添加到数据中
	ExtraInfo              bool   `json:"extra_info"`
	LogAudit               bool   `json:"log_audit"`
	SendRaw                bool   `json:"send_raw"`            //使用发送原始字符串的接口,而不是Data
	ReadTime               bool   `json:"read_time"`           // 读取时间
	InternalKeyPrefix      string `json:"internal_key_prefix"` // 内置字段名前缀
	MaxLineLen             int64  `json:"max_line_len"`        // 限制单条数据/每个key对应的value大小,防止读取数据/发送数据出错时内存/磁盘占用过大
	IsBlock                bool   `json:"is_block"`            // 阻塞式发送
}

type RunnerStatus

type RunnerStatus struct {
	Name           string               `json:"name"`
	Logpath        string               `json:"logpath"`
	ReadDataSize   int64                `json:"readDataSize"`
	ReadDataCount  int64                `json:"readDataCount"`
	Elaspedtime    float64              `json:"elaspedtime"`
	Lag            LagInfo              `json:"lag"`
	ReaderStats    StatsInfo            `json:"readerStats"`
	ParserStats    StatsInfo            `json:"parserStats"`
	SenderStats    map[string]StatsInfo `json:"senderStats"`
	TransformStats map[string]StatsInfo `json:"transformStats"`
	Error          string               `json:"error,omitempty"`

	ReadSpeedKB      int64  `json:"readspeed_kb"`
	ReadSpeed        int64  `json:"readspeed"`
	ReadSpeedTrendKb string `json:"readspeedtrend_kb"`
	ReadSpeedTrend   string `json:"readspeedtrend"`
	RunningStatus    string `json:"runningStatus"`
	Tag              string `json:"tag,omitempty"`
	Url              string `json:"url,omitempty"`

	//仅作为将history error同步上传到服务端时使用
	HistorySyncErrors CompatibleErrorResult `json:"history_errors"`
	// contains filtered or unexported fields
}

RunnerStatus runner运行状态,添加字段请在clone函数中相应添加

func (*RunnerStatus) Clone

func (src *RunnerStatus) Clone() RunnerStatus

Clone 复制出一个完整的RunnerStatus

type Service

type Service struct {
	Prefix string
}

type Slave

type Slave struct {
	Url       string    `json:"url"`
	Tag       string    `json:"tag"`
	Status    string    `json:"status"`
	LastTouch time.Time `json:"last_touch"`
}

type SlaveConfig

type SlaveConfig struct {
	Configs map[string]RunnerConfig `json:"configs"`
	Tag     string                  `json:"tag"`
	Err     string                  `json:"error"`
}

type StatusPersistable

type StatusPersistable interface {
	StatusBackup()
	StatusRestore()
}

type TagReq

type TagReq struct {
	Tag string `json:"tag"`
}

type TokenRefreshable

type TokenRefreshable interface {
	TokenRefresh(AuthTokens) error
}

type Version

type Version struct {
	Version string `json:"version"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL