Documentation ¶
Index ¶
- Constants
- Variables
- func AllocVunitSafe(ctx context.Context, cli IAllocVunit, vuid comproto.Vuid, ...) (ret *client.AllocVunitInfo, err error)
- func DataMountFormat(dataMountBytes [counter.SLOT]int) string
- func FormatPrint(statsInfos []ErrorPercent) (res []string)
- func GenTaskID(prefix string, vid comproto.Vid) string
- func InsistOn(ctx context.Context, errMsg string, on func() error)
- func NewCounter(clusterID proto.ClusterID, taskType string, kind string) prometheus.Counter
- func ShouldAllocAndRedo(errCode int) bool
- func Subtraction(a, b []comproto.Vuid) (c []comproto.Vuid)
- type ClusterTopologyStatsMgr
- type ConsumeInfo
- type ErrorPercent
- type ErrorStats
- type IAllocVunit
- type IConsumer
- func NewKafkaPartitionConsumers(taskType proto.TaskType, cfg *KafkaConfig, offsetAccessor IConsumerOffset) ([]IConsumer, error)
- func NewPriorityConsumer(taskType proto.TaskType, cfgs []PriorityConsumerConfig, ...) (IConsumer, error)
- func NewTopicConsumer(taskType proto.TaskType, cfg *KafkaConfig, offsetAccessor IConsumerOffset) (IConsumer, error)
- type IConsumerOffset
- type IProducer
- type KafkaConfig
- type KafkaTopicMonitor
- type PartitionConsumer
- type PriorityConsumerConfig
- type Queue
- func (q *Queue) Get(id string) (interface{}, error)
- func (q *Queue) Pop() (string, interface{}, bool)
- func (q *Queue) Push(id string, msg interface{}) error
- func (q *Queue) Remove(id string) error
- func (q *Queue) Requeue(id string, delay time.Duration) error
- func (q *Queue) Stats() (todo, doing int)
- type TaskCntStats
- type TaskCommonConfig
- type TaskQueue
- func (q *TaskQueue) PopTask() (string, WorkerTask, bool)
- func (q *TaskQueue) PushTask(taskID string, task WorkerTask)
- func (q *TaskQueue) Query(taskID string) (WorkerTask, bool)
- func (q *TaskQueue) RemoveTask(taskID string) error
- func (q *TaskQueue) RetryTask(taskID string)
- func (q *TaskQueue) StatsTasks() (todo int, doing int)
- type TaskRunDetailInfo
- type TaskStatsMgr
- func (statsMgr *TaskStatsMgr) CancelTask()
- func (statsMgr *TaskStatsMgr) Counters() (increaseDataSize, increaseShardCnt [counter.SLOT]int)
- func (statsMgr *TaskStatsMgr) QueryTaskDetail(taskID string) (detail TaskRunDetailInfo, err error)
- func (statsMgr *TaskStatsMgr) ReclaimTask()
- func (statsMgr *TaskStatsMgr) ReportTaskCntLoop()
- func (statsMgr *TaskStatsMgr) ReportWorkerTaskStats(taskID string, s proto.TaskStatistics, increaseDataSize, increaseShardCnt int)
- type TopicConsumer
- type VolTaskLocker
- type WorkerTask
- type WorkerTaskQueue
- func (q *WorkerTaskQueue) Acquire(idc string) (taskID string, wtask WorkerTask, exist bool)
- func (q *WorkerTaskQueue) AddPreparedTask(idc, taskID string, wtask WorkerTask)
- func (q *WorkerTaskQueue) Cancel(idc, taskID string, src []proto.VunitLocation, dst proto.VunitLocation) error
- func (q *WorkerTaskQueue) Complete(idc, taskID string, src []proto.VunitLocation, dst proto.VunitLocation) (WorkerTask, error)
- func (q *WorkerTaskQueue) Query(idc, taskID string) (WorkerTask, error)
- func (q *WorkerTaskQueue) Reclaim(idc, taskID string, src []proto.VunitLocation, ...) error
- func (q *WorkerTaskQueue) Renewal(idc, taskID string) error
- func (q *WorkerTaskQueue) SetLeaseExpiredS(dura time.Duration)
- func (q *WorkerTaskQueue) StatsTasks() (todo int, doing int)
Constants ¶
const ( KindFailed = "failed" KindSuccess = "success" )
statistics stats
const ( // EmptyDiskID empty diskID EmptyDiskID = proto.DiskID(0) )
Variables ¶
var ( // ErrNoSuchMessageID no such message id ErrNoSuchMessageID = errors.New("no such message id") // ErrUnmatchedVuids unmatched task vuids ErrUnmatchedVuids = errors.New("unmatched task vuids") )
var ( ErrNoTaskInQueue = errors.New("no task in queue") ErrVolNotOnlyOneTask = errors.New("vol not only one task running") ErrUpdateVolumeCache = errors.New("update volume cache failed") )
err use for task
var Buckets = []float64{1, 5, 10, 25, 50, 100, 250, 500, 1000, 2500, 5000}
Buckets default buckets for stats
var ( // ErrVidTaskConflict vid task conflict ErrVidTaskConflict = errors.New("vid task conflict") )
make sure only one task in same volume to run in cluster
var NewVolTaskLockerOnce sync.Once
NewVolTaskLockerOnce singleton mode:make sure only one instance in global
Functions ¶
func AllocVunitSafe ¶
func AllocVunitSafe( ctx context.Context, cli IAllocVunit, vuid comproto.Vuid, volReplicas []comproto.VunitLocation) (ret *client.AllocVunitInfo, err error)
AllocVunitSafe alloc volume unit safe
func DataMountFormat ¶
DataMountFormat format data
func FormatPrint ¶
func FormatPrint(statsInfos []ErrorPercent) (res []string)
FormatPrint format print message
func NewCounter ¶
NewCounter returns statistics counter
func ShouldAllocAndRedo ¶
ShouldAllocAndRedo return true if should alloc and redo task
Types ¶
type ClusterTopologyStatsMgr ¶
type ClusterTopologyStatsMgr struct {
// contains filtered or unexported fields
}
ClusterTopologyStatsMgr cluster topology stats manager
func NewClusterTopologyStatisticsMgr ¶
func NewClusterTopologyStatisticsMgr(clusterID proto.ClusterID, buckets []float64) *ClusterTopologyStatsMgr
NewClusterTopologyStatisticsMgr returns cluster topology stats manager
func (*ClusterTopologyStatsMgr) ReportFreeChunk ¶
func (statsMgr *ClusterTopologyStatsMgr) ReportFreeChunk(disk *api.DiskInfoSimple)
ReportFreeChunk report free chunk
type ErrorPercent ¶
type ErrorPercent struct {
// contains filtered or unexported fields
}
ErrorPercent error percent
type ErrorStats ¶
type ErrorStats struct {
// contains filtered or unexported fields
}
ErrorStats error stats
func (*ErrorStats) Stats ¶
func (es *ErrorStats) Stats() (statsResult []ErrorPercent, totalErrCnt uint64)
Stats returns stats
type IAllocVunit ¶
type IAllocVunit interface {
AllocVolumeUnit(ctx context.Context, vuid comproto.Vuid) (ret *client.AllocVunitInfo, err error)
}
IAllocVunit define the interface of clustermgr used for volume alloc
type IConsumer ¶
type IConsumer interface { ConsumeMessages(ctx context.Context, msgCnt int) (msgs []*sarama.ConsumerMessage) CommitOffset(ctx context.Context) error }
IConsumer define the interface of consumer for message consume
func NewKafkaPartitionConsumers ¶
func NewKafkaPartitionConsumers(taskType proto.TaskType, cfg *KafkaConfig, offsetAccessor IConsumerOffset) ([]IConsumer, error)
NewKafkaPartitionConsumers returns kafka partition consumers
func NewPriorityConsumer ¶
func NewPriorityConsumer(taskType proto.TaskType, cfgs []PriorityConsumerConfig, offsetAccessor IConsumerOffset) (IConsumer, error)
NewPriorityConsumer return priority consumer
func NewTopicConsumer ¶
func NewTopicConsumer(taskType proto.TaskType, cfg *KafkaConfig, offsetAccessor IConsumerOffset) (IConsumer, error)
NewTopicConsumer returns topic round-robin partition consumer
type IConsumerOffset ¶
type IConsumerOffset interface { GetConsumeOffset(taskType proto.TaskType, topic string, partition int32) (offset int64, err error) SetConsumeOffset(taskType proto.TaskType, topic string, partition int32, offset int64) (err error) }
IConsumerOffset records consume offset
type IProducer ¶
type IProducer interface { SendMessage(msg []byte) (err error) SendMessages(msgs [][]byte) (err error) }
IProducer define the interface of producer
func NewMsgSender ¶
func NewMsgSender(cfg *kafka.ProducerCfg) (IProducer, error)
NewMsgSender returns message sender
type KafkaConfig ¶
KafkaConfig kafka config
type KafkaTopicMonitor ¶
KafkaTopicMonitor kafka monitor
func NewKafkaTopicMonitor ¶
func NewKafkaTopicMonitor(taskType proto.TaskType, clusterID proto.ClusterID, cfg *KafkaConfig, offsetAccessor IConsumerOffset, monitorIntervalS int) (*KafkaTopicMonitor, error)
NewKafkaTopicMonitor returns kafka topic monitor
func (*KafkaTopicMonitor) Close ¶
func (m *KafkaTopicMonitor) Close()
type PartitionConsumer ¶
type PartitionConsumer struct {
// contains filtered or unexported fields
}
PartitionConsumer partition consumer
func (*PartitionConsumer) CommitOffset ¶
func (c *PartitionConsumer) CommitOffset(ctx context.Context) error
CommitOffset commit offset
func (*PartitionConsumer) ConsumeMessages ¶
func (c *PartitionConsumer) ConsumeMessages(ctx context.Context, msgCnt int) (msgs []*sarama.ConsumerMessage)
ConsumeMessages consume messages
type PriorityConsumerConfig ¶
type PriorityConsumerConfig struct { KafkaConfig Priority int `json:"priority"` }
PriorityConsumerConfig priority consumer
type Queue ¶
type Queue struct {
// contains filtered or unexported fields
}
Queue task queue
type TaskCntStats ¶
type TaskCntStats interface {
StatQueueTaskCnt() (preparing, workerDoing, finishing int)
}
TaskCntStats information of task running on worker
type TaskCommonConfig ¶
type TaskCommonConfig struct { PrepareQueueRetryDelayS int `json:"prepare_queue_retry_delay_s"` FinishQueueRetryDelayS int `json:"finish_queue_retry_delay_s"` CancelPunishDurationS int `json:"cancel_punish_duration_s"` WorkQueueSize int `json:"work_queue_size"` CollectTaskIntervalS int `json:"collect_task_interval_s"` CheckTaskIntervalS int `json:"check_task_interval_s"` DiskConcurrency int `json:"disk_concurrency"` }
TaskCommonConfig task common config
func (*TaskCommonConfig) CheckAndFix ¶
func (conf *TaskCommonConfig) CheckAndFix()
CheckAndFix check and fix task common config
type TaskQueue ¶
type TaskQueue struct {
// contains filtered or unexported fields
}
TaskQueue task queue
func NewTaskQueue ¶
NewTaskQueue returns task queue
func (*TaskQueue) PopTask ¶
func (q *TaskQueue) PopTask() (string, WorkerTask, bool)
PopTask return args: taskID, task, flag of task exist
func (*TaskQueue) PushTask ¶
func (q *TaskQueue) PushTask(taskID string, task WorkerTask)
PushTask push task to queue
func (*TaskQueue) Query ¶
func (q *TaskQueue) Query(taskID string) (WorkerTask, bool)
Query find task by taskID
func (*TaskQueue) RemoveTask ¶
RemoveTask remove task by taskID
func (*TaskQueue) StatsTasks ¶
StatsTasks returns task stats
type TaskRunDetailInfo ¶
type TaskRunDetailInfo struct { Statistics proto.TaskStatistics `json:"statistics"` StartTime time.Time `json:"start_time"` CompleteTime time.Time `json:"complete_time"` Completed bool `json:"completed"` }
TaskRunDetailInfo task run detail info
type TaskStatsMgr ¶
type TaskStatsMgr struct { TaskRunInfos map[string]TaskRunDetailInfo // contains filtered or unexported fields }
TaskStatsMgr task stats manager
func NewTaskStatsMgr ¶
func NewTaskStatsMgr(clusterID proto.ClusterID, taskType proto.TaskType) *TaskStatsMgr
NewTaskStatsMgr returns task stats manager
func NewTaskStatsMgrAndRun ¶
func NewTaskStatsMgrAndRun(clusterID proto.ClusterID, taskType proto.TaskType, taskCntStats TaskCntStats) *TaskStatsMgr
NewTaskStatsMgrAndRun run task stats manager
func (*TaskStatsMgr) Counters ¶
func (statsMgr *TaskStatsMgr) Counters() (increaseDataSize, increaseShardCnt [counter.SLOT]int)
Counters returns task stats counters
func (*TaskStatsMgr) QueryTaskDetail ¶
func (statsMgr *TaskStatsMgr) QueryTaskDetail(taskID string) (detail TaskRunDetailInfo, err error)
QueryTaskDetail find task detail info
func (*TaskStatsMgr) ReclaimTask ¶
func (statsMgr *TaskStatsMgr) ReclaimTask()
ReclaimTask reclaim task
func (*TaskStatsMgr) ReportTaskCntLoop ¶
func (statsMgr *TaskStatsMgr) ReportTaskCntLoop()
ReportTaskCntLoop report task count
func (*TaskStatsMgr) ReportWorkerTaskStats ¶
func (statsMgr *TaskStatsMgr) ReportWorkerTaskStats(taskID string, s proto.TaskStatistics, increaseDataSize, increaseShardCnt int)
ReportWorkerTaskStats report worker task stats
type TopicConsumer ¶
type TopicConsumer struct {
// contains filtered or unexported fields
}
TopicConsumer rotate consume msg among partition consumers
func (*TopicConsumer) CommitOffset ¶
func (c *TopicConsumer) CommitOffset(ctx context.Context) error
CommitOffset commit offset
func (*TopicConsumer) ConsumeMessages ¶
func (c *TopicConsumer) ConsumeMessages(ctx context.Context, msgCnt int) (msgs []*sarama.ConsumerMessage)
ConsumeMessages consumer messages
type VolTaskLocker ¶
type VolTaskLocker struct {
// contains filtered or unexported fields
}
VolTaskLocker volume task locker
func VolTaskLockerInst ¶
func VolTaskLockerInst() *VolTaskLocker
VolTaskLockerInst ensure that only one background task is executing on the same volume
type WorkerTask ¶
type WorkerTask interface { GetSources() []proto.VunitLocation GetDestination() proto.VunitLocation SetDestination(dest proto.VunitLocation) }
WorkerTask define worker task interface
type WorkerTaskQueue ¶
type WorkerTaskQueue struct {
// contains filtered or unexported fields
}
WorkerTaskQueue task queue for worker
func NewWorkerTaskQueue ¶
func NewWorkerTaskQueue(cancelPunishDuration time.Duration) *WorkerTaskQueue
NewWorkerTaskQueue return worker task queue
func (*WorkerTaskQueue) Acquire ¶
func (q *WorkerTaskQueue) Acquire(idc string) (taskID string, wtask WorkerTask, exist bool)
Acquire acquire task by idc
func (*WorkerTaskQueue) AddPreparedTask ¶
func (q *WorkerTaskQueue) AddPreparedTask(idc, taskID string, wtask WorkerTask)
AddPreparedTask add prepared task
func (*WorkerTaskQueue) Cancel ¶
func (q *WorkerTaskQueue) Cancel(idc, taskID string, src []proto.VunitLocation, dst proto.VunitLocation) error
Cancel cancel task
func (*WorkerTaskQueue) Complete ¶
func (q *WorkerTaskQueue) Complete(idc, taskID string, src []proto.VunitLocation, dst proto.VunitLocation) (WorkerTask, error)
Complete complete task
func (*WorkerTaskQueue) Query ¶
func (q *WorkerTaskQueue) Query(idc, taskID string) (WorkerTask, error)
Query find task by idc and taskID
func (*WorkerTaskQueue) Reclaim ¶
func (q *WorkerTaskQueue) Reclaim(idc, taskID string, src []proto.VunitLocation, oldDest, newDest proto.VunitLocation, newDiskID proto.DiskID) error
Reclaim reclaim task
func (*WorkerTaskQueue) Renewal ¶
func (q *WorkerTaskQueue) Renewal(idc, taskID string) error
Renewal renewal task
func (*WorkerTaskQueue) SetLeaseExpiredS ¶
func (q *WorkerTaskQueue) SetLeaseExpiredS(dura time.Duration)
SetLeaseExpiredS set lease expired time
func (*WorkerTaskQueue) StatsTasks ¶
func (q *WorkerTaskQueue) StatsTasks() (todo int, doing int)
StatsTasks returns task stats