binlogmsg

package
v0.11.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 8, 2020 License: MIT Imports: 16 Imported by: 0

Documentation

Overview

Package binlogmsg contains BinlogMsgPipe which is used as a publisher pipeline from MySQL-8 to downstream publisher.

Typical workflow:

  • `CreateMsgTable` creates a msg table to store msgs.
  • `NewBinlogMsgPipe` creates a pipe to connect MySQL binlog and downstream publisher.
  • `NewBinlogMsgPublisher` then publish messages.

Index

Constants

View Source
const (
	// DefaultLockName is the default lock name for msg pipe.
	DefaultLockName = "nproto.binlogmsg"

	// DefaultMaxInflight is default max number of processings message.
	DefaultMaxInflight = 512

	// DefaultRetryWait is the default interval reconnection.
	DefaultRetryWait = 5 * time.Second
)

Variables

This section is empty.

Functions

func CreateMsgTable

func CreateMsgTable(ctx context.Context, q sqlh.Queryer, schema, table string) error

CreateMsgTable creates a msg table to store msgs.

Types

type BinlogMsgPipe

type BinlogMsgPipe struct {
	// contains filtered or unexported fields
}

BinlogMsgPipe is used to pipe messages from message tables to downstream.

func NewBinlogMsgPipe

func NewBinlogMsgPipe(
	downstream nproto.MsgPublisher,
	masterCfg *mycanal.FullDumpConfig,
	slaveCfg *mycanal.IncrDumpConfig,
	tableFilter MsgTableFilter,
	opts ...Option,
) (*BinlogMsgPipe, error)

NewBinlogMsgPipe creates a new BinlogMsgPipe. Params:

  • downstream: downstream publisher, can be nproto.MsgPublisher or nproto.MsgAsyncPublisher
  • masterCfg: master connection config to read (full dump) and write (delete published message)
  • slaveCfg: slave config for binlog subscription
  • tableFilter: determine whether a table is used to store messages

func (*BinlogMsgPipe) Run

func (pipe *BinlogMsgPipe) Run(ctx context.Context) (err error)

Run the main loop (flush messages to downstream) until ctx done.

type BinlogMsgPublisher

type BinlogMsgPublisher struct {
	// contains filtered or unexported fields
}

BinlogMsgPublisher 'publishes' msg to MySQL (>=8.0.2) binlog: it simply insert msg to a table.

func NewBinlogMsgPublisher

func NewBinlogMsgPublisher(schema, table string, q sqlh.Queryer) (*BinlogMsgPublisher, error)

NewBinlogMsgPublisher creates a new BinlogMsgPublisher. schema/table is the message table to store messages.

func (*BinlogMsgPublisher) Publish

func (p *BinlogMsgPublisher) Publish(ctx context.Context, subject string, msgData []byte) error

Publish implements nproto.MsgPublisher interface. MetaData attached `ctx` will be passed unmodified to downstream publisher.

type MsgTableFilter

type MsgTableFilter func(schema, table string) bool

MsgTableFilter returns true if a given table is a msg table.

type Option

type Option func(*BinlogMsgPipe) error

Option is option for BinlogMsgPipe.

func OptLockName

func OptLockName(lockName string) Option

OptLockName sets the lock name used by 'SELECT GET_LOCK'.

func OptLogger

func OptLogger(logger *zerolog.Logger) Option

OptLogger sets structured logger.

func OptMaxInflight

func OptMaxInflight(maxInflight int) Option

OptMaxInflight sets the max number of processing messages.

func OptRetryWait

func OptRetryWait(retryWait time.Duration) Option

OptRetryWait sets the interval between reconnection.

type PublisherOption

type PublisherOption func(*BinlogMsgPublisher) error

PublisherOption is option for BinlogMsgPublisher.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL