cdc

package
v0.0.0-...-c874f44 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 7, 2019 License: BSD-2-Clause Imports: 20 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Connection

type Connection interface {
	Read(b []byte) (n int, err error)
	Write(b []byte) (n int, err error)
	Close() error
}

Connection is a subset of net.Connection to allow testing

type Dialer

type Dialer interface {
	Dial(ctx context.Context) (Connection, error)
}

Dialer interface for open a new connection

type GTID

type GTID struct {
	Domain   uint32 `json:"domain"`
	ServerId uint32 `json:"server_id"`
	Sequence uint64 `json:"sequence"`
}

GTID is the Mariadb Global Transaction ID https://mariadb.com/kb/en/library/gtid/

func ParseGTID

func ParseGTID(gtid string) (*GTID, error)

ParseGTID return GTID for the given string. For empty string it returns nil with error

func (*GTID) String

func (g *GTID) String() string

String representation of the gtid. Return empty string if nil.

type GTIDExtractor

type GTIDExtractor interface {
	Parse(line []byte) (*GTID, error)
}

GTIDExtractor decodes GTID from records

func NewGTIDExtractor

func NewGTIDExtractor(
	format string,
) GTIDExtractor

type GTIDStore

type GTIDStore interface {
	Read() (*GTID, error)
	Write(gtid *GTID) error
}

GTIDStore save GTID to disk

func NewGTIDStore

func NewGTIDStore(
	dataDir string,
) GTIDStore

type KafkaSender

type KafkaSender interface {
	Send(ctx context.Context, ch <-chan []byte) error
}

KafkaSender takes a channel of []byte and send them to the given topic

func NewKafkaSender

func NewKafkaSender(
	producer sarama.SyncProducer,
	kafkaTopic string,
	gtidStore GTIDStore,
	gtidExtractor GTIDExtractor,
) KafkaSender

type MaxscaleReader

type MaxscaleReader struct {
	Dialer   Dialer
	User     string
	Password string
	UUID     string
	Format   string // JSON or AVRO
	Database string
	Table    string
	Version  string
}

MaxscaleReader of CDC messages from Maxscale

func (*MaxscaleReader) Read

func (m *MaxscaleReader) Read(ctx context.Context, gtid *GTID, ch chan<- []byte) error

Read all cdc and send them to the given channel https://mariadb.com/resources/blog/how-to-stream-change-data-through-mariadb-maxscale-using-cdc-api/

type Reader

type Reader interface {
	// Read changes and send them to the given channel
	Read(ctx context.Context, gtid *GTID, ch chan<- []byte) error
}

Reader interface for the Streamer

type RetryReader

type RetryReader interface {
	Read(ctx context.Context, gtid *GTID, outch chan<- []byte) error
}

RetryReader store the gtid of the last message and resume there on failure

func NewRetryReader

func NewRetryReader(
	reader Reader,
	gtidExtractor GTIDExtractor,
) RetryReader

type Sender

type Sender interface {
	Send(ctx context.Context, ch <-chan []byte) error
}

Sender interface for the Streamer

type Streamer

type Streamer interface {
	Run(ctx context.Context) error
}

Streamer coordinates read and send of CDC records

func NewStreamer

func NewStreamer(
	gtid *GTID,
	reader Reader,
	sender Sender,
) Streamer

type TcpDialer

type TcpDialer interface {
	Dial(ctx context.Context) (Connection, error)
}

TcpDialer opens a TCP connection to the given address

func NewTcpDialer

func NewTcpDialer(
	address string,
) TcpDialer

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL