worker

package
v0.0.0-...-6c58b8e Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 12, 2021 License: MIT Imports: 20 Imported by: 2

Documentation

Index

Constants

View Source
const CachedDuration = 60

CachedDuration cached时间周期

Variables

View Source
var ManagerConfig map[int64]*ConfigInfo

ManagerConfig to manage configs

View Source
var ManagerJob map[string]*Job //管理job,文件路径为key

ManagerJob to manage jobs

View Source
var ManagerJobLock *sync.RWMutex

ManagerJobLock is a global lock

Functions

func AlignStepTms

func AlignStepTms(step, tms int64) int64

AlignStepTms to align the step 时间戳向前对齐

func CleanLoop

func CleanLoop()

CleanLoop to Loop & clean old cache

func GetCachedAll

func GetCachedAll() string

GetCachedAll to get all cache

func GetLatestTmsAndDelay

func GetLatestTmsAndDelay(filepath string) (int64, int64, bool)

func PostToCache

func PostToCache(paramPoints []*FalconPoint)

PostToCache to post points to cache

func PosterLoop

func PosterLoop()

PosterLoop to start post loop 循环推送,10s一次

func PushToCount

func PushToCount(Point *AnalysPoint) error

PushToCount to push to count module 提供给Worker用来Push计算后的信息 需保证线程安全

func PusherLoop

func PusherLoop()

PusherLoop to start push loop

func PusherStart

func PusherStart()

PusherStart to start push loop

func ToPushQueue

func ToPushQueue(strategy *scheme.Strategy, tms int64, pointMap map[string]*PointCounter) error

ToPushQueue to push data to pusher queue 这个参数是为了最大限度的对接 pointMap的key,是打平了的tagkv

func UpdateConfigsLoop

func UpdateConfigsLoop()

UpdateConfigsLoop to update strategys

Types

type AnalysPoint

type AnalysPoint struct {
	StrategyID int64
	Value      float64
	Tms        int64
	Tags       map[string]string
}

AnalysPoint to push to Calculate module 从worker往计算部分推的Point

type ConfigInfo

type ConfigInfo struct {
	ID       int64
	FilePath string
}

ConfigInfo to control config

type FalconPoint

type FalconPoint struct {
	Endpoint    string  `json:"endpoint"`
	Metric      string  `json:"metric"`
	Timestamp   int64   `json:"timestamp"`
	Step        int64   `json:"step"`
	Value       float64 `json:"value"`
	CounterType string  `json:"counterType"`
	Tags        string  `json:"tags"`
}

FalconPoint to push to falcon-agent

type GlobalCounter

type GlobalCounter struct {
	sync.RWMutex
	StrategyCounts map[int64]*StrategyCounter
}

GlobalCounter to be as a global counter store 全局counter对象, 以key为索引,索引每个策略的统计 key : Strategy ID

var GlobalCount *GlobalCounter

GlobalCount to be as a global counter store

func (*GlobalCounter) AddStrategyCount

func (gc *GlobalCounter) AddStrategyCount(st *scheme.Strategy)

AddStrategyCount to add strategy to counter

func (*GlobalCounter) GetIDs

func (gc *GlobalCounter) GetIDs() []int64

GetIDs get ids from counter

func (*GlobalCounter) GetStrategyCountByID

func (gc *GlobalCounter) GetStrategyCountByID(id int64) (*StrategyCounter, error)

GetStrategyCountByID get count by strategy id

func (*GlobalCounter) UpdateByStrategy

func (gc *GlobalCounter) UpdateByStrategy(globalStras map[int64]*scheme.Strategy)

UpdateByStrategy to update counter by strategy 只做更新和删除,添加 由数据驱动

type Job

type Job struct {
	// contains filtered or unexported fields
}

Job to control job

type PointCounter

type PointCounter struct {
	sync.RWMutex
	Count int64
	Sum   float64
	Max   float64
	Min   float64
}

PointCounter to analysis 统计的实体

func (*PointCounter) UpdateCnt

func (pc *PointCounter) UpdateCnt()

UpdateCnt to update count

func (*PointCounter) UpdateMaxMin

func (pc *PointCounter) UpdateMaxMin(value float64)

UpdateMaxMin to update max & min

func (*PointCounter) UpdateSum

func (pc *PointCounter) UpdateSum(value float64)

UpdateSum to update sum

type PointsCounter

type PointsCounter struct {
	sync.RWMutex
	TagstringMap map[string]*PointCounter
}

PointsCounter to index the data 单策略下,单step的统计对象 以Sorted的tagkv的字符串来做索引

func (*PointsCounter) GetBytagstring

func (pc *PointsCounter) GetBytagstring(tagstring string) (*PointCounter, error)

GetBytagstring to get Counter structure by tagstring

func (*PointsCounter) Update

func (pc *PointsCounter) Update(tagstring string, value float64) error

Update to update value

type SortByTms

type SortByTms []*FalconPoint

SortByTms to be used by sort

func (SortByTms) Len

func (p SortByTms) Len() int

func (SortByTms) Less

func (p SortByTms) Less(i, j int) bool

func (SortByTms) Swap

func (p SortByTms) Swap(i, j int)

type StrategyCounter

type StrategyCounter struct {
	sync.RWMutex
	Strategy  *scheme.Strategy         //Strategy结构体扔这里,以备不时之需
	TmsPoints map[int64]*PointsCounter //按照时间戳分类的分别的counter
}

StrategyCounter to 单策略下的对象, 以step为索引, 索引每一个Step的统计 单step统计, 推送完则删

func (*StrategyCounter) AddTms

func (sc *StrategyCounter) AddTms(tms int64) error

AddTms to add Tms to counter

func (*StrategyCounter) DeleteTms

func (sc *StrategyCounter) DeleteTms(tms int64)

DeleteTms to delete one tms

func (*StrategyCounter) GetByTms

func (sc *StrategyCounter) GetByTms(tms int64) (*PointsCounter, error)

GetByTms get cached counter by tms

func (*StrategyCounter) GetTmsList

func (sc *StrategyCounter) GetTmsList() []int64

GetTmsList to get tmslist cached

type Worker

type Worker struct {
	FilePath  string
	Counter   int64
	LatestTms int64 //正在处理的单条日志时间
	Delay     int64 //时间戳乱序差值, 每个worker独立更新
	Close     chan struct{}
	Stream    chan string
	Mark      string //标记该worker信息,方便打log及上报自监控指标, 追查问题
	Analyzing bool   //标记当前Worker状态是否在分析中,还是空闲状态
	Callback  callbackHandler
}

Worker to analysis 单个worker对象

func (*Worker) Start

func (w *Worker) Start()

Start to start a worker

func (*Worker) Stop

func (w *Worker) Stop()

Stop to stop a worker

func (*Worker) Work

func (w *Worker) Work()

Work to analysis logs

type WorkerGroup

type WorkerGroup struct {
	WorkerNum          int
	LatestTms          int64 //日志文件最新处理的时间戳
	MaxDelay           int64 //日志文件存在的时间戳乱序最大差值
	ResetTms           int64 //maxDelay上次重置的时间
	Workers            []*Worker
	TimeFormatStrategy string
}

WorkerGroup is group of workers worker组

func NewWorkerGroup

func NewWorkerGroup(filePath string, stream chan string, st *scheme.Strategy) *WorkerGroup

NewWorkerGroup to new a worker group filepath和stream依赖外部,其他的都自己创建

func (WorkerGroup) GetLatestTmsAndDelay

func (wg WorkerGroup) GetLatestTmsAndDelay() (tms int64, delay int64)

func (*WorkerGroup) ResetMaxDelay

func (wg *WorkerGroup) ResetMaxDelay()

ResetMaxDelay reset maxDelay record

func (*WorkerGroup) SetLatestTmsAndDelay

func (wg *WorkerGroup) SetLatestTmsAndDelay(tms int64, delay int64)

func (*WorkerGroup) Start

func (wg *WorkerGroup) Start()

Start to start a workergroup

func (*WorkerGroup) Stop

func (wg *WorkerGroup) Stop()

Stop to stop a workergroup

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL