go-vitess.v1: gopkg.in/src-d/go-vitess.v1/vt/binlog Index | Files | Directories

package binlog

import "gopkg.in/src-d/go-vitess.v1/vt/binlog"

Index

Package Files

binlog_streamer.go event_streamer.go keyrange_filter.go keyspace_id_resolver.go slave_connection.go tables_filter.go updatestream.go updatestreamctl.go

Variables

var (

    // ErrClientEOF is returned by Streamer if the stream ended because the
    // consumer of the stream indicated it doesn't want any more events.
    ErrClientEOF = fmt.Errorf("binlog stream consumer ended the reply stream")
    // ErrServerEOF is returned by Streamer if the stream ended because the
    // connection to the mysqld server was lost, or the stream was terminated by
    // mysqld.
    ErrServerEOF = fmt.Errorf("binlog stream connection was closed by mysqld")
)
var (
    // ErrBinlogUnavailable is returned by this library when we
    // cannot find a suitable binlog to satisfy the request.
    ErrBinlogUnavailable = fmt.Errorf("cannot find relevant binlogs on this server")
)
var RegisterUpdateStreamServices []RegisterUpdateStreamServiceFunc

RegisterUpdateStreamServices is the list of all registration callbacks to invoke

func KeyRangeFilterFunc Uses

func KeyRangeFilterFunc(keyrange *topodatapb.KeyRange, callback func(*binlogdatapb.BinlogTransaction) error) sendTransactionFunc

KeyRangeFilterFunc returns a function that calls callback only if statements in the transaction match the specified keyrange. The resulting function can be passed into the Streamer: bls.Stream(file, pos, sendTransaction) -> bls.Stream(file, pos, KeyRangeFilterFunc(keyrange, sendTransaction))

func TablesFilterFunc Uses

func TablesFilterFunc(tables []string, callback func(*binlogdatapb.BinlogTransaction) error) sendTransactionFunc

TablesFilterFunc returns a function that calls callback only if statements in the transaction match the specified tables. The resulting function can be passed into the Streamer: bls.Stream(file, pos, sendTransaction) -> bls.Stream(file, pos, TablesFilterFunc(sendTransaction))

type EventStreamer Uses

type EventStreamer struct {
    // contains filtered or unexported fields
}

EventStreamer is an adapter on top of a binlog Streamer that convert the events into StreamEvent objects.

func NewEventStreamer Uses

func NewEventStreamer(cp *mysql.ConnParams, se *schema.Engine, startPos mysql.Position, timestamp int64, sendEvent sendEventFunc) *EventStreamer

NewEventStreamer returns a new EventStreamer on top of a Streamer

func (*EventStreamer) Stream Uses

func (evs *EventStreamer) Stream(ctx context.Context) error

Stream starts streaming updates

type FullBinlogStatement Uses

type FullBinlogStatement struct {
    Statement  *binlogdatapb.BinlogTransaction_Statement
    Table      string
    KeyspaceID []byte
    PKNames    []*querypb.Field
    PKValues   []sqltypes.Value
}

FullBinlogStatement has all the information we can gather for an event. Some fields are only set if asked for, and if RBR is used. Otherwise we'll revert back to using the SQL comments, for SBR.

type RegisterUpdateStreamServiceFunc Uses

type RegisterUpdateStreamServiceFunc func(UpdateStream)

RegisterUpdateStreamServiceFunc is the type to use for delayed registration of RPC servers until we have all the objects

type SlaveConnection Uses

type SlaveConnection struct {
    *mysql.Conn
    // contains filtered or unexported fields
}

SlaveConnection represents a connection to mysqld that pretends to be a slave connecting for replication. Each such connection must identify itself to mysqld with a server ID that is unique both among other SlaveConnections and among actual slaves in the topology.

func NewSlaveConnection Uses

func NewSlaveConnection(cp *mysql.ConnParams) (*SlaveConnection, error)

NewSlaveConnection creates a new slave connection to the mysqld instance. It uses a pools.IDPool to ensure that the server IDs used to connect are unique within this process. This is done with the assumptions that:

1) No other processes are making fake slave connections to our mysqld. 2) No real slave servers will have IDs in the range 1-N where N is the peak

number of concurrent fake slave connections we will ever make.

func (*SlaveConnection) Close Uses

func (sc *SlaveConnection) Close()

Close closes the slave connection, which also signals an ongoing dump started with StartBinlogDump() to stop and close its BinlogEvent channel. The ID for the slave connection is recycled back into the pool.

func (*SlaveConnection) StartBinlogDumpFromBinlogBeforeTimestamp Uses

func (sc *SlaveConnection) StartBinlogDumpFromBinlogBeforeTimestamp(ctx context.Context, timestamp int64) (<-chan mysql.BinlogEvent, error)

StartBinlogDumpFromBinlogBeforeTimestamp requests a replication binlog dump from the master mysqld starting with a file that has timestamps smaller than the provided timestamp, and then sends binlog events to the provided channel.

The startup phase will list all the binary logs, and find the one that has events starting strictly before the provided timestamp. It will then start from there, and stream all events. It is the responsability of the calling site to filter the events more.

MySQL 5.6+ note: we need to do it that way because of the way the GTIDSet works. In the previous two streaming functions, we pass in the full GTIDSet (that has the list of all transactions seen in the replication stream). In this case, we don't know it, all we have is the binlog file names. We depend on parsing the first PREVIOUS_GTIDS_EVENT event in the logs to get it. So we need the caller to parse that event, and it can't be skipped because its timestamp is lower. Then, for each subsequent event, the caller also needs to add the event GTID to its GTIDSet. Otherwise it won't be correct ever. So the caller really needs to build up its GTIDSet along the entire file, not just for events whose timestamp is in a given range.

The stream will continue in the background, waiting for new events if necessary, until the connection is closed, either by the master or by canceling the context.

Note the context is valid and used until eventChan is closed.

func (*SlaveConnection) StartBinlogDumpFromCurrent Uses

func (sc *SlaveConnection) StartBinlogDumpFromCurrent(ctx context.Context) (mysql.Position, <-chan mysql.BinlogEvent, error)

StartBinlogDumpFromCurrent requests a replication binlog dump from the current position.

func (*SlaveConnection) StartBinlogDumpFromPosition Uses

func (sc *SlaveConnection) StartBinlogDumpFromPosition(ctx context.Context, startPos mysql.Position) (<-chan mysql.BinlogEvent, error)

StartBinlogDumpFromPosition requests a replication binlog dump from the master mysqld at the given Position and then sends binlog events to the provided channel. The stream will continue in the background, waiting for new events if necessary, until the connection is closed, either by the master or by canceling the context.

Note the context is valid and used until eventChan is closed.

type StreamList Uses

type StreamList struct {
    sync.Mutex
    // contains filtered or unexported fields
}

StreamList is a map of context.CancelFunc to mass-interrupt ongoing calls.

func (*StreamList) Add Uses

func (sl *StreamList) Add(c context.CancelFunc) int

Add adds a CancelFunc to the map.

func (*StreamList) Delete Uses

func (sl *StreamList) Delete(i int)

Delete removes a CancelFunc from the list.

func (*StreamList) Init Uses

func (sl *StreamList) Init()

Init must be called before using the list.

func (*StreamList) Stop Uses

func (sl *StreamList) Stop()

Stop stops all the current streams.

type Streamer Uses

type Streamer struct {
    // contains filtered or unexported fields
}

Streamer streams binlog events from MySQL by connecting as a slave. A Streamer should only be used once. To start another stream, call NewStreamer() again.

func NewStreamer Uses

func NewStreamer(cp *mysql.ConnParams, se *schema.Engine, clientCharset *binlogdatapb.Charset, startPos mysql.Position, timestamp int64, sendTransaction sendTransactionFunc) *Streamer

NewStreamer creates a binlog Streamer.

dbname specifes the database to stream events for. mysqld is the local instance of mysqlctl.Mysqld. charset is the default character set on the BinlogPlayer side. startPos is the position to start streaming at. Incompatible with timestamp. timestamp is the timestamp to start streaming at. Incompatible with startPos. sendTransaction is called each time a transaction is committed or rolled back.

func (*Streamer) Stream Uses

func (bls *Streamer) Stream(ctx context.Context) (err error)

Stream starts streaming binlog events using the settings from NewStreamer().

type UpdateStream Uses

type UpdateStream interface {
    // StreamKeyRange streams events related to a KeyRange only
    StreamKeyRange(ctx context.Context, position string, keyRange *topodatapb.KeyRange, charset *binlogdatapb.Charset, callback func(trans *binlogdatapb.BinlogTransaction) error) error

    // StreamTables streams events related to a set of Tables only
    StreamTables(ctx context.Context, position string, tables []string, charset *binlogdatapb.Charset, callback func(trans *binlogdatapb.BinlogTransaction) error) error

    // HandlePanic should be called in a defer,
    // first thing in the RPC implementation.
    HandlePanic(*error)
}

UpdateStream is the interface for the binlog server

type UpdateStreamControl Uses

type UpdateStreamControl interface {
    // Enable will allow any new RPC calls
    Enable()

    // Disable will interrupt all current calls, and disallow any new call
    Disable()

    // IsEnabled returns true iff the service is enabled
    IsEnabled() bool
}

UpdateStreamControl is the interface an UpdateStream service implements to bring it up or down.

type UpdateStreamControlMock Uses

type UpdateStreamControlMock struct {
    sync.Mutex
    // contains filtered or unexported fields
}

UpdateStreamControlMock is an implementation of UpdateStreamControl to be used in tests

func NewUpdateStreamControlMock Uses

func NewUpdateStreamControlMock() *UpdateStreamControlMock

NewUpdateStreamControlMock creates a new UpdateStreamControlMock

func (*UpdateStreamControlMock) Disable Uses

func (m *UpdateStreamControlMock) Disable()

Disable is part of UpdateStreamControl

func (*UpdateStreamControlMock) Enable Uses

func (m *UpdateStreamControlMock) Enable()

Enable is part of UpdateStreamControl

func (*UpdateStreamControlMock) IsEnabled Uses

func (m *UpdateStreamControlMock) IsEnabled() bool

IsEnabled is part of UpdateStreamControl

type UpdateStreamImpl Uses

type UpdateStreamImpl struct {
    // contains filtered or unexported fields
}

UpdateStreamImpl is the real implementation of UpdateStream and UpdateStreamControl

func NewUpdateStream Uses

func NewUpdateStream(ts *topo.Server, keyspace string, cell string, cp *mysql.ConnParams, se *schema.Engine) *UpdateStreamImpl

NewUpdateStream returns a new UpdateStreamImpl object

func (*UpdateStreamImpl) Disable Uses

func (updateStream *UpdateStreamImpl) Disable()

Disable will disallow any connection to the service

func (*UpdateStreamImpl) Enable Uses

func (updateStream *UpdateStreamImpl) Enable()

Enable will allow connections to the service

func (*UpdateStreamImpl) HandlePanic Uses

func (updateStream *UpdateStreamImpl) HandlePanic(err *error)

HandlePanic is part of the UpdateStream interface

func (*UpdateStreamImpl) IsEnabled Uses

func (updateStream *UpdateStreamImpl) IsEnabled() bool

IsEnabled returns true if UpdateStreamImpl is enabled

func (*UpdateStreamImpl) RegisterService Uses

func (updateStream *UpdateStreamImpl) RegisterService()

RegisterService needs to be called to publish stats, and to start listening to clients. Only once instance can call this in a process.

func (*UpdateStreamImpl) StreamKeyRange Uses

func (updateStream *UpdateStreamImpl) StreamKeyRange(ctx context.Context, position string, keyRange *topodatapb.KeyRange, charset *binlogdatapb.Charset, callback func(trans *binlogdatapb.BinlogTransaction) error) (err error)

StreamKeyRange is part of the UpdateStream interface

func (*UpdateStreamImpl) StreamTables Uses

func (updateStream *UpdateStreamImpl) StreamTables(ctx context.Context, position string, tables []string, charset *binlogdatapb.Charset, callback func(trans *binlogdatapb.BinlogTransaction) error) (err error)

StreamTables is part of the UpdateStream interface

Directories

PathSynopsis
binlogplayerPackage binlogplayer contains the code that plays a vreplication stream on a client database.
binlogplayertest
eventtokenPackage eventtoken includes utility methods for event token handling.
grpcbinlogplayer
grpcbinlogstreamerPackage grpcbinlogstreamer contains the gRPC implementation of the binlog streamer server component.

Package binlog imports 28 packages (graph) and is imported by 5 packages. Updated 2019-06-13. Refresh now. Tools for package owners.