optimism

package
v0.0.0-...-be15534 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 30, 2024 License: Apache-2.0 Imports: 25 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func AddDifferentFieldLenColumns

func AddDifferentFieldLenColumns(lockID, ddl string, oldJoined, newJoined schemacmp.Table) (string, error)

AddDifferentFieldLenColumns checks whether dm adds columns with different field lengths.

func CheckColumns

func CheckColumns(cli *clientv3.Client, source string, schemaMap map[string]string, tablesMap map[string]map[string]string) error

CheckColumns try to check and fix all the schema and table names for delete columns infos.

func CheckDDLInfos

func CheckDDLInfos(cli *clientv3.Client, source string, schemaMap map[string]string, tablesMap map[string]map[string]string) error

CheckDDLInfos try to check and fix all the schema and table names for DDL info.

func CheckOperations

func CheckOperations(cli *clientv3.Client, source string, schemaMap map[string]string, tablesMap map[string]map[string]string) error

CheckOperations try to check and fix all the schema and table names for operation infos.

func CheckSourceTables

func CheckSourceTables(cli *clientv3.Client, source string, schemaMap map[string]string, talesMap map[string]map[string]string) error

CheckSourceTables try to check and fix all the source schemas and table names.

func ClearTestInfoOperationColumn

func ClearTestInfoOperationColumn(cli *clientv3.Client) error

ClearTestInfoOperationColumns is used to clear all shard DDL information in optimism mode. it only used for testing now.

func DeleteDroppedColumns

func DeleteDroppedColumns(cli *clientv3.Client, lockID string, columns ...string) (rev int64, deleted bool, err error)

DeleteDroppedColumns tries to delete the partially dropped columns for the specified lock ID. Only when this column is fully dropped in downstream database, in other words, **we receive all `Done` operation from dm-worker**, we can delete this column's name from the etcd.

func DeleteInfosOperationsColumns

func DeleteInfosOperationsColumns(cli *clientv3.Client, infos []Info, ops []Operation, lockID string) (int64, bool, error)

DeleteInfosOperationsColumns deletes the shard DDL infos, operations, and dropped columns in etcd. This function should often be called by DM-master when removing the lock. Only delete when all info's version are greater or equal to etcd's version, otherwise it means new info was putted into etcd before.

func DeleteInfosOperationsTablesByTable

func DeleteInfosOperationsTablesByTable(cli *clientv3.Client, task, source, upSchema, upTable, lockID string, dropCols []string) (int64, error)

DeleteInfosOperationsTablesByTable deletes the shard DDL infos and operations in etcd by table This function should often be called by DM-master when drop a table.

func DeleteInfosOperationsTablesByTask

func DeleteInfosOperationsTablesByTask(cli *clientv3.Client, task string, lockIDSet map[string]struct{}) (int64, error)

DeleteInfosOperationsTablesByTask deletes the shard DDL infos and operations in etcd. This function should often be called by DM-master when stop a task for all sources.

func DeleteInfosOperationsTablesByTaskAndSource

func DeleteInfosOperationsTablesByTaskAndSource(cli *clientv3.Client, task string, sources []string, dropColumns map[string][]string) (int64, error)

DeleteInfosOperationsTablesByTaskAndSource deletes the shard DDL infos and operations in etcd by task and source. This function should often be called by DM-master when stop a task for sources.

func DeleteSourceTables

func DeleteSourceTables(cli *clientv3.Client, st SourceTables) (int64, error)

DeleteSourceTables deletes the source tables in etcd. This function should often be called by DM-worker.

func DiffSourceTables

func DiffSourceTables(oldST, newST SourceTables) (map[RouteTable]struct{}, map[RouteTable]struct{})

func GetAllDroppedColumns

func GetAllDroppedColumns(cli *clientv3.Client) (map[string]map[string]map[string]map[string]map[string]DropColumnStage, int64, error)

GetAllDroppedColumns gets the all partially dropped columns. return lockID -> column-name -> source-id -> upstream-schema-name -> upstream-table-name.

func GetAllInfo

func GetAllInfo(cli *clientv3.Client) (map[string]map[string]map[string]map[string]Info, int64, error)

GetAllInfo gets all shard DDL info in etcd currently. This function should often be called by DM-master. k/k/k/k/v: task-name -> source-ID -> upstream-schema-name -> upstream-table-name -> shard DDL info. ugly code, but have no better idea now.

func GetAllOperations

func GetAllOperations(cli *clientv3.Client) (map[string]map[string]map[string]map[string]Operation, int64, error)

GetAllOperations gets all shard DDL operation in etcd currently. This function should often be called by DM-master. k/k/k/k/v: task-name -> source-ID -> upstream-schema-name -> upstream-table-name -> shard DDL operation.

func GetAllSourceTables

func GetAllSourceTables(cli *clientv3.Client) (map[string]map[string]SourceTables, int64, error)

GetAllSourceTables gets all source tables in etcd currently. This function should often be called by DM-master. k/k/v: task-name -> source-ID -> source tables.

func GetColumnName

func GetColumnName(lockID, ddl string, tp ast.AlterTableType) (string, error)

GetColumnName checks whether dm adds/drops a column, and return this column's name.

func GetInfosOperationsByTask

func GetInfosOperationsByTask(cli *clientv3.Client, task string) ([]Info, []Operation, int64, error)

GetInfosOperationsByTask gets all shard DDL info and operation in etcd currently. This function should often be called by DM-master.

func PutDroppedColumns

func PutDroppedColumns(cli *clientv3.Client, lockID, source, upSchema, upTable string, cols []string, done DropColumnStage) (int64, bool, error)

PutDroppedColumn puts the partially dropped column names into ectd. When we drop a column, we save this column's name in etcd.

func PutInfo

func PutInfo(cli *clientv3.Client, info Info) (int64, error)

PutInfo puts the shard DDL info into etcd. NOTE:

In some cases before the lock resolved, the same DDL info may be PUT multiple times:
  1. start-task after stop-task.
  2. resume-task after paused manually or automatically.
  3. the task scheduled to another DM-worker instance (just like case-1).
Then we need to ensure re-PUT is safe:
  1. DM-master can construct the lock and do the coordination correctly.
  2. DM-worker can re-PUT and comply with the coordination correctly.

This function should often be called by DM-worker.

func PutOperation

func PutOperation(cli *clientv3.Client, skipDone bool, op Operation, infoModRev int64) (rev int64, putted bool, err error)

PutOperation puts the shard DDL operation into etcd.

func PutSourceTables

func PutSourceTables(cli *clientv3.Client, st SourceTables) (int64, error)

PutSourceTables puts source tables into etcd. This function should often be called by DM-worker.

func WatchInfo

func WatchInfo(ctx context.Context, cli *clientv3.Client, revision int64,
	outCh chan<- Info, errCh chan<- error,
)

WatchInfo watches PUT & DELETE operations for info. This function should often be called by DM-master.

func WatchOperationPut

func WatchOperationPut(ctx context.Context, cli *clientv3.Client,
	task, source, upSchema, upTable string, revision int64,
	outCh chan<- Operation, errCh chan<- error,
)

WatchOperationPut watches PUT operations for DDL lock operation. If want to watch all operations matching, pass empty string for `task`, `source`, `upSchema` and `upTable`. This function can be called by DM-worker and DM-master.

func WatchSourceTables

func WatchSourceTables(ctx context.Context, cli *clientv3.Client, revision int64,
	outCh chan<- SourceTables, errCh chan<- error,
)

WatchSourceTables watches PUT & DELETE operations for source tables. This function should often be called by DM-master.

Types

type ConflictStage

type ConflictStage string

ConflictStage represents the current shard DDL conflict stage in the optimistic mode.

const (
	// ConflictNone indicates no conflict exists,
	// DM-worker can execute DDL/DML to the downstream normally.
	ConflictNone ConflictStage = "none"
	// ConflictDetected indicates a conflict will exist after applied the shard DDL.
	// in this stage, DM-worker should not execute/skip DDL/DML,
	// but it should still try to find the DDL which can resolve the conflict in the binlog stream.
	ConflictDetected ConflictStage = "detected"
	// ConflictResolved indicates a conflict DDL be resolved.
	// in this stage, DM-worker should redirect to the conflict DDL.
	ConflictResolved ConflictStage = "resolved"
	// ConflictUnlocked indicates a conflict will be unlocked after applied the shard DDL.
	// in this stage, DM-worker should directly execute/skip DDLs.
	ConflictUnlocked ConflictStage = "unlocked"
	// ConflictSkipWaitRedirect indicates a conflict happened and will be skipped and redirected until all tables has no conflict
	// in this stage, DM-worker should skip all DML and DDL for the conflict table until redirect.
	ConflictSkipWaitRedirect ConflictStage = "skip and wait for redirect" // #nosec
	// ConflictError indicates an error happened when we try to sync the DDLs
	// in this stage, DM-worker should retry and can skip ddls for this error.
	ConflictError ConflictStage = "error"
)

type DownstreamMeta

type DownstreamMeta struct {
	// contains filtered or unexported fields
}

DownstreamMeta used to fetch table info from downstream.

type DropColumnStage

type DropColumnStage int

DropColumnStage represents whether drop column done for a sharding table.

const (
	// DropNotDone represents master haven't received done for the col.
	DropNotDone DropColumnStage = iota
	// DropPartiallyDone represents master receive done for the col.
	DropPartiallyDone
	// DropDone represents master receive done and ddl for the col(executed in downstream).
	DropDone
)

type Info

type Info struct {
	Task       string   `json:"task"`        // data migration task name
	Source     string   `json:"source"`      // upstream source ID
	UpSchema   string   `json:"up-schema"`   // upstream/source schema name, different sources can have the same schema name
	UpTable    string   `json:"up-table"`    // upstream/source table name, different sources can have the same table name
	DownSchema string   `json:"down-schema"` // downstream/target schema name
	DownTable  string   `json:"down-table"`  // downstream/target table name
	DDLs       []string `json:"ddls"`        // DDL statements

	TableInfoBefore *model.TableInfo   `json:"table-info-before"` // the tracked table schema before applying the DDLs
	TableInfosAfter []*model.TableInfo `json:"table-info-after"`  // the tracked table schema after applying the DDLs

	// only used to report to the caller of the watcher, do not marsh it.
	// if it's true, it means the Info has been deleted in etcd.
	IsDeleted bool `json:"-"`

	// only set it when get/watch from etcd
	Version int64 `json:"-"`

	// only set it when get from etcd
	// use for sort infos in recoverlock
	Revision int64 `json:"-"`

	// use to resolve conflict
	IgnoreConflict bool `json:"ignore-conflict"`
}

Info represents the shard DDL information. This information should be persistent in etcd so can be retrieved after the DM-master leader restarted or changed. NOTE: `Task`, `Source`, `UpSchema` and `DownTable` are redundant in the etcd key path for convenient. Info is putted when receiving a shard DDL for a table in DM-worker, and is deleted when removing the lock by DM-master because we need the newest schema in Info to recover the lock when restarting DM-master. when new Info is putted to overwrite the old one, the DM-master should update the lock based on the new one.

func NewInfo

func NewInfo(task, source, upSchema, upTable, downSchema, downTable string,
	ddls []string, tableInfoBefore *model.TableInfo, tableInfosAfter []*model.TableInfo,
) Info

NewInfo creates a new Info instance.

func (*Info) ShortString

func (i *Info) ShortString() string

ShortString returns short string of Info.

func (Info) String

func (i Info) String() string

String implements Stringer interface.

type Lock

type Lock struct {
	ID   string // lock's ID
	Task string // lock's corresponding task name

	DownSchema string // downstream schema name
	DownTable  string // downstream table name
	// contains filtered or unexported fields
}

Lock represents the shard DDL lock in memory. This information does not need to be persistent, and can be re-constructed from the shard DDL info.

func NewLock

func NewLock(cli *clientv3.Client, id, task, downSchema, downTable string, initTable schemacmp.Table, tts []TargetTable, downstreamMeta *DownstreamMeta) *Lock

NewLock creates a new Lock instance.

func (*Lock) AddDroppedColumns

func (l *Lock) AddDroppedColumns(source, schema, table string, cols []string) error

AddDroppedColumns adds a dropped column name in both etcd and lock's column map.

func (*Lock) AddTable

func (l *Lock) AddTable(source, schema, table string, needLock bool)

AddTable create a table in lock.

func (*Lock) DeleteColumnsByOp

func (l *Lock) DeleteColumnsByOp(op Operation) error

DeleteColumnsByOp deletes the partially dropped columns that extracted from operation. We can not remove columns from the partially dropped columns map unless: this column is dropped in the downstream database, all the upstream source done the delete column operation that is to say, columns all done.

func (*Lock) FetchTableInfos

func (l *Lock) FetchTableInfos(task, source, schema, table string) (*model.TableInfo, error)

FetchTableInfos fetch all table infos for a lock.

func (*Lock) GetVersion

func (l *Lock) GetVersion(source string, schema string, table string) int64

GetVersion return version of info in lock.

func (*Lock) HasTables

func (l *Lock) HasTables() bool

HasTables check whether a lock has tables.

func (*Lock) IsDone

func (l *Lock) IsDone(source, schema, table string) bool

IsDone returns whether the operation of the source table has done.

func (*Lock) IsDroppedColumn

func (l *Lock) IsDroppedColumn(source, upSchema, upTable, col string) bool

IsDroppedColumn checks whether this column is a partially dropped column for this lock.

func (*Lock) IsResolved

func (l *Lock) IsResolved() bool

IsResolved returns whether the lock has resolved. return true if all tables have the same schema and all DDLs operations have done.

func (*Lock) IsSynced

func (l *Lock) IsSynced() (bool, int)

IsSynced returns whether the lock has synced. In the optimistic mode, we call it `synced` if table info of all tables are the same, and we define `remain` as the table count which have different table info with the joined one, e.g. for `ADD COLUMN`, it's the table count which have not added the column, for `DROP COLUMN`, it's the table count which have dropped the column.

func (*Lock) Joined

func (l *Lock) Joined() (schemacmp.Table, error)

Joined returns the joined table info.

func (*Lock) Ready

func (l *Lock) Ready() map[string]map[string]map[string]bool

Ready returns the source tables' sync status (whether they are ready). we define `ready` if the table's info is the same with the joined one, e.g for `ADD COLUMN`, it's true if it has added the column, for `DROP COLUMN`, it's true if it has not dropped the column.

func (*Lock) TryMarkDone

func (l *Lock) TryMarkDone(source, schema, table string) bool

TryMarkDone tries to mark the operation of the source table as done. it returns whether marked done. NOTE: this method can always mark a existing table as done, so the caller of this method should ensure that the table has done the DDLs operation. NOTE: a done table may revert to not-done if new table schema received and new DDLs operation need to be done.

func (*Lock) TryRemoveTable

func (l *Lock) TryRemoveTable(source, schema, table string) []string

TryRemoveTable tries to remove a table in the lock. it returns whether the table has been removed. TODO: it does NOT try to rebuild the joined schema after the table removed now. try to support this if needed later. NOTE: if no table exists in the lock after removed the table, it's the caller's responsibility to decide whether remove the lock or not.

func (*Lock) TryRemoveTableBySources

func (l *Lock) TryRemoveTableBySources(sources []string) []string

TryRemoveTable tries to remove tables in the lock by sources. return drop columns for later use.

func (*Lock) TrySync

func (l *Lock) TrySync(info Info, tts []TargetTable) (newDDLs []string, cols []string, err error)

TrySync tries to sync the lock, re-entrant. new upstream sources may join when the DDL lock is in syncing, so we need to merge these new sources. NOTE: now, any error returned, we treat it as conflict detected. NOTE: now, DDLs (not empty) returned when resolved the conflict, but in fact these DDLs should not be replicated to the downstream. NOTE: now, `TrySync` can detect and resolve conflicts in both of the following modes:

  • non-intrusive: update the schema of non-conflict tables to match the conflict tables. data from conflict tables are non-intrusive.
  • intrusive: revert the schema of the conflict tables to match the non-conflict tables. data from conflict tables are intrusive.

TODO: but both of these modes are difficult to be implemented in DM-worker now, try to do that later. for non-intrusive, a broadcast mechanism needed to notify conflict tables after the conflict has resolved, or even a block mechanism needed. for intrusive, a DML prune or transform mechanism needed for two different schemas (before and after the conflict resolved).

func (*Lock) UpdateTableAfterUnlock

func (l *Lock) UpdateTableAfterUnlock(info Info)

UpdateTableAfterUnlock updates table's schema info after unlock exec action.

type LockKeeper

type LockKeeper struct {
	// contains filtered or unexported fields
}

LockKeeper used to keep and handle DDL lock conveniently. The lock information do not need to be persistent, and can be re-constructed from the shard DDL info. But the drop columns should be persistent.

func NewLockKeeper

func NewLockKeeper(getDownstreamMetaFunc func(string) (*dbconfig.DBConfig, string)) *LockKeeper

NewLockKeeper creates a new LockKeeper instance.

func (*LockKeeper) Clear

func (lk *LockKeeper) Clear()

Clear clears all Locks.

func (*LockKeeper) FindLock

func (lk *LockKeeper) FindLock(lockID string) *Lock

FindLock finds a lock.

func (*LockKeeper) FindLockByInfo

func (lk *LockKeeper) FindLockByInfo(info Info) *Lock

FindLockByInfo finds a lock with a shard DDL info.

func (*LockKeeper) FindLocksByTask

func (lk *LockKeeper) FindLocksByTask(task string) []*Lock

FindLocksByTask finds locks by task.

func (*LockKeeper) Locks

func (lk *LockKeeper) Locks() map[string]*Lock

Locks return a copy of all Locks.

func (*LockKeeper) RemoveDownstreamMeta

func (lk *LockKeeper) RemoveDownstreamMeta(task string)

RemoveDownstreamMeta removes downstream mate by task.

func (*LockKeeper) RemoveLock

func (lk *LockKeeper) RemoveLock(lockID string) bool

RemoveLock removes a lock.

func (*LockKeeper) RemoveLockByInfo

func (lk *LockKeeper) RemoveLockByInfo(info Info) bool

RemoveLockByInfo removes a lock.

func (*LockKeeper) SetDropColumns

func (lk *LockKeeper) SetDropColumns(dropColumns map[string]map[string]map[string]map[string]map[string]DropColumnStage)

SetDropColumns set drop columns for lock keeper.

func (*LockKeeper) TrySync

func (lk *LockKeeper) TrySync(cli *clientv3.Client, info Info, tts []TargetTable) (string, []string, []string, error)

TrySync tries to sync the lock.

type LogInfo

type LogInfo struct {
	Task           string   `json:"task"`
	Source         string   `json:"source"`
	UpSchema       string   `json:"up-schema"`
	UpTable        string   `json:"up-table"`
	DownSchema     string   `json:"down-schema"`
	DownTable      string   `json:"down-table"`
	DDLs           []string `json:"ddls"`
	TableBefore    string   `json:"table-before"`
	TableAfter     string   `json:"table-after"`
	IsDeleted      bool     `json:"is-deleted"`
	Version        int64    `json:"version"`
	Revision       int64    `json:"revision"`
	IgnoreConflict bool     `json:"ignore-conflict"`
}

LogInfo replace TableInfo with schema.Table.String() for log.

type OldInfo

type OldInfo struct {
	Task       string   `json:"task"`
	Source     string   `json:"source"`
	UpSchema   string   `json:"up-schema"`
	UpTable    string   `json:"up-table"`
	DownSchema string   `json:"down-schema"`
	DownTable  string   `json:"down-table"`
	DDLs       []string `json:"ddls"`

	TableInfoBefore *model.TableInfo `json:"table-info-before"` // the tracked table schema before applying the DDLs
	TableInfoAfter  *model.TableInfo `json:"table-info-after"`  // the tracked table schema after applying the DDLs
}

OldInfo represents info in etcd before v2.0.2.

type Operation

type Operation struct {
	ID            string        `json:"id"`               // the corresponding DDL lock ID
	Task          string        `json:"task"`             // data migration task name
	Source        string        `json:"source"`           // upstream source ID
	UpSchema      string        `json:"up-schema"`        // upstream/source schema name, different sources can have the same schema name
	UpTable       string        `json:"up-table"`         // upstream/source table name, different sources can have the same table name
	DDLs          []string      `json:"ddls"`             // DDL statements need to apply to the downstream.
	ConflictStage ConflictStage `json:"conflict-stage"`   // current conflict stage.
	ConflictMsg   string        `json:"conflict-message"` // current conflict message
	Done          bool          `json:"done"`             // whether the operation has done
	Cols          []string      `json:"cols"`             // drop columns' name

	// only set it when get from etcd
	// use for sort infos in recovering locks
	Revision int64 `json:"-"`
}

Operation represents a shard DDL coordinate operation. This information should be persistent in etcd so can be retrieved after the DM-master leader restarted or changed. NOTE: `Task`, `Source`, `UpSchema` and `UpTable` are redundant in the etcd key path for convenient. Operation is putted when coordinating a shard DDL operation for DM-worker by DM-master, and is updated (with `done`) after DM-worker has done the operation by DM-worker, and is deleted when removing the lock by DM-master. because we need the newest stage in Operation to recover the lock when restarting DM-master.

func GetOperation

func GetOperation(cli *clientv3.Client, task, source, upSchema, upTable string) (Operation, int64, error)

GetOperation gets shard DDL operation in etcd currently. This function should often be called by DM-worker. (task-name, source-ID, upstream-schema-name, upstream-table-name) -> shard DDL operation.

func NewOperation

func NewOperation(id, task, source, upSchema, upTable string,
	ddls []string, conflictStage ConflictStage, conflictMsg string, done bool, cols []string,
) Operation

NewOperation creates a new Operation instance.

func (Operation) String

func (o Operation) String() string

String implements Stringer interface.

type RouteTable

type RouteTable struct {
	UpSchema   string
	UpTable    string
	DownSchema string
	DownTable  string
}

RouteTable represents a table in upstream/downstream.

type SourceTables

type SourceTables struct {
	Task   string `json:"task"`   // data migration task name
	Source string `json:"source"` // upstream source ID

	// downstream-schema-name -> downstream-table-name -> upstream-schema-name -> upstream-table-name -> struct{},
	// multiple downstream/target tables (<downstream-schema-name, downstream-table-name> pair) may exist in one subtask.
	Tables map[string]map[string]map[string]map[string]struct{} `json:"tables"`

	// only used to report to the caller of the watcher, do not marsh it.
	// if it's true, it means the SourceTables has been deleted in etcd.
	IsDeleted bool `json:"-"`
}

SourceTables represents the upstream/sources tables for a data migration **subtask**. This information should be persistent in etcd so can be retrieved after the DM-master leader restarted or changed. We need this because only one shard group exists for **every** target table in the optimistic mode (in DM-master), so we need DM-worker to report its upstream table names to DM-master. NOTE: `Task` and `Source` are redundant in the etcd key path for convenient. SourceTables is putted when starting the subtask by DM-worker, and is updated when new tables added/removed in the upstream source by DM-worker, and **may be** deleted when stopping the subtask by DM-worker later.

func NewSourceTables

func NewSourceTables(task, source string) SourceTables

NewSourceTables creates a new SourceTables instances.

func (*SourceTables) AddTable

func (st *SourceTables) AddTable(upSchema, upTable, downSchema, downTable string) bool

AddTable adds a table into SourceTables. it returns whether added (not exist before).

func (*SourceTables) RemoveTable

func (st *SourceTables) RemoveTable(upSchema, upTable, downSchema, downTable string) bool

RemoveTable removes a table from SourceTables. it returns whether removed (exist before).

func (SourceTables) String

func (st SourceTables) String() string

String implements Stringer interface.

func (*SourceTables) TargetTable

func (st *SourceTables) TargetTable(downSchema, downTable string) TargetTable

TargetTable returns a TargetTable instance for a specified downstream table, returns an empty TargetTable instance if no tables exist.

type TableKeeper

type TableKeeper struct {
	// contains filtered or unexported fields
}

TableKeeper used to keep initial tables for a task in optimism mode.

func NewTableKeeper

func NewTableKeeper() *TableKeeper

NewTableKeeper creates a new TableKeeper instance.

func (*TableKeeper) AddTable

func (tk *TableKeeper) AddTable(task, source, upSchema, upTable, downSchema, downTable string) bool

AddTable adds a table into the source tables. it returns whether added (not exist before). NOTE: we only add for existing task now.

func (*TableKeeper) FindTables

func (tk *TableKeeper) FindTables(task, downSchema, downTable string) []TargetTable

FindTables finds source tables by task name and downstream table name.

func (*TableKeeper) Init

func (tk *TableKeeper) Init(stm map[string]map[string]SourceTables)

Init (re-)initializes the keeper with initial source tables.

func (*TableKeeper) RemoveTable

func (tk *TableKeeper) RemoveTable(task, source, upSchema, upTable, downSchema, downTable string) bool

RemoveTable removes a table from the source tables. it returns whether removed (exit before).

func (*TableKeeper) RemoveTableByTask

func (tk *TableKeeper) RemoveTableByTask(task string) bool

RemoveTableByTask removes tables from the source tables through task name. it returns whether removed (exit before).

func (*TableKeeper) RemoveTableByTaskAndSources

func (tk *TableKeeper) RemoveTableByTaskAndSources(task string, sources []string)

RemoveTableByTaskAndSource removes tables from the source tables through task name and sources.

func (*TableKeeper) SourceTableExist

func (tk *TableKeeper) SourceTableExist(task, source, upSchema, upTable, downSchema, downTable string) bool

SourceTableExist check whether a source table exist.

func (*TableKeeper) Update

func (tk *TableKeeper) Update(st SourceTables) (map[RouteTable]struct{}, map[RouteTable]struct{})

Update adds/updates tables into the keeper or removes tables from the keeper. it returns the newly added and dropped tables.

type TargetTable

type TargetTable struct {
	Task       string `json:"task"`        // data migration task name
	Source     string `json:"source"`      // upstream source ID
	DownSchema string `json:"down-schema"` // downstream schema name
	DownTable  string `json:"down-table"`  // downstream table name

	// upstream-schema-name -> upstream-table-name -> struct{}
	UpTables map[string]map[string]struct{} `json:"up-tables"`
}

TargetTable represents some upstream/sources tables for **one** target table. It is often generated from `SourceTables` for the specified downstream table.

func TargetTablesForTask

func TargetTablesForTask(task, downSchema, downTable string, stm map[string]map[string]SourceTables) []TargetTable

TargetTablesForTask returns TargetTable list for a specified task and downstream table. stm: task name -> upstream source ID -> SourceTables.

func (TargetTable) IsEmpty

func (tt TargetTable) IsEmpty() bool

IsEmpty returns whether the TargetTable instance is empty.

type TargetTableSlice

type TargetTableSlice []TargetTable

TargetTableSlice attaches the methods of Interface to []TargetTable, sorting in increasing order according to `Source` field.

func (TargetTableSlice) Len

func (t TargetTableSlice) Len() int

Len implements Sorter.Len.

func (TargetTableSlice) Less

func (t TargetTableSlice) Less(i, j int) bool

Less implements Sorter.Less.

func (TargetTableSlice) Swap

func (t TargetTableSlice) Swap(i, j int)

Swap implements Sorter.Swap.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL