Documentation ¶
Index ¶
- Constants
- func CreateConsumerGroupID(clusterID uint64) (string, error)
- func SetSchemaLockAttemptTimeout(timeout time.Duration)
- type CreateIndexCommand
- func (c *CreateIndexCommand) AfterPhase(phase int32) error
- func (c *CreateIndexCommand) Before() error
- func (c *CreateIndexCommand) Cancel()
- func (c *CreateIndexCommand) Cleanup()
- func (c *CreateIndexCommand) CommandType() DDLCommandType
- func (c *CreateIndexCommand) GetExtraData() []byte
- func (c *CreateIndexCommand) NumPhases() int
- func (c *CreateIndexCommand) OnPhase(phase int32) error
- func (c *CreateIndexCommand) SQL() string
- func (c *CreateIndexCommand) SchemaName() string
- func (c *CreateIndexCommand) TableSequences() []uint64
- type CreateMVCommand
- func (c *CreateMVCommand) AfterPhase(phase int32) error
- func (c *CreateMVCommand) Before() error
- func (c *CreateMVCommand) Cancel()
- func (c *CreateMVCommand) Cleanup()
- func (c *CreateMVCommand) CommandType() DDLCommandType
- func (c *CreateMVCommand) GetExtraData() []byte
- func (c *CreateMVCommand) NumPhases() int
- func (c *CreateMVCommand) OnPhase(phase int32) error
- func (c *CreateMVCommand) SQL() string
- func (c *CreateMVCommand) SchemaName() string
- func (c *CreateMVCommand) TableSequences() []uint64
- type CreateSourceCommand
- func (c *CreateSourceCommand) AfterPhase(phase int32) error
- func (c *CreateSourceCommand) Before() error
- func (c *CreateSourceCommand) Cancel()
- func (c *CreateSourceCommand) Cleanup()
- func (c *CreateSourceCommand) CommandType() DDLCommandType
- func (c *CreateSourceCommand) GetExtraData() []byte
- func (c *CreateSourceCommand) NumPhases() int
- func (c *CreateSourceCommand) OnPhase(phase int32) error
- func (c *CreateSourceCommand) SQL() string
- func (c *CreateSourceCommand) SchemaName() string
- func (c *CreateSourceCommand) TableSequences() []uint64
- type DDLCommand
- type DDLCommandRunner
- func (d *DDLCommandRunner) Cancel(schemaName string) error
- func (d *DDLCommandRunner) CancelHandler() remoting.ClusterMessageHandler
- func (d *DDLCommandRunner) DdlHandler() remoting.ClusterMessageHandler
- func (d *DDLCommandRunner) HandleCancelMessage(clusterMsg remoting.ClusterMessage) error
- func (d *DDLCommandRunner) HandleDdlMessage(ddlMsg remoting.ClusterMessage) error
- func (d *DDLCommandRunner) RunCommand(command DDLCommand) error
- func (d *DDLCommandRunner) RunWithLock(commandKey string, command DDLCommand, ddlInfo *clustermsgs.DDLStatementInfo) error
- type DDLCommandType
- type DropIndexCommand
- func (d *DropIndexCommand) AfterPhase(phase int32) error
- func (d *DropIndexCommand) Before() error
- func (d *DropIndexCommand) Cancel()
- func (d *DropIndexCommand) Cleanup()
- func (d *DropIndexCommand) CommandType() DDLCommandType
- func (d *DropIndexCommand) GetExtraData() []byte
- func (d *DropIndexCommand) NumPhases() int
- func (d *DropIndexCommand) OnPhase(phase int32) error
- func (d *DropIndexCommand) SQL() string
- func (d *DropIndexCommand) SchemaName() string
- func (d *DropIndexCommand) TableSequences() []uint64
- type DropMVCommand
- func (d *DropMVCommand) AfterPhase(phase int32) error
- func (d *DropMVCommand) Before() error
- func (d *DropMVCommand) Cancel()
- func (d *DropMVCommand) Cleanup()
- func (d *DropMVCommand) CommandType() DDLCommandType
- func (d *DropMVCommand) GetExtraData() []byte
- func (d *DropMVCommand) NumPhases() int
- func (d *DropMVCommand) OnPhase(phase int32) error
- func (d *DropMVCommand) SQL() string
- func (d *DropMVCommand) SchemaName() string
- func (d *DropMVCommand) TableSequences() []uint64
- type DropSourceCommand
- func (d *DropSourceCommand) AfterPhase(phase int32) error
- func (d *DropSourceCommand) Before() error
- func (d *DropSourceCommand) Cancel()
- func (d *DropSourceCommand) Cleanup()
- func (d *DropSourceCommand) CommandType() DDLCommandType
- func (d *DropSourceCommand) GetExtraData() []byte
- func (d *DropSourceCommand) NumPhases() int
- func (d *DropSourceCommand) OnPhase(phase int32) error
- func (d *DropSourceCommand) SQL() string
- func (d *DropSourceCommand) SchemaName() string
- func (d *DropSourceCommand) TableSequences() []uint64
- type Executor
- func (e *Executor) CreateExecutionContext(schema *common.Schema) *execctx.ExecutionContext
- func (e *Executor) DDlCommandRunner() *DDLCommandRunner
- func (e *Executor) Empty() bool
- func (e *Executor) ExecuteSQLStatement(execCtx *execctx.ExecutionContext, sql string, argTypes []common.ColumnType, ...) (exec.PullExecutor, error)
- func (e *Executor) FailureInjector() failinject.Injector
- func (e *Executor) GetPullEngine() *pull.Engine
- func (e *Executor) GetPushEngine() *push.Engine
- func (e *Executor) Start() error
- func (e *Executor) Stop() error
Constants ¶
View Source
const ( DDLCommandTypeCreateSource = iota DDLCommandTypeDropSource DDLCommandTypeCreateMV DDLCommandTypeDropMV DDLCommandTypeCreateIndex DDLCommandTypeDropIndex )
Variables ¶
This section is empty.
Functions ¶
func CreateConsumerGroupID ¶ added in v0.1.6
func SetSchemaLockAttemptTimeout ¶ added in v0.1.6
Types ¶
type CreateIndexCommand ¶
type CreateIndexCommand struct {
// contains filtered or unexported fields
}
func NewCreateIndexCommand ¶
func (*CreateIndexCommand) AfterPhase ¶
func (c *CreateIndexCommand) AfterPhase(phase int32) error
func (*CreateIndexCommand) Before ¶
func (c *CreateIndexCommand) Before() error
func (*CreateIndexCommand) Cancel ¶ added in v0.1.6
func (c *CreateIndexCommand) Cancel()
func (*CreateIndexCommand) Cleanup ¶ added in v0.1.6
func (c *CreateIndexCommand) Cleanup()
func (*CreateIndexCommand) CommandType ¶
func (c *CreateIndexCommand) CommandType() DDLCommandType
func (*CreateIndexCommand) GetExtraData ¶ added in v0.1.6
func (c *CreateIndexCommand) GetExtraData() []byte
func (*CreateIndexCommand) NumPhases ¶
func (c *CreateIndexCommand) NumPhases() int
func (*CreateIndexCommand) OnPhase ¶
func (c *CreateIndexCommand) OnPhase(phase int32) error
func (*CreateIndexCommand) SQL ¶
func (c *CreateIndexCommand) SQL() string
func (*CreateIndexCommand) SchemaName ¶
func (c *CreateIndexCommand) SchemaName() string
func (*CreateIndexCommand) TableSequences ¶
func (c *CreateIndexCommand) TableSequences() []uint64
type CreateMVCommand ¶
type CreateMVCommand struct {
// contains filtered or unexported fields
}
func NewCreateMVCommand ¶
func (*CreateMVCommand) AfterPhase ¶
func (c *CreateMVCommand) AfterPhase(phase int32) error
func (*CreateMVCommand) Before ¶
func (c *CreateMVCommand) Before() error
func (*CreateMVCommand) Cancel ¶ added in v0.1.6
func (c *CreateMVCommand) Cancel()
func (*CreateMVCommand) Cleanup ¶ added in v0.1.6
func (c *CreateMVCommand) Cleanup()
func (*CreateMVCommand) CommandType ¶
func (c *CreateMVCommand) CommandType() DDLCommandType
func (*CreateMVCommand) GetExtraData ¶ added in v0.1.6
func (c *CreateMVCommand) GetExtraData() []byte
func (*CreateMVCommand) NumPhases ¶
func (c *CreateMVCommand) NumPhases() int
func (*CreateMVCommand) OnPhase ¶
func (c *CreateMVCommand) OnPhase(phase int32) error
func (*CreateMVCommand) SQL ¶
func (c *CreateMVCommand) SQL() string
func (*CreateMVCommand) SchemaName ¶
func (c *CreateMVCommand) SchemaName() string
func (*CreateMVCommand) TableSequences ¶
func (c *CreateMVCommand) TableSequences() []uint64
type CreateSourceCommand ¶
type CreateSourceCommand struct {
// contains filtered or unexported fields
}
func NewCreateSourceCommand ¶
func NewOriginatingCreateSourceCommand ¶
func NewOriginatingCreateSourceCommand(e *Executor, schemaName string, sql string, tableSequences []uint64, ast *parser.CreateSource, consumerGroupID string) *CreateSourceCommand
func (*CreateSourceCommand) AfterPhase ¶
func (c *CreateSourceCommand) AfterPhase(phase int32) error
func (*CreateSourceCommand) Before ¶
func (c *CreateSourceCommand) Before() error
func (*CreateSourceCommand) Cancel ¶ added in v0.1.6
func (c *CreateSourceCommand) Cancel()
func (*CreateSourceCommand) Cleanup ¶ added in v0.1.6
func (c *CreateSourceCommand) Cleanup()
func (*CreateSourceCommand) CommandType ¶
func (c *CreateSourceCommand) CommandType() DDLCommandType
func (*CreateSourceCommand) GetExtraData ¶ added in v0.1.6
func (c *CreateSourceCommand) GetExtraData() []byte
func (*CreateSourceCommand) NumPhases ¶
func (c *CreateSourceCommand) NumPhases() int
func (*CreateSourceCommand) OnPhase ¶
func (c *CreateSourceCommand) OnPhase(phase int32) error
func (*CreateSourceCommand) SQL ¶
func (c *CreateSourceCommand) SQL() string
func (*CreateSourceCommand) SchemaName ¶
func (c *CreateSourceCommand) SchemaName() string
func (*CreateSourceCommand) TableSequences ¶
func (c *CreateSourceCommand) TableSequences() []uint64
type DDLCommand ¶
type DDLCommand interface { CommandType() DDLCommandType SchemaName() string SQL() string TableSequences() []uint64 // Before is called on the originating node before the first phase Before() error // OnPhase is called on every node in the cluster passing in the phase OnPhase(phase int32) error // AfterPhase is called on the originating node once successful responses from the specified phase have been returned AfterPhase(phase int32) error // NumPhases returns the number of phases in the command NumPhases() int // Cancel cancels the command Cancel() // Cleanup will be called if an error occurs during execution of the command, it should perform any clean up logic // to leave the system in a clean state Cleanup() GetExtraData() []byte }
func NewDDLCommand ¶
func NewDDLCommand(e *Executor, commandType DDLCommandType, schemaName string, sql string, tableSequences []uint64, extraData []byte) DDLCommand
type DDLCommandRunner ¶
type DDLCommandRunner struct {
// contains filtered or unexported fields
}
func NewDDLCommandRunner ¶
func NewDDLCommandRunner(ce *Executor) *DDLCommandRunner
func (*DDLCommandRunner) Cancel ¶ added in v0.1.6
func (d *DDLCommandRunner) Cancel(schemaName string) error
Cancel stops any running DDL and broadcasts all nodes to do the same
func (*DDLCommandRunner) CancelHandler ¶ added in v0.1.6
func (d *DDLCommandRunner) CancelHandler() remoting.ClusterMessageHandler
func (*DDLCommandRunner) DdlHandler ¶ added in v0.1.6
func (d *DDLCommandRunner) DdlHandler() remoting.ClusterMessageHandler
func (*DDLCommandRunner) HandleCancelMessage ¶ added in v0.1.6
func (d *DDLCommandRunner) HandleCancelMessage(clusterMsg remoting.ClusterMessage) error
func (*DDLCommandRunner) HandleDdlMessage ¶ added in v0.1.6
func (d *DDLCommandRunner) HandleDdlMessage(ddlMsg remoting.ClusterMessage) error
func (*DDLCommandRunner) RunCommand ¶
func (d *DDLCommandRunner) RunCommand(command DDLCommand) error
func (*DDLCommandRunner) RunWithLock ¶
func (d *DDLCommandRunner) RunWithLock(commandKey string, command DDLCommand, ddlInfo *clustermsgs.DDLStatementInfo) error
type DDLCommandType ¶
type DDLCommandType int
type DropIndexCommand ¶
type DropIndexCommand struct {
// contains filtered or unexported fields
}
func NewDropIndexCommand ¶
func NewDropIndexCommand(e *Executor, schemaName string, sql string) *DropIndexCommand
func (*DropIndexCommand) AfterPhase ¶
func (d *DropIndexCommand) AfterPhase(phase int32) error
func (*DropIndexCommand) Before ¶
func (d *DropIndexCommand) Before() error
func (*DropIndexCommand) Cancel ¶ added in v0.1.6
func (d *DropIndexCommand) Cancel()
func (*DropIndexCommand) Cleanup ¶ added in v0.1.6
func (d *DropIndexCommand) Cleanup()
func (*DropIndexCommand) CommandType ¶
func (d *DropIndexCommand) CommandType() DDLCommandType
func (*DropIndexCommand) GetExtraData ¶ added in v0.1.6
func (d *DropIndexCommand) GetExtraData() []byte
func (*DropIndexCommand) NumPhases ¶
func (d *DropIndexCommand) NumPhases() int
func (*DropIndexCommand) OnPhase ¶
func (d *DropIndexCommand) OnPhase(phase int32) error
func (*DropIndexCommand) SQL ¶
func (d *DropIndexCommand) SQL() string
func (*DropIndexCommand) SchemaName ¶
func (d *DropIndexCommand) SchemaName() string
func (*DropIndexCommand) TableSequences ¶
func (d *DropIndexCommand) TableSequences() []uint64
type DropMVCommand ¶
type DropMVCommand struct {
// contains filtered or unexported fields
}
func NewDropMVCommand ¶
func NewDropMVCommand(e *Executor, schemaName string, sql string) *DropMVCommand
func NewOriginatingDropMVCommand ¶
func NewOriginatingDropMVCommand(e *Executor, schemaName string, sql string, mvName string) *DropMVCommand
func (*DropMVCommand) AfterPhase ¶
func (d *DropMVCommand) AfterPhase(phase int32) error
func (*DropMVCommand) Before ¶
func (d *DropMVCommand) Before() error
func (*DropMVCommand) Cancel ¶ added in v0.1.6
func (d *DropMVCommand) Cancel()
func (*DropMVCommand) Cleanup ¶ added in v0.1.6
func (d *DropMVCommand) Cleanup()
func (*DropMVCommand) CommandType ¶
func (d *DropMVCommand) CommandType() DDLCommandType
func (*DropMVCommand) GetExtraData ¶ added in v0.1.6
func (d *DropMVCommand) GetExtraData() []byte
func (*DropMVCommand) NumPhases ¶
func (d *DropMVCommand) NumPhases() int
func (*DropMVCommand) OnPhase ¶
func (d *DropMVCommand) OnPhase(phase int32) error
func (*DropMVCommand) SQL ¶
func (d *DropMVCommand) SQL() string
func (*DropMVCommand) SchemaName ¶
func (d *DropMVCommand) SchemaName() string
func (*DropMVCommand) TableSequences ¶
func (d *DropMVCommand) TableSequences() []uint64
type DropSourceCommand ¶
type DropSourceCommand struct {
// contains filtered or unexported fields
}
func NewDropSourceCommand ¶
func NewDropSourceCommand(e *Executor, schemaName string, sql string) *DropSourceCommand
func NewOriginatingDropSourceCommand ¶
func NewOriginatingDropSourceCommand(e *Executor, schemaName string, sql string, sourceName string) *DropSourceCommand
func (*DropSourceCommand) AfterPhase ¶
func (d *DropSourceCommand) AfterPhase(phase int32) error
func (*DropSourceCommand) Before ¶
func (d *DropSourceCommand) Before() error
func (*DropSourceCommand) Cancel ¶ added in v0.1.6
func (d *DropSourceCommand) Cancel()
func (*DropSourceCommand) Cleanup ¶ added in v0.1.6
func (d *DropSourceCommand) Cleanup()
func (*DropSourceCommand) CommandType ¶
func (d *DropSourceCommand) CommandType() DDLCommandType
func (*DropSourceCommand) GetExtraData ¶ added in v0.1.6
func (d *DropSourceCommand) GetExtraData() []byte
func (*DropSourceCommand) NumPhases ¶
func (d *DropSourceCommand) NumPhases() int
func (*DropSourceCommand) OnPhase ¶
func (d *DropSourceCommand) OnPhase(phase int32) error
func (*DropSourceCommand) SQL ¶
func (d *DropSourceCommand) SQL() string
func (*DropSourceCommand) SchemaName ¶
func (d *DropSourceCommand) SchemaName() string
func (*DropSourceCommand) TableSequences ¶
func (d *DropSourceCommand) TableSequences() []uint64
type Executor ¶
type Executor struct {
// contains filtered or unexported fields
}
func NewCommandExecutor ¶
func NewCommandExecutor(metaController *meta.Controller, pushEngine *push.Engine, pullEngine *pull.Engine, cluster cluster.Cluster, ddlClient remoting.Broadcaster, ddlResetClient remoting.Broadcaster, protoRegistry protolib.Resolver, failureInjector failinject.Injector, config *conf.Config) *Executor
func (*Executor) CreateExecutionContext ¶
func (e *Executor) CreateExecutionContext(schema *common.Schema) *execctx.ExecutionContext
func (*Executor) DDlCommandRunner ¶ added in v0.1.6
func (e *Executor) DDlCommandRunner() *DDLCommandRunner
func (*Executor) ExecuteSQLStatement ¶
func (e *Executor) ExecuteSQLStatement(execCtx *execctx.ExecutionContext, sql string, argTypes []common.ColumnType, args []interface{}) (exec.PullExecutor, error)
ExecuteSQLStatement executes a synchronous SQL statement.
func (*Executor) FailureInjector ¶
func (e *Executor) FailureInjector() failinject.Injector
func (*Executor) GetPullEngine ¶
func (*Executor) GetPushEngine ¶
GetPushEngine is only used in testing
Source Files ¶
Click to show internal directories.
Click to hide internal directories.