shift

package
v0.0.0-...-2d50ede Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 17, 2023 License: GPL-3.0, GPL-3.0 Imports: 17 Imported by: 0

Documentation

Index

Constants

View Source
const (
	ToMySQLFlavor   = "mysql"
	ToMariaDBFlavor = "mariadb"
	ToNeoDBFlavor   = "neodb"
)

Use flavor for different target cluster

Variables

This section is empty.

Functions

This section is empty.

Types

type Config

type Config struct {
	ToFlavor string

	From         string
	FromUser     string
	FromPassword string
	FromDatabase string
	FromTable    string

	To         string
	ToUser     string
	ToPassword string
	ToDatabase string
	ToTable    string

	Rebalance              bool
	Cleanup                bool
	MySQLDump              string
	Threads                int
	Behinds                int
	NeoDBURL               string
	Checksum               bool
	WaitTimeBeforeChecksum int // seconds
}

type EventHandler

type EventHandler struct {
	canal.DummyEventHandler
	// contains filtered or unexported fields
}

func NewEventHandler

func NewEventHandler(log *xlog.Log, shift *Shift) *EventHandler

func (*EventHandler) DeleteRow

func (h *EventHandler) DeleteRow(e *canal.RowsEvent)

func (*EventHandler) InsertMySQLRow

func (h *EventHandler) InsertMySQLRow(e *canal.RowsEvent, systemTable bool)

func (*EventHandler) InsertNeoDBRow

func (h *EventHandler) InsertNeoDBRow(e *canal.RowsEvent, systemTable bool)

func (*EventHandler) OnRow

func (h *EventHandler) OnRow(e *canal.RowsEvent) error

OnRow used to handle the Insert/Delete/Update events.

func (*EventHandler) OnTableChanged

func (h *EventHandler) OnTableChanged(schema string, table string) error

OnTableChanged used to handle the QueryEvent and XAEvent.

func (*EventHandler) OnXA

func (h *EventHandler) OnXA(e *canal.XAEvent) error

func (*EventHandler) ParseValue

func (h *EventHandler) ParseValue(e *canal.RowsEvent, idx int, v interface{}) string

func (*EventHandler) UpdateRow

func (h *EventHandler) UpdateRow(e *canal.RowsEvent)

func (*EventHandler) WaitWorkerDone

func (h *EventHandler) WaitWorkerDone()

type Pool

type Pool struct {
	// contains filtered or unexported fields
}

Blocked connection pool.

func NewPool

func NewPool(log *xlog.Log, cap int, host string, user string, password string) (*Pool, error)

func (*Pool) Close

func (p *Pool) Close()

func (*Pool) Get

func (p *Pool) Get() *client.Conn

func (*Pool) Put

func (p *Pool) Put(conn *client.Conn)

type Query

type Query struct {
	// contains filtered or unexported fields
}

type QueryType

type QueryType int
const (
	QueryType_INSERT      QueryType = 0
	QueryType_DELETE      QueryType = 1
	QueryType_UPDATE      QueryType = 2
	QueryType_XA_ROLLBACK QueryType = 3
)

type Shift

type Shift struct {
	// contains filtered or unexported fields
}

func MockShift

func MockShift(log *xlog.Log, hasPK bool) (*Shift, func())

func MockShiftDDL

func MockShiftDDL(log *xlog.Log, hasPK bool) (*Shift, func())

func MockShiftMysqlTable

func MockShiftMysqlTable(log *xlog.Log, hasPK bool) (*Shift, func())

func MockShiftMysqlTableWithData

func MockShiftMysqlTableWithData(log *xlog.Log, hasPK bool) (*Shift, func())

func MockShiftWithCleanup

func MockShiftWithCleanup(log *xlog.Log, hasPK bool) (*Shift, func())

func MockShiftWithData

func MockShiftWithData(log *xlog.Log, hasPK bool) (*Shift, func())

func MockShiftWithNeoDBReadonlyError

func MockShiftWithNeoDBReadonlyError(log *xlog.Log, hasPK bool) (*Shift, func())

func MockShiftWithNeoDBShardRuleError

func MockShiftWithNeoDBShardRuleError(log *xlog.Log, hasPK bool) (*Shift, func())

func MockShiftWithRebalance

func MockShiftWithRebalance(log *xlog.Log, hasPK bool) (*Shift, func())

func MockShiftXa

func MockShiftXa(log *xlog.Log, hasPK bool) (*Shift, func())

func (*Shift) ChecksumTable

func (shift *Shift) ChecksumTable() error
mysql> checksum table sbtest.sbtest1;
+----------------+-----------+
| Table          | Checksum  |
+----------------+-----------+
| sbtest.sbtest1 | 410139351 |
+----------------+-----------+

ChecksumTable ensure that FromTable and ToTable are consistent

func (*Shift) Cleanup

func (shift *Shift) Cleanup() error

Cleanup used to clean up the table on the from who has shifted, Or cleanup the to tables who half shifted. This func must be called after canal closed, otherwise it maybe replicated by canal.

func (*Shift) SetStopSignal

func (shift *Shift) SetStopSignal()

SetStopSignal used to set a stop signal to shift

func (*Shift) Start

func (shift *Shift) Start() error

Start used to start canal and behinds ticker.

func (*Shift) WaitFinish

func (shift *Shift) WaitFinish() error

In WaitFinish(), we should add signal kill operation if we use shift as a program.

type ShiftHandler

type ShiftHandler interface {
	// Start used to start a shift work.
	Start() error

	// WaitFinish used to wait success or fail signal to finish.
	WaitFinish() error

	// ChecksumTable used to checksum data src tbl and dst tbl.
	ChecksumTable() error

	// SetStopSignal() used set a stop signal to stop a shift work.
	SetStopSignal()
}

func NewShift

func NewShift(log *xlog.Log, cfg *Config) ShiftHandler

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL