Documentation ¶
Index ¶
- Constants
- Variables
- func AlignStepTms(step, tms int64) int64
- func CleanLoop()
- func GetCachedAll() string
- func GetLatestTmsAndDelay(filepath string) (int64, int64, bool)
- func PostToCache(paramPoints []*FalconPoint)
- func PosterLoop()
- func PushToCount(Point *AnalysPoint) error
- func PusherLoop()
- func PusherStart()
- func ToPushQueue(strategy *scheme.Strategy, tms int64, pointMap map[string]*PointCounter) error
- func UpdateConfigsLoop()
- type AnalysPoint
- type ConfigInfo
- type FalconPoint
- type GlobalCounter
- type Job
- type PointCounter
- type PointsCounter
- type SortByTms
- type StrategyCounter
- type Worker
- type WorkerGroup
Constants ¶
const CachedDuration = 60
CachedDuration cached时间周期
Variables ¶
var ManagerConfig map[int64]*ConfigInfo
ManagerConfig to manage configs
var ManagerJob map[string]*Job //管理job,文件路径为key
ManagerJob to manage jobs
var ManagerJobLock *sync.RWMutex
ManagerJobLock is a global lock
Functions ¶
func PushToCount ¶
func PushToCount(Point *AnalysPoint) error
PushToCount to push to count module 提供给Worker用来Push计算后的信息 需保证线程安全
func ToPushQueue ¶
ToPushQueue to push data to pusher queue 这个参数是为了最大限度的对接 pointMap的key,是打平了的tagkv
Types ¶
type AnalysPoint ¶
AnalysPoint to push to Calculate module 从worker往计算部分推的Point
type FalconPoint ¶
type FalconPoint struct { Endpoint string `json:"endpoint"` Metric string `json:"metric"` Timestamp int64 `json:"timestamp"` Step int64 `json:"step"` Value float64 `json:"value"` CounterType string `json:"counterType"` Tags string `json:"tags"` }
FalconPoint to push to falcon-agent
type GlobalCounter ¶
type GlobalCounter struct { sync.RWMutex StrategyCounts map[int64]*StrategyCounter }
GlobalCounter to be as a global counter store 全局counter对象, 以key为索引,索引每个策略的统计 key : Strategy ID
var GlobalCount *GlobalCounter
GlobalCount to be as a global counter store
func (*GlobalCounter) AddStrategyCount ¶
func (gc *GlobalCounter) AddStrategyCount(st *scheme.Strategy)
AddStrategyCount to add strategy to counter
func (*GlobalCounter) GetIDs ¶
func (gc *GlobalCounter) GetIDs() []int64
GetIDs get ids from counter
func (*GlobalCounter) GetStrategyCountByID ¶
func (gc *GlobalCounter) GetStrategyCountByID(id int64) (*StrategyCounter, error)
GetStrategyCountByID get count by strategy id
func (*GlobalCounter) UpdateByStrategy ¶
func (gc *GlobalCounter) UpdateByStrategy(globalStras map[int64]*scheme.Strategy)
UpdateByStrategy to update counter by strategy 只做更新和删除,添加 由数据驱动
type PointCounter ¶
PointCounter to analysis 统计的实体
func (*PointCounter) UpdateMaxMin ¶
func (pc *PointCounter) UpdateMaxMin(value float64)
UpdateMaxMin to update max & min
func (*PointCounter) UpdateSum ¶
func (pc *PointCounter) UpdateSum(value float64)
UpdateSum to update sum
type PointsCounter ¶
type PointsCounter struct { sync.RWMutex TagstringMap map[string]*PointCounter }
PointsCounter to index the data 单策略下,单step的统计对象 以Sorted的tagkv的字符串来做索引
func (*PointsCounter) GetBytagstring ¶
func (pc *PointsCounter) GetBytagstring(tagstring string) (*PointCounter, error)
GetBytagstring to get Counter structure by tagstring
type StrategyCounter ¶
type StrategyCounter struct { sync.RWMutex Strategy *scheme.Strategy //Strategy结构体扔这里,以备不时之需 TmsPoints map[int64]*PointsCounter //按照时间戳分类的分别的counter }
StrategyCounter to 单策略下的对象, 以step为索引, 索引每一个Step的统计 单step统计, 推送完则删
func (*StrategyCounter) AddTms ¶
func (sc *StrategyCounter) AddTms(tms int64) error
AddTms to add Tms to counter
func (*StrategyCounter) DeleteTms ¶
func (sc *StrategyCounter) DeleteTms(tms int64)
DeleteTms to delete one tms
func (*StrategyCounter) GetByTms ¶
func (sc *StrategyCounter) GetByTms(tms int64) (*PointsCounter, error)
GetByTms get cached counter by tms
func (*StrategyCounter) GetTmsList ¶
func (sc *StrategyCounter) GetTmsList() []int64
GetTmsList to get tmslist cached
type Worker ¶
type Worker struct { FilePath string Counter int64 LatestTms int64 //正在处理的单条日志时间 Delay int64 //时间戳乱序差值, 每个worker独立更新 Close chan struct{} Stream chan string Mark string //标记该worker信息,方便打log及上报自监控指标, 追查问题 Analyzing bool //标记当前Worker状态是否在分析中,还是空闲状态 Callback callbackHandler }
Worker to analysis 单个worker对象
type WorkerGroup ¶
type WorkerGroup struct { WorkerNum int LatestTms int64 //日志文件最新处理的时间戳 MaxDelay int64 //日志文件存在的时间戳乱序最大差值 ResetTms int64 //maxDelay上次重置的时间 Workers []*Worker TimeFormatStrategy string }
WorkerGroup is group of workers worker组
func NewWorkerGroup ¶
func NewWorkerGroup(filePath string, stream chan string, st *scheme.Strategy) *WorkerGroup
NewWorkerGroup to new a worker group filepath和stream依赖外部,其他的都自己创建
func (WorkerGroup) GetLatestTmsAndDelay ¶
func (wg WorkerGroup) GetLatestTmsAndDelay() (tms int64, delay int64)
func (*WorkerGroup) ResetMaxDelay ¶
func (wg *WorkerGroup) ResetMaxDelay()
ResetMaxDelay reset maxDelay record
func (*WorkerGroup) SetLatestTmsAndDelay ¶
func (wg *WorkerGroup) SetLatestTmsAndDelay(tms int64, delay int64)