tasks

package
v1.1.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 15, 2024 License: Apache-2.0 Imports: 16 Imported by: 0

Documentation

Index

Constants

View Source
const (
	JTAny             JobType = iota
	JTCustomizedStart         = 100

	JTInvalid = 10000
)

Variables

View Source
var (
	ErrDispatcherNotFound = moerr.NewInternalErrorNoCtx("tae sched: dispatcher not found")
	ErrSchedule           = moerr.NewInternalErrorNoCtx("tae sched: cannot schedule")
)
View Source
var (
	ErrBadTaskRequestPara    = moerr.NewInternalErrorNoCtx("tae scheduler: bad task request parameters")
	ErrScheduleScopeConflict = moerr.NewInternalErrorNoCtx("tae scheduler: scope conflict")
)
View Source
var DefaultScopeSharder = func(scope *common.ID) int {
	if scope == nil {
		return 0
	}
	hasher := fnv.New64a()
	hasher.Write(types.EncodeUint64(&scope.TableID))
	hasher.Write(types.EncodeUuid(scope.SegmentID()))
	return int(hasher.Sum64())
}
View Source
var (
	ErrDispatchWrongTask = moerr.NewInternalErrorNoCtx("tae: wrong task type")
)
View Source
var (
	ErrTaskHandleEnqueue = moerr.NewInternalErrorNoCtx("tae: task handle enqueue")
)
View Source
var SerialJobScheduler = new(simpleJobScheduler)
View Source
var WaitableCtx = &Context{Waitable: true}

Functions

func IsSameScope

func IsSameScope(left, right *common.ID) bool

func JobName added in v0.8.0

func JobName(jt JobType) string

func NewParallelJobScheduler added in v0.7.0

func NewParallelJobScheduler(parallism int) *parallelJobScheduler

func NewPoolHandler

func NewPoolHandler(ctx context.Context, num int) *poolHandler

func NewSingleWorkerHandler

func NewSingleWorkerHandler(ctx context.Context, name string) *singleWorkerHandler

func NextTaskId

func NextTaskId() uint64

func RegisterJobType added in v0.8.0

func RegisterJobType(jt JobType, jn string)

func RegisterType

func RegisterType(t TaskType, name string)

func TaskName

func TaskName(t TaskType) string

Types

type BaseDispatcher

type BaseDispatcher struct {
	// contains filtered or unexported fields
}

func NewBaseDispatcher

func NewBaseDispatcher() *BaseDispatcher

func (*BaseDispatcher) Close

func (d *BaseDispatcher) Close() error

func (*BaseDispatcher) Dispatch

func (d *BaseDispatcher) Dispatch(task Task)

func (*BaseDispatcher) RegisterHandler

func (d *BaseDispatcher) RegisterHandler(t TaskType, h TaskHandler)

type BaseScheduler

type BaseScheduler struct {
	ops.OpWorker

	Dispatchers map[TaskType]Dispatcher
	// contains filtered or unexported fields
}

func NewBaseScheduler

func NewBaseScheduler(ctx context.Context, name string) *BaseScheduler

func (*BaseScheduler) RegisterDispatcher

func (s *BaseScheduler) RegisterDispatcher(t TaskType, dispatcher Dispatcher)

func (*BaseScheduler) Schedule

func (s *BaseScheduler) Schedule(task Task) error

func (*BaseScheduler) Stop

func (s *BaseScheduler) Stop()

type BaseScopedDispatcher

type BaseScopedDispatcher struct {
	// contains filtered or unexported fields
}

func NewBaseScopedDispatcher

func NewBaseScopedDispatcher(sharder ScopedTaskSharder) *BaseScopedDispatcher

func (*BaseScopedDispatcher) AddHandle

func (d *BaseScopedDispatcher) AddHandle(h TaskHandler)

func (*BaseScopedDispatcher) Close

func (d *BaseScopedDispatcher) Close() error

func (*BaseScopedDispatcher) Dispatch

func (d *BaseScopedDispatcher) Dispatch(task Task)

type BaseTask

type BaseTask struct {
	ops.Op
	// contains filtered or unexported fields
}

func NewBaseTask

func NewBaseTask(impl Task, taskType TaskType, ctx *Context) *BaseTask

func (*BaseTask) Cancel

func (task *BaseTask) Cancel() (err error)

func (*BaseTask) Execute

func (task *BaseTask) Execute() (err error)

func (*BaseTask) ID

func (task *BaseTask) ID() uint64

func (*BaseTask) Name

func (task *BaseTask) Name() string

func (*BaseTask) Type

func (task *BaseTask) Type() TaskType

type BaseTaskHandler

type BaseTaskHandler struct {
	ops.OpWorker
}

func NewBaseEventHandler

func NewBaseEventHandler(ctx context.Context, name string) *BaseTaskHandler

func (*BaseTaskHandler) Close

func (h *BaseTaskHandler) Close() error

func (*BaseTaskHandler) Enqueue

func (h *BaseTaskHandler) Enqueue(task Task)

func (*BaseTaskHandler) Execute

func (h *BaseTaskHandler) Execute(task Task)

type Context

type Context struct {
	DoneCB   ops.OpDoneCB
	Waitable bool
}

type Dispatcher

type Dispatcher interface {
	io.Closer
	Dispatch(Task)
}

type FnTask

type FnTask struct {
	*BaseTask
	Fn FuncT
}

func NewFnTask

func NewFnTask(ctx *Context, taskType TaskType, fn FuncT) *FnTask

func (*FnTask) Execute

func (task *FnTask) Execute(ctx context.Context) error

type FuncT

type FuncT = func() error

type Job added in v0.7.0

type Job struct {
	// contains filtered or unexported fields
}

func (*Job) Close added in v0.7.0

func (job *Job) Close()

func (*Job) DoneWithErr added in v0.8.0

func (job *Job) DoneWithErr(err error)

func (*Job) GetResult added in v0.7.0

func (job *Job) GetResult() *JobResult

func (*Job) ID added in v0.8.0

func (job *Job) ID() string

func (*Job) Init added in v0.8.0

func (job *Job) Init(
	ctx context.Context,
	id string,
	typ JobType,
	exec JobExecutor)

func (*Job) Reset added in v0.8.0

func (job *Job) Reset()

func (*Job) Run added in v0.7.0

func (job *Job) Run()

func (*Job) String added in v0.8.0

func (job *Job) String() string

func (*Job) Type added in v0.8.0

func (job *Job) Type() JobType

func (*Job) WaitDone added in v0.7.0

func (job *Job) WaitDone() *JobResult

type JobExecutor added in v0.7.0

type JobExecutor = func(context.Context) *JobResult

type JobResult added in v0.7.0

type JobResult struct {
	Err error
	Res any
}

type JobScheduler added in v0.7.0

type JobScheduler interface {
	Schedule(job *Job) error
	Stop()
}

type JobType added in v0.8.0

type JobType = uint16

type MScopedTask

type MScopedTask interface {
	Task
	Scopes() []common.ID
}

type MultiScopedFnTask

type MultiScopedFnTask struct {
	*FnTask
	// contains filtered or unexported fields
}

func NewMultiScopedFnTask

func NewMultiScopedFnTask(ctx *Context, taskType TaskType, scopes []common.ID, fn FuncT) *MultiScopedFnTask

func (*MultiScopedFnTask) Scopes

func (task *MultiScopedFnTask) Scopes() []common.ID

type Scheduler

type Scheduler interface {
	Start()
	Stop()
	Schedule(Task) error
}

type ScopedFnTask

type ScopedFnTask struct {
	*FnTask
	// contains filtered or unexported fields
}

func NewScopedFnTask

func NewScopedFnTask(ctx *Context, taskType TaskType, scope *common.ID, fn FuncT) *ScopedFnTask

func (*ScopedFnTask) Scope

func (task *ScopedFnTask) Scope() *common.ID

type ScopedTask

type ScopedTask interface {
	Task
	Scope() *common.ID
}

type ScopedTaskSharder

type ScopedTaskSharder = func(scope *common.ID) int

type Task

type Task interface {
	base.IOp
	ID() uint64
	Type() TaskType
	Cancel() error
	Name() string
}

type TaskHandler

type TaskHandler interface {
	io.Closer
	Start()
	Enqueue(Task)
	Execute(Task)
}

type TaskScheduler

type TaskScheduler interface {
	Scheduler
	ScheduleTxnTask(ctx *Context, taskType TaskType, factory TxnTaskFactory) (Task, error)
	ScheduleMultiScopedTxnTask(ctx *Context, taskType TaskType, scopes []common.ID, factory TxnTaskFactory) (Task, error)
	ScheduleMultiScopedFn(ctx *Context, taskType TaskType, scopes []common.ID, fn FuncT) (Task, error)
	ScheduleFn(ctx *Context, taskType TaskType, fn func() error) (Task, error)
	ScheduleScopedFn(ctx *Context, taskType TaskType, scope *common.ID, fn func() error) (Task, error)

	GetCheckpointedLSN() uint64
	GetPenddingLSNCnt() uint64
	GetCheckpointTS() types.TS
}

type TaskType

type TaskType uint16
const (
	NoopTask TaskType = iota
	MockTask
	CustomizedTask

	DataCompactionTask
	CheckpointTask
	GCTask
	IOTask
)

type TxnTaskFactory

type TxnTaskFactory = func(ctx *Context, txn txnif.AsyncTxn) (Task, error)

Directories

Path Synopsis
ops

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL