Documentation ¶
Overview ¶
vt binlog server: Serves binlog for out of band replication.
Copyright 2012, Google Inc. All rights reserved. Use of this source code is governed by a BSD-style license that can be found in the LICENSE file.
Index ¶
- Constants
- Variables
- func ConcurrentMap(concurrency, n int, fun MapFunc) error
- func DiffPermissions(leftName string, left *Permissions, rightName string, right *Permissions, ...)
- func DiffPermissionsToArray(leftName string, left *Permissions, rightName string, right *Permissions) (result []string)
- func DiffSchema(leftName string, left *SchemaDefinition, rightName string, ...)
- func DiffSchemaToArray(leftName string, left *SchemaDefinition, rightName string, ...) (result []string)
- func DirectoryList(cnf *Mycnf) []string
- func DisableBinlogServerService(blServer *BinlogServer)
- func DisableUpdateStreamService()
- func EnableBinlogServerService(blServer *BinlogServer, dbname string)
- func EnableUpdateStreamService(tabletType string, dbcfgs dbconfigs.DBConfigs)
- func GetDmlType(firstKw string) string
- func GetReplicationPosition() (*proto.ReplicationCoordinates, error)
- func GetSqlType(firstKw string) string
- func IgnoredStatement(line []byte) bool
- func Init(mt *Mysqld, mysqlWaitTime time.Duration) error
- func IsBinlogServerEnabled(blServer *BinlogServer) bool
- func IsMasterPositionValid(startCoordinates *proto.ReplicationCoordinates) bool
- func IsStartPositionValid(startPos *proto.BinlogPosition) bool
- func IsTxnStatement(line []byte, firstKw string) bool
- func IsUpdateStreamEnabled() bool
- func LogsDir() string
- func MakeMycnf(mycnf *Mycnf, cnfFiles []string) (string, error)
- func MycnfFile(uid uint32) string
- func NewBlplStats() *blplStats
- func NewEventBuffer(pos *proto.BinlogPosition, line []byte) *eventBuffer
- func ReadStartPosition(dbClient VtClient, uid uint32) (*binlogRecoveryState, error)
- func RegisterBinlogServerService(blServer *BinlogServer)
- func RegisterUpdateStreamService(mycnf *Mycnf)
- func SanityCheckManifests(ssms []*SplitSnapshotManifest) error
- func ServeUpdateStream(req *UpdateStreamRequest, sendReply SendUpdateStreamResponse) error
- func Shutdown(mt *Mysqld, waitForMysqld bool, mysqlWaitTime time.Duration) error
- func SnapshotDir(uid uint32) string
- func Start(mt *Mysqld, mysqlWaitTime time.Duration) error
- func StartReplicationCommands(mysqld *Mysqld, replState *ReplicationState) ([]string, error)
- func TabletDir(uid uint32) string
- func Teardown(mt *Mysqld, force bool) error
- func TopLevelDirs() []string
- type BinlogDecoder
- type BinlogParseError
- type BinlogPlayer
- type BinlogReader
- type BinlogServer
- type BinlogServerError
- type Blp
- type BlpPosition
- type Bls
- type ByReverseDataLength
- type CreateConnection
- type DBClient
- type DbPermission
- type DbPermissionList
- type DummyVtClient
- func (dc DummyVtClient) Begin() error
- func (dc DummyVtClient) Close()
- func (dc DummyVtClient) Commit() error
- func (dc DummyVtClient) Connect() error
- func (dc DummyVtClient) ExecuteFetch(query string, maxrows int, wantfields bool) (qr *proto.QueryResult, err error)
- func (dc DummyVtClient) Rollback() error
- type EventData
- type FakeMysqlDaemon
- type HostPermission
- type HostPermissionList
- type MapFunc
- type Mycnf
- type MysqlBinlog
- type MysqlDaemon
- type Mysqld
- func (mysqld *Mysqld) Addr() string
- func (mysqld *Mysqld) ApplySchemaChange(dbName string, change *SchemaChange) (*SchemaChangeResult, error)
- func (mysqld *Mysqld) BreakSlaves() error
- func (mysqld *Mysqld) CheckReplication(timeCheck int64) error
- func (mysqld *Mysqld) CreateMultiSnapshot(keyRanges []key.KeyRange, dbName, keyName string, sourceAddr string, ...) (snapshotManifestFilenames []string, err error)
- func (mysqld *Mysqld) CreateSnapshot(dbName, sourceAddr string, allowHierarchicalReplication bool, concurrency int, ...) (snapshotManifestUrlPath string, slaveStartRequired, readOnly bool, err error)
- func (mysqld *Mysqld) DemoteMaster() (*ReplicationPosition, error)
- func (mysqld *Mysqld) ExecuteMysqlCommand(sql string) error
- func (mysqld *Mysqld) FindSlaves() ([]string, error)
- func (mysqld *Mysqld) FindVtDatabases() ([]string, error)
- func (mysqld *Mysqld) GetColumns(dbName, table string) ([]string, error)
- func (mysqld *Mysqld) GetMasterAddr() (string, error)
- func (mysqld *Mysqld) GetPermissions() (*Permissions, error)
- func (mysqld *Mysqld) GetSchema(dbName string, tables []string, includeViews bool) (*SchemaDefinition, error)
- func (mysqld *Mysqld) IpAddr() string
- func (mysqld *Mysqld) IsReadOnly() (bool, error)
- func (mysqld *Mysqld) MasterStatus() (rp *ReplicationPosition, err error)
- func (mysqld *Mysqld) MultiRestore(destinationDbName string, keyRange key.KeyRange, sourceAddrs []*url.URL, ...) (err error)
- func (mysqld *Mysqld) PreflightSchemaChange(dbName string, change string) (*SchemaChangeResult, error)
- func (mysqld *Mysqld) PromoteSlave(setReadWrite bool) (replicationState *ReplicationState, waitPosition *ReplicationPosition, ...)
- func (mysqld *Mysqld) ReparentPosition(slavePosition *ReplicationPosition) (rs *ReplicationState, waitPosition *ReplicationPosition, reparentTime int64, ...)
- func (mysqld *Mysqld) RestartSlave(replicationState *ReplicationState, waitPosition *ReplicationPosition, ...) error
- func (mysqld *Mysqld) RestoreFromSnapshot(snapshotManifest *SnapshotManifest, fetchConcurrency, fetchRetryCount int, ...) error
- func (mysqld *Mysqld) SetReadOnly(on bool) error
- func (mysqld *Mysqld) SlaveStatus() (*ReplicationPosition, error)
- func (mysqld *Mysqld) SnapshotSourceEnd(slaveStartRequired, readOnly, deleteSnapshot bool) error
- func (mysqld *Mysqld) StartSlave() error
- func (mysqld *Mysqld) StopSlave() error
- func (mysqld *Mysqld) ValidateCloneTarget(hookExtraEnv map[string]string) error
- func (mysqld *Mysqld) ValidateSnapshotPath() error
- func (mysqld *Mysqld) WaitBlpPos(bp *BlpPosition, waitTimeout int) error
- func (mysqld *Mysqld) WaitForSlave(maxLag int) (err error)
- func (mysqld *Mysqld) WaitForSlaveStart(slaveStartDeadline int) (err error)
- func (mysqld *Mysqld) WaitMasterPos(rp *ReplicationPosition, waitTimeout int) error
- type Permission
- type PermissionList
- type Permissions
- type ReplicationPosition
- type ReplicationState
- type SchemaChange
- type SchemaChangeResult
- type SchemaDefinition
- type SendUpdateStreamResponse
- type SnapshotFile
- type SnapshotFiles
- type SnapshotManifest
- type SplitSnapshotManifest
- type TableDefinition
- type TableDefinitions
- type UpdateResponse
- type UpdateStream
- type UpdateStreamRequest
- type UserPermission
- type UserPermissionList
- type VtClient
- type VtReplParams
Constants ¶
const ( MAX_TXN_BATCH = 1024 DML = "DML" DDL = "DDL" BEGIN = "BEGIN" COMMIT = "COMMIT" USE = "use" EOF = "EOF" )
const ( FATAL = "Fatal" SERVICE_ERROR = "Service Error" EVENT_ERROR = "Event Error" CODE_ERROR = "Code Error" REPLICATION_ERROR = "Replication Error" CONNECTION_ERROR = "Connection Error" )
Error types for update stream
const ( BINLOG_HEADER_SIZE = 4 // copied from mysqlbinlog.cc for mysql 5.0.33 EVENT_HEADER_SIZE = 19 // 4.0 and above, can be larger in 5.x BINLOG_BLOCK_SIZE = 16 * 1024 MYSQLBINLOG_CHUNK = 64 * 1024 MAX_WAIT_TIMEOUT = 30 * time.Second LOG_WAIT_TIMEOUT = time.Second / 2 )
const ( BINLOG_SERVER_DISABLED = iota BINLOG_SERVER_ENABLED )
const ( SlaveStartDeadline = 30 InvalidLagSeconds = 0xFFFFFFFF )
const ( TABLE_BASE_TABLE = "BASE TABLE" TABLE_VIEW = "VIEW" )
const ( SnapshotURLPath = "/snapshot" INSERT_INTO_RECOVERY = `` /* 212-byte string literal not displayed */ )
const ( DISABLED = iota ENABLED )
const (
MysqlWaitTime = 120 * time.Second // default number of seconds to wait
)
const (
SnapshotManifestFile = "snapshot_manifest.json"
)
Variables ¶
var ( STREAM_COMMENT_START = []byte("/* _stream ") BINLOG_DELIMITER = []byte("/*!*/;") BINLOG_POSITION_PREFIX = []byte("# at ") BINLOG_ROTATE_TO = []byte("Rotate to ") BINLOG_BEGIN = []byte("BEGIN") BINLOG_COMMIT = []byte("COMMIT") BINLOG_ROLLBACK = []byte("ROLLBACK") BINLOG_SET_TIMESTAMP = []byte("SET TIMESTAMP=") BINLOG_SET_INSERT = []byte("SET INSERT_ID=") BINLOG_END_LOG_POS = []byte("end_log_pos ") BINLOG_XID = []byte("Xid = ") BINLOG_GROUP_ID = []byte("group_id ") BINLOG_START = []byte("Start: binlog") BINLOG_DB_CHANGE = []byte(USE) POS = []byte(" pos: ") SPACE = []byte(" ") COMMENT = []byte("/*!") SET_SESSION_VAR = []byte("SET @@session") DELIMITER = []byte("DELIMITER ") BINLOG = []byte("BINLOG ") SEMICOLON_BYTE = []byte(";") )
var ( SLOW_TXN_THRESHOLD = time.Duration(100 * time.Millisecond) ROLLBACK = "rollback" BLPL_STREAM_COMMENT_START = "/* _stream " BLPL_SPACE = " " UPDATE_RECOVERY = "" /* 160-byte string literal not displayed */ UPDATE_RECOVERY_LAST_EOF = "update _vt.blp_checkpoint set last_eof_group_id='%v' where source_shard_uid=%v" SELECT_FROM_RECOVERY = "select * from _vt.blp_checkpoint where source_shard_uid=%v" )
var ( KEYSPACE_ID_COMMENT = []byte("/* EMD keyspace_id:") USER_ID = []byte("user_id") END_COMMENT = []byte("*/") HEARTBEAT = []byte("heartbeat") ADMIN = []byte("admin") )
var ( ErrNotSlave = errors.New("no slave status") ErrNotMaster = errors.New("no master status") )
var DefaultDbaParams = mysql.ConnectionParams{
Uname: "vt_dba",
Charset: "utf8",
}
var DefaultReplParams = mysql.ConnectionParams{
Uname: "vt_repl",
Charset: "utf8",
}
Functions ¶
func ConcurrentMap ¶
ConcurrentMap applies fun in a concurrent manner on integers from 0 to n-1 (they are assumed to be indexes of some slice containing items to be processed). The first error returned by a fun application will returned (subsequent errors will only be logged). It will use concurrency goroutines.
func DiffPermissions ¶
func DiffPermissions(leftName string, left *Permissions, rightName string, right *Permissions, er concurrency.ErrorRecorder)
func DiffPermissionsToArray ¶
func DiffPermissionsToArray(leftName string, left *Permissions, rightName string, right *Permissions) (result []string)
func DiffSchema ¶
func DiffSchema(leftName string, left *SchemaDefinition, rightName string, right *SchemaDefinition, er concurrency.ErrorRecorder)
generates a report on what's different between two SchemaDefinition for now, we skip the VIEW entirely.
func DiffSchemaToArray ¶
func DiffSchemaToArray(leftName string, left *SchemaDefinition, rightName string, right *SchemaDefinition) (result []string)
func DirectoryList ¶
func DisableBinlogServerService ¶
func DisableBinlogServerService(blServer *BinlogServer)
DisableBinlogServerService disables the service for serving.
func DisableUpdateStreamService ¶
func DisableUpdateStreamService()
func EnableBinlogServerService ¶
func EnableBinlogServerService(blServer *BinlogServer, dbname string)
EnableBinlogServerService enabled the service for serving.
func GetDmlType ¶
func GetReplicationPosition ¶
func GetReplicationPosition() (*proto.ReplicationCoordinates, error)
func GetSqlType ¶
func IgnoredStatement ¶
func IsBinlogServerEnabled ¶
func IsBinlogServerEnabled(blServer *BinlogServer) bool
func IsMasterPositionValid ¶
func IsMasterPositionValid(startCoordinates *proto.ReplicationCoordinates) bool
func IsStartPositionValid ¶
func IsStartPositionValid(startPos *proto.BinlogPosition) bool
func IsTxnStatement ¶
func IsUpdateStreamEnabled ¶
func IsUpdateStreamEnabled() bool
func NewBlplStats ¶
func NewBlplStats() *blplStats
func NewEventBuffer ¶
func NewEventBuffer(pos *proto.BinlogPosition, line []byte) *eventBuffer
func ReadStartPosition ¶
func RegisterBinlogServerService ¶
func RegisterBinlogServerService(blServer *BinlogServer)
RegisterBinlogServerService registers the service for serving and stats.
func RegisterUpdateStreamService ¶
func RegisterUpdateStreamService(mycnf *Mycnf)
func SanityCheckManifests ¶
func SanityCheckManifests(ssms []*SplitSnapshotManifest) error
SanityCheckManifests checks if the ssms can be restored together.
func ServeUpdateStream ¶
func ServeUpdateStream(req *UpdateStreamRequest, sendReply SendUpdateStreamResponse) error
func Shutdown ¶
waitForMysqld: should the function block until mysqld has stopped?
This can actually take a *long* time if the buffer cache needs to be fully flushed - on the order of 20-30 minutes.
func SnapshotDir ¶
func StartReplicationCommands ¶
func StartReplicationCommands(mysqld *Mysqld, replState *ReplicationState) ([]string, error)
func TopLevelDirs ¶
func TopLevelDirs() []string
Types ¶
type BinlogDecoder ¶
type BinlogDecoder struct {
// contains filtered or unexported fields
}
func (*BinlogDecoder) DecodeMysqlBinlog ¶
return a Reader from which the decoded binlog can be read
func (*BinlogDecoder) Kill ¶
func (decoder *BinlogDecoder) Kill() error
type BinlogParseError ¶
type BinlogParseError struct {
// contains filtered or unexported fields
}
func NewBinlogParseError ¶
func NewBinlogParseError(errType, msg string) *BinlogParseError
func (BinlogParseError) Error ¶
func (err BinlogParseError) Error() string
func (*BinlogParseError) IsEOF ¶
func (err *BinlogParseError) IsEOF() bool
func (*BinlogParseError) IsFatal ¶
func (err *BinlogParseError) IsFatal() bool
type BinlogPlayer ¶
type BinlogPlayer struct {
// contains filtered or unexported fields
}
BinlogPlayer is handling reading a stream of updates from BinlogServer
func NewBinlogPlayer ¶
func (*BinlogPlayer) ApplyBinlogEvents ¶
func (blp *BinlogPlayer) ApplyBinlogEvents(interrupted chan struct{}) error
ApplyBinlogEvents makes a gob rpc request to BinlogServer and processes the events.
func (*BinlogPlayer) StatsJSON ¶
func (blp *BinlogPlayer) StatsJSON() string
type BinlogReader ¶
type BinlogReader struct { // these parameters will have reasonable default values but can be tuned BinlogBlockSize int64 MaxWaitTimeout time.Duration LogWaitTimeout time.Duration // contains filtered or unexported fields }
func NewBinlogReader ¶
func NewBinlogReader(binLogPrefix string) *BinlogReader
type BinlogServer ¶
type BinlogServer struct {
// contains filtered or unexported fields
}
func NewBinlogServer ¶
func NewBinlogServer(mysqld *Mysqld) *BinlogServer
func (*BinlogServer) ServeBinlog ¶
func (blServer *BinlogServer) ServeBinlog(req *proto.BinlogServerRequest, sendReply proto.SendBinlogResponse) error
type BinlogServerError ¶
type BinlogServerError struct {
Msg string
}
func (BinlogServerError) Error ¶
func (err BinlogServerError) Error() string
type Blp ¶
type Blp struct {
// contains filtered or unexported fields
}
func NewBlp ¶
func NewBlp(startCoordinates *proto.ReplicationCoordinates, updateStream *UpdateStream) *Blp
func (*Blp) ComputeBacklog ¶
func (Blp) DebugJsonString ¶
func (stats Blp) DebugJsonString() string
func (*Blp) StreamBinlog ¶
func (blp *Blp) StreamBinlog(sendReply SendUpdateStreamResponse, binlogPrefix string) (err error)
Main entry function for reading and parsing the binlog.
type BlpPosition ¶
type ByReverseDataLength ¶
type ByReverseDataLength struct {
TableDefinitions
}
sort by reverse DataLength
func (ByReverseDataLength) Less ¶
func (bdl ByReverseDataLength) Less(i, j int) bool
type CreateConnection ¶
type CreateConnection func() (*mysql.Connection, error)
type DBClient ¶
type DBClient struct {
// contains filtered or unexported fields
}
DBClient is a real VtClient backed by a mysql connection
func NewDbClient ¶
func NewDbClient(dbConfig *mysql.ConnectionParams) *DBClient
func (*DBClient) ExecuteFetch ¶
type DbPermission ¶
DbPermission describes a single row in the mysql.db table Primary key is Host+Db+User
func (*DbPermission) PrimaryKey ¶
func (dp *DbPermission) PrimaryKey() string
func (*DbPermission) String ¶
func (dp *DbPermission) String() string
type DbPermissionList ¶
type DbPermissionList []*DbPermission
func (DbPermissionList) Get ¶
func (upl DbPermissionList) Get(i int) Permission
func (DbPermissionList) Len ¶
func (upl DbPermissionList) Len() int
type DummyVtClient ¶
type DummyVtClient struct {
// contains filtered or unexported fields
}
DummyVtClient is a VtClient that writes to a writer instead of executing anything
func NewDummyVtClient ¶
func NewDummyVtClient() *DummyVtClient
func (DummyVtClient) Begin ¶
func (dc DummyVtClient) Begin() error
func (DummyVtClient) Close ¶
func (dc DummyVtClient) Close()
func (DummyVtClient) Commit ¶
func (dc DummyVtClient) Commit() error
func (DummyVtClient) Connect ¶
func (dc DummyVtClient) Connect() error
func (DummyVtClient) ExecuteFetch ¶
func (dc DummyVtClient) ExecuteFetch(query string, maxrows int, wantfields bool) (qr *proto.QueryResult, err error)
func (DummyVtClient) Rollback ¶
func (dc DummyVtClient) Rollback() error
type FakeMysqlDaemon ¶
type FakeMysqlDaemon struct { // will be returned by GetMasterAddr(). Set to "" to return an error. MasterAddr string }
FakeMysqlDaemon implements MysqlDaemon and allows the user to fake everything.
func (*FakeMysqlDaemon) GetMasterAddr ¶
func (fmd *FakeMysqlDaemon) GetMasterAddr() (string, error)
type HostPermission ¶
HostPermission describes a single row in the mysql.host table Primary key is Host+Db
func (*HostPermission) PrimaryKey ¶
func (hp *HostPermission) PrimaryKey() string
func (*HostPermission) String ¶
func (hp *HostPermission) String() string
type HostPermissionList ¶
type HostPermissionList []*HostPermission
func (HostPermissionList) Get ¶
func (upl HostPermissionList) Get(i int) Permission
func (HostPermissionList) Len ¶
func (upl HostPermissionList) Len() int
type Mycnf ¶
type Mycnf struct { ServerId uint32 MysqlPort int DataDir string InnodbDataHomeDir string InnodbLogGroupHomeDir string SocketFile string StartKey string EndKey string ErrorLogPath string SlowLogPath string RelayLogPath string RelayLogIndexPath string RelayLogInfoPath string BinLogPath string MasterInfoFile string PidFile string TmpDir string SlaveLoadTmpDir string // contains filtered or unexported fields }
func NewMycnf ¶
func NewMycnf(uid uint32, mysqlPort int, vtRepl VtReplParams) *Mycnf
NewMycnf fills the Mycnf structure with vt root paths and derived values. This is used to fill out the cnfTemplate values and generate my.cnf. uid is a unique id for a particular tablet - it must be unique within the tabletservers deployed within a keyspace, lest there be collisions on disk. mysqldPort needs to be unique per instance per machine.
type MysqlBinlog ¶
type MysqlBinlog struct {
// contains filtered or unexported fields
}
func (*MysqlBinlog) Kill ¶
func (mbl *MysqlBinlog) Kill()
Kill terminates the current mysqlbinlog process.
func (*MysqlBinlog) Launch ¶
func (mbl *MysqlBinlog) Launch(dbname, filename string, pos uint64) (stdout io.ReadCloser, err error)
MysqlBinlog launches mysqlbinlog and returns a ReadCloser into which its output will be piped. The stderr will be redirected to the log.
func (*MysqlBinlog) Wait ¶
func (mbl *MysqlBinlog) Wait() error
Wait waits for the mysqlbinlog process to terminate and returns an error if there was any.
type MysqlDaemon ¶
type MysqlDaemon interface { // GetMasterAddr returns the mysql master address, as shown by // 'show slave status'. GetMasterAddr() (string, error) }
MysqlDaemon is the interface we use for abstracting Mysqld.
type Mysqld ¶
type Mysqld struct { TabletDir string SnapshotDir string // contains filtered or unexported fields }
func (*Mysqld) ApplySchemaChange ¶
func (mysqld *Mysqld) ApplySchemaChange(dbName string, change *SchemaChange) (*SchemaChangeResult, error)
func (*Mysqld) BreakSlaves ¶
Force all slaves to error and stop. This is extreme, but helpful for emergencies and tests. Insert a row, block the propagation of its subsequent delete and reinsert it. This forces a failure on slaves only.
func (*Mysqld) CheckReplication ¶
Check for the magic row inserted under controlled reparenting.
func (*Mysqld) CreateMultiSnapshot ¶
func (mysqld *Mysqld) CreateMultiSnapshot(keyRanges []key.KeyRange, dbName, keyName string, sourceAddr string, allowHierarchicalReplication bool, snapshotConcurrency int, tables []string, skipSlaveRestart bool, maximumFilesize uint64, hookExtraEnv map[string]string) (snapshotManifestFilenames []string, err error)
func (*Mysqld) CreateSnapshot ¶
func (mysqld *Mysqld) CreateSnapshot(dbName, sourceAddr string, allowHierarchicalReplication bool, concurrency int, serverMode bool, hookExtraEnv map[string]string) (snapshotManifestUrlPath string, slaveStartRequired, readOnly bool, err error)
This function runs on the machine acting as the source for the clone.
Check master/slave status and determine restore needs. If this instance is a slave, stop replication, otherwise place in read-only mode. Record replication position. Shutdown mysql Check paths for storing data
Depending on the serverMode flag, we do the following: serverMode = false:
Compress /vt/vt_[0-9a-f]+/data/vt_.+ Compute hash (of compressed files, as we serve .gz files here) Place in /vt/clone_src where they will be served by http server (not rpc) Restart mysql
serverMode = true:
Make symlinks for /vt/vt_[0-9a-f]+/data/vt_.+ to innodb files Compute hash (of uncompressed files, as we serve uncompressed files) Place symlinks in /vt/clone_src where they will be served by http server Leave mysql stopped, return slaveStartRequired, readOnly
func (*Mysqld) DemoteMaster ¶
func (mysqld *Mysqld) DemoteMaster() (*ReplicationPosition, error)
if the master is still alive, then we need to demote it gracefully make it read-only, flush the writes and get the position
func (*Mysqld) ExecuteMysqlCommand ¶
executes some SQL commands using a mysql command line interface process
func (*Mysqld) FindSlaves ¶
Get IP addresses for all currently connected slaves. FIXME(msolomon) use command instead of user to find "rogue" slaves?
func (*Mysqld) FindVtDatabases ¶
func (*Mysqld) GetColumns ¶
GetColumns returns the columns of table.
func (*Mysqld) GetMasterAddr ¶
func (*Mysqld) GetPermissions ¶
func (mysqld *Mysqld) GetPermissions() (*Permissions, error)
func (*Mysqld) GetSchema ¶
func (mysqld *Mysqld) GetSchema(dbName string, tables []string, includeViews bool) (*SchemaDefinition, error)
GetSchema returns the schema for database for tables listed in tables. If tables is empty, return the schema for all tables.
func (*Mysqld) IsReadOnly ¶
func (*Mysqld) MasterStatus ¶
func (mysqld *Mysqld) MasterStatus() (rp *ReplicationPosition, err error)
mysql> show master status\G **************************** 1. row *************************** File: vt-000001c6-bin.000003 Position: 106 Binlog_Do_DB: Binlog_Ignore_DB: Group_ID:
func (*Mysqld) MultiRestore ¶
func (mysqld *Mysqld) MultiRestore(destinationDbName string, keyRange key.KeyRange, sourceAddrs []*url.URL, snapshotConcurrency, fetchConcurrency, insertTableConcurrency, fetchRetryCount int, strategy string) (err error)
MultiRestore is the main entry point for multi restore.
- If the strategy contains the string 'writeBinLogs' then we will also write to the binary logs.
- If the strategy contains the command 'populateBlpCheckpoint' then we will populate the blp_checkpoint table with master positions to start from
func (*Mysqld) PreflightSchemaChange ¶
func (mysqld *Mysqld) PreflightSchemaChange(dbName string, change string) (*SchemaChangeResult, error)
func (*Mysqld) PromoteSlave ¶
func (mysqld *Mysqld) PromoteSlave(setReadWrite bool) (replicationState *ReplicationState, waitPosition *ReplicationPosition, timePromoted int64, err error)
setReadWrite: set the new master in read-write mode.
replicationState: info slaves need to reparent themselves waitPosition: slaves can wait for this position when restarting replication timePromoted: this timestamp (unix nanoseconds) is inserted into _vt.replication_log to verify the replication config
func (*Mysqld) ReparentPosition ¶
func (mysqld *Mysqld) ReparentPosition(slavePosition *ReplicationPosition) (rs *ReplicationState, waitPosition *ReplicationPosition, reparentTime int64, err error)
Return a replication state that will reparent a slave to the correct master for a specified position.
func (*Mysqld) RestartSlave ¶
func (mysqld *Mysqld) RestartSlave(replicationState *ReplicationState, waitPosition *ReplicationPosition, timeCheck int64) error
func (*Mysqld) RestoreFromSnapshot ¶
func (mysqld *Mysqld) RestoreFromSnapshot(snapshotManifest *SnapshotManifest, fetchConcurrency, fetchRetryCount int, dontWaitForSlaveStart bool, hookExtraEnv map[string]string) error
This piece runs on the presumably empty machine acting as the target in the create replica action.
validate target (self) shutdown_mysql() create temp data directory /vt/target/vt_<keyspace> copy compressed data files via HTTP verify hash of compressed files uncompress into /vt/vt_<target-uid>/data/vt_<keyspace> start_mysql() clean up compressed files
func (*Mysqld) SetReadOnly ¶
func (*Mysqld) SlaveStatus ¶
func (mysqld *Mysqld) SlaveStatus() (*ReplicationPosition, error)
func (*Mysqld) SnapshotSourceEnd ¶
func (*Mysqld) StartSlave ¶
func (*Mysqld) ValidateCloneTarget ¶
func (*Mysqld) ValidateSnapshotPath ¶
Helper function to make sure we can write to the local snapshot area, before we actually do any action (can be used for both partial and full snapshots)
func (*Mysqld) WaitBlpPos ¶
func (mysqld *Mysqld) WaitBlpPos(bp *BlpPosition, waitTimeout int) error
func (*Mysqld) WaitForSlave ¶
func (*Mysqld) WaitForSlaveStart ¶
func (*Mysqld) WaitMasterPos ¶
func (mysqld *Mysqld) WaitMasterPos(rp *ReplicationPosition, waitTimeout int) error
type Permission ¶
type PermissionList ¶
type PermissionList interface { Get(int) Permission Len() int }
type Permissions ¶
type Permissions struct { UserPermissions UserPermissionList DbPermissions DbPermissionList HostPermissions HostPermissionList }
Permissions have all the rows in mysql.{user,db,host} tables, (all rows are sorted by primary key)
func (*Permissions) String ¶
func (permissions *Permissions) String() string
type ReplicationPosition ¶
type ReplicationPosition struct { // MasterLogFile, MasterLogPosition and MasterLogGroupId are // the position on the logs for transactions that have been // applied (SQL position): // - on the master, it's File, Position and Group_ID from // 'show master status'. // - on the slave, it's Relay_Master_Log_File, Exec_Master_Log_Pos // and Exec_Master_Group_ID from 'show slave status'. MasterLogFile string MasterLogPosition uint MasterLogGroupId string // MasterLogFileIo and MasterLogPositionIo are the position on the logs // that have been downloaded from the master (IO position), // but not necessarely applied yet: // - on the master, same as MasterLogFile and MasterLogPosition. // - on the slave, it's Master_Log_File and Read_Master_Log_Pos // from 'show slave status'. MasterLogFileIo string MasterLogPositionIo uint // SecondsBehindMaster is how far behind we are in applying logs in // replication. If equal to InvalidLagSeconds, it means replication // is not running. SecondsBehindMaster uint }
ReplicationPosition tracks the replication position on both a master and a slave.
func (ReplicationPosition) MapKey ¶
func (rp ReplicationPosition) MapKey() string
func (ReplicationPosition) MapKeyIo ¶
func (rp ReplicationPosition) MapKeyIo() string
type ReplicationState ¶
type ReplicationState struct { // ReplicationPosition is not anonymous because the default json encoder has begun to fail here. ReplicationPosition ReplicationPosition MasterHost string MasterPort int MasterConnectRetry int }
func NewReplicationState ¶
func NewReplicationState(masterAddr string) (*ReplicationState, error)
func (ReplicationState) MasterAddr ¶
func (rs ReplicationState) MasterAddr() string
type SchemaChange ¶
type SchemaChange struct { Sql string Force bool AllowReplication bool BeforeSchema *SchemaDefinition AfterSchema *SchemaDefinition }
type SchemaChangeResult ¶
type SchemaChangeResult struct { BeforeSchema *SchemaDefinition AfterSchema *SchemaDefinition }
func (*SchemaChangeResult) String ¶
func (scr *SchemaChangeResult) String() string
type SchemaDefinition ¶
type SchemaDefinition struct { // the 'CREATE DATABASE...' statement, with db name as {{.DatabaseName}} DatabaseSchema string // ordered by TableDefinition.Name by default TableDefinitions TableDefinitions // the md5 of the concatenation of TableDefinition.Schema Version string }
func (*SchemaDefinition) GetTable ¶
func (sd *SchemaDefinition) GetTable(table string) (td *TableDefinition, ok bool)
func (*SchemaDefinition) SortByReverseDataLength ¶
func (sd *SchemaDefinition) SortByReverseDataLength()
func (*SchemaDefinition) String ¶
func (sd *SchemaDefinition) String() string
type SendUpdateStreamResponse ¶
type SendUpdateStreamResponse func(response interface{}) error
type SnapshotFile ¶
SnapshotFile describes a file to serve. 'Path' is the path component of the URL. SnapshotManifest.Addr is the host+port component of the URL. If path ends in '.gz', it is compressed. Size and Hash are computed on the Path itself if TableName is set, this file belongs to that table
type SnapshotFiles ¶
type SnapshotFiles []SnapshotFile
func (SnapshotFiles) Len ¶
func (s SnapshotFiles) Len() int
sort.Interface we sort by descending file size
func (SnapshotFiles) Less ¶
func (s SnapshotFiles) Less(i, j int) bool
func (SnapshotFiles) Swap ¶
func (s SnapshotFiles) Swap(i, j int)
type SnapshotManifest ¶
type SnapshotManifest struct { Addr string // this is the address of the tabletserver, not mysql DbName string Files SnapshotFiles ReplicationState *ReplicationState MasterState *ReplicationState }
a SnapshotManifest describes multiple SnapshotFiles and where to get them from.
func ReadSnapshotManifest ¶
func ReadSnapshotManifest(filename string) (*SnapshotManifest, error)
type SplitSnapshotManifest ¶
type SplitSnapshotManifest struct { // Source describes the files and our tablet Source *SnapshotManifest // KeyRange describes the data present in this snapshot // When splitting 40-80 into 40-60 and 60-80, this would // have 40-60 for instance. KeyRange key.KeyRange // The schema for this server SchemaDefinition *SchemaDefinition }
func NewSplitSnapshotManifest ¶
func NewSplitSnapshotManifest(myAddr, myMysqlAddr, masterAddr, dbName string, files []SnapshotFile, pos, myMasterPos *ReplicationPosition, keyRange key.KeyRange, sd *SchemaDefinition) (*SplitSnapshotManifest, error)
NewSplitSnapshotManifest creates a new SplitSnapshotManifest. myAddr and myMysqlAddr are the local server addresses. masterAddr is the address of the server to use as master. pos is the replication position to use on that master. myMasterPos is the local server master position
type TableDefinition ¶
type TableDefinition struct { Name string // the table name Schema string // the SQL to run to create the table Columns []string // the columns in the order that will be used to dump and load the data Type string // TABLE_BASE_TABLE or TABLE_VIEW DataLength uint64 // how much space the data file takes. }
type TableDefinitions ¶
type TableDefinitions []TableDefinition
helper methods for sorting
func (TableDefinitions) Len ¶
func (tds TableDefinitions) Len() int
func (TableDefinitions) Swap ¶
func (tds TableDefinitions) Swap(i, j int)
type UpdateResponse ¶
type UpdateResponse struct { Coord proto.BinlogPosition Data EventData }
Api Interface
type UpdateStream ¶
type UpdateStream struct {
// contains filtered or unexported fields
}
var UpdateStreamRpcService *UpdateStream
func (*UpdateStream) ServeUpdateStream ¶
func (updateStream *UpdateStream) ServeUpdateStream(req *UpdateStreamRequest, sendReply SendUpdateStreamResponse) (err error)
type UpdateStreamRequest ¶
type UpdateStreamRequest struct {
StartPosition proto.BinlogPosition
}
type UserPermission ¶
type UserPermission struct { Host string User string PasswordChecksum uint64 Privileges map[string]string }
UserPermission describes a single row in the mysql.user table Primary key is Host+User PasswordChecksum is the crc64 of the password, for security reasons
func (*UserPermission) PrimaryKey ¶
func (up *UserPermission) PrimaryKey() string
func (*UserPermission) String ¶
func (up *UserPermission) String() string
type UserPermissionList ¶
type UserPermissionList []*UserPermission
func (UserPermissionList) Get ¶
func (upl UserPermissionList) Get(i int) Permission
func (UserPermissionList) Len ¶
func (upl UserPermissionList) Len() int
type VtClient ¶
type VtClient interface { Connect() error Begin() error Commit() error Rollback() error Close() ExecuteFetch(query string, maxrows int, wantfields bool) (qr *proto.QueryResult, err error) }
VtClient is a high level interface to the database