binlogmsg

package
v2.4.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 15, 2022 License: MIT Imports: 20 Imported by: 0

Documentation

Overview

Package binlogmsg contains publisher implemenation to 'publish' (store) messages to MySQL8 tables then flush to downstream publisher using binlog notification.

Since messages are stored in normal MySQL tables, all ACID properties are applied to them.

Index

Constants

This section is empty.

Variables

View Source
var (
	// DefaultMaxInflight is the default value of PipeOptMaxInflight.
	DefaultMaxInflight = 4096

	// DefaultRetryWait is the default value of PipeOptRetryWait.
	DefaultRetryWait = 5 * time.Second
)

Functions

func CreateMsgTable

func CreateMsgTable(ctx context.Context, q sqlh.Queryer, schema, table string) error

CreateMsgTable creates a msg table to store msgs.

func NewMsgPublisher

func NewMsgPublisher(encoder npenc.Encoder, q sqlh.Queryer, schema, table string) MsgPublisherFunc

NewMsgPublisher creates a publisher to publish (store) message to MySQL msg tables:

  • encoder: encoder for messages.
  • q: *sql.DB/*sql.Tx/*sql.Conn/...
  • schema: database name.
  • table: msg table name, the table must be created by CreateMsgTable.

func NewPbJsonPublisher

func NewPbJsonPublisher(q sqlh.Queryer, schema, table string) MsgPublisherFunc

NewPbJsonPublisher creates a msg publisher using protobuf or json for encoding:

  • If msg is proto.Message, then use protobuf.
  • Otherwise use json.

Types

type MsgPipe

type MsgPipe struct {
	// contains filtered or unexported fields
}

MsgPipe pipes messages from MySQL (>=8.0.2) msg tables to downstream. Messages from MsgPipe have type *rawenc.RawData. So downstream must be able to handle this type of messages.

func NewMsgPipe

func NewMsgPipe(
	downstream interface{},
	masterCfg *mycanal.Config,
	slaveCfg *mycanal.Config,
	tableFilter MsgTableFilter,
	opts ...MsgPipeOption,
) (*MsgPipe, error)

NewMsgPipe creates a new msg pipe:

  • downstream: must be MsgPublisher or MsgAsyncPublisher.
  • masterCfg: master connection to delete published messages.
  • slaveCfg: slave connection for fulldump and incrdump.
  • tableFilter: determine whether a table is used to store messages.

func (*MsgPipe) Run

func (pipe *MsgPipe) Run(ctx context.Context) (err error)

Run the main loop (flush messages to downstream) until context.Done().

type MsgPipeOption

type MsgPipeOption func(*MsgPipe) error

MsgPipeOption is option in creating MsgPipe.

func PipeOptLogger

func PipeOptLogger(logger logr.Logger) MsgPipeOption

PipeOptLogger sets logger for MsgPipe.

func PipeOptMaxInflight

func PipeOptMaxInflight(maxInflight int) MsgPipeOption

PipeOptMaxInflight sets the max number of messages inflight (publishing).

func PipeOptRetryWait

func PipeOptRetryWait(t time.Duration) MsgPipeOption

PipeOptRetryWait sets the interval between retries due to all kinds of errors.

type MsgTableFilter

type MsgTableFilter func(schema, table string) bool

MsgTableFilter returns true if a given table is a msg table.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL