Documentation ¶
Index ¶
- Variables
- func CreateLoader(db *sql.DB, cfg *DBConfig, worker int, batchSize int, ...) (ld loader.Loader, err error)
- func NewPBSyncer(dir string, retentionDays int, tableInfoGetter translator.TableInfoGetter) (*pbSyncer, error)
- type CheckpointConfig
- type DBConfig
- type Item
- type KafkaSyncer
- type MysqlSyncer
- type OracleSyncer
- type Syncer
Constants ¶
This section is empty.
Variables ¶
var QueueSizeGauge *prometheus.GaugeVec
QueueSizeGauge to be used.
Functions ¶
func CreateLoader ¶
func CreateLoader( db *sql.DB, cfg *DBConfig, worker int, batchSize int, queryHistogramVec *prometheus.HistogramVec, sqlMode *string, destDBType string, info *loopbacksync.LoopBackSync, enableDispatch bool, enableCausility bool, ) (ld loader.Loader, err error)
CreateLoader create the Loader instance.
func NewPBSyncer ¶
func NewPBSyncer(dir string, retentionDays int, tableInfoGetter translator.TableInfoGetter) (*pbSyncer, error)
NewPBSyncer sync binlog to files
Types ¶
type CheckpointConfig ¶
type CheckpointConfig struct { Type string `toml:"type" json:"type"` Schema string `toml:"schema" json:"schema"` Table string `toml:"table" json:"table"` Host string `toml:"host" json:"host"` User string `toml:"user" json:"user"` Password string `toml:"password" json:"password"` // if EncryptedPassword is not empty, Password will be ignore. EncryptedPassword string `toml:"encrypted_password" json:"encrypted_password"` Port int `toml:"port" json:"port"` Security security.Config `toml:"security" json:"security"` TLS *tls.Config `toml:"-" json:"-"` //for oracle database OracleServiceName string `toml:"oracle-service-name" json:"oracle-service-name"` OracleConnectString string `toml:"oracle-connect-string" json:"oracle-connect-string"` }
CheckpointConfig is the Checkpoint configuration.
type DBConfig ¶
type DBConfig struct { Host string `toml:"host" json:"host"` User string `toml:"user" json:"user"` Password string `toml:"password" json:"password"` Security security.Config `toml:"security" json:"security"` TLS *tls.Config `toml:"-" json:"-"` // if EncryptedPassword is not empty, Password will be ignore. EncryptedPassword string `toml:"encrypted_password" json:"encrypted_password"` SyncMode int `toml:"sync-mode" json:"sync-mode"` Port int `toml:"port" json:"port"` Checkpoint CheckpointConfig `toml:"checkpoint" json:"checkpoint"` BinlogFileDir string `toml:"dir" json:"dir"` BinlogFileRetentionTime int `toml:"retention-time" json:"retention-time"` Params map[string]string `toml:"params" json:"params"` ReadTimeout time.Duration `toml:"-" json:"-"` ReadTimeoutStr util.Duration `toml:"read-timeout" json:"read-timeout"` Merge bool `toml:"merge" json:"merge"` ZKAddrs string `toml:"zookeeper-addrs" json:"zookeeper-addrs"` KafkaAddrs string `toml:"kafka-addrs" json:"kafka-addrs"` KafkaVersion string `toml:"kafka-version" json:"kafka-version"` KafkaMaxMessages int `toml:"kafka-max-messages" json:"kafka-max-messages"` KafkaClientID string `toml:"kafka-client-id" json:"kafka-client-id"` KafkaMaxMessageSize int32 `toml:"kafka-max-message-size" json:"kafka-max-message-size"` TopicName string `toml:"topic-name" json:"topic-name"` // get it from pd ClusterID uint64 `toml:"-" json:"-"` //for oracle database OracleServiceName string `toml:"oracle-service-name" json:"oracle-service-name"` OracleConnectString string `toml:"oracle-connect-string" json:"oracle-connect-string"` }
DBConfig is the DB configuration.
type Item ¶
type Item struct { Binlog *pb.Binlog PrewriteValue *pb.PrewriteValue // only for DML Schema string Table string RelayLogPos pb.Pos // Each item has a schemaVersion. with amend txn feature the prewrite DML's SchemaVersion could change. // which makes restart & reload history DDL with previous SchemaVersion not reliable. // so we should save this version as checkpoint. SchemaVersion int64 // the applied TS executed in downstream, only for tidb AppliedTS int64 // should skip to replicate this item at downstream // currently only used for signal the syncer to learn that the downstream schema is changed // when we don't replicate DDL. ShouldSkip bool }
Item contains information about binlog
type KafkaSyncer ¶
type KafkaSyncer struct {
// contains filtered or unexported fields
}
KafkaSyncer sync data to kafka
func NewKafka ¶
func NewKafka(cfg *DBConfig, tableInfoGetter translator.TableInfoGetter) (*KafkaSyncer, error)
NewKafka returns a instance of KafkaSyncer
func (KafkaSyncer) Error ¶
func (s KafkaSyncer) Error() <-chan error
Error implements Syncer interface
func (*KafkaSyncer) SetSafeMode ¶
func (p *KafkaSyncer) SetSafeMode(mode bool) bool
SetSafeMode should be ignore by KafkaSyncer
func (KafkaSyncer) Successes ¶
func (s KafkaSyncer) Successes() <-chan *Item
Successes implements Syncer interface
func (*KafkaSyncer) Sync ¶
func (p *KafkaSyncer) Sync(item *Item) error
Sync implements Syncer interface
type MysqlSyncer ¶
type MysqlSyncer struct {
// contains filtered or unexported fields
}
MysqlSyncer sync binlog to Mysql
func NewMysqlSyncer ¶
func NewMysqlSyncer( cfg *DBConfig, tableInfoGetter translator.TableInfoGetter, worker int, batchSize int, queryHistogramVec *prometheus.HistogramVec, sqlMode *string, destDBType string, relayer relay.Relayer, info *loopbacksync.LoopBackSync, enableDispatch bool, enableCausility bool, ) (*MysqlSyncer, error)
NewMysqlSyncer returns a instance of MysqlSyncer
func (MysqlSyncer) Error ¶
func (s MysqlSyncer) Error() <-chan error
Error implements Syncer interface
func (*MysqlSyncer) SetSafeMode ¶
func (m *MysqlSyncer) SetSafeMode(mode bool) bool
SetSafeMode make the MysqlSyncer to use safe mode or not
func (MysqlSyncer) Successes ¶
func (s MysqlSyncer) Successes() <-chan *Item
Successes implements Syncer interface
func (*MysqlSyncer) Sync ¶
func (m *MysqlSyncer) Sync(item *Item) error
Sync implements Syncer interface
type OracleSyncer ¶
type OracleSyncer struct {
// contains filtered or unexported fields
}
OracleSyncer sync binlog to Oracle
func NewOracleSyncer ¶
func NewOracleSyncer( cfg *DBConfig, tableInfoGetter translator.TableInfoGetter, worker int, batchSize int, queryHistogramVec *prometheus.HistogramVec, sqlMode *string, destDBType string, relayer relay.Relayer, enableDispatch bool, enableCausility bool, tableRouter *router.Table, ) (*OracleSyncer, error)
NewOracleSyncer returns a instance of OracleSyncer
func (OracleSyncer) Error ¶
func (s OracleSyncer) Error() <-chan error
Error implements Syncer interface
func (*OracleSyncer) SetSafeMode ¶
func (m *OracleSyncer) SetSafeMode(mode bool) bool
SetSafeMode make the OracleSyncer to use safe mode or not
func (OracleSyncer) Successes ¶
func (s OracleSyncer) Successes() <-chan *Item
Successes implements Syncer interface
func (*OracleSyncer) Sync ¶
func (m *OracleSyncer) Sync(item *Item) error
Sync implements Syncer interface
type Syncer ¶
type Syncer interface { // Sync the binlog item to downstream Sync(item *Item) error // will be close if Close normally or meet error, call Error() to check it Successes() <-chan *Item // Return not nil if fail to sync data to downstream or nil if closed normally Error() <-chan error // Close the Syncer, no more item can be added by `Sync` // will drain all items and return nil if all successfully sync into downstream Close() error // SetSafeMode make the Syncer to use safe mode or not. If no need to handle, it should return false SetSafeMode(mode bool) bool }
Syncer sync binlog item to downstream