syncer

package
v1.0.7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 21, 2021 License: Apache-2.0 Imports: 57 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// IncompatibleDDLFormat is for incompatible ddl
	IncompatibleDDLFormat = `` /* 407-byte string literal not displayed */

)
View Source
var (

	// MaxDDLConnectionTimeoutMinute also used by SubTask.ExecuteDDL
	MaxDDLConnectionTimeoutMinute = 5
)
View Source
var (
	// OnlineDDLSchemes is scheme name => online ddl handler
	OnlineDDLSchemes = map[string]func(*tcontext.Context, *config.SubTaskConfig) (OnlinePlugin, error){
		config.PT:    NewPT,
		config.GHOST: NewGhost,
	}
)

Functions

func GenTableID

func GenTableID(schema, table string) (ID string, isSchemaOnly bool)

GenTableID generates table ID

func InitStatusAndMetrics

func InitStatusAndMetrics(addr string)

InitStatusAndMetrics register prometheus metrics and listen for status port.

func RegisterMetrics

func RegisterMetrics(registry *prometheus.Registry)

RegisterMetrics registers metrics

func UnpackTableID

func UnpackTableID(id string) (string, string)

UnpackTableID unpacks table ID to <schema, table> pair

Types

type BinlogType

type BinlogType uint8

BinlogType represents binlog sync type

const (
	RemoteBinlog BinlogType = iota + 1
	LocalBinlog
)

binlog sync type

type CheckPoint

type CheckPoint interface {
	// Init initializes the CheckPoint
	Init(tctx *tcontext.Context) error

	// Close closes the CheckPoint
	Close()

	// ResetConn resets database connections owned by the Checkpoint
	ResetConn(tctx *tcontext.Context) error

	// Clear clears all checkpoints
	Clear(tctx *tcontext.Context) error

	// Load loads all checkpoints saved by CheckPoint
	Load(tctx *tcontext.Context) error

	// LoadMeta loads checkpoints from meta config item or file
	LoadMeta() error

	// SaveTablePoint saves checkpoint for specified table in memory
	SaveTablePoint(sourceSchema, sourceTable string, pos mysql.Position)

	// DeleteTablePoint deletes checkpoint for specified table in memory and storage
	DeleteTablePoint(tctx *tcontext.Context, sourceSchema, sourceTable string) error

	// IsOlderThanTablePoint checks whether job's checkpoint is older than previous saved checkpoint
	IsOlderThanTablePoint(sourceSchema, sourceTable string, pos mysql.Position, useLE bool) bool

	// SaveGlobalPoint saves the global binlog stream's checkpoint
	// corresponding to Meta.Save
	SaveGlobalPoint(pos mysql.Position)

	// FlushGlobalPointsExcept flushes the global checkpoint and tables'
	// checkpoints except exceptTables, it also flushes SQLs with Args providing
	// by extraSQLs and extraArgs. Currently extraSQLs contain shard meta only.
	// @exceptTables: [[schema, table]... ]
	// corresponding to Meta.Flush
	FlushPointsExcept(tctx *tcontext.Context, exceptTables [][]string, extraSQLs []string, extraArgs [][]interface{}) error

	// GlobalPoint returns the global binlog stream's checkpoint
	// corresponding to to Meta.Pos
	GlobalPoint() mysql.Position

	// TablePoint returns all table's stream checkpoint
	TablePoint() map[string]map[string]mysql.Position

	// FlushedGlobalPoint returns the flushed global binlog stream's checkpoint
	// corresponding to to Meta.Pos
	FlushedGlobalPoint() mysql.Position

	// CheckGlobalPoint checks whether we should save global checkpoint
	// corresponding to Meta.Check
	CheckGlobalPoint() bool

	// Rollback rolls global checkpoint and all table checkpoints back to flushed checkpoints
	Rollback()

	// String return text of global position
	String() string
}

CheckPoint represents checkpoints status for syncer including global binlog's checkpoint and every table's checkpoint when save checkpoint, we must differ saving in memory from saving (flushing) to DB (or file) permanently for sharding merging, we must save checkpoint in memory to support skip when re-syncing for the special streamer but before all DDLs for a sharding group to be synced and executed, we should not save checkpoint permanently because, when restarting to continue the sync, all sharding DDLs must try-sync again

func NewRemoteCheckPoint

func NewRemoteCheckPoint(tctx *tcontext.Context, cfg *config.SubTaskConfig, id string) CheckPoint

NewRemoteCheckPoint creates a new RemoteCheckPoint

type DBConn added in v1.0.2

type DBConn struct {
	// contains filtered or unexported fields
}

DBConn represents a live DB connection it's not thread-safe

type DDLExecInfo

type DDLExecInfo struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

DDLExecInfo used by syncer to execute or ignore sharding DDL it's specific to syncer, and can not be used by other process unit

func NewDDLExecInfo

func NewDDLExecInfo() *DDLExecInfo

NewDDLExecInfo creates a new DDLExecInfo

func (*DDLExecInfo) BlockingDDLs

func (i *DDLExecInfo) BlockingDDLs() []string

BlockingDDLs returns current blocking DDL

func (*DDLExecInfo) Chan

func (i *DDLExecInfo) Chan(ddls []string) <-chan *DDLExecItem

Chan returns a receive only DDLExecItem chan

func (*DDLExecInfo) ClearBlockingDDL

func (i *DDLExecInfo) ClearBlockingDDL()

ClearBlockingDDL clears current blocking DDL

func (*DDLExecInfo) Close

func (i *DDLExecInfo) Close()

Close closes the chan

func (*DDLExecInfo) Renew

func (i *DDLExecInfo) Renew()

Renew renews the chan

func (*DDLExecInfo) Send

func (i *DDLExecInfo) Send(ctx context.Context, item *DDLExecItem) error

Send sends an item (with request) to the chan

type DDLExecItem

type DDLExecItem struct {
	// contains filtered or unexported fields
}

DDLExecItem wraps request and response for a sharding DDL execution

type ExecErrorContext

type ExecErrorContext struct {
	// contains filtered or unexported fields
}

ExecErrorContext records a failed exec SQL information

type GenColCache

type GenColCache struct {
	// contains filtered or unexported fields
}

GenColCache stores generated column information for all tables

func NewGenColCache

func NewGenColCache() *GenColCache

NewGenColCache creates a GenColCache.

type Ghost

type Ghost struct {
	// contains filtered or unexported fields
}

Ghost handles gh-ost online ddls (not complete, don't need to review it) _*_gho ghost table _*_ghc ghost changelog table _*_del ghost transh table

func (*Ghost) Apply

func (g *Ghost) Apply(tctx *tcontext.Context, tables []*filter.Table, statement string, stmt ast.StmtNode) ([]string, string, string, error)

Apply implements interface. returns ddls, real schema, real table, error

func (*Ghost) Clear

func (g *Ghost) Clear(tctx *tcontext.Context) error

Clear clears online ddl information

func (*Ghost) Close

func (g *Ghost) Close()

Close implements interface

func (*Ghost) Finish

func (g *Ghost) Finish(tctx *tcontext.Context, schema, table string) error

Finish implements interface

func (*Ghost) RealName

func (g *Ghost) RealName(schema, table string) (string, string)

RealName implements interface

func (*Ghost) ResetConn added in v1.0.2

func (g *Ghost) ResetConn(tctx *tcontext.Context) error

ResetConn implements interface

func (*Ghost) TableType

func (g *Ghost) TableType(table string) TableType

TableType implements interface

type GhostDDLInfo

type GhostDDLInfo struct {
	Schema string `json:"schema"`
	Table  string `json:"table"`

	DDLs []string `json:"ddls"`
}

GhostDDLInfo stores ghost information and ddls

type Heartbeat

type Heartbeat struct {
	// contains filtered or unexported fields
}

Heartbeat represents a heartbeat mechanism to measures replication lag on mysql and tidb/mysql. Learn from: https://www.percona.com/doc/percona-toolkit/LATEST/pt-heartbeat.html

func GetHeartbeat

func GetHeartbeat(cfg *HeartbeatConfig) (*Heartbeat, error)

GetHeartbeat gets singleton instance of Heartbeat

func (*Heartbeat) AddTask

func (h *Heartbeat) AddTask(name string) error

AddTask adds a new task

func (*Heartbeat) RemoveTask

func (h *Heartbeat) RemoveTask(name string) error

RemoveTask removes a previous added task

func (*Heartbeat) TryUpdateTaskTs

func (h *Heartbeat) TryUpdateTaskTs(taskName, schema, table string, data [][]interface{})

TryUpdateTaskTs tries to update task's ts

type HeartbeatConfig

type HeartbeatConfig struct {
	// contains filtered or unexported fields
}

HeartbeatConfig represents Heartbeat configurations.

func (*HeartbeatConfig) Equal

func (cfg *HeartbeatConfig) Equal(other *HeartbeatConfig) error

Equal tests whether config equals to other

type OnlineDDLStorage

type OnlineDDLStorage struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

OnlineDDLStorage stores sharding group online ddls information

func NewOnlineDDLStorage

func NewOnlineDDLStorage(logCtx *tcontext.Context, cfg *config.SubTaskConfig) *OnlineDDLStorage

NewOnlineDDLStorage creates a new online ddl storager

func (*OnlineDDLStorage) Clear

func (s *OnlineDDLStorage) Clear(tctx *tcontext.Context) error

Clear clears online ddl information from storage

func (*OnlineDDLStorage) Close

func (s *OnlineDDLStorage) Close()

Close closes database connection

func (*OnlineDDLStorage) Delete

func (s *OnlineDDLStorage) Delete(tctx *tcontext.Context, ghostSchema, ghostTable string) error

Delete deletes online ddl informations

func (*OnlineDDLStorage) Get

func (s *OnlineDDLStorage) Get(ghostSchema, ghostTable string) *GhostDDLInfo

Get returns ddls by given schema/table

func (*OnlineDDLStorage) Init

func (s *OnlineDDLStorage) Init(tctx *tcontext.Context) error

Init initials online handler

func (*OnlineDDLStorage) Load

func (s *OnlineDDLStorage) Load(tctx *tcontext.Context) error

Load loads information from storage

func (*OnlineDDLStorage) ResetConn added in v1.0.2

func (s *OnlineDDLStorage) ResetConn(tctx *tcontext.Context) error

ResetConn implements OnlinePlugin.ResetConn

func (*OnlineDDLStorage) Save

func (s *OnlineDDLStorage) Save(tctx *tcontext.Context, ghostSchema, ghostTable, realSchema, realTable, ddl string) error

Save saves online ddl information

type OnlinePlugin

type OnlinePlugin interface {
	// Applys does:
	// * detect online ddl
	// * record changes
	// * apply online ddl on real table
	// returns sqls, replaced/self schema, repliaced/slef table, error
	Apply(tctx *tcontext.Context, tables []*filter.Table, statement string, stmt ast.StmtNode) ([]string, string, string, error)
	// Finish would delete online ddl from memory and storage
	Finish(tctx *tcontext.Context, schema, table string) error
	// TableType returns ghhost/real table
	TableType(table string) TableType
	// RealName returns real table name that removed ghost suffix and handled by table router
	RealName(schema, table string) (string, string)
	// ResetConn reset db connection
	ResetConn(tctx *tcontext.Context) error
	// Clear clears all online information
	Clear(tctx *tcontext.Context) error
	// Close closes online ddl plugin
	Close()
}

OnlinePlugin handles online ddl solutions like pt, gh-ost

func NewGhost

func NewGhost(tctx *tcontext.Context, cfg *config.SubTaskConfig) (OnlinePlugin, error)

NewGhost returns gh-oat online plugin

func NewPT

func NewPT(tctx *tcontext.Context, cfg *config.SubTaskConfig) (OnlinePlugin, error)

NewPT returns pt online schema changes plugin

type PT

type PT struct {
	// contains filtered or unexported fields
}

PT handles pt online schema changes (_*).*_new ghost table (_*).*_old ghost transh table we don't support `--new-table-name` flag

func (*PT) Apply

func (p *PT) Apply(tctx *tcontext.Context, tables []*filter.Table, statement string, stmt ast.StmtNode) ([]string, string, string, error)

Apply implements interface. returns ddls, real schema, real table, error

func (*PT) Clear

func (p *PT) Clear(tctx *tcontext.Context) error

Clear clears online ddl information

func (*PT) Close

func (p *PT) Close()

Close implements interface

func (*PT) Finish

func (p *PT) Finish(tcxt *tcontext.Context, schema, table string) error

Finish implements interface

func (*PT) RealName

func (p *PT) RealName(schema, table string) (string, string)

RealName implements interface

func (*PT) ResetConn added in v1.0.2

func (p *PT) ResetConn(tctx *tcontext.Context) error

ResetConn implements interface

func (*PT) TableType

func (p *PT) TableType(table string) TableType

TableType implements interface

type RemoteCheckPoint

type RemoteCheckPoint struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

RemoteCheckPoint implements CheckPoint which using target database to store info NOTE: now we sync from relay log, so not add GTID support yet it's not thread-safe

func (*RemoteCheckPoint) CheckGlobalPoint

func (cp *RemoteCheckPoint) CheckGlobalPoint() bool

CheckGlobalPoint implements CheckPoint.CheckGlobalPoint

func (*RemoteCheckPoint) Clear

func (cp *RemoteCheckPoint) Clear(tctx *tcontext.Context) error

Clear implements CheckPoint.Clear

func (*RemoteCheckPoint) Close

func (cp *RemoteCheckPoint) Close()

Close implements CheckPoint.Close

func (*RemoteCheckPoint) DeleteTablePoint

func (cp *RemoteCheckPoint) DeleteTablePoint(tctx *tcontext.Context, sourceSchema, sourceTable string) error

DeleteTablePoint implements CheckPoint.DeleteTablePoint

func (*RemoteCheckPoint) FlushPointsExcept

func (cp *RemoteCheckPoint) FlushPointsExcept(tctx *tcontext.Context, exceptTables [][]string, extraSQLs []string, extraArgs [][]interface{}) error

FlushPointsExcept implements CheckPoint.FlushPointsExcept

func (*RemoteCheckPoint) FlushedGlobalPoint

func (cp *RemoteCheckPoint) FlushedGlobalPoint() mysql.Position

FlushedGlobalPoint implements CheckPoint.FlushedGlobalPoint

func (*RemoteCheckPoint) GlobalPoint

func (cp *RemoteCheckPoint) GlobalPoint() mysql.Position

GlobalPoint implements CheckPoint.GlobalPoint

func (*RemoteCheckPoint) Init

func (cp *RemoteCheckPoint) Init(tctx *tcontext.Context) error

Init implements CheckPoint.Init

func (*RemoteCheckPoint) IsOlderThanTablePoint added in v1.0.7

func (cp *RemoteCheckPoint) IsOlderThanTablePoint(sourceSchema, sourceTable string, pos mysql.Position, useLE bool) bool

IsOlderThanTablePoint implements CheckPoint.IsOlderThanTablePoint. For binlog position replication, currently DM will split rows changes of an event to jobs, so some job may has save position. if useLE is true, we use less than or equal.

func (*RemoteCheckPoint) Load

func (cp *RemoteCheckPoint) Load(tctx *tcontext.Context) error

Load implements CheckPoint.Load

func (*RemoteCheckPoint) LoadMeta

func (cp *RemoteCheckPoint) LoadMeta() error

LoadMeta implements CheckPoint.LoadMeta

func (*RemoteCheckPoint) ResetConn

func (cp *RemoteCheckPoint) ResetConn(tctx *tcontext.Context) error

ResetConn implements CheckPoint.ResetConn

func (*RemoteCheckPoint) Rollback

func (cp *RemoteCheckPoint) Rollback()

Rollback implements CheckPoint.Rollback

func (*RemoteCheckPoint) SaveGlobalPoint

func (cp *RemoteCheckPoint) SaveGlobalPoint(pos mysql.Position)

SaveGlobalPoint implements CheckPoint.SaveGlobalPoint

func (*RemoteCheckPoint) SaveTablePoint

func (cp *RemoteCheckPoint) SaveTablePoint(sourceSchema, sourceTable string, pos mysql.Position)

SaveTablePoint implements CheckPoint.SaveTablePoint

func (*RemoteCheckPoint) String

func (cp *RemoteCheckPoint) String() string

String implements CheckPoint.String

func (*RemoteCheckPoint) TablePoint added in v1.0.4

func (cp *RemoteCheckPoint) TablePoint() map[string]map[string]mysql.Position

TablePoint implements CheckPoint.TablePoint

type ShardingGroup

type ShardingGroup struct {
	sync.RWMutex

	IsSchemaOnly bool // whether is a schema (database) only DDL TODO: zxc add schema-level syncing support later
	// contains filtered or unexported fields
}

ShardingGroup represents a sharding DDL sync group

func NewShardingGroup

func NewShardingGroup(sourceID, shardMetaSchema, shardMetaTable string, sources []string, meta *shardmeta.ShardingMeta, isSchemaOnly bool) *ShardingGroup

NewShardingGroup creates a new ShardingGroup

func (*ShardingGroup) ActiveDDLFirstPos

func (sg *ShardingGroup) ActiveDDLFirstPos() (mysql.Position, error)

ActiveDDLFirstPos returns the first binlog position of active DDL

func (*ShardingGroup) CheckSyncing

func (sg *ShardingGroup) CheckSyncing(source string, pos mysql.Position) (beforeActiveDDL bool)

CheckSyncing checks the source table syncing status returns

beforeActiveDDL: whether the position is before active DDL

func (*ShardingGroup) FirstEndPosUnresolved

func (sg *ShardingGroup) FirstEndPosUnresolved() *mysql.Position

FirstEndPosUnresolved returns the first DDL End_log_pos if un-resolved, else nil

func (*ShardingGroup) FirstPosUnresolved

func (sg *ShardingGroup) FirstPosUnresolved() *mysql.Position

FirstPosUnresolved returns the first DDL pos if un-resolved, else nil

func (*ShardingGroup) FlushData

func (sg *ShardingGroup) FlushData(targetTableID string) ([]string, [][]interface{})

FlushData returns sharding meta flush SQLs and args

func (*ShardingGroup) InSequenceSharding

func (sg *ShardingGroup) InSequenceSharding() bool

InSequenceSharding returns whether this sharding group is in sequence sharding

func (*ShardingGroup) IsUnresolved

func (sg *ShardingGroup) IsUnresolved() bool

IsUnresolved return whether it's unresolved

func (*ShardingGroup) Leave

func (sg *ShardingGroup) Leave(sources []string) error

Leave leaves from sharding group it, doesn't affect in syncing process used cases

  • drop a database
  • drop table

func (*ShardingGroup) Merge

func (sg *ShardingGroup) Merge(sources []string) (bool, bool, int, error)

Merge merges new sources to exists used cases

  • add a new table to exists sharding group
  • add new table(s) to parent database's sharding group if group is in sequence sharding, return error directly othereise add it in source, set it false and increment remain

func (*ShardingGroup) Reset

func (sg *ShardingGroup) Reset()

Reset resets all sources to un-synced state when the previous sharding DDL synced and resolved, we need reset it

func (*ShardingGroup) ResolveShardingDDL

func (sg *ShardingGroup) ResolveShardingDDL() bool

ResolveShardingDDL resolves sharding DDL in sharding group

func (*ShardingGroup) Sources

func (sg *ShardingGroup) Sources() map[string]bool

Sources returns all sources (and whether synced)

func (*ShardingGroup) String

func (sg *ShardingGroup) String() string

String implements Stringer.String

func (*ShardingGroup) Tables

func (sg *ShardingGroup) Tables() [][]string

Tables returns all source tables' <schema, table> pair

func (*ShardingGroup) TrySync

func (sg *ShardingGroup) TrySync(source string, pos, endPos mysql.Position, ddls []string) (bool, bool, int, error)

TrySync tries to sync the sharding group returns

synced: whether the source table's sharding group synced
active: whether the DDL will be processed in this round
remain: remain un-synced source table's count

func (*ShardingGroup) UnresolvedGroupInfo

func (sg *ShardingGroup) UnresolvedGroupInfo() *pb.ShardingGroup

UnresolvedGroupInfo returns pb.ShardingGroup if is unresolved, else returns nil

func (*ShardingGroup) UnresolvedTables

func (sg *ShardingGroup) UnresolvedTables() [][]string

UnresolvedTables returns all source tables' <schema, table> pair if is unresolved, else returns nil

type ShardingGroupKeeper

type ShardingGroupKeeper struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

ShardingGroupKeeper used to keep ShardingGroup

func NewShardingGroupKeeper

func NewShardingGroupKeeper(tctx *tcontext.Context, cfg *config.SubTaskConfig) *ShardingGroupKeeper

NewShardingGroupKeeper creates a new ShardingGroupKeeper

func (*ShardingGroupKeeper) ActiveDDLFirstPos

func (k *ShardingGroupKeeper) ActiveDDLFirstPos(targetSchema, targetTable string) (mysql.Position, error)

ActiveDDLFirstPos returns the binlog position of active DDL

func (*ShardingGroupKeeper) AddGroup

func (k *ShardingGroupKeeper) AddGroup(targetSchema, targetTable string, sourceIDs []string, meta *shardmeta.ShardingMeta, merge bool) (needShardingHandle bool, group *ShardingGroup, synced bool, remain int, err error)

AddGroup adds new group(s) according to target schema, table and source IDs

func (*ShardingGroupKeeper) AdjustGlobalPoint

func (k *ShardingGroupKeeper) AdjustGlobalPoint(globalPoint mysql.Position) mysql.Position

AdjustGlobalPoint adjusts globalPoint with sharding groups' lowest first point

func (*ShardingGroupKeeper) Close

func (k *ShardingGroupKeeper) Close()

Close closes sharding group keeper

func (*ShardingGroupKeeper) Group

func (k *ShardingGroupKeeper) Group(targetSchema, targetTable string) *ShardingGroup

Group returns target table's group, nil if not exist

func (*ShardingGroupKeeper) Groups

func (k *ShardingGroupKeeper) Groups() map[string]*ShardingGroup

Groups returns all sharding groups, often used for debug caution: do not modify the returned groups directly

func (*ShardingGroupKeeper) InSequenceSharding

func (k *ShardingGroupKeeper) InSequenceSharding() bool

InSequenceSharding returns whether exists sharding group in unfinished sequence sharding

func (*ShardingGroupKeeper) InSyncing

func (k *ShardingGroupKeeper) InSyncing(targetSchema, targetTable, source string, pos mysql.Position) bool

InSyncing checks whether the source is in sharding syncing

func (*ShardingGroupKeeper) Init

func (k *ShardingGroupKeeper) Init() error

Init does initialization staff

func (*ShardingGroupKeeper) LeaveGroup

func (k *ShardingGroupKeeper) LeaveGroup(targetSchema, targetTable string, sources []string) error

LeaveGroup leaves group according to target schema, table and source IDs LeaveGroup doesn't affect in syncing process

func (*ShardingGroupKeeper) LoadShardMeta

func (k *ShardingGroupKeeper) LoadShardMeta() (map[string]*shardmeta.ShardingMeta, error)

LoadShardMeta implements CheckPoint.LoadShardMeta

func (*ShardingGroupKeeper) PrepareFlushSQLs

func (k *ShardingGroupKeeper) PrepareFlushSQLs(exceptTableIDs map[string]bool) ([]string, [][]interface{})

PrepareFlushSQLs returns all sharding meta flushed SQLs execpt for given table IDs

func (*ShardingGroupKeeper) ResetGroups

func (k *ShardingGroupKeeper) ResetGroups()

ResetGroups resets group's sync status

func (*ShardingGroupKeeper) ResolveShardingDDL

func (k *ShardingGroupKeeper) ResolveShardingDDL(targetSchema, targetTable string) (bool, error)

ResolveShardingDDL resolves one sharding DDL in specific group

func (*ShardingGroupKeeper) TrySync

func (k *ShardingGroupKeeper) TrySync(
	targetSchema, targetTable, source string, pos, endPos mysql.Position, ddls []string) (
	needShardingHandle bool, group *ShardingGroup, synced, active bool, remain int, err error)

TrySync tries to sync the sharding group returns

isSharding: whether the source table is in a sharding group
group: the sharding group
synced: whether the source table's sharding group synced
active: whether is active DDL in sequence sharding DDL
remain: remain un-synced source table's count

func (*ShardingGroupKeeper) UnresolvedGroups

func (k *ShardingGroupKeeper) UnresolvedGroups() []*pb.ShardingGroup

UnresolvedGroups returns sharding groups which are un-resolved caution: do not modify the returned groups directly

func (*ShardingGroupKeeper) UnresolvedTables

func (k *ShardingGroupKeeper) UnresolvedTables() (map[string]bool, [][]string)

UnresolvedTables returns

all `target-schema.target-table` that has unresolved sharding DDL
all source tables which with DDLs are un-resolved

NOTE: this func only ensure the returned tables are current un-resolved if passing the returned tables to other func (like checkpoint), must ensure their sync state not changed in this progress

type ShardingReSync

type ShardingReSync struct {
	// contains filtered or unexported fields
}

ShardingReSync represents re-sync info for a sharding DDL group

type StreamerProducer

type StreamerProducer interface {
	// contains filtered or unexported methods
}

StreamerProducer provides the ability to generate binlog streamer by StartSync() but go-mysql StartSync() returns (struct, err) rather than (interface, err) And we can't simplely use StartSync() method in SteamerProducer so use generateStreamer to wrap StartSync() method to make *BinlogSyncer and *BinlogReader in same interface For other implementations who implement StreamerProducer and Streamer can easily take place of Syncer.streamProducer For test is easy to mock

type Syncer

type Syncer struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

Syncer can sync your MySQL data to another MySQL database.

func NewSyncer

func NewSyncer(cfg *config.SubTaskConfig) *Syncer

NewSyncer creates a new Syncer.

func (*Syncer) Close

func (s *Syncer) Close()

Close closes syncer.

func (*Syncer) DDLInfo

func (s *Syncer) DDLInfo() <-chan *pb.DDLInfo

DDLInfo returns a chan from which can receive DDLInfo

func (*Syncer) Error

func (s *Syncer) Error() interface{}

Error implements SubTaskUnit.Error

func (*Syncer) ExecuteDDL

func (s *Syncer) ExecuteDDL(ctx context.Context, execReq *pb.ExecDDLRequest) (<-chan error, error)

ExecuteDDL executes or skips a hanging-up DDL when in sharding

func (*Syncer) Init

func (s *Syncer) Init(ctx context.Context) (err error)

Init initializes syncer for a sync task, but not start Process. if fail, it should not call s.Close. some check may move to checker later.

func (*Syncer) InjectSQLs

func (s *Syncer) InjectSQLs(ctx context.Context, sqls []string) error

InjectSQLs injects ddl into syncer as binlog events while meet xid/query event TODO: let user to specify special xid/query event position TODO: inject dml sqls

func (*Syncer) IsFreshTask

func (s *Syncer) IsFreshTask(ctx context.Context) (bool, error)

IsFreshTask implements Unit.IsFreshTask

func (*Syncer) Pause

func (s *Syncer) Pause()

Pause pauses the process, and it can be resumed later should cancel context from external TODO: it is not a true-meaning Pause because you can't stop it by calling Pause only.

func (*Syncer) Process

func (s *Syncer) Process(ctx context.Context, pr chan pb.ProcessResult)

Process implements the dm.Unit interface.

func (*Syncer) Resume

func (s *Syncer) Resume(ctx context.Context, pr chan pb.ProcessResult)

Resume resumes the paused process

func (*Syncer) Run

func (s *Syncer) Run(ctx context.Context) (err error)

Run starts running for sync, we should guarantee it can rerun when paused.

func (*Syncer) SetSQLOperator

func (s *Syncer) SetSQLOperator(req *pb.HandleSubTaskSQLsRequest) error

SetSQLOperator sets an SQL operator to syncer

func (*Syncer) Status

func (s *Syncer) Status() interface{}

Status implements SubTaskUnit.Status it returns status, but does not calc status

func (*Syncer) Type

func (s *Syncer) Type() pb.UnitType

Type implements Unit.Type

func (*Syncer) Update

func (s *Syncer) Update(cfg *config.SubTaskConfig) error

Update implements Unit.Update now, only support to update config for routes, filters, column-mappings, black-white-list now no config diff implemented, so simply re-init use new config

func (*Syncer) UpdateFromConfig

func (s *Syncer) UpdateFromConfig(cfg *config.SubTaskConfig) error

UpdateFromConfig updates config for `From`

type TableType

type TableType string

TableType is type of table

type UpStreamConn added in v1.0.2

type UpStreamConn struct {
	BaseDB *conn.BaseDB
}

UpStreamConn connect to upstream DB Normally, we need to get some upstream information through some helper functions these helper functions are all easy query functions, so we use a pool of connections here maybe change to one connection some day

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL