task

package
v0.0.0-...-d369a32 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 9, 2019 License: MIT Imports: 19 Imported by: 0

Documentation

Index

Constants

View Source
const (
	StatusOK      types.Status = 1
	StatusProblem types.Status = 2
	StatusClose   types.Status = 3
)

内置的几种任务状态, 不用 iota 避免后续迭代过程中,需增删状态值时, 数值对不上

View Source
const (
	MarkOpenThreshold types.Mark = 1 << iota
	MarkNoticeRecovery
)

内置事件标记值

Variables

This section is empty.

Functions

func FixTimeRule

func FixTimeRule(spec string) string

func NewTemplate

func NewTemplate(text string) (*template.Template, error)

NewTemplate create a new template

func ParseStatus

func ParseStatus(s string) types.Status

ParseStatus 将字符串解析成真正的 value

Types

type AlertManager

type AlertManager interface {
	FireEvent(context.Context, string, string, ...events.EventSetting) (events.Event, error)
	CloseEvent(context.Context, string, string, ...events.EventSetting) error
}

AlertManager 代表告警事件处理器

type Manager

type Manager struct {
	*sync.RWMutex
	// contains filtered or unexported fields
}

Manager is a common manager for task

func NewManager

func NewManager(options ...Option) (*Manager, error)

NewManager return a new task manager base on the given store

func (*Manager) AddDatasource

func (manager *Manager) AddDatasource(ctx context.Context, dsrc datasource.Datasource) (datasource.Datasource, error)

AddDatasource 创建一个 datasource

func (*Manager) Count

func (manager *Manager) Count() (int64, int64)

Count 计算统计正在运行的和已关闭的任务总数,返回 正在运行,已关闭

func (*Manager) Create

func (manager *Manager) Create(ctx context.Context, spec Spec) (Spec, error)

Create a new task by spec, it return error if task already exist with the same name.

func (*Manager) Delete

func (manager *Manager) Delete(ctx context.Context, id int64) error

Delete a exist task, do nothing if task not exist. return error if error happened when creating.

func (*Manager) DeleteDatasource

func (manager *Manager) DeleteDatasource(ctx context.Context, id int64) error

DeleteDatasource 更新一个 datasource

func (*Manager) Get

func (manager *Manager) Get(id int64) (*Task, error)

Get a task by name

func (*Manager) GetDatasource

func (manager *Manager) GetDatasource(ctx context.Context, id int64) (datasource.Datasource, error)

GetDatasource 获取一个 datasource

func (*Manager) GetDatasources

func (manager *Manager) GetDatasources(user string) []datasource.Datasource

GetDatasources 获取某用户的所有datasource, 如果 user 是空, 则返回所有的 datasource

func (*Manager) GetMyTasks

func (manager *Manager) GetMyTasks(user string, tag string) ([]Spec, error)

GetMyTasks 获取该用户的是 owner 的任务列表

func (*Manager) GetReadonlyTasks

func (manager *Manager) GetReadonlyTasks(user string, tag string) ([]Spec, error)

GetReadonlyTasks 获取对该用户是只读的任务列表

func (*Manager) Tasks

func (manager *Manager) Tasks(user string, tag string) ([]Spec, error)

Tasks 根据 user 和 tag 获取任务列表, 如果 user 为空则忽略 user 条件 会返回 user 是 owner 或 receiver 的任务列表,以及设置了动态接收人的列表

func (*Manager) Update

func (manager *Manager) Update(ctx context.Context, id int64, spec Spec, fields []string) (Spec, error)

Update 对任务进行部分更新, fields 指明了需要更新的字段

func (*Manager) UpdateDatasource

func (manager *Manager) UpdateDatasource(ctx context.Context, id int64, dsrc datasource.Datasource) error

UpdateDatasource 更新一个 datasource

type Option

type Option func(*Manager)

Option 代表创建的 Manager 的选项

func WithAlertManager

func WithAlertManager(am AlertManager) Option

WithAlertManager 设置上下文

func WithContext

func WithContext(ctx context.Context) Option

WithContext 配置 context

func WithMySQL

func WithMySQL(mysql *store.MySQL) Option

WithMySQL 配置 Storage

type ProblemPoints

type ProblemPoints struct {
	HealthyPoints []datasource.Point     `json:"-"` // 健康的数据
	OutlierPoints []datasource.Point     `json:"-"` // 异常的数据
	Sample        map[string]interface{} `json:"sample"`
	Threshold     TaskThreshold          `json:"threshold"`
	Labels        map[string]string      `json:"labels"`
	Metric        string                 `json:"metric"`
	IsProblem     bool                   `json:"is_problem"`
}

ProblemPoints 表示异常数据组, 具有相同 label 的数据点合并成一个 ProblemPoints, ProblemPoints 会用来生成 events, 他们是 1对1 的关系 通常情况下, ProblemPoints.points 里应该只有一个 point 的, 除非 influxdb 语句用了 Group By time(<interval>), 导致一个 series 下好多个 point 返回. group by time 是不鼓励的 还有一个种情况就是 select * from, 所有 point 的 labels 是空, 可能会导致 ProblemPoints 中有多个 points

type ReceiverGroup

type ReceiverGroup struct {
	ID          int64            `json:"id" db:"id"`
	TaskID      int64            `json:"task_id" db:"task_id"`
	Receivers   types.StringList `json:"receivers" db:"receivers"`
	Policy      string           `json:"policy" db:"policy"`
	Channels    types.Chan       `json:"channels" db:"channels"`
	CreatedTime time.Time        `json:"created_time" db:"created_time"`
	UpdatedTime time.Time        `json:"updated_time" db:"updated_time"`
	// contains filtered or unexported fields
}

ReceiverGroup 代表一组告警接收人

func (*ReceiverGroup) DynamicReceiver

func (group *ReceiverGroup) DynamicReceiver(data map[string]interface{}) ([]string, error)

DynamicReceiver 使用参数 data, 返回动态的接收人列表

func (*ReceiverGroup) Handler

func (group *ReceiverGroup) Handler(data map[string]interface{}) (events.Handler, error)

Handler generate event handler from receiver group

func (*ReceiverGroup) HasDynamicReceiver

func (group *ReceiverGroup) HasDynamicReceiver() bool

HasDynamicReceiver 校验该接收人组是否包含动态接收人配置

type Spec

type Spec struct {
	ID          int64               `json:"id" db:"id"`
	Name        string              `json:"name" db:"name"` // required, 名称
	Datasource  int64               `json:"datasource" db:"datasource"`
	TimeRule    string              `json:"time_rule" db:"time_rule"` // optional, 执行策略
	Query       string              `json:"query" db:"query"`
	Thresholds  *TaskThresholdGroup `json:"thresholds" db:"-"`
	Message     string              `json:"message" db:"message"` // optional, 告警内容,支持模板。可为空,会自动生成。
	Receivers   []ReceiverGroup     `json:"receivers" db:"-"`     // required, 接收人
	Owners      types.StringList    `json:"owners" db:"owners"`
	Tags        types.StringList    `json:"tags" db:"tags"`
	Description string              `json:"description" db:"description"`

	Status types.Status `json:"status" db:"status"`
	Mark   types.Mark   `json:"mark" db:"mark"`

	CreatedTime time.Time `json:"created_time" db:"created_time"`
	UpdatedTime time.Time `json:"updated_time" db:"updated_time"`
}

Spec represents a task's specification

func (Spec) IsValid

func (spec Spec) IsValid() error

IsValid check if the Spec is valid, return error info if not

type Task

type Task struct {
	Spec `json:",inline"`
	*sync.RWMutex

	// 最近一次执行时间, 用来防止任务被太频繁的触发
	LatestRunTime time.Time `json:"latest_run_time"`

	// 执行过程中发现的修改建议, 比如查询语句频繁超时, 执行的频率太高, 查询指标数太多等.
	Proposal string `json:"proposal"`
	// contains filtered or unexported fields
}

Task is a real normal task instance

func NewTask

func NewTask(spec Spec, options ...TaskOption) (*Task, error)

NewTask create a new Task by using given specification

func (*Task) AddReceiverGroup

func (task *Task) AddReceiverGroup(ctx context.Context, receivers []string, channel types.Chan, policy string) (ReceiverGroup, error)

AddReceiverGroup 增加一组接收人

func (*Task) AddThreshold

func (task *Task) AddThreshold(ctx context.Context, when string, level types.Level, expr string) (TaskThreshold, error)

AddThreshold 增加一个阈值规则

func (*Task) DelReceiverGroup

func (task *Task) DelReceiverGroup(ctx context.Context, id int64) error

DelReceiverGroup 删除一组接收人

func (*Task) DelThreshold

func (task *Task) DelThreshold(ctx context.Context, id int64) error

DelThreshold 删除一个阈值

func (*Task) GetReceiverGroup

func (task *Task) GetReceiverGroup() []ReceiverGroup

GetReceiverGroup 获取所有接收人列表

func (*Task) GetThreshold

func (task *Task) GetThreshold() []TaskThreshold

GetThreshold 获取规则中所有阈值设置

func (*Task) HasOwner

func (task *Task) HasOwner(users ...string) bool

HasOwner 判断参数给定的所有人都是该规则的 owner

func (*Task) HasReceiver

func (task *Task) HasReceiver(users ...string) bool

HasReceiver 判断该规则接收人中,是否包含所有 users

func (*Task) HasTag

func (task *Task) HasTag(tag string) bool

HasTag 判断任务是否包含指定标签

func (*Task) Marked

func (task *Task) Marked(m types.Mark) bool

Marked 检查事件是否含有某种标记

func (*Task) QueryProblemPoints

func (task *Task) QueryProblemPoints(search string, limit int) []ProblemPoints

QueryProblemPoints 查询该规则监控到的最新数据

func (*Task) ReloadProblemPoints

func (task *Task) ReloadProblemPoints() ([]ProblemPoints, error)

ReloadProblemPoints 立即向 datasource 发起 Query 请求更新最新的数据

func (*Task) RemoveMark

func (task *Task) RemoveMark(m types.Mark) error

RemoveMark 为事件增加某种标记,并返回新事件

func (*Task) RemoveMarkContext

func (task *Task) RemoveMarkContext(ctx context.Context, m types.Mark) error

RemoveMarkContext 为事件增加某种标记,并返回新事件

func (*Task) Run

func (task *Task) Run(prev, current, next time.Time)

Run the task for once

func (*Task) RunForce

func (task *Task) RunForce(prev, current, next time.Time)

RunForce the task for once no matter task.Status is DOWN

func (*Task) ShiftThreshold

func (task *Task) ShiftThreshold(ctx context.Context, id int64, index int) error

ShiftThreshold 移动阈值

func (*Task) UpdateReceiverGroup

func (task *Task) UpdateReceiverGroup(ctx context.Context, id int64, receivers []string, channel types.Chan, policy string) (ReceiverGroup, error)

UpdateReceiverGroup 更新接收人列表

func (*Task) UpdateThreshold

func (task *Task) UpdateThreshold(ctx context.Context, id int64, data TaskThreshold) (TaskThreshold, error)

UpdateThreshold 更新一条阈值设置

func (*Task) WithMark

func (task *Task) WithMark(m types.Mark) error

WithMark 为事件增加某种标记,并返回新事件

func (*Task) WithMarkContext

func (task *Task) WithMarkContext(ctx context.Context, m types.Mark) error

WithMarkContext 为事件增加某种标记,并返回新事件

type TaskOption

type TaskOption func(*Task)

TaskOption 代表任务的配置

func WithDatasource

func WithDatasource(dsrc datasource.Datasource) TaskOption

WithDatasource 设置数据源

func WithTaskAlertManager

func WithTaskAlertManager(am AlertManager) TaskOption

WithTaskAlertManager 设置上下文

func WithTaskContext

func WithTaskContext(ctx context.Context) TaskOption

WithTaskContext 设置上下文

func WithTaskStorage

func WithTaskStorage(mysql *store.MySQL) TaskOption

WithTaskStorage 设置存储

type TaskThreshold

type TaskThreshold struct {
	ID          int64       `json:"id" db:"id"`
	TaskID      int64       `json:"task_id" db:"task_id"`
	When        string      `json:"when" db:"when"`
	Assert      string      `json:"assert" db:"assert"`
	Level       types.Level `json:"level" db:"level"`
	Weight      int64       `json:"weight" db:"weight"`
	CreatedTime time.Time   `json:"created_time" db:"created_time"`
	UpdatedTime time.Time   `json:"updated_time" db:"updated_time"`
	// contains filtered or unexported fields
}

TaskThreshold 代表一条阈值规则

func (TaskThreshold) Avail

func (tt TaskThreshold) Avail() error

Avail 判断阈值规则是否合法

func (TaskThreshold) IsZero

func (tt TaskThreshold) IsZero() bool

IsZero 判断是否是空阈值

func (TaskThreshold) String

func (tt TaskThreshold) String() string

type TaskThresholdGroup

type TaskThresholdGroup struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

TaskThresholdGroup 在调用所有函数前(除了 sort.Interface 那 3 个 method), 都要确保 TaskThresholdGroup 是已排序的 该数据结构的大部分方法都会执行力度较大的锁, 所以频繁调用可能会出现性能问题 但因阈值的变更操作是非常低频的, 这里没有进行优化

func MustNewThresholdGroupWithData

func MustNewThresholdGroupWithData(mysql *store.MySQL, data []TaskThreshold) *TaskThresholdGroup

MustNewThresholdGroupWithData 同 NewThresholdGroupWithData 一样, 只是在出现 error 的时候直接 panic

func NewThresholdGroup

func NewThresholdGroup(mysql *store.MySQL, taskID int64) (*TaskThresholdGroup, error)

NewThresholdGroup 创建一个阈值组

func NewThresholdGroupWithData

func NewThresholdGroupWithData(mysql *store.MySQL, data []TaskThreshold) (*TaskThresholdGroup, error)

NewThresholdGroupWithData 使用现有数据创建一个阈值组

func (*TaskThresholdGroup) Add

Add add data into group and return the new data and the index, index will be used to update threshold

func (*TaskThresholdGroup) Data

func (ttg *TaskThresholdGroup) Data() []TaskThreshold

Data 返回该组中的所有阈值设置

func (*TaskThresholdGroup) Get

func (ttg *TaskThresholdGroup) Get(id int64) (TaskThreshold, error)

Get 通过 id 获取阈值配置

func (*TaskThresholdGroup) Len

func (ttg *TaskThresholdGroup) Len() int

func (*TaskThresholdGroup) Less

func (ttg *TaskThresholdGroup) Less(i, j int) bool

func (TaskThresholdGroup) MarshalJSON

func (ttg TaskThresholdGroup) MarshalJSON() ([]byte, error)

MarshalJSON 用以 json 序列化

func (*TaskThresholdGroup) Remove

func (ttg *TaskThresholdGroup) Remove(ctx context.Context, id int64) error

Remove 删除一条阈值规则

func (*TaskThresholdGroup) ResetWeight

func (ttg *TaskThresholdGroup) ResetWeight(ctx context.Context) error

ResetWeight 重置权值, 当阈值出现不安全的权值时, 即没有办法再进行拖拽排序了, 则进行重置

func (*TaskThresholdGroup) Safe

func (ttg *TaskThresholdGroup) Safe() bool

Safe 检测权值是不是安全的, 就是要保证两个阈值的权值之差, 一定要大于 2, 否则 (a+b)/2会使得出现两个权值一样的阈值

func (*TaskThresholdGroup) Shift

func (ttg *TaskThresholdGroup) Shift(ctx context.Context, id int64, newIndex int) error

Shift 将一个阈值从 oldIndex 地方挪到 newIndex 的地方, 并返回挪动后该阈值的数据, 挪动后只有权值会被修改 函数执行完毕后, 阈值数据还是保持有序的

func (*TaskThresholdGroup) Swap

func (ttg *TaskThresholdGroup) Swap(i, j int)

func (*TaskThresholdGroup) TriggerProblem

func (ttg *TaskThresholdGroup) TriggerProblem(point datasource.Point) (TaskThreshold, bool)

TriggerProblem 按照权重从高到低寻找合适阈值, 执行验证表达式 如果检测出异常, 则返回相关的阈值设置和 true 如果阈值检测没问题, 则返回阈值设置和 false 其他情况都返回 Zero Value 和 false

func (*TaskThresholdGroup) UnmarshalJSON

func (ttg *TaskThresholdGroup) UnmarshalJSON(content []byte) error

UnmarshalJSON 用以 json 反序列化

func (*TaskThresholdGroup) Update

func (ttg *TaskThresholdGroup) Update(ctx context.Context, id int64, threshold TaskThreshold) (TaskThreshold, error)

Update 更新一个阈值配置

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL