Documentation ¶
Overview ¶
Package binlogmsg contains BinlogMsgPipe which is used as a publisher pipeline from MySQL-8 to downstream publisher.
Typical workflow:
- `CreateMsgTable` creates a msg table to store msgs.
- `NewBinlogMsgPipe` creates a pipe to connect MySQL binlog and downstream publisher.
- `NewBinlogMsgPublisher` then publish messages.
Index ¶
Constants ¶
const ( // DefaultLockName is the default lock name for msg pipe. DefaultLockName = "nproto.binlogmsg" // DefaultMaxInflight is default max number of processings message. DefaultMaxInflight = 512 // DefaultRetryWait is the default interval reconnection. DefaultRetryWait = 5 * time.Second )
Variables ¶
This section is empty.
Functions ¶
Types ¶
type BinlogMsgPipe ¶
type BinlogMsgPipe struct {
// contains filtered or unexported fields
}
BinlogMsgPipe is used to pipe messages from message tables to downstream.
func NewBinlogMsgPipe ¶
func NewBinlogMsgPipe( downstream nproto.MsgPublisher, masterCfg *mycanal.FullDumpConfig, slaveCfg *mycanal.IncrDumpConfig, tableFilter MsgTableFilter, opts ...Option, ) (*BinlogMsgPipe, error)
NewBinlogMsgPipe creates a new BinlogMsgPipe. Params:
- downstream: downstream publisher, can be nproto.MsgPublisher or nproto.MsgAsyncPublisher
- masterCfg: master connection config to read (full dump) and write (delete published message)
- slaveCfg: slave config for binlog subscription
- tableFilter: determine whether a table is used to store messages
type BinlogMsgPublisher ¶
type BinlogMsgPublisher struct {
// contains filtered or unexported fields
}
BinlogMsgPublisher 'publishes' msg to MySQL (>=8.0.2) binlog: it simply insert msg to a table.
func NewBinlogMsgPublisher ¶
func NewBinlogMsgPublisher(schema, table string, q sqlh.Queryer) (*BinlogMsgPublisher, error)
NewBinlogMsgPublisher creates a new BinlogMsgPublisher. schema/table is the message table to store messages.
type MsgTableFilter ¶
MsgTableFilter returns true if a given table is a msg table.
type Option ¶
type Option func(*BinlogMsgPipe) error
Option is option for BinlogMsgPipe.
func OptLockName ¶
OptLockName sets the lock name used by 'SELECT GET_LOCK'.
func OptMaxInflight ¶
OptMaxInflight sets the max number of processing messages.
func OptRetryWait ¶
OptRetryWait sets the interval between reconnection.
type PublisherOption ¶
type PublisherOption func(*BinlogMsgPublisher) error
PublisherOption is option for BinlogMsgPublisher.