sync

package
v0.0.0-...-6fba4f8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 19, 2024 License: Apache-2.0 Imports: 27 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var QueueSizeGauge *prometheus.GaugeVec

QueueSizeGauge to be used.

Functions

func CreateLoader

func CreateLoader(
	db *sql.DB,
	cfg *DBConfig,
	worker int,
	batchSize int,
	queryHistogramVec *prometheus.HistogramVec,
	sqlMode *string,
	destDBType string,
	info *loopbacksync.LoopBackSync,
	enableDispatch bool,
	enableCausility bool,
) (ld loader.Loader, err error)

CreateLoader create the Loader instance.

func NewPBSyncer

func NewPBSyncer(dir string, retentionDays int, tableInfoGetter translator.TableInfoGetter) (*pbSyncer, error)

NewPBSyncer sync binlog to files

Types

type CheckpointConfig

type CheckpointConfig struct {
	Type     string `toml:"type" json:"type"`
	Schema   string `toml:"schema" json:"schema"`
	Table    string `toml:"table" json:"table"`
	Host     string `toml:"host" json:"host"`
	User     string `toml:"user" json:"user"`
	Password string `toml:"password" json:"password"`
	// if EncryptedPassword is not empty, Password will be ignore.
	EncryptedPassword string          `toml:"encrypted_password" json:"encrypted_password"`
	Port              int             `toml:"port" json:"port"`
	Security          security.Config `toml:"security" json:"security"`
	TLS               *tls.Config     `toml:"-" json:"-"`
	//for oracle database
	OracleServiceName   string `toml:"oracle-service-name" json:"oracle-service-name"`
	OracleConnectString string `toml:"oracle-connect-string" json:"oracle-connect-string"`
}

CheckpointConfig is the Checkpoint configuration.

type DBConfig

type DBConfig struct {
	Host     string          `toml:"host" json:"host"`
	User     string          `toml:"user" json:"user"`
	Password string          `toml:"password" json:"password"`
	Security security.Config `toml:"security" json:"security"`
	TLS      *tls.Config     `toml:"-" json:"-"`
	// if EncryptedPassword is not empty, Password will be ignore.
	EncryptedPassword       string            `toml:"encrypted_password" json:"encrypted_password"`
	SyncMode                int               `toml:"sync-mode" json:"sync-mode"`
	Port                    int               `toml:"port" json:"port"`
	Checkpoint              CheckpointConfig  `toml:"checkpoint" json:"checkpoint"`
	BinlogFileDir           string            `toml:"dir" json:"dir"`
	BinlogFileRetentionTime int               `toml:"retention-time" json:"retention-time"`
	Params                  map[string]string `toml:"params" json:"params"`

	ReadTimeout    time.Duration `toml:"-" json:"-"`
	ReadTimeoutStr util.Duration `toml:"read-timeout" json:"read-timeout"`

	Merge bool `toml:"merge" json:"merge"`

	ZKAddrs             string `toml:"zookeeper-addrs" json:"zookeeper-addrs"`
	KafkaAddrs          string `toml:"kafka-addrs" json:"kafka-addrs"`
	KafkaVersion        string `toml:"kafka-version" json:"kafka-version"`
	KafkaMaxMessages    int    `toml:"kafka-max-messages" json:"kafka-max-messages"`
	KafkaClientID       string `toml:"kafka-client-id" json:"kafka-client-id"`
	KafkaMaxMessageSize int32  `toml:"kafka-max-message-size" json:"kafka-max-message-size"`
	TopicName           string `toml:"topic-name" json:"topic-name"`

	// get it from pd
	ClusterID uint64 `toml:"-" json:"-"`
	//for oracle database
	OracleServiceName   string `toml:"oracle-service-name" json:"oracle-service-name"`
	OracleConnectString string `toml:"oracle-connect-string" json:"oracle-connect-string"`
}

DBConfig is the DB configuration.

type Item

type Item struct {
	Binlog        *pb.Binlog
	PrewriteValue *pb.PrewriteValue // only for DML
	Schema        string
	Table         string
	RelayLogPos   pb.Pos

	// Each item has a schemaVersion. with amend txn feature the prewrite DML's SchemaVersion could change.
	// which makes restart & reload history DDL with previous SchemaVersion not reliable.
	// so we should save this version as checkpoint.
	SchemaVersion int64

	// the applied TS executed in downstream, only for tidb
	AppliedTS int64
	// should skip to replicate this item at downstream
	// currently only used for signal the syncer to learn that the downstream schema is changed
	// when we don't replicate DDL.
	ShouldSkip bool
}

Item contains information about binlog

func (*Item) String

func (i *Item) String() string

type KafkaSyncer

type KafkaSyncer struct {
	// contains filtered or unexported fields
}

KafkaSyncer sync data to kafka

func NewKafka

func NewKafka(cfg *DBConfig, tableInfoGetter translator.TableInfoGetter) (*KafkaSyncer, error)

NewKafka returns a instance of KafkaSyncer

func (*KafkaSyncer) Close

func (p *KafkaSyncer) Close() error

Close implements Syncer interface

func (KafkaSyncer) Error

func (s KafkaSyncer) Error() <-chan error

Error implements Syncer interface

func (*KafkaSyncer) SetSafeMode

func (p *KafkaSyncer) SetSafeMode(mode bool) bool

SetSafeMode should be ignore by KafkaSyncer

func (KafkaSyncer) Successes

func (s KafkaSyncer) Successes() <-chan *Item

Successes implements Syncer interface

func (*KafkaSyncer) Sync

func (p *KafkaSyncer) Sync(item *Item) error

Sync implements Syncer interface

type MysqlSyncer

type MysqlSyncer struct {
	// contains filtered or unexported fields
}

MysqlSyncer sync binlog to Mysql

func NewMysqlSyncer

func NewMysqlSyncer(
	cfg *DBConfig,
	tableInfoGetter translator.TableInfoGetter,
	worker int,
	batchSize int,
	queryHistogramVec *prometheus.HistogramVec,
	sqlMode *string,
	destDBType string,
	relayer relay.Relayer,
	info *loopbacksync.LoopBackSync,
	enableDispatch bool,
	enableCausility bool,
) (*MysqlSyncer, error)

NewMysqlSyncer returns a instance of MysqlSyncer

func (*MysqlSyncer) Close

func (m *MysqlSyncer) Close() error

Close implements Syncer interface

func (MysqlSyncer) Error

func (s MysqlSyncer) Error() <-chan error

Error implements Syncer interface

func (*MysqlSyncer) SetSafeMode

func (m *MysqlSyncer) SetSafeMode(mode bool) bool

SetSafeMode make the MysqlSyncer to use safe mode or not

func (MysqlSyncer) Successes

func (s MysqlSyncer) Successes() <-chan *Item

Successes implements Syncer interface

func (*MysqlSyncer) Sync

func (m *MysqlSyncer) Sync(item *Item) error

Sync implements Syncer interface

type OracleSyncer

type OracleSyncer struct {
	// contains filtered or unexported fields
}

OracleSyncer sync binlog to Oracle

func NewOracleSyncer

func NewOracleSyncer(
	cfg *DBConfig,
	tableInfoGetter translator.TableInfoGetter,
	worker int,
	batchSize int,
	queryHistogramVec *prometheus.HistogramVec,
	sqlMode *string,
	destDBType string,
	relayer relay.Relayer,
	enableDispatch bool,
	enableCausility bool,
	tableRouter *router.Table,
) (*OracleSyncer, error)

NewOracleSyncer returns a instance of OracleSyncer

func (*OracleSyncer) Close

func (m *OracleSyncer) Close() error

Close implements Syncer interface

func (OracleSyncer) Error

func (s OracleSyncer) Error() <-chan error

Error implements Syncer interface

func (*OracleSyncer) SetSafeMode

func (m *OracleSyncer) SetSafeMode(mode bool) bool

SetSafeMode make the OracleSyncer to use safe mode or not

func (OracleSyncer) Successes

func (s OracleSyncer) Successes() <-chan *Item

Successes implements Syncer interface

func (*OracleSyncer) Sync

func (m *OracleSyncer) Sync(item *Item) error

Sync implements Syncer interface

type Syncer

type Syncer interface {
	// Sync the binlog item to downstream
	Sync(item *Item) error
	// will be close if Close normally or meet error, call Error() to check it
	Successes() <-chan *Item
	// Return not nil if fail to sync data to downstream or nil if closed normally
	Error() <-chan error
	// Close the Syncer, no more item can be added by `Sync`
	// will drain all items and return nil if all successfully sync into downstream
	Close() error
	// SetSafeMode make the Syncer to use safe mode or not. If no need to handle, it should return false
	SetSafeMode(mode bool) bool
}

Syncer sync binlog item to downstream

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL