mysqlstream

package
v0.9.71 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 29, 2020 License: Apache-2.0 Imports: 35 Imported by: 2

Documentation

Index

Constants

View Source
const MySQLPrimaryKeyName = "PRIMARY"
View Source
const Name = "mysql-stream"

Variables

View Source
var (
	// re-sync retry timeout
	RetryTimeout = 3 * time.Second

	BinlogProbeInterval = 3 * time.Second
)

Functions

func GenerateDMLDepHashes added in v0.9.29

func GenerateDMLDepHashes(msg *core.Msg, tableDef *schema_store.Table) []core.OutputHash

func GenerateDataHashes added in v0.9.29

func GenerateDataHashes(
	schema string,
	table string,
	uniqKeys map[string][]string,
	oldData map[string]interface{},
	newData map[string]interface{}) []core.OutputHash

func GetCurrentPositionValue added in v0.9.17

func GetCurrentPositionValue(cache position_cache.PositionCacheInterface) (config.MySQLBinlogPosition, error)

func IsEventBelongsToMyself

func IsEventBelongsToMyself(event *replication.RowsEvent, pipelineName string) bool

func NewBarrierMsg

func NewBarrierMsg(callback core.MsgCallbackFunc) *core.Msg

func NewDDLMsg

func NewDDLMsg(
	callback core.MsgCallbackFunc,
	dbName string,
	table string,
	ast ast.StmtNode,
	ddlSQL string,
	ts int64,
	received time.Time) *core.Msg

func NewDeleteMsgs

func NewDeleteMsgs(
	host string,
	database string,
	table string,
	ts int64,
	received time.Time,
	ev *replication.RowsEvent,
	tableDef *schema_store.Table) ([]*core.Msg, error)

func NewHeartbeatMsg added in v0.9.70

func NewHeartbeatMsg(callback core.MsgCallbackFunc) *core.Msg

func NewInsertMsgs

func NewInsertMsgs(
	host string,
	database string,
	table string,
	ts int64,
	received time.Time,
	ev *replication.RowsEvent,
	tableDef *schema_store.Table) ([]*core.Msg, error)

func NewUpdateMsgs

func NewUpdateMsgs(
	host string,
	database string,
	table string,
	ts int64,
	received time.Time,
	ev *replication.RowsEvent,
	tableDef *schema_store.Table) ([]*core.Msg, error)

func NewXIDMsg

func NewXIDMsg(ts int64, received time.Time, callback core.MsgCallbackFunc, position config.MySQLBinlogPosition) *core.Msg

func SetupInitialPosition added in v0.9.17

func SetupInitialPosition(db *sql.DB, positionCache position_cache.PositionCacheInterface, startPositionSpec *config.MySQLBinlogPosition) error

func ToGoMySQLPosition added in v0.9.17

func UpdateCurrentPositionValue added in v0.9.17

func UpdateCurrentPositionValue(cache position_cache.PositionCacheInterface, currentPosition config.MySQLBinlogPosition) error

Types

type BinlogEventSchemaFilterFunc

type BinlogEventSchemaFilterFunc func(schemaName string) bool

type BinlogTailer

type BinlogTailer struct {
	// contains filtered or unexported fields
}

func NewBinlogTailer

func NewBinlogTailer(
	pipelineName string,
	cfg *MySQLBinlogInputPluginConfig,
	gravityServerID uint32,
	positionCache position_cache.PositionCacheInterface,
	sourceSchemaStore schema_store.SchemaStore,
	sourceDB *sql.DB,
	emitter core.Emitter,
	router core.Router,
	binlogChecker binlog_checker.BinlogChecker,
	binlogEventSchemaFilter BinlogEventSchemaFilterFunc,
) (*BinlogTailer, error)

NewBinlogTailer creats a new binlog tailer

func (*BinlogTailer) AfterMsgCommit

func (tailer *BinlogTailer) AfterMsgCommit(msg *core.Msg) error

func (*BinlogTailer) AppendMsgTxnBuffer

func (tailer *BinlogTailer) AppendMsgTxnBuffer(msg *core.Msg)

AppendMsgTxnBuffer adds basic job information to txn buffer

func (*BinlogTailer) ClearMsgTxnBuffer

func (tailer *BinlogTailer) ClearMsgTxnBuffer()

func (*BinlogTailer) Close

func (tailer *BinlogTailer) Close()

func (*BinlogTailer) FlushMsgTxnBuffer

func (tailer *BinlogTailer) FlushMsgTxnBuffer()

FlushMsgTxnBuffer will flush job in txn buffer to queue. We will also filter out job that don't need to send out in this stage.

func (*BinlogTailer) Start

func (tailer *BinlogTailer) Start() error

func (*BinlogTailer) Wait

func (tailer *BinlogTailer) Wait()

type MySQLBinlogInputPluginConfig

type MySQLBinlogInputPluginConfig struct {
	Source                  *config.DBConfig            `mapstructure:"source" toml:"source" json:"source"`
	IgnoreBiDirectionalData bool                        `mapstructure:"ignore-bidirectional-data" toml:"ignore-bidirectional-data" json:"ignore-bidirectional-data"`
	StartPosition           *config.MySQLBinlogPosition `mapstructure:"start-position" toml:"start-position" json:"start-position"`

	SourceProbeCfg *helper.SourceProbeCfg `mapstructure:"source-probe-config"json:"source-probe-config"`

	PositionRepo *config.GenericPluginConfig `mapstructure:"position-repo" toml:"position-repo" json:"position-repo"`

	// If we detect any internal txn tag that matches FailOnTxnTag, just fail.
	FailOnTxnTags []string `mapstructure:"fail-on-txn-tags" toml:"fail-on-txn-tags"`

	HeartbeatPeriodStr string        `toml:"heartbeat-period" json:"heartbeat-period" mapstructure:"heartbeat-period"`
	HeartbeatPeriod    time.Duration `toml:"-" json:"-" mapstructure:"-"`

	//
	// internal configurations that is not exposed to users
	//
	DisableBinlogChecker bool   `mapstructure:"-"json:"-"`
	DebugBinlog          bool   `mapstructure:"-"json:"-"`
	BinlogSyncerTimeout  string `mapstructure:"-"json:"-"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL