worker

package
v1.0.7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 21, 2021 License: Apache-2.0 Imports: 63 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// GCBatchSize is batch size for gc process
	GCBatchSize = 1024
	// GCInterval is the interval to gc
	GCInterval = time.Hour
)
View Source
var (
	DefaultCheckInterval           = 5 * time.Second
	DefaultBackoffRollback         = 5 * time.Minute
	DefaultBackoffMin              = 1 * time.Second
	DefaultBackoffMax              = 5 * time.Minute
	DefaultBackoffJitter           = true
	DefaultBackoffFactor   float64 = 2
)

Backoff related constants

View Source
var HandledPointerKey = []byte("!DM!handledPointer")

HandledPointerKey is key of HandledPointer which point to the last handled log

View Source
var NewRelayHolder = NewRealRelayHolder

NewRelayHolder is relay holder initializer it can be used for testing

View Source
var NewTaskStatusChecker = NewRealTaskStatusChecker

NewTaskStatusChecker is a TaskStatusChecker initializer

View Source
var SampleConfigFile string

SampleConfigFile is sample config file of dm-worker later we can read it from dm/worker/dm-worker.toml and assign it to SampleConfigFile while we build dm-worker

View Source
var (

	// TaskLogPrefix is  prefix of task log key
	TaskLogPrefix = []byte("!DM!TaskLog")
)
View Source
var TaskMetaPrefix = []byte("!DM!TaskMeta")

TaskMetaPrefix is prefix of task meta key

Functions

func ClearHandledPointer

func ClearHandledPointer(tctx *tcontext.Context, h dbOperator) error

ClearHandledPointer clears the handled pointer in kv DB.

func ClearOperationLog

func ClearOperationLog(tctx *tcontext.Context, h dbOperator) error

ClearOperationLog clears the task operation log.

func ClearTaskMeta

func ClearTaskMeta(tctx *tcontext.Context, h dbOperator) error

ClearTaskMeta clears all task meta in kv DB.

func CloneTaskLog

func CloneTaskLog(log *pb.TaskLog) *pb.TaskLog

CloneTaskLog returns a task log copy

func CloneTaskMeta

func CloneTaskMeta(task *pb.TaskMeta) *pb.TaskMeta

CloneTaskMeta returns a task meta copy

func DecodeTaskLogKey

func DecodeTaskLogKey(key []byte) (int64, error)

DecodeTaskLogKey decodes task log key and returns its log ID

func DecodeTaskMetaKey

func DecodeTaskMetaKey(key []byte) string

DecodeTaskMetaKey decodes task meta key and returns task name

func DeleteTaskMeta

func DeleteTaskMeta(h dbOperator, name string) error

DeleteTaskMeta delete task meta from kv DB

func EncodeTaskLogKey

func EncodeTaskLogKey(id int64) []byte

EncodeTaskLogKey encodes log ID into a task log key

func EncodeTaskMetaKey

func EncodeTaskMetaKey(name string) []byte

EncodeTaskMetaKey encodes take name into a task meta key

func GetTaskMeta

func GetTaskMeta(h dbOperator, name string) (*pb.TaskMeta, error)

GetTaskMeta returns task meta by given name

func InitConditionHub

func InitConditionHub(w *Worker)

InitConditionHub inits the singleton instance of ConditionHub

func InitStatus

func InitStatus(lis net.Listener)

InitStatus initializes the HTTP status server

func LoadTaskMetas

func LoadTaskMetas(h dbOperator) (map[string]*pb.TaskMeta, error)

LoadTaskMetas loads all task metas from kv db

func RegistryMetrics added in v1.0.4

func RegistryMetrics()

RegistryMetrics registries metrics for worker

func SetTaskMeta

func SetTaskMeta(h dbOperator, task *pb.TaskMeta) error

SetTaskMeta saves task meta into kv db

func VerifyTaskMeta

func VerifyTaskMeta(task *pb.TaskMeta) error

VerifyTaskMeta verify legality of take meta

Types

type CheckerConfig

type CheckerConfig struct {
	CheckEnable     bool     `toml:"check-enable" json:"check-enable"`
	BackoffRollback duration `toml:"backoff-rollback" json:"backoff-rollback"`
	BackoffMax      duration `toml:"backoff-max" json:"backoff-max"`
	// unexpose config
	CheckInterval duration `json:"-"`
	BackoffMin    duration `json:"-"`
	BackoffJitter bool     `json:"-"`
	BackoffFactor float64  `json:"-"`
}

CheckerConfig is configuration used for TaskStatusChecker

type ConditionHub

type ConditionHub struct {
	// contains filtered or unexported fields
}

ConditionHub holds a DM-worker and it is used for wait condition detection

func GetConditionHub

func GetConditionHub() *ConditionHub

GetConditionHub returns singleton instance of ConditionHub

type Config

type Config struct {
	LogLevel  string `toml:"log-level" json:"log-level"`
	LogFile   string `toml:"log-file" json:"log-file"`
	LogRotate string `toml:"log-rotate" json:"log-rotate"`

	WorkerAddr string `toml:"worker-addr" json:"worker-addr"`

	EnableGTID  bool   `toml:"enable-gtid" json:"enable-gtid"`
	AutoFixGTID bool   `toml:"auto-fix-gtid" json:"auto-fix-gtid"`
	RelayDir    string `toml:"relay-dir" json:"relay-dir"`
	MetaDir     string `toml:"meta-dir" json:"meta-dir"`
	ServerID    uint32 `toml:"server-id" json:"server-id"`
	Flavor      string `toml:"flavor" json:"flavor"`
	Charset     string `toml:"charset" json:"charset"`

	// relay synchronous starting point (if specified)
	RelayBinLogName string `toml:"relay-binlog-name" json:"relay-binlog-name"`
	RelayBinlogGTID string `toml:"relay-binlog-gtid" json:"relay-binlog-gtid"`

	SourceID string          `toml:"source-id" json:"source-id"`
	From     config.DBConfig `toml:"from" json:"from"`

	// config items for purger
	Purge purger.Config `toml:"purge" json:"purge"`

	// config items for task status checker
	Checker CheckerConfig `toml:"checker" json:"checker"`

	// config items for tracer
	Tracer tracing.Config `toml:"tracer" json:"tracer"`

	ConfigFile string `json:"config-file"`
	// contains filtered or unexported fields
}

Config is the configuration.

func NewConfig

func NewConfig() *Config

NewConfig creates a new base config for worker.

func (*Config) Clone

func (c *Config) Clone() *Config

Clone clones a config

func (*Config) DecryptPassword

func (c *Config) DecryptPassword() *Config

DecryptPassword returns a decrypted config replica in config

func (*Config) Parse

func (c *Config) Parse(arguments []string) error

Parse parses flag definitions from the argument list.

func (*Config) Reload

func (c *Config) Reload() error

Reload reload configure from ConfigFile

func (*Config) String

func (c *Config) String() string

func (*Config) Toml

func (c *Config) Toml() (string, error)

Toml returns TOML format representation of config

func (*Config) UpdateConfigFile

func (c *Config) UpdateConfigFile(content string) error

UpdateConfigFile write configure to local file

type KVConfig

type KVConfig struct {
	BlockCacheCapacity            int     `toml:"block-cache-capacity" json:"block-cache-capacity"`
	BlockRestartInterval          int     `toml:"block-restart-interval" json:"block-restart-interval"`
	BlockSize                     int     `toml:"block-size" json:"block-size"`
	CompactionL0Trigger           int     `toml:"compaction-L0-trigger" json:"compaction-L0-trigger"`
	CompactionTableSize           int     `toml:"compaction-table-size" json:"compaction-table-size"`
	CompactionTotalSize           int     `toml:"compaction-total-size" json:"compaction-total-size"`
	CompactionTotalSizeMultiplier float64 `toml:"compaction-total-size-multiplier" json:"compaction-total-size-multiplier"`
	WriteBuffer                   int     `toml:"write-buffer" json:"write-buffer"`
	WriteL0PauseTrigger           int     `toml:"write-L0-pause-trigger" json:"write-L0-pause-trigger"`
	WriteL0SlowdownTrigger        int     `toml:"write-L0-slowdown-trigger" json:"write-L0-slowdown-trigger"`
}

KVConfig is the configuration of goleveldb

type Logger

type Logger struct {
	// contains filtered or unexported fields
}

Logger manage task operation logs

func (*Logger) Append

func (logger *Logger) Append(h dbOperator, opLog *pb.TaskLog) error

Append appends a task log

func (*Logger) ForwardTo

func (logger *Logger) ForwardTo(h dbOperator, ID int64) error

ForwardTo forward handled pointer to specified ID location not thread safe

func (*Logger) GC

func (logger *Logger) GC(ctx context.Context, h dbOperator)

GC deletes useless log

func (*Logger) GetTaskLog

func (logger *Logger) GetTaskLog(h dbOperator, id int64) (*pb.TaskLog, error)

GetTaskLog returns task log by given log ID

func (*Logger) Initial

func (logger *Logger) Initial(h dbOperator) ([]*pb.TaskLog, error)

Initial initials Logger

func (*Logger) MarkAndForwardLog

func (logger *Logger) MarkAndForwardLog(h dbOperator, opLog *pb.TaskLog) error

MarkAndForwardLog marks result success or not in log, and forwards handledPointer

type Meta

type Meta struct {
	SubTasks map[string]*config.SubTaskConfig `json:"sub-tasks" toml:"sub-tasks"`
}

Meta information contains (deprecated, instead of proto.WorkMeta) * sub-task

func (*Meta) Decode

func (m *Meta) Decode(data string) error

Decode loads config from file data

func (*Meta) DecodeFile

func (m *Meta) DecodeFile(fpath string) error

DecodeFile loads and decodes config from file

func (*Meta) Toml

func (m *Meta) Toml() (string, error)

Toml returns TOML format representation of config

type Metadata

type Metadata struct {
	sync.RWMutex // we need to ensure only a thread can access to `metaDB` at a time
	// contains filtered or unexported fields
}

Metadata stores metadata and log of task it also provides logger feature * append log * forward to specified log location

func NewMetadata

func NewMetadata(dir string, db *leveldb.DB) (*Metadata, error)

NewMetadata returns a metadata object

func (*Metadata) AppendOperation

func (meta *Metadata) AppendOperation(subTask *pb.TaskMeta) (int64, error)

AppendOperation appends operation into task log

func (*Metadata) Close

func (meta *Metadata) Close()

Close closes meta DB

func (*Metadata) GetTask

func (meta *Metadata) GetTask(name string) (task *pb.TaskMeta)

GetTask returns task meta by given name

func (*Metadata) GetTaskLog

func (meta *Metadata) GetTaskLog(opLogID int64) (*pb.TaskLog, error)

GetTaskLog returns task log by give log ID

func (*Metadata) LoadTaskMeta

func (meta *Metadata) LoadTaskMeta() map[string]*pb.TaskMeta

LoadTaskMeta returns meta of all tasks

func (*Metadata) MarkOperation

func (meta *Metadata) MarkOperation(log *pb.TaskLog) error

MarkOperation marks operation result

func (*Metadata) PeekLog

func (meta *Metadata) PeekLog() (log *pb.TaskLog)

PeekLog returns first need to be handled task log

type Pointer

type Pointer struct {
	Location int64
}

Pointer is a logic pointer that point to a location of log

func LoadHandledPointer

func LoadHandledPointer(h dbOperator) (Pointer, error)

LoadHandledPointer loads handled pointer value from kv DB

func (*Pointer) MarshalBinary

func (p *Pointer) MarshalBinary() ([]byte, error)

MarshalBinary never return not nil err now

func (*Pointer) UnmarshalBinary

func (p *Pointer) UnmarshalBinary(data []byte) error

UnmarshalBinary implement encoding.BinaryMarshal

type RelayHolder

type RelayHolder interface {
	// Init initializes the holder
	Init(interceptors []purger.PurgeInterceptor) (purger.Purger, error)
	// Start starts run the relay
	Start()
	// Close closes the holder
	Close()
	// Status returns relay unit's status
	Status() *pb.RelayStatus
	// Stage returns the stage of the relay
	Stage() pb.Stage
	// Error returns relay unit's status
	Error() *pb.RelayError
	// SwitchMaster requests relay unit to switch master server
	SwitchMaster(ctx context.Context, req *pb.SwitchRelayMasterRequest) error
	// Operate operates relay unit
	Operate(ctx context.Context, req *pb.OperateRelayRequest) error
	// Result returns the result of the relay
	Result() *pb.ProcessResult
	// Update updates relay config online
	Update(ctx context.Context, cfg *Config) error
	// Migrate resets binlog name and binlog position for relay unit
	Migrate(ctx context.Context, binlogName string, binlogPos uint32) error
}

RelayHolder for relay unit

func NewDummyRelayHolder

func NewDummyRelayHolder(cfg *Config) RelayHolder

NewDummyRelayHolder creates a new RelayHolder

func NewDummyRelayHolderWithInitError

func NewDummyRelayHolderWithInitError(cfg *Config) RelayHolder

NewDummyRelayHolderWithInitError creates a new RelayHolder with init error

func NewDummyRelayHolderWithRelayBinlog added in v1.0.5

func NewDummyRelayHolderWithRelayBinlog(cfg *Config, relayBinlog string) RelayHolder

NewDummyRelayHolderWithRelayBinlog creates a new RelayHolder with relayBinlog in relayStatus

func NewRealRelayHolder

func NewRealRelayHolder(cfg *Config) RelayHolder

NewRealRelayHolder creates a new RelayHolder

type ResumeStrategy

type ResumeStrategy int

ResumeStrategy represents what we can do when we meet a paused task in task status checker

const (
	// When a task is not in paused state, or paused by manually, or we can't get enough information from worker
	// to determine whether this task is paused because of some error, we will apply ResumeIgnore strategy, and
	// do nothing with this task in this check round.
	ResumeIgnore ResumeStrategy = iota + 1
	// When checker detects a paused task can recover synchronization by resume, but its last auto resume
	// duration is less than backoff waiting time, we will apply ResumeSkip strategy, and skip auto resume
	// for this task in this check round.
	ResumeSkip
	// When checker detects a task is paused because of some un-resumable error, such as paused because of
	// executing incompatible DDL to downstream, we will apply ResumeNoSense strategy
	ResumeNoSense
	// ResumeDispatch means we will dispatch an auto resume operation in this check round for the paused task
	ResumeDispatch
)

resume strategies, in each round of `check`, the checker will apply one of the following strategies to a given task based on its `state`, `result` from `SubTaskStatus` and backoff information recored in task status checker. operation of different strategies: ResumeIgnore:

  1. check duration since latestPausedTime, if larger than backoff rollback, rollback backoff once

ResumeNoSense:

  1. update latestPausedTime
  2. update latestBlockTime

ResumeSkip:

  1. update latestPausedTime

ResumeDispatch:

  1. update latestPausedTime
  2. dispatch auto resume task
  3. if step2 successes, update latestResumeTime, forward backoff

func (ResumeStrategy) String

func (bs ResumeStrategy) String() string

String implements fmt.Stringer interface

type Server

type Server struct {
	sync.Mutex
	// contains filtered or unexported fields
}

Server accepts RPC requests dispatches requests to worker sends responses to RPC client

func NewServer

func NewServer(cfg *Config) *Server

NewServer creates a new Server

func (*Server) BreakDDLLock

func (s *Server) BreakDDLLock(ctx context.Context, req *pb.BreakDDLLockRequest) (*pb.CommonWorkerResponse, error)

BreakDDLLock implements WorkerServer.BreakDDLLock

func (*Server) Close

func (s *Server) Close()

Close close the RPC server, this function can be called multiple times

func (*Server) ExecuteDDL

func (s *Server) ExecuteDDL(ctx context.Context, req *pb.ExecDDLRequest) (*pb.CommonWorkerResponse, error)

ExecuteDDL implements WorkerServer.ExecuteDDL

func (*Server) FetchDDLInfo

func (s *Server) FetchDDLInfo(stream pb.Worker_FetchDDLInfoServer) error

FetchDDLInfo implements WorkerServer.FetchDDLInfo we do ping-pong send-receive on stream for DDL (lock) info if error occurred in Send / Recv, just retry in client

func (*Server) HandleSQLs

HandleSQLs implements WorkerServer.HandleSQLs

func (*Server) MigrateRelay

func (s *Server) MigrateRelay(ctx context.Context, req *pb.MigrateRelayRequest) (*pb.CommonWorkerResponse, error)

MigrateRelay migrate relay to original binlog pos

func (*Server) OperateRelay

func (s *Server) OperateRelay(ctx context.Context, req *pb.OperateRelayRequest) (*pb.OperateRelayResponse, error)

OperateRelay implements WorkerServer.OperateRelay

func (*Server) OperateSubTask

func (s *Server) OperateSubTask(ctx context.Context, req *pb.OperateSubTaskRequest) (*pb.OperateSubTaskResponse, error)

OperateSubTask implements WorkerServer.OperateSubTask

func (*Server) PurgeRelay

func (s *Server) PurgeRelay(ctx context.Context, req *pb.PurgeRelayRequest) (*pb.CommonWorkerResponse, error)

PurgeRelay implements WorkerServer.PurgeRelay

func (*Server) QueryError

func (s *Server) QueryError(ctx context.Context, req *pb.QueryErrorRequest) (*pb.QueryErrorResponse, error)

QueryError implements WorkerServer.QueryError

func (*Server) QueryStatus

func (s *Server) QueryStatus(ctx context.Context, req *pb.QueryStatusRequest) (*pb.QueryStatusResponse, error)

QueryStatus implements WorkerServer.QueryStatus

func (*Server) QueryTaskOperation

QueryTaskOperation implements WorkerServer.QueryTaskOperation

func (*Server) QueryWorkerConfig

func (s *Server) QueryWorkerConfig(ctx context.Context, req *pb.QueryWorkerConfigRequest) (*pb.QueryWorkerConfigResponse, error)

QueryWorkerConfig return worker config worker config is defined in worker directory now, to avoid circular import, we only return db config

func (*Server) Start

func (s *Server) Start() error

Start starts to serving

func (*Server) StartSubTask

func (s *Server) StartSubTask(ctx context.Context, req *pb.StartSubTaskRequest) (*pb.OperateSubTaskResponse, error)

StartSubTask implements WorkerServer.StartSubTask

func (*Server) SwitchRelayMaster

func (s *Server) SwitchRelayMaster(ctx context.Context, req *pb.SwitchRelayMasterRequest) (*pb.CommonWorkerResponse, error)

SwitchRelayMaster implements WorkerServer.SwitchRelayMaster

func (*Server) UpdateRelayConfig

func (s *Server) UpdateRelayConfig(ctx context.Context, req *pb.UpdateRelayRequest) (*pb.CommonWorkerResponse, error)

UpdateRelayConfig updates config for relay and (dm-worker)

func (*Server) UpdateSubTask

func (s *Server) UpdateSubTask(ctx context.Context, req *pb.UpdateSubTaskRequest) (*pb.OperateSubTaskResponse, error)

UpdateSubTask implements WorkerServer.UpdateSubTask

type SubTask

type SubTask struct {
	sync.RWMutex

	// only support sync one DDL lock one time, refine if needed
	DDLInfo chan *pb.DDLInfo // DDL info pending to sync
	// contains filtered or unexported fields
}

SubTask represents a sub task of data migration

func NewSubTask

func NewSubTask(cfg *config.SubTaskConfig) *SubTask

NewSubTask creates a new SubTask

func NewSubTaskWithStage

func NewSubTaskWithStage(cfg *config.SubTaskConfig, stage pb.Stage) *SubTask

NewSubTaskWithStage creates a new SubTask with stage

func (*SubTask) CheckUnit

func (st *SubTask) CheckUnit() bool

CheckUnit checks whether current unit is sync unit

func (*SubTask) ClearDDLInfo

func (st *SubTask) ClearDDLInfo()

ClearDDLInfo clears current CacheDDLInfo.

func (*SubTask) ClearDDLLockInfo

func (st *SubTask) ClearDDLLockInfo()

ClearDDLLockInfo clears current DDLLockInfo

func (*SubTask) Close

func (st *SubTask) Close()

Close stops the sub task

func (*SubTask) CurrUnit

func (st *SubTask) CurrUnit() unit.Unit

CurrUnit returns current dm unit

func (*SubTask) DDLLockInfo

func (st *SubTask) DDLLockInfo() *pb.DDLLockInfo

DDLLockInfo returns current DDLLockInfo, maybe nil

func (*SubTask) Error

func (st *SubTask) Error() interface{}

Error returns the error of the current sub task

func (*SubTask) ExecuteDDL

func (st *SubTask) ExecuteDDL(ctx context.Context, req *pb.ExecDDLRequest) error

ExecuteDDL requests current unit to execute a DDL

func (*SubTask) GetDDLInfo

func (st *SubTask) GetDDLInfo() *pb.DDLInfo

GetDDLInfo returns current CacheDDLInfo.

func (*SubTask) Init

func (st *SubTask) Init() error

Init initializes the sub task processing units

func (*SubTask) Pause

func (st *SubTask) Pause() error

Pause pauses the running sub task

func (*SubTask) PrevUnit

func (st *SubTask) PrevUnit() unit.Unit

PrevUnit returns dm previous unit

func (*SubTask) Result

func (st *SubTask) Result() *pb.ProcessResult

Result returns the result of the sub task

func (*SubTask) Resume

func (st *SubTask) Resume() error

Resume resumes the paused sub task similar to Run

func (*SubTask) Run

func (st *SubTask) Run()

Run runs the sub task

func (*SubTask) SaveDDLInfo

func (st *SubTask) SaveDDLInfo(info *pb.DDLInfo) error

SaveDDLInfo saves a CacheDDLInfo.

func (*SubTask) SaveDDLLockInfo

func (st *SubTask) SaveDDLLockInfo(info *pb.DDLLockInfo) error

SaveDDLLockInfo saves a DDLLockInfo

func (*SubTask) SetSyncerSQLOperator

func (st *SubTask) SetSyncerSQLOperator(ctx context.Context, req *pb.HandleSubTaskSQLsRequest) error

SetSyncerSQLOperator sets an operator to syncer.

func (*SubTask) Stage

func (st *SubTask) Stage() pb.Stage

Stage returns the stage of the sub task

func (*SubTask) Status

func (st *SubTask) Status() interface{}

Status returns the status of the current sub task

func (*SubTask) StatusJSON

func (st *SubTask) StatusJSON() string

StatusJSON returns the status of the current sub task as json string

func (*SubTask) Update

func (st *SubTask) Update(cfg *config.SubTaskConfig) error

Update update the sub task's config

func (*SubTask) UpdateFromConfig

func (st *SubTask) UpdateFromConfig(cfg *config.SubTaskConfig) error

UpdateFromConfig updates config for `From`

type TaskStatusChecker

type TaskStatusChecker interface {
	// Init initializes the checker
	Init() error
	// Start starts the checker
	Start()
	// Close closes the checker
	Close()
}

TaskStatusChecker is an interface that defines how we manage task status

func NewRealTaskStatusChecker

func NewRealTaskStatusChecker(cfg CheckerConfig, w *Worker) TaskStatusChecker

NewRealTaskStatusChecker creates a new realTaskStatusChecker instance

type Worker

type Worker struct {
	// ensure no other operation can be done when closing (we can use `WatGroup`/`Context` to archive this)
	sync.RWMutex
	// contains filtered or unexported fields
}

Worker manages sub tasks and process units for data migration

func NewWorker

func NewWorker(cfg *Config) (w *Worker, err error)

NewWorker creates a new Worker

func (*Worker) BreakDDLLock

func (w *Worker) BreakDDLLock(ctx context.Context, req *pb.BreakDDLLockRequest) error

BreakDDLLock breaks current blocking DDL lock and/or remove current DDLLockInfo

func (*Worker) Close

func (w *Worker) Close()

Close stops working and releases resources

func (*Worker) Error

func (w *Worker) Error(stName string) []*pb.SubTaskError

Error returns the error information of the worker (and sub tasks) if stName is empty, all sub task's error information will be returned

func (*Worker) ExecuteDDL

func (w *Worker) ExecuteDDL(ctx context.Context, req *pb.ExecDDLRequest) error

ExecuteDDL executes (or ignores) DDL (in sharding DDL lock, requested by dm-master)

func (*Worker) FetchDDLInfo

func (w *Worker) FetchDDLInfo(ctx context.Context) *pb.DDLInfo

FetchDDLInfo fetches all sub tasks' DDL info which pending to sync

func (*Worker) ForbidPurge

func (w *Worker) ForbidPurge() (bool, string)

ForbidPurge implements PurgeInterceptor.ForbidPurge

func (*Worker) HandleSQLs

func (w *Worker) HandleSQLs(ctx context.Context, req *pb.HandleSubTaskSQLsRequest) error

HandleSQLs implements Handler.HandleSQLs.

func (*Worker) MigrateRelay

func (w *Worker) MigrateRelay(ctx context.Context, binlogName string, binlogPos uint32) error

MigrateRelay migrate relay unit

func (*Worker) OperateRelay

func (w *Worker) OperateRelay(ctx context.Context, req *pb.OperateRelayRequest) error

OperateRelay operates relay unit

func (*Worker) OperateSubTask

func (w *Worker) OperateSubTask(name string, op pb.TaskOp) (int64, error)

OperateSubTask stop/resume/pause sub task

func (*Worker) PurgeRelay

func (w *Worker) PurgeRelay(ctx context.Context, req *pb.PurgeRelayRequest) error

PurgeRelay purges relay log files

func (*Worker) QueryConfig

func (w *Worker) QueryConfig(ctx context.Context) (*Config, error)

QueryConfig returns worker's config

func (*Worker) QueryError

func (w *Worker) QueryError(name string) []*pb.SubTaskError

QueryError query worker's sub tasks' error

func (*Worker) QueryStatus

func (w *Worker) QueryStatus(name string) []*pb.SubTaskStatus

QueryStatus query worker's sub tasks' status

func (*Worker) RecordDDLLockInfo

func (w *Worker) RecordDDLLockInfo(info *pb.DDLLockInfo) error

RecordDDLLockInfo records the current DDL lock info which pending to sync

func (*Worker) Start

func (w *Worker) Start()

Start starts working

func (*Worker) StartSubTask

func (w *Worker) StartSubTask(cfg *config.SubTaskConfig) (int64, error)

StartSubTask creates a sub task an run it

func (*Worker) Status

func (w *Worker) Status(stName string) []*pb.SubTaskStatus

Status returns the status of the worker (and sub tasks) if stName is empty, all sub task's status will be returned

func (*Worker) StatusJSON

func (w *Worker) StatusJSON(stName string) string

StatusJSON returns the status of the worker as json string

func (*Worker) SwitchRelayMaster

func (w *Worker) SwitchRelayMaster(ctx context.Context, req *pb.SwitchRelayMasterRequest) error

SwitchRelayMaster switches relay unit's master server

func (*Worker) UpdateRelayConfig

func (w *Worker) UpdateRelayConfig(ctx context.Context, content string) error

UpdateRelayConfig update subTask ans relay unit configure online

func (*Worker) UpdateSubTask

func (w *Worker) UpdateSubTask(cfg *config.SubTaskConfig) (int64, error)

UpdateSubTask update config for a sub task

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL