Documentation ¶
Index ¶
- Variables
- type Action
- type Canal
- func (c *Canal) AddDumpDatabases(dbs ...string)
- func (c *Canal) AddDumpIgnoreTables(db string, tables ...string)
- func (c *Canal) AddDumpTables(db string, tables ...string)
- func (c *Canal) CatchMasterPos(timeout time.Duration) error
- func (c *Canal) CheckBinlogRowImage(image string) error
- func (c *Canal) ClearTableCache(db []byte, table []byte)
- func (c *Canal) Close()
- func (c *Canal) Dump(ctx context.Context) error
- func (c *Canal) Execute(cmd string, args ...interface{}) (rr *mysql.Result, err error)
- func (c *Canal) FlushBinlog() error
- func (c *Canal) GetDelay() uint32
- func (c *Canal) GetMasterGTIDSet() (mysql.GTIDSet, error)
- func (c *Canal) GetMasterPos() (mysql.Position, error)
- func (c *Canal) GetTable(db, table string) (*schema.Table, error)
- func (c *Canal) Run(ctx context.Context) error
- func (c *Canal) RunFrom(ctx context.Context, pos mysql.Position) error
- func (c *Canal) SetEventHandler(h EventHandler)
- func (c *Canal) SyncedGTIDSet() mysql.GTIDSet
- func (c *Canal) SyncedPosition() mysql.Position
- func (c *Canal) SyncedTimestamp() uint32
- func (c *Canal) WaitDumpDone() <-chan struct{}
- func (c *Canal) WaitUntilPos(pos mysql.Position, timeout time.Duration) error
- type Config
- type DummyEventHandler
- func (h *DummyEventHandler) OnDDL(uint32, mysql.Position, *replication.QueryEvent, string) error
- func (h *DummyEventHandler) OnGTID(uint32, mysql.GTIDSet) error
- func (h *DummyEventHandler) OnPosSynced(uint32, mysql.Position, mysql.GTIDSet, bool) error
- func (h *DummyEventHandler) OnRotate(uint32, *replication.RotateEvent) error
- func (h *DummyEventHandler) OnRow(uint32, *RowsEvent) error
- func (h *DummyEventHandler) OnTableChanged(uint32, string, string) error
- func (h *DummyEventHandler) OnXID(uint32, mysql.Position) error
- func (h *DummyEventHandler) String() string
- type DumpConfig
- type EventHandler
- type RowsEvent
Constants ¶
This section is empty.
Variables ¶
View Source
var ( UnknownTableRetryPeriod = time.Second * time.Duration(10) ErrExcludedTable = errors.New("excluded table meta") )
canal will retry fetching unknown table's meta after UnknownTableRetryPeriod
Functions ¶
This section is empty.
Types ¶
type Canal ¶
type Canal struct {
// contains filtered or unexported fields
}
Canal can sync your MySQL data into everywhere, like Elasticsearch, Redis, etc... MySQL must open row format for binlog
func (*Canal) AddDumpDatabases ¶
func (*Canal) AddDumpIgnoreTables ¶
func (*Canal) AddDumpTables ¶
func (*Canal) CheckBinlogRowImage ¶
CheckBinlogRowImage checks MySQL binlog row image, must be in FULL, MINIMAL, NOBLOB
func (*Canal) ClearTableCache ¶
ClearTableCache clear table cache
func (*Canal) FlushBinlog ¶
func (*Canal) Run ¶
Run will first try to dump all data from MySQL master `mysqldump`, then sync from the binlog position in the dump data. It will run forever until meeting an error or Canal closed.
func (*Canal) SetEventHandler ¶
func (c *Canal) SetEventHandler(h EventHandler)
`SetEventHandler` registers the sync handler, you must register your own handler before starting Canal.
func (*Canal) SyncedGTIDSet ¶
func (*Canal) SyncedPosition ¶
func (*Canal) SyncedTimestamp ¶
func (*Canal) WaitDumpDone ¶
func (c *Canal) WaitDumpDone() <-chan struct{}
type Config ¶
type Config struct { Addr string `toml:"addr"` User string `toml:"user"` Password string `toml:"password"` Charset string `toml:"charset"` ServerID uint32 `toml:"server_id"` Flavor string `toml:"flavor"` HeartbeatPeriod time.Duration `toml:"heartbeat_period"` ReadTimeout time.Duration `toml:"read_timeout"` SyncMarkTable string // discard row event without table meta DiscardNoMetaRowEvent bool `toml:"discard_no_meta_row_event"` Dump DumpConfig `toml:"dump"` UseDecimal bool `toml:"use_decimal"` ParseTime bool `toml:"parse_time"` TimestampStringLocation *time.Location // SemiSyncEnabled enables semi-sync or not. SemiSyncEnabled bool `toml:"semi_sync_enabled"` // maximum number of attempts to re-establish a broken connection, zero or negative number means infinite retry. // this configuration will not work if DisableRetrySync is true MaxReconnectAttempts int `toml:"max_reconnect_attempts"` // whether disable re-sync for broken connection DisableRetrySync bool `toml:"disable_retry_sync"` // Set TLS config TLSConfig *tls.Config TableFilter *xx.TableFilter }
func NewDefaultConfig ¶
func NewDefaultConfig() *Config
type DummyEventHandler ¶
type DummyEventHandler struct { }
func (*DummyEventHandler) OnDDL ¶
func (h *DummyEventHandler) OnDDL(uint32, mysql.Position, *replication.QueryEvent, string) error
func (*DummyEventHandler) OnPosSynced ¶
func (*DummyEventHandler) OnRotate ¶
func (h *DummyEventHandler) OnRotate(uint32, *replication.RotateEvent) error
func (*DummyEventHandler) OnTableChanged ¶
func (h *DummyEventHandler) OnTableChanged(uint32, string, string) error
func (*DummyEventHandler) String ¶
func (h *DummyEventHandler) String() string
type DumpConfig ¶
type DumpConfig struct { // mysqldump execution path, like mysqldump or /usr/bin/mysqldump, etc... // If not set, ignore using mysqldump. ExecutionPath string `toml:"mysqldump"` // Will override Databases, tables is in database table_db Tables []string `toml:"tables"` TableDB string `toml:"table_db"` Databases []string `toml:"dbs"` // Ignore table format is db.table IgnoreTables []string `toml:"ignore_tables"` // Dump only selected records. Quotes are mandatory Where string `toml:"where"` // If true, discard error msg, else, output to stderr DiscardErr bool `toml:"discard_err"` // Set true to skip --master-data if we have no privilege to do // 'FLUSH TABLES WITH READ LOCK' SkipMasterData bool `toml:"skip_master_data"` // Set to change the default max_allowed_packet size MaxAllowedPacketMB int `toml:"max_allowed_packet_mb"` // Set to change the default protocol to connect with Protocol string `toml:"protocol"` // Set extra options ExtraOptions []string `toml:"extra_options"` }
type EventHandler ¶
type EventHandler interface { OnRotate(timestamp uint32, rotateEvent *replication.RotateEvent) error // OnTableChanged is called when the table is created, altered, renamed or dropped. // You need to clear the associated data like cache with the table. // It will be called before OnDDL. OnTableChanged(timestamp uint32, schema string, table string) error OnDDL(timestamp uint32, nextPos mysql.Position, queryEvent *replication.QueryEvent, stmt string) error OnRow(timestamp uint32, e *RowsEvent) error OnXID(timestamp uint32, nextPos mysql.Position) error OnGTID(timestamp uint32, gtid mysql.GTIDSet) error // OnPosSynced Use your own way to sync position. When force is true, sync position immediately. OnPosSynced(timestamp uint32, pos mysql.Position, set mysql.GTIDSet, force bool) error String() string }
type RowsEvent ¶
type RowsEvent struct { Table *schema.Table Action Action // changed row list // binlog has three update event version, v0, v1 and v2. // for v1 and v2, the rows number must be even. // Two rows for one event, format is [before update row, after update row] // for update v0, only one row for a event, and we don't support this version. Rows [][]interface{} // Header can be used to inspect the event Header *replication.EventHeader }
RowsEvent is the event for row replication.
func (*RowsEvent) GetBytesColumns ¶
func (*RowsEvent) GetColumnNames ¶
func (*RowsEvent) GetPKColumns ¶
Source Files ¶
Click to show internal directories.
Click to hide internal directories.