mysqlctl

package
v0.0.0-...-7c5168d Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 19, 2013 License: BSD-3-Clause Imports: 48 Imported by: 0

Documentation

Overview

vt binlog server: Serves binlog for out of band replication.

Copyright 2012, Google Inc. All rights reserved. Use of this source code is governed by a BSD-style license that can be found in the LICENSE file.

Index

Constants

View Source
const (
	MAX_TXN_BATCH = 1024
	DML           = "DML"
	DDL           = "DDL"
	BEGIN         = "BEGIN"
	COMMIT        = "COMMIT"
	USE           = "use"
	EOF           = "EOF"
)
View Source
const (
	FATAL             = "Fatal"
	SERVICE_ERROR     = "Service Error"
	EVENT_ERROR       = "Event Error"
	CODE_ERROR        = "Code Error"
	REPLICATION_ERROR = "Replication Error"
	CONNECTION_ERROR  = "Connection Error"
)

Error types for update stream

View Source
const (
	BINLOG_HEADER_SIZE = 4  // copied from mysqlbinlog.cc for mysql 5.0.33
	EVENT_HEADER_SIZE  = 19 // 4.0 and above, can be larger in 5.x
	BINLOG_BLOCK_SIZE  = 16 * 1024
	MYSQLBINLOG_CHUNK  = 64 * 1024
	MAX_WAIT_TIMEOUT   = 30 * time.Second
	LOG_WAIT_TIMEOUT   = time.Second / 2
)
View Source
const (
	BINLOG_SERVER_DISABLED = iota
	BINLOG_SERVER_ENABLED
)
View Source
const (
	SlaveStartDeadline = 30
	InvalidLagSeconds  = 0xFFFFFFFF
)
View Source
const (
	TABLE_BASE_TABLE = "BASE TABLE"
	TABLE_VIEW       = "VIEW"
)
View Source
const (
	SnapshotURLPath = "/snapshot"

	INSERT_INTO_RECOVERY = `` /* 212-byte string literal not displayed */

)
View Source
const (
	DISABLED = iota
	ENABLED
)
View Source
const (
	MysqlWaitTime = 120 * time.Second // default number of seconds to wait
)
View Source
const (
	SnapshotManifestFile = "snapshot_manifest.json"
)

Variables

View Source
var (
	STREAM_COMMENT_START   = []byte("/* _stream ")
	BINLOG_DELIMITER       = []byte("/*!*/;")
	BINLOG_POSITION_PREFIX = []byte("# at ")
	BINLOG_ROTATE_TO       = []byte("Rotate to ")
	BINLOG_BEGIN           = []byte("BEGIN")
	BINLOG_COMMIT          = []byte("COMMIT")
	BINLOG_ROLLBACK        = []byte("ROLLBACK")
	BINLOG_SET_TIMESTAMP   = []byte("SET TIMESTAMP=")
	BINLOG_SET_INSERT      = []byte("SET INSERT_ID=")
	BINLOG_END_LOG_POS     = []byte("end_log_pos ")
	BINLOG_XID             = []byte("Xid = ")
	BINLOG_GROUP_ID        = []byte("group_id ")
	BINLOG_START           = []byte("Start: binlog")
	BINLOG_DB_CHANGE       = []byte(USE)
	POS                    = []byte(" pos: ")
	SPACE                  = []byte(" ")
	COMMENT                = []byte("/*!")
	SET_SESSION_VAR        = []byte("SET @@session")
	DELIMITER              = []byte("DELIMITER ")
	BINLOG                 = []byte("BINLOG ")
	SEMICOLON_BYTE         = []byte(";")
)
View Source
var (
	SLOW_TXN_THRESHOLD        = time.Duration(100 * time.Millisecond)
	ROLLBACK                  = "rollback"
	BLPL_STREAM_COMMENT_START = "/* _stream "
	BLPL_SPACE                = " "
	UPDATE_RECOVERY           = "" /* 160-byte string literal not displayed */
	UPDATE_RECOVERY_LAST_EOF  = "update _vt.blp_checkpoint set last_eof_group_id='%v' where source_shard_uid=%v"
	SELECT_FROM_RECOVERY      = "select * from _vt.blp_checkpoint where source_shard_uid=%v"
)
View Source
var (
	KEYSPACE_ID_COMMENT = []byte("/* EMD keyspace_id:")
	USER_ID             = []byte("user_id")
	END_COMMENT         = []byte("*/")

	HEARTBEAT = []byte("heartbeat")
	ADMIN     = []byte("admin")
)
View Source
var (
	ErrNotSlave  = errors.New("no slave status")
	ErrNotMaster = errors.New("no master status")
)
View Source
var DefaultDbaParams = mysql.ConnectionParams{
	Uname:   "vt_dba",
	Charset: "utf8",
}
View Source
var DefaultReplParams = mysql.ConnectionParams{
	Uname:   "vt_repl",
	Charset: "utf8",
}
View Source
var SqlKwMap = map[string]string{
	"create":   DDL,
	"alter":    DDL,
	"drop":     DDL,
	"truncate": DDL,
	"rename":   DDL,
	"insert":   DML,
	"update":   DML,
	"delete":   DML,
	"begin":    BEGIN,
	"commit":   COMMIT,
}

Functions

func ConcurrentMap

func ConcurrentMap(concurrency, n int, fun MapFunc) error

ConcurrentMap applies fun in a concurrent manner on integers from 0 to n-1 (they are assumed to be indexes of some slice containing items to be processed). The first error returned by a fun application will returned (subsequent errors will only be logged). It will use concurrency goroutines.

func DiffPermissions

func DiffPermissions(leftName string, left *Permissions, rightName string, right *Permissions, er concurrency.ErrorRecorder)

func DiffPermissionsToArray

func DiffPermissionsToArray(leftName string, left *Permissions, rightName string, right *Permissions) (result []string)

func DiffSchema

func DiffSchema(leftName string, left *SchemaDefinition, rightName string, right *SchemaDefinition, er concurrency.ErrorRecorder)

generates a report on what's different between two SchemaDefinition for now, we skip the VIEW entirely.

func DiffSchemaToArray

func DiffSchemaToArray(leftName string, left *SchemaDefinition, rightName string, right *SchemaDefinition) (result []string)

func DirectoryList

func DirectoryList(cnf *Mycnf) []string

func DisableBinlogServerService

func DisableBinlogServerService(blServer *BinlogServer)

DisableBinlogServerService disables the service for serving.

func DisableUpdateStreamService

func DisableUpdateStreamService()

func EnableBinlogServerService

func EnableBinlogServerService(blServer *BinlogServer, dbname string)

EnableBinlogServerService enabled the service for serving.

func EnableUpdateStreamService

func EnableUpdateStreamService(tabletType string, dbcfgs dbconfigs.DBConfigs)

func GetDmlType

func GetDmlType(firstKw string) string

func GetReplicationPosition

func GetReplicationPosition() (*proto.ReplicationCoordinates, error)

func GetSqlType

func GetSqlType(firstKw string) string

func IgnoredStatement

func IgnoredStatement(line []byte) bool

func Init

func Init(mt *Mysqld, mysqlWaitTime time.Duration) error

func IsBinlogServerEnabled

func IsBinlogServerEnabled(blServer *BinlogServer) bool

func IsMasterPositionValid

func IsMasterPositionValid(startCoordinates *proto.ReplicationCoordinates) bool

func IsStartPositionValid

func IsStartPositionValid(startPos *proto.BinlogPosition) bool

func IsTxnStatement

func IsTxnStatement(line []byte, firstKw string) bool

func IsUpdateStreamEnabled

func IsUpdateStreamEnabled() bool

func LogsDir

func LogsDir() string

func MakeMycnf

func MakeMycnf(mycnf *Mycnf, cnfFiles []string) (string, error)

Join cnf files cnfPaths and subsitute in the right values.

func MycnfFile

func MycnfFile(uid uint32) string

func NewBlplStats

func NewBlplStats() *blplStats

func NewEventBuffer

func NewEventBuffer(pos *proto.BinlogPosition, line []byte) *eventBuffer

func ReadStartPosition

func ReadStartPosition(dbClient VtClient, uid uint32) (*binlogRecoveryState, error)

func RegisterBinlogServerService

func RegisterBinlogServerService(blServer *BinlogServer)

RegisterBinlogServerService registers the service for serving and stats.

func RegisterUpdateStreamService

func RegisterUpdateStreamService(mycnf *Mycnf)

func SanityCheckManifests

func SanityCheckManifests(ssms []*SplitSnapshotManifest) error

SanityCheckManifests checks if the ssms can be restored together.

func ServeUpdateStream

func ServeUpdateStream(req *UpdateStreamRequest, sendReply SendUpdateStreamResponse) error

func Shutdown

func Shutdown(mt *Mysqld, waitForMysqld bool, mysqlWaitTime time.Duration) error
waitForMysqld: should the function block until mysqld has stopped?

This can actually take a *long* time if the buffer cache needs to be fully flushed - on the order of 20-30 minutes.

func SnapshotDir

func SnapshotDir(uid uint32) string

func Start

func Start(mt *Mysqld, mysqlWaitTime time.Duration) error

func StartReplicationCommands

func StartReplicationCommands(mysqld *Mysqld, replState *ReplicationState) ([]string, error)

func TabletDir

func TabletDir(uid uint32) string

func Teardown

func Teardown(mt *Mysqld, force bool) error

func TopLevelDirs

func TopLevelDirs() []string

Types

type BinlogDecoder

type BinlogDecoder struct {
	// contains filtered or unexported fields
}

func (*BinlogDecoder) DecodeMysqlBinlog

func (decoder *BinlogDecoder) DecodeMysqlBinlog(binlog *os.File) (io.Reader, error)

return a Reader from which the decoded binlog can be read

func (*BinlogDecoder) Kill

func (decoder *BinlogDecoder) Kill() error

type BinlogParseError

type BinlogParseError struct {
	// contains filtered or unexported fields
}

func NewBinlogParseError

func NewBinlogParseError(errType, msg string) *BinlogParseError

func (BinlogParseError) Error

func (err BinlogParseError) Error() string

func (*BinlogParseError) IsEOF

func (err *BinlogParseError) IsEOF() bool

func (*BinlogParseError) IsFatal

func (err *BinlogParseError) IsFatal() bool

type BinlogPlayer

type BinlogPlayer struct {
	// contains filtered or unexported fields
}

BinlogPlayer is handling reading a stream of updates from BinlogServer

func NewBinlogPlayer

func NewBinlogPlayer(dbClient VtClient, keyRange key.KeyRange, uid uint32, startPosition *binlogRecoveryState, tables []string, txnBatch int, maxTxnInterval time.Duration, execDdl bool) (*BinlogPlayer, error)

func (*BinlogPlayer) ApplyBinlogEvents

func (blp *BinlogPlayer) ApplyBinlogEvents(interrupted chan struct{}) error

ApplyBinlogEvents makes a gob rpc request to BinlogServer and processes the events.

func (*BinlogPlayer) StatsJSON

func (blp *BinlogPlayer) StatsJSON() string

type BinlogReader

type BinlogReader struct {

	// these parameters will have reasonable default values but can be tuned
	BinlogBlockSize int64
	MaxWaitTimeout  time.Duration
	LogWaitTimeout  time.Duration
	// contains filtered or unexported fields
}

func NewBinlogReader

func NewBinlogReader(binLogPrefix string) *BinlogReader

func (*BinlogReader) ServeData

func (blr *BinlogReader) ServeData(writer io.Writer, filename string, startPosition int64)

type BinlogServer

type BinlogServer struct {
	// contains filtered or unexported fields
}

func NewBinlogServer

func NewBinlogServer(mysqld *Mysqld) *BinlogServer

func (*BinlogServer) ServeBinlog

func (blServer *BinlogServer) ServeBinlog(req *proto.BinlogServerRequest, sendReply proto.SendBinlogResponse) error

type BinlogServerError

type BinlogServerError struct {
	Msg string
}

func (BinlogServerError) Error

func (err BinlogServerError) Error() string

type Blp

type Blp struct {
	// contains filtered or unexported fields
}

func NewBlp

func NewBlp(startCoordinates *proto.ReplicationCoordinates, updateStream *UpdateStream) *Blp

func (*Blp) ComputeBacklog

func (blp *Blp) ComputeBacklog() int64

func (Blp) DebugJsonString

func (stats Blp) DebugJsonString() string

func (*Blp) StreamBinlog

func (blp *Blp) StreamBinlog(sendReply SendUpdateStreamResponse, binlogPrefix string) (err error)

Main entry function for reading and parsing the binlog.

type BlpPosition

type BlpPosition struct {
	Uid     uint32
	GroupId string
}

type Bls

type Bls struct {
	// contains filtered or unexported fields
}

type ByReverseDataLength

type ByReverseDataLength struct {
	TableDefinitions
}

sort by reverse DataLength

func (ByReverseDataLength) Less

func (bdl ByReverseDataLength) Less(i, j int) bool

type CreateConnection

type CreateConnection func() (*mysql.Connection, error)

type DBClient

type DBClient struct {
	// contains filtered or unexported fields
}

DBClient is a real VtClient backed by a mysql connection

func NewDbClient

func NewDbClient(dbConfig *mysql.ConnectionParams) *DBClient

func (*DBClient) Begin

func (dc *DBClient) Begin() error

func (*DBClient) Close

func (dc *DBClient) Close()

func (*DBClient) Commit

func (dc *DBClient) Commit() error

func (*DBClient) Connect

func (dc *DBClient) Connect() error

func (*DBClient) ExecuteFetch

func (dc *DBClient) ExecuteFetch(query string, maxrows int, wantfields bool) (*proto.QueryResult, error)

func (*DBClient) Rollback

func (dc *DBClient) Rollback() error

type DbPermission

type DbPermission struct {
	Host       string
	Db         string
	User       string
	Privileges map[string]string
}

DbPermission describes a single row in the mysql.db table Primary key is Host+Db+User

func (*DbPermission) PrimaryKey

func (dp *DbPermission) PrimaryKey() string

func (*DbPermission) String

func (dp *DbPermission) String() string

type DbPermissionList

type DbPermissionList []*DbPermission

func (DbPermissionList) Get

func (upl DbPermissionList) Get(i int) Permission

func (DbPermissionList) Len

func (upl DbPermissionList) Len() int

type DummyVtClient

type DummyVtClient struct {
	// contains filtered or unexported fields
}

DummyVtClient is a VtClient that writes to a writer instead of executing anything

func NewDummyVtClient

func NewDummyVtClient() *DummyVtClient

func (DummyVtClient) Begin

func (dc DummyVtClient) Begin() error

func (DummyVtClient) Close

func (dc DummyVtClient) Close()

func (DummyVtClient) Commit

func (dc DummyVtClient) Commit() error

func (DummyVtClient) Connect

func (dc DummyVtClient) Connect() error

func (DummyVtClient) ExecuteFetch

func (dc DummyVtClient) ExecuteFetch(query string, maxrows int, wantfields bool) (qr *proto.QueryResult, err error)

func (DummyVtClient) Rollback

func (dc DummyVtClient) Rollback() error

type EventData

type EventData struct {
	SqlType    string
	TableName  string
	Sql        string
	PkColNames []string
	PkValues   [][]interface{}
}

type FakeMysqlDaemon

type FakeMysqlDaemon struct {
	// will be returned by GetMasterAddr(). Set to "" to return an error.
	MasterAddr string
}

FakeMysqlDaemon implements MysqlDaemon and allows the user to fake everything.

func (*FakeMysqlDaemon) GetMasterAddr

func (fmd *FakeMysqlDaemon) GetMasterAddr() (string, error)

type HostPermission

type HostPermission struct {
	Host       string
	Db         string
	Privileges map[string]string
}

HostPermission describes a single row in the mysql.host table Primary key is Host+Db

func (*HostPermission) PrimaryKey

func (hp *HostPermission) PrimaryKey() string

func (*HostPermission) String

func (hp *HostPermission) String() string

type HostPermissionList

type HostPermissionList []*HostPermission

func (HostPermissionList) Get

func (upl HostPermissionList) Get(i int) Permission

func (HostPermissionList) Len

func (upl HostPermissionList) Len() int

type MapFunc

type MapFunc func(index int) error

type Mycnf

type Mycnf struct {
	ServerId              uint32
	MysqlPort             int
	DataDir               string
	InnodbDataHomeDir     string
	InnodbLogGroupHomeDir string
	SocketFile            string
	StartKey              string
	EndKey                string
	ErrorLogPath          string
	SlowLogPath           string
	RelayLogPath          string
	RelayLogIndexPath     string
	RelayLogInfoPath      string
	BinLogPath            string
	MasterInfoFile        string
	PidFile               string
	TmpDir                string
	SlaveLoadTmpDir       string
	// contains filtered or unexported fields
}

func NewMycnf

func NewMycnf(uid uint32, mysqlPort int, vtRepl VtReplParams) *Mycnf

NewMycnf fills the Mycnf structure with vt root paths and derived values. This is used to fill out the cnfTemplate values and generate my.cnf. uid is a unique id for a particular tablet - it must be unique within the tabletservers deployed within a keyspace, lest there be collisions on disk. mysqldPort needs to be unique per instance per machine.

func ReadMycnf

func ReadMycnf(cnfFile string) (mycnf *Mycnf, err error)

type MysqlBinlog

type MysqlBinlog struct {
	// contains filtered or unexported fields
}

func (*MysqlBinlog) Kill

func (mbl *MysqlBinlog) Kill()

Kill terminates the current mysqlbinlog process.

func (*MysqlBinlog) Launch

func (mbl *MysqlBinlog) Launch(dbname, filename string, pos uint64) (stdout io.ReadCloser, err error)

MysqlBinlog launches mysqlbinlog and returns a ReadCloser into which its output will be piped. The stderr will be redirected to the log.

func (*MysqlBinlog) Wait

func (mbl *MysqlBinlog) Wait() error

Wait waits for the mysqlbinlog process to terminate and returns an error if there was any.

type MysqlDaemon

type MysqlDaemon interface {
	// GetMasterAddr returns the mysql master address, as shown by
	// 'show slave status'.
	GetMasterAddr() (string, error)
}

MysqlDaemon is the interface we use for abstracting Mysqld.

type Mysqld

type Mysqld struct {
	TabletDir   string
	SnapshotDir string
	// contains filtered or unexported fields
}

func NewMysqld

func NewMysqld(config *Mycnf, dba, repl mysql.ConnectionParams) *Mysqld

func (*Mysqld) Addr

func (mysqld *Mysqld) Addr() string

func (*Mysqld) ApplySchemaChange

func (mysqld *Mysqld) ApplySchemaChange(dbName string, change *SchemaChange) (*SchemaChangeResult, error)

func (*Mysqld) BreakSlaves

func (mysqld *Mysqld) BreakSlaves() error

Force all slaves to error and stop. This is extreme, but helpful for emergencies and tests. Insert a row, block the propagation of its subsequent delete and reinsert it. This forces a failure on slaves only.

func (*Mysqld) CheckReplication

func (mysqld *Mysqld) CheckReplication(timeCheck int64) error

Check for the magic row inserted under controlled reparenting.

func (*Mysqld) CreateMultiSnapshot

func (mysqld *Mysqld) CreateMultiSnapshot(keyRanges []key.KeyRange, dbName, keyName string, sourceAddr string, allowHierarchicalReplication bool, snapshotConcurrency int, tables []string, skipSlaveRestart bool, maximumFilesize uint64, hookExtraEnv map[string]string) (snapshotManifestFilenames []string, err error)

func (*Mysqld) CreateSnapshot

func (mysqld *Mysqld) CreateSnapshot(dbName, sourceAddr string, allowHierarchicalReplication bool, concurrency int, serverMode bool, hookExtraEnv map[string]string) (snapshotManifestUrlPath string, slaveStartRequired, readOnly bool, err error)

This function runs on the machine acting as the source for the clone.

Check master/slave status and determine restore needs. If this instance is a slave, stop replication, otherwise place in read-only mode. Record replication position. Shutdown mysql Check paths for storing data

Depending on the serverMode flag, we do the following: serverMode = false:

Compress /vt/vt_[0-9a-f]+/data/vt_.+
Compute hash (of compressed files, as we serve .gz files here)
Place in /vt/clone_src where they will be served by http server (not rpc)
Restart mysql

serverMode = true:

Make symlinks for /vt/vt_[0-9a-f]+/data/vt_.+ to innodb files
Compute hash (of uncompressed files, as we serve uncompressed files)
Place symlinks in /vt/clone_src where they will be served by http server
Leave mysql stopped, return slaveStartRequired, readOnly

func (*Mysqld) DemoteMaster

func (mysqld *Mysqld) DemoteMaster() (*ReplicationPosition, error)

if the master is still alive, then we need to demote it gracefully make it read-only, flush the writes and get the position

func (*Mysqld) ExecuteMysqlCommand

func (mysqld *Mysqld) ExecuteMysqlCommand(sql string) error

executes some SQL commands using a mysql command line interface process

func (*Mysqld) FindSlaves

func (mysqld *Mysqld) FindSlaves() ([]string, error)

Get IP addresses for all currently connected slaves. FIXME(msolomon) use command instead of user to find "rogue" slaves?

func (*Mysqld) FindVtDatabases

func (mysqld *Mysqld) FindVtDatabases() ([]string, error)

func (*Mysqld) GetColumns

func (mysqld *Mysqld) GetColumns(dbName, table string) ([]string, error)

GetColumns returns the columns of table.

func (*Mysqld) GetMasterAddr

func (mysqld *Mysqld) GetMasterAddr() (string, error)

func (*Mysqld) GetPermissions

func (mysqld *Mysqld) GetPermissions() (*Permissions, error)

func (*Mysqld) GetSchema

func (mysqld *Mysqld) GetSchema(dbName string, tables []string, includeViews bool) (*SchemaDefinition, error)

GetSchema returns the schema for database for tables listed in tables. If tables is empty, return the schema for all tables.

func (*Mysqld) IpAddr

func (mysqld *Mysqld) IpAddr() string

func (*Mysqld) IsReadOnly

func (mysqld *Mysqld) IsReadOnly() (bool, error)

func (*Mysqld) MasterStatus

func (mysqld *Mysqld) MasterStatus() (rp *ReplicationPosition, err error)

mysql> show master status\G **************************** 1. row *************************** File: vt-000001c6-bin.000003 Position: 106 Binlog_Do_DB: Binlog_Ignore_DB: Group_ID:

func (*Mysqld) MultiRestore

func (mysqld *Mysqld) MultiRestore(destinationDbName string, keyRange key.KeyRange, sourceAddrs []*url.URL, snapshotConcurrency, fetchConcurrency, insertTableConcurrency, fetchRetryCount int, strategy string) (err error)

MultiRestore is the main entry point for multi restore.

  • If the strategy contains the string 'writeBinLogs' then we will also write to the binary logs.
  • If the strategy contains the command 'populateBlpCheckpoint' then we will populate the blp_checkpoint table with master positions to start from

func (*Mysqld) PreflightSchemaChange

func (mysqld *Mysqld) PreflightSchemaChange(dbName string, change string) (*SchemaChangeResult, error)

func (*Mysqld) PromoteSlave

func (mysqld *Mysqld) PromoteSlave(setReadWrite bool) (replicationState *ReplicationState, waitPosition *ReplicationPosition, timePromoted int64, err error)

setReadWrite: set the new master in read-write mode.

replicationState: info slaves need to reparent themselves waitPosition: slaves can wait for this position when restarting replication timePromoted: this timestamp (unix nanoseconds) is inserted into _vt.replication_log to verify the replication config

func (*Mysqld) ReparentPosition

func (mysqld *Mysqld) ReparentPosition(slavePosition *ReplicationPosition) (rs *ReplicationState, waitPosition *ReplicationPosition, reparentTime int64, err error)

Return a replication state that will reparent a slave to the correct master for a specified position.

func (*Mysqld) RestartSlave

func (mysqld *Mysqld) RestartSlave(replicationState *ReplicationState, waitPosition *ReplicationPosition, timeCheck int64) error

func (*Mysqld) RestoreFromSnapshot

func (mysqld *Mysqld) RestoreFromSnapshot(snapshotManifest *SnapshotManifest, fetchConcurrency, fetchRetryCount int, dontWaitForSlaveStart bool, hookExtraEnv map[string]string) error

This piece runs on the presumably empty machine acting as the target in the create replica action.

validate target (self) shutdown_mysql() create temp data directory /vt/target/vt_<keyspace> copy compressed data files via HTTP verify hash of compressed files uncompress into /vt/vt_<target-uid>/data/vt_<keyspace> start_mysql() clean up compressed files

func (*Mysqld) SetReadOnly

func (mysqld *Mysqld) SetReadOnly(on bool) error

func (*Mysqld) SlaveStatus

func (mysqld *Mysqld) SlaveStatus() (*ReplicationPosition, error)

func (*Mysqld) SnapshotSourceEnd

func (mysqld *Mysqld) SnapshotSourceEnd(slaveStartRequired, readOnly, deleteSnapshot bool) error

func (*Mysqld) StartSlave

func (mysqld *Mysqld) StartSlave() error

func (*Mysqld) StopSlave

func (mysqld *Mysqld) StopSlave() error

func (*Mysqld) ValidateCloneTarget

func (mysqld *Mysqld) ValidateCloneTarget(hookExtraEnv map[string]string) error

func (*Mysqld) ValidateSnapshotPath

func (mysqld *Mysqld) ValidateSnapshotPath() error

Helper function to make sure we can write to the local snapshot area, before we actually do any action (can be used for both partial and full snapshots)

func (*Mysqld) WaitBlpPos

func (mysqld *Mysqld) WaitBlpPos(bp *BlpPosition, waitTimeout int) error

func (*Mysqld) WaitForSlave

func (mysqld *Mysqld) WaitForSlave(maxLag int) (err error)

func (*Mysqld) WaitForSlaveStart

func (mysqld *Mysqld) WaitForSlaveStart(slaveStartDeadline int) (err error)

func (*Mysqld) WaitMasterPos

func (mysqld *Mysqld) WaitMasterPos(rp *ReplicationPosition, waitTimeout int) error

type Permission

type Permission interface {
	PrimaryKey() string
	String() string
}

type PermissionList

type PermissionList interface {
	Get(int) Permission
	Len() int
}

type Permissions

type Permissions struct {
	UserPermissions UserPermissionList
	DbPermissions   DbPermissionList
	HostPermissions HostPermissionList
}

Permissions have all the rows in mysql.{user,db,host} tables, (all rows are sorted by primary key)

func (*Permissions) String

func (permissions *Permissions) String() string

type ReplicationPosition

type ReplicationPosition struct {
	// MasterLogFile, MasterLogPosition and MasterLogGroupId are
	// the position on the logs for transactions that have been
	// applied (SQL position):
	// - on the master, it's File, Position and Group_ID from
	//   'show master status'.
	// - on the slave, it's Relay_Master_Log_File, Exec_Master_Log_Pos
	//   and Exec_Master_Group_ID from 'show slave status'.
	MasterLogFile     string
	MasterLogPosition uint
	MasterLogGroupId  string

	// MasterLogFileIo and MasterLogPositionIo are the position on the logs
	// that have been downloaded from the master (IO position),
	// but not necessarely applied yet:
	// - on the master, same as MasterLogFile and MasterLogPosition.
	// - on the slave, it's Master_Log_File and Read_Master_Log_Pos
	//   from 'show slave status'.
	MasterLogFileIo     string
	MasterLogPositionIo uint

	// SecondsBehindMaster is how far behind we are in applying logs in
	// replication. If equal to InvalidLagSeconds, it means replication
	// is not running.
	SecondsBehindMaster uint
}

ReplicationPosition tracks the replication position on both a master and a slave.

func (ReplicationPosition) MapKey

func (rp ReplicationPosition) MapKey() string

func (ReplicationPosition) MapKeyIo

func (rp ReplicationPosition) MapKeyIo() string

type ReplicationState

type ReplicationState struct {
	// ReplicationPosition is not anonymous because the default json encoder has begun to fail here.
	ReplicationPosition ReplicationPosition
	MasterHost          string
	MasterPort          int
	MasterConnectRetry  int
}

func NewReplicationState

func NewReplicationState(masterAddr string) (*ReplicationState, error)

func (ReplicationState) MasterAddr

func (rs ReplicationState) MasterAddr() string

type SchemaChange

type SchemaChange struct {
	Sql              string
	Force            bool
	AllowReplication bool
	BeforeSchema     *SchemaDefinition
	AfterSchema      *SchemaDefinition
}

type SchemaChangeResult

type SchemaChangeResult struct {
	BeforeSchema *SchemaDefinition
	AfterSchema  *SchemaDefinition
}

func (*SchemaChangeResult) String

func (scr *SchemaChangeResult) String() string

type SchemaDefinition

type SchemaDefinition struct {
	// the 'CREATE DATABASE...' statement, with db name as {{.DatabaseName}}
	DatabaseSchema string

	// ordered by TableDefinition.Name by default
	TableDefinitions TableDefinitions

	// the md5 of the concatenation of TableDefinition.Schema
	Version string
}

func (*SchemaDefinition) GetTable

func (sd *SchemaDefinition) GetTable(table string) (td *TableDefinition, ok bool)

func (*SchemaDefinition) SortByReverseDataLength

func (sd *SchemaDefinition) SortByReverseDataLength()

func (*SchemaDefinition) String

func (sd *SchemaDefinition) String() string

type SendUpdateStreamResponse

type SendUpdateStreamResponse func(response interface{}) error

type SnapshotFile

type SnapshotFile struct {
	Path      string
	Size      int64
	Hash      string
	TableName string
}

SnapshotFile describes a file to serve. 'Path' is the path component of the URL. SnapshotManifest.Addr is the host+port component of the URL. If path ends in '.gz', it is compressed. Size and Hash are computed on the Path itself if TableName is set, this file belongs to that table

type SnapshotFiles

type SnapshotFiles []SnapshotFile

func (SnapshotFiles) Len

func (s SnapshotFiles) Len() int

sort.Interface we sort by descending file size

func (SnapshotFiles) Less

func (s SnapshotFiles) Less(i, j int) bool

func (SnapshotFiles) Swap

func (s SnapshotFiles) Swap(i, j int)

type SnapshotManifest

type SnapshotManifest struct {
	Addr string // this is the address of the tabletserver, not mysql

	DbName string
	Files  SnapshotFiles

	ReplicationState *ReplicationState
	MasterState      *ReplicationState
}

a SnapshotManifest describes multiple SnapshotFiles and where to get them from.

func ReadSnapshotManifest

func ReadSnapshotManifest(filename string) (*SnapshotManifest, error)

type SplitSnapshotManifest

type SplitSnapshotManifest struct {
	// Source describes the files and our tablet
	Source *SnapshotManifest

	// KeyRange describes the data present in this snapshot
	// When splitting 40-80 into 40-60 and 60-80, this would
	// have 40-60 for instance.
	KeyRange key.KeyRange

	// The schema for this server
	SchemaDefinition *SchemaDefinition
}

func NewSplitSnapshotManifest

func NewSplitSnapshotManifest(myAddr, myMysqlAddr, masterAddr, dbName string, files []SnapshotFile, pos, myMasterPos *ReplicationPosition, keyRange key.KeyRange, sd *SchemaDefinition) (*SplitSnapshotManifest, error)

NewSplitSnapshotManifest creates a new SplitSnapshotManifest. myAddr and myMysqlAddr are the local server addresses. masterAddr is the address of the server to use as master. pos is the replication position to use on that master. myMasterPos is the local server master position

type TableDefinition

type TableDefinition struct {
	Name       string   // the table name
	Schema     string   // the SQL to run to create the table
	Columns    []string // the columns in the order that will be used to dump and load the data
	Type       string   // TABLE_BASE_TABLE or TABLE_VIEW
	DataLength uint64   // how much space the data file takes.
}

type TableDefinitions

type TableDefinitions []TableDefinition

helper methods for sorting

func (TableDefinitions) Len

func (tds TableDefinitions) Len() int

func (TableDefinitions) Swap

func (tds TableDefinitions) Swap(i, j int)

type UpdateResponse

type UpdateResponse struct {
	Coord proto.BinlogPosition
	Data  EventData
}

Api Interface

type UpdateStream

type UpdateStream struct {
	// contains filtered or unexported fields
}
var UpdateStreamRpcService *UpdateStream

func (*UpdateStream) ServeUpdateStream

func (updateStream *UpdateStream) ServeUpdateStream(req *UpdateStreamRequest, sendReply SendUpdateStreamResponse) (err error)

type UpdateStreamRequest

type UpdateStreamRequest struct {
	StartPosition proto.BinlogPosition
}

type UserPermission

type UserPermission struct {
	Host             string
	User             string
	PasswordChecksum uint64
	Privileges       map[string]string
}

UserPermission describes a single row in the mysql.user table Primary key is Host+User PasswordChecksum is the crc64 of the password, for security reasons

func (*UserPermission) PrimaryKey

func (up *UserPermission) PrimaryKey() string

func (*UserPermission) String

func (up *UserPermission) String() string

type UserPermissionList

type UserPermissionList []*UserPermission

func (UserPermissionList) Get

func (upl UserPermissionList) Get(i int) Permission

func (UserPermissionList) Len

func (upl UserPermissionList) Len() int

type VtClient

type VtClient interface {
	Connect() error
	Begin() error
	Commit() error
	Rollback() error
	Close()
	ExecuteFetch(query string, maxrows int, wantfields bool) (qr *proto.QueryResult, err error)
}

VtClient is a high level interface to the database

type VtReplParams

type VtReplParams struct {
	StartKey string
	EndKey   string
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL