base

package
v0.0.0-...-303e327 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 22, 2023 License: Apache-2.0 Imports: 24 Imported by: 0

Documentation

Index

Constants

View Source
const (
	KindFailed  = "failed"
	KindSuccess = "success"
)

statistics stats

View Source
const (
	// EmptyDiskID empty diskID
	EmptyDiskID = proto.DiskID(0)
)

Variables

View Source
var (
	// ErrNoSuchMessageID no such message id
	ErrNoSuchMessageID = errors.New("no such message id")
	// ErrUnmatchedVuids unmatched task vuids
	ErrUnmatchedVuids = errors.New("unmatched task vuids")
)
View Source
var (
	ErrNoTaskInQueue     = errors.New("no task in queue")
	ErrVolNotOnlyOneTask = errors.New("vol not only one task running")
	ErrUpdateVolumeCache = errors.New("update volume cache failed")
)

err use for task

View Source
var Buckets = []float64{1, 5, 10, 25, 50, 100, 250, 500, 1000, 2500, 5000}

Buckets default buckets for stats

View Source
var (
	// ErrVidTaskConflict vid task conflict
	ErrVidTaskConflict = errors.New("vid task conflict")
)

make sure only one task in same volume to run in cluster

View Source
var NewVolTaskLockerOnce sync.Once

NewVolTaskLockerOnce singleton mode:make sure only one instance in global

Functions

func AllocVunitSafe

func AllocVunitSafe(
	ctx context.Context,
	cli IAllocVunit,
	vuid comproto.Vuid,
	volReplicas []comproto.VunitLocation) (ret *client.AllocVunitInfo, err error)

AllocVunitSafe alloc volume unit safe

func DataMountFormat

func DataMountFormat(dataMountBytes [counter.SLOT]int) string

DataMountFormat format data

func FormatPrint

func FormatPrint(statsInfos []ErrorPercent) (res []string)

FormatPrint format print message

func GenTaskID

func GenTaskID(prefix string, vid comproto.Vid) string

GenTaskID return task id

func InsistOn

func InsistOn(ctx context.Context, errMsg string, on func() error)

func NewCounter

func NewCounter(clusterID proto.ClusterID, taskType string, kind string) prometheus.Counter

NewCounter returns statistics counter

func ShouldAllocAndRedo

func ShouldAllocAndRedo(errCode int) bool

ShouldAllocAndRedo return true if should alloc and redo task

func Subtraction

func Subtraction(a, b []comproto.Vuid) (c []comproto.Vuid)

Subtraction c = a - b

Types

type ClusterTopologyStatsMgr

type ClusterTopologyStatsMgr struct {
	// contains filtered or unexported fields
}

ClusterTopologyStatsMgr cluster topology stats manager

func NewClusterTopologyStatisticsMgr

func NewClusterTopologyStatisticsMgr(clusterID proto.ClusterID, buckets []float64) *ClusterTopologyStatsMgr

NewClusterTopologyStatisticsMgr returns cluster topology stats manager

func (*ClusterTopologyStatsMgr) ReportFreeChunk

func (statsMgr *ClusterTopologyStatsMgr) ReportFreeChunk(disk *api.DiskInfoSimple)

ReportFreeChunk report free chunk

type ConsumeInfo

type ConsumeInfo struct {
	Offset int64
	Commit int64
}

ConsumeInfo consume info

type ErrorPercent

type ErrorPercent struct {
	// contains filtered or unexported fields
}

ErrorPercent error percent

type ErrorStats

type ErrorStats struct {
	// contains filtered or unexported fields
}

ErrorStats error stats

func NewErrorStats

func NewErrorStats() *ErrorStats

NewErrorStats returns error stats

func (*ErrorStats) AddFail

func (es *ErrorStats) AddFail(err error)

AddFail add fail statistics

func (*ErrorStats) Stats

func (es *ErrorStats) Stats() (statsResult []ErrorPercent, totalErrCnt uint64)

Stats returns stats

type IAllocVunit

type IAllocVunit interface {
	AllocVolumeUnit(ctx context.Context, vuid comproto.Vuid) (ret *client.AllocVunitInfo, err error)
}

IAllocVunit define the interface of clustermgr used for volume alloc

type IConsumer

type IConsumer interface {
	ConsumeMessages(ctx context.Context, msgCnt int) (msgs []*sarama.ConsumerMessage)
	CommitOffset(ctx context.Context) error
}

IConsumer define the interface of consumer for message consume

func NewKafkaPartitionConsumers

func NewKafkaPartitionConsumers(taskType proto.TaskType, cfg *KafkaConfig, offsetAccessor IConsumerOffset) ([]IConsumer, error)

NewKafkaPartitionConsumers returns kafka partition consumers

func NewPriorityConsumer

func NewPriorityConsumer(taskType proto.TaskType, cfgs []PriorityConsumerConfig, offsetAccessor IConsumerOffset) (IConsumer, error)

NewPriorityConsumer return priority consumer

func NewTopicConsumer

func NewTopicConsumer(taskType proto.TaskType, cfg *KafkaConfig, offsetAccessor IConsumerOffset) (IConsumer, error)

NewTopicConsumer returns topic round-robin partition consumer

type IConsumerOffset

type IConsumerOffset interface {
	GetConsumeOffset(taskType proto.TaskType, topic string, partition int32) (offset int64, err error)
	SetConsumeOffset(taskType proto.TaskType, topic string, partition int32, offset int64) (err error)
}

IConsumerOffset records consume offset

type IProducer

type IProducer interface {
	SendMessage(msg []byte) (err error)
	SendMessages(msgs [][]byte) (err error)
}

IProducer define the interface of producer

func NewMsgSender

func NewMsgSender(cfg *kafka.ProducerCfg) (IProducer, error)

NewMsgSender returns message sender

type KafkaConfig

type KafkaConfig struct {
	Topic      string
	Partitions []int32
	BrokerList []string
}

KafkaConfig kafka config

type KafkaTopicMonitor

type KafkaTopicMonitor struct {
	closer.Closer
	// contains filtered or unexported fields
}

KafkaTopicMonitor kafka monitor

func NewKafkaTopicMonitor

func NewKafkaTopicMonitor(taskType proto.TaskType, clusterID proto.ClusterID, cfg *KafkaConfig, offsetAccessor IConsumerOffset, monitorIntervalS int) (*KafkaTopicMonitor, error)

NewKafkaTopicMonitor returns kafka topic monitor

func (*KafkaTopicMonitor) Close

func (m *KafkaTopicMonitor) Close()

func (*KafkaTopicMonitor) Run

func (m *KafkaTopicMonitor) Run()

Run run kafka monitor

type PartitionConsumer

type PartitionConsumer struct {
	// contains filtered or unexported fields
}

PartitionConsumer partition consumer

func (*PartitionConsumer) CommitOffset

func (c *PartitionConsumer) CommitOffset(ctx context.Context) error

CommitOffset commit offset

func (*PartitionConsumer) ConsumeMessages

func (c *PartitionConsumer) ConsumeMessages(ctx context.Context, msgCnt int) (msgs []*sarama.ConsumerMessage)

ConsumeMessages consume messages

type PriorityConsumerConfig

type PriorityConsumerConfig struct {
	KafkaConfig
	Priority int `json:"priority"`
}

PriorityConsumerConfig priority consumer

type Queue

type Queue struct {
	// contains filtered or unexported fields
}

Queue task queue

func NewQueue

func NewQueue(msgTimeout time.Duration) *Queue

NewQueue return task queue

func (*Queue) Get

func (q *Queue) Get(id string) (interface{}, error)

Get returns message by id

func (*Queue) Pop

func (q *Queue) Pop() (string, interface{}, bool)

Pop fetch a msg from queue。

func (*Queue) Push

func (q *Queue) Push(id string, msg interface{}) error

Push push message to queue id is uniquely identifies。

func (*Queue) Remove

func (q *Queue) Remove(id string) error

Remove remove message by id

func (*Queue) Requeue

func (q *Queue) Requeue(id string, delay time.Duration) error

Requeue :msg while get again after delay。

func (*Queue) Stats

func (q *Queue) Stats() (todo, doing int)

Stats returns queue stats

type TaskCntStats

type TaskCntStats interface {
	StatQueueTaskCnt() (preparing, workerDoing, finishing int)
}

TaskCntStats information of task running on worker

type TaskCommonConfig

type TaskCommonConfig struct {
	PrepareQueueRetryDelayS int `json:"prepare_queue_retry_delay_s"`
	FinishQueueRetryDelayS  int `json:"finish_queue_retry_delay_s"`
	CancelPunishDurationS   int `json:"cancel_punish_duration_s"`
	WorkQueueSize           int `json:"work_queue_size"`
	CollectTaskIntervalS    int `json:"collect_task_interval_s"`
	CheckTaskIntervalS      int `json:"check_task_interval_s"`
	DiskConcurrency         int `json:"disk_concurrency"`
}

TaskCommonConfig task common config

func (*TaskCommonConfig) CheckAndFix

func (conf *TaskCommonConfig) CheckAndFix()

CheckAndFix check and fix task common config

type TaskQueue

type TaskQueue struct {
	// contains filtered or unexported fields
}

TaskQueue task queue

func NewTaskQueue

func NewTaskQueue(retryDelay time.Duration) *TaskQueue

NewTaskQueue returns task queue

func (*TaskQueue) PopTask

func (q *TaskQueue) PopTask() (string, WorkerTask, bool)

PopTask return args: taskID, task, flag of task exist

func (*TaskQueue) PushTask

func (q *TaskQueue) PushTask(taskID string, task WorkerTask)

PushTask push task to queue

func (*TaskQueue) Query

func (q *TaskQueue) Query(taskID string) (WorkerTask, bool)

Query find task by taskID

func (*TaskQueue) RemoveTask

func (q *TaskQueue) RemoveTask(taskID string) error

RemoveTask remove task by taskID

func (*TaskQueue) RetryTask

func (q *TaskQueue) RetryTask(taskID string)

RetryTask retry task by taskID

func (*TaskQueue) StatsTasks

func (q *TaskQueue) StatsTasks() (todo int, doing int)

StatsTasks returns task stats

type TaskRunDetailInfo

type TaskRunDetailInfo struct {
	Statistics   proto.TaskStatistics `json:"statistics"`
	StartTime    time.Time            `json:"start_time"`
	CompleteTime time.Time            `json:"complete_time"`
	Completed    bool                 `json:"completed"`
}

TaskRunDetailInfo task run detail info

type TaskStatsMgr

type TaskStatsMgr struct {
	TaskRunInfos map[string]TaskRunDetailInfo
	// contains filtered or unexported fields
}

TaskStatsMgr task stats manager

func NewTaskStatsMgr

func NewTaskStatsMgr(clusterID proto.ClusterID, taskType proto.TaskType) *TaskStatsMgr

NewTaskStatsMgr returns task stats manager

func NewTaskStatsMgrAndRun

func NewTaskStatsMgrAndRun(clusterID proto.ClusterID, taskType proto.TaskType, taskCntStats TaskCntStats) *TaskStatsMgr

NewTaskStatsMgrAndRun run task stats manager

func (*TaskStatsMgr) CancelTask

func (statsMgr *TaskStatsMgr) CancelTask()

CancelTask cancel task

func (*TaskStatsMgr) Counters

func (statsMgr *TaskStatsMgr) Counters() (increaseDataSize, increaseShardCnt [counter.SLOT]int)

Counters returns task stats counters

func (*TaskStatsMgr) QueryTaskDetail

func (statsMgr *TaskStatsMgr) QueryTaskDetail(taskID string) (detail TaskRunDetailInfo, err error)

QueryTaskDetail find task detail info

func (*TaskStatsMgr) ReclaimTask

func (statsMgr *TaskStatsMgr) ReclaimTask()

ReclaimTask reclaim task

func (*TaskStatsMgr) ReportTaskCntLoop

func (statsMgr *TaskStatsMgr) ReportTaskCntLoop()

ReportTaskCntLoop report task count

func (*TaskStatsMgr) ReportWorkerTaskStats

func (statsMgr *TaskStatsMgr) ReportWorkerTaskStats(taskID string, s proto.TaskStatistics, increaseDataSize, increaseShardCnt int)

ReportWorkerTaskStats report worker task stats

type TopicConsumer

type TopicConsumer struct {
	// contains filtered or unexported fields
}

TopicConsumer rotate consume msg among partition consumers

func (*TopicConsumer) CommitOffset

func (c *TopicConsumer) CommitOffset(ctx context.Context) error

CommitOffset commit offset

func (*TopicConsumer) ConsumeMessages

func (c *TopicConsumer) ConsumeMessages(ctx context.Context, msgCnt int) (msgs []*sarama.ConsumerMessage)

ConsumeMessages consumer messages

type VolTaskLocker

type VolTaskLocker struct {
	// contains filtered or unexported fields
}

VolTaskLocker volume task locker

func VolTaskLockerInst

func VolTaskLockerInst() *VolTaskLocker

VolTaskLockerInst ensure that only one background task is executing on the same volume

func (*VolTaskLocker) TryLock

func (m *VolTaskLocker) TryLock(ctx context.Context, vid proto.Vid) error

TryLock try lock task volume and return error if there is task doing

func (*VolTaskLocker) Unlock

func (m *VolTaskLocker) Unlock(ctx context.Context, vid proto.Vid)

Unlock unlock task volume

type WorkerTask

type WorkerTask interface {
	GetSources() []proto.VunitLocation
	GetDestination() proto.VunitLocation
	SetDestination(dest proto.VunitLocation)
}

WorkerTask define worker task interface

type WorkerTaskQueue

type WorkerTaskQueue struct {
	// contains filtered or unexported fields
}

WorkerTaskQueue task queue for worker

func NewWorkerTaskQueue

func NewWorkerTaskQueue(cancelPunishDuration time.Duration) *WorkerTaskQueue

NewWorkerTaskQueue return worker task queue

func (*WorkerTaskQueue) Acquire

func (q *WorkerTaskQueue) Acquire(idc string) (taskID string, wtask WorkerTask, exist bool)

Acquire acquire task by idc

func (*WorkerTaskQueue) AddPreparedTask

func (q *WorkerTaskQueue) AddPreparedTask(idc, taskID string, wtask WorkerTask)

AddPreparedTask add prepared task

func (*WorkerTaskQueue) Cancel

func (q *WorkerTaskQueue) Cancel(idc, taskID string, src []proto.VunitLocation, dst proto.VunitLocation) error

Cancel cancel task

func (*WorkerTaskQueue) Complete

func (q *WorkerTaskQueue) Complete(idc, taskID string, src []proto.VunitLocation, dst proto.VunitLocation) (WorkerTask, error)

Complete complete task

func (*WorkerTaskQueue) Query

func (q *WorkerTaskQueue) Query(idc, taskID string) (WorkerTask, error)

Query find task by idc and taskID

func (*WorkerTaskQueue) Reclaim

func (q *WorkerTaskQueue) Reclaim(idc, taskID string, src []proto.VunitLocation, oldDest, newDest proto.VunitLocation, newDiskID proto.DiskID) error

Reclaim reclaim task

func (*WorkerTaskQueue) Renewal

func (q *WorkerTaskQueue) Renewal(idc, taskID string) error

Renewal renewal task

func (*WorkerTaskQueue) SetLeaseExpiredS

func (q *WorkerTaskQueue) SetLeaseExpiredS(dura time.Duration)

SetLeaseExpiredS set lease expired time

func (*WorkerTaskQueue) StatsTasks

func (q *WorkerTaskQueue) StatsTasks() (todo int, doing int)

StatsTasks returns task stats

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL