ghostferry

package module
v0.0.0-...-cf24988 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 25, 2018 License: MIT Imports: 32 Imported by: 0

README

Ghostferry

Ghostferry is a library that enables you to selectively copy data from one mysql instance to another with minimal amount of downtime.

It is inspired by Github's gh-ost, although instead of copying data from and to the same database, Ghostferry copies data from one database to another and have the ability to only partially copy data.

There is an example application called ghostferry-copydb included (under the copydb directory) that demonstrates this library by copying an entire database from one machine to another.

Overview of How it Works

An overview of Ghostferry's high-level design is expressed in the TLA+ specification, under the tlaplus directory. It maybe good to consult with that as it has a concise definition. However, the specification might not be entirely correct as proofs remain elusive.

On a high-level, Ghostferry is broken into several components, enabling it to copy data. This is detailed in docs/source/technicaloverview.rst.

A more detailed documentation is coming soon.

Development Setup

Install:

  • Have Docker installed
  • Clone the repo
  • docker-compose up -d mysql-1 mysql-2

Run tests:

  • make test

Test copydb:

  • make copydb && ghostferry-copydb -verbose examples/copydb/conf.json

Documentation

Index

Constants

View Source
const (
	StateStarting          = "starting"
	StateCopying           = "copying"
	StateWaitingForCutover = "wait-for-cutover"
	StateCutover           = "cutover"
	StateDone              = "done"
)

Variables

View Source
var (
	VersionString string = "?.?.?+??????????????+???????"
	WebUiBasedir  string = ""
)

Functions

func DefaultBuildSelect

func DefaultBuildSelect(columns []string, table *schema.Table, lastPk, batchSize uint64) squirrel.SelectBuilder

func GetMd5HashesSql

func GetMd5HashesSql(schema, table, pkColumn string, columns []schema.TableColumn, pks []uint64) (string, []interface{}, error)

func Int64Value

func Int64Value(value interface{}) (int64, bool)

func MaskedDSN

func MaskedDSN(c *mysql.Config) string

func MaxPrimaryKeys

func MaxPrimaryKeys(db *sql.DB, tables []*schema.Table, logger *logrus.Entry) (map[*schema.Table]uint64, []*schema.Table, error)

func QuotedTableName

func QuotedTableName(table *schema.Table) string

func QuotedTableNameFromString

func QuotedTableNameFromString(database, table string) string

func Uint64Value

func Uint64Value(value interface{}) (uint64, bool)

func WaitForThrottle

func WaitForThrottle(t Throttler)

func WithRetries

func WithRetries(maxRetries int, sleep time.Duration, logger *logrus.Entry, verb string, f func() error) (err error)

func WithRetriesContext

func WithRetriesContext(ctx context.Context, maxRetries int, sleep time.Duration, logger *logrus.Entry, verb string, f func() error) (err error)

Types

type AtomicBoolean

type AtomicBoolean int32

func (*AtomicBoolean) Get

func (a *AtomicBoolean) Get() bool

func (*AtomicBoolean) Set

func (a *AtomicBoolean) Set(b bool)

type BatchWriter

type BatchWriter struct {
	DB *sql.DB

	DatabaseRewrites map[string]string
	TableRewrites    map[string]string

	WriteRetries int
	// contains filtered or unexported fields
}

func (*BatchWriter) Initialize

func (w *BatchWriter) Initialize()

func (*BatchWriter) WriteRowBatch

func (w *BatchWriter) WriteRowBatch(batch *RowBatch) error

type BinlogDeleteEvent

type BinlogDeleteEvent struct {
	*DMLEventBase
	// contains filtered or unexported fields
}

func (*BinlogDeleteEvent) AsSQLString

func (e *BinlogDeleteEvent) AsSQLString(target *schema.Table) (string, error)

func (*BinlogDeleteEvent) NewValues

func (e *BinlogDeleteEvent) NewValues() RowData

func (*BinlogDeleteEvent) OldValues

func (e *BinlogDeleteEvent) OldValues() RowData

func (*BinlogDeleteEvent) PK

func (e *BinlogDeleteEvent) PK() (uint64, error)

type BinlogInsertEvent

type BinlogInsertEvent struct {
	*DMLEventBase
	// contains filtered or unexported fields
}

func (*BinlogInsertEvent) AsSQLString

func (e *BinlogInsertEvent) AsSQLString(target *schema.Table) (string, error)

func (*BinlogInsertEvent) NewValues

func (e *BinlogInsertEvent) NewValues() RowData

func (*BinlogInsertEvent) OldValues

func (e *BinlogInsertEvent) OldValues() RowData

func (*BinlogInsertEvent) PK

func (e *BinlogInsertEvent) PK() (uint64, error)

type BinlogStreamer

type BinlogStreamer struct {
	Db           *sql.DB
	Config       *Config
	ErrorHandler ErrorHandler
	Filter       CopyFilter

	TableSchema TableSchemaCache
	// contains filtered or unexported fields
}

func (*BinlogStreamer) AddEventListener

func (s *BinlogStreamer) AddEventListener(listener func([]DMLEvent) error)

func (*BinlogStreamer) ConnectBinlogStreamerToMysql

func (s *BinlogStreamer) ConnectBinlogStreamerToMysql() error

func (*BinlogStreamer) FlushAndStop

func (s *BinlogStreamer) FlushAndStop()

func (*BinlogStreamer) GetLastStreamedBinlogPosition

func (s *BinlogStreamer) GetLastStreamedBinlogPosition() mysql.Position

func (*BinlogStreamer) Initialize

func (s *BinlogStreamer) Initialize() (err error)

func (*BinlogStreamer) IsAlmostCaughtUp

func (s *BinlogStreamer) IsAlmostCaughtUp() bool

func (*BinlogStreamer) Run

func (s *BinlogStreamer) Run()

type BinlogUpdateEvent

type BinlogUpdateEvent struct {
	*DMLEventBase
	// contains filtered or unexported fields
}

func (*BinlogUpdateEvent) AsSQLString

func (e *BinlogUpdateEvent) AsSQLString(target *schema.Table) (string, error)

func (*BinlogUpdateEvent) NewValues

func (e *BinlogUpdateEvent) NewValues() RowData

func (*BinlogUpdateEvent) OldValues

func (e *BinlogUpdateEvent) OldValues() RowData

func (*BinlogUpdateEvent) PK

func (e *BinlogUpdateEvent) PK() (uint64, error)

type BinlogWriter

type BinlogWriter struct {
	DB               *sql.DB
	DatabaseRewrites map[string]string
	TableRewrites    map[string]string
	Throttler        Throttler

	BatchSize    int
	WriteRetries int

	ErrorHandler ErrorHandler
	// contains filtered or unexported fields
}

func (*BinlogWriter) BufferBinlogEvents

func (b *BinlogWriter) BufferBinlogEvents(events []DMLEvent) error

func (*BinlogWriter) Initialize

func (b *BinlogWriter) Initialize() error

func (*BinlogWriter) Run

func (b *BinlogWriter) Run()

func (*BinlogWriter) Stop

func (b *BinlogWriter) Stop()

type ChecksumTableVerifier

type ChecksumTableVerifier struct {
	Tables           []*schema.Table
	DatabaseRewrites map[string]string
	TableRewrites    map[string]string
	SourceDB         *sql.DB
	TargetDB         *sql.DB
	// contains filtered or unexported fields
}

func (*ChecksumTableVerifier) Result

func (*ChecksumTableVerifier) StartInBackground

func (v *ChecksumTableVerifier) StartInBackground() error

func (*ChecksumTableVerifier) Verify

func (*ChecksumTableVerifier) Wait

func (v *ChecksumTableVerifier) Wait()

type Config

type Config struct {
	// Source database connection configuration
	//
	// Required
	Source DatabaseConfig

	// Target database connection configuration
	//
	// Required
	Target DatabaseConfig

	// Map database name on the source database (key of the map) to a
	// different name on the target database (value of the associated key).
	// This allows one to move data and change the database name in the
	// process.
	//
	// Optional: defaults to empty map/no rewrites
	DatabaseRewrites map[string]string

	// Map the table name on the source dataabase to a different name on
	// the target database. See DatabaseRewrite.
	//
	// Optional: defaults to empty map/no rewrites
	TableRewrites map[string]string

	// The maximum number of retries for writes if the writes failed on
	// the target database.
	//
	// Optional: defaults to 5.
	DBWriteRetries int

	// Filter out the databases/tables when detecting the source databases
	// and tables.
	//
	// Required
	TableFilter TableFilter

	// Filter out unwanted data/events from being copied.
	//
	// Optional: defaults to nil/no filter.
	CopyFilter CopyFilter

	// The server id used by Ghostferry to connect to MySQL as a replication
	// slave. This id must be unique on the MySQL server. If 0 is specified,
	// a random id will be generated upon connecting to the MySQL server.
	//
	// Optional: defaults to an automatically generated one
	MyServerId uint32

	// The maximum number of binlog events to write at once. Note this is a
	// maximum: if there are not a lot of binlog events, they will be written
	// one at a time such the binlog streamer lag is as low as possible. This
	// batch size will only be hit if there is a log of binlog at the same time.
	//
	// Optional: defaults to 100
	BinlogEventBatchSize int

	// The batch size used to iterate the data during data copy. This batch size
	// is always used: if this is specified to be 100, 100 rows will be copied
	// per iteration.
	//
	// With the current implementation of Ghostferry, we need to lock the rows
	// we select. This means, the larger this number is, the longer we need to
	// hold this lock. On the flip side, the smaller this number is, the slower
	// the copy will likely be.
	//
	// Optional: defaults to 200
	DataIterationBatchSize uint64

	// The maximum number of retries for reads if the reads fail on the source
	// database.
	//
	// Optional: defaults to 5
	DBReadRetries int

	// This specify the number of concurrent goroutines, each iterating over
	// a single table.
	//
	// At this point in time, parallelize iteration within a single table. This
	// may be possible to add to the future.
	//
	// Optional: defaults to 4
	DataIterationConcurrency int

	// This specifies if Ghostferry will pause before cutover or not.
	//
	// Optional: defaults to false
	AutomaticCutover bool

	// Config for the ControlServer
	ServerBindAddr string
	WebBasedir     string
}

func (*Config) ValidateConfig

func (c *Config) ValidateConfig() error

type ControlServer

type ControlServer struct {
	F        *Ferry
	Verifier Verifier
	Addr     string
	Basedir  string
	// contains filtered or unexported fields
}

func (*ControlServer) HandleCutover

func (this *ControlServer) HandleCutover(w http.ResponseWriter, r *http.Request)

func (*ControlServer) HandleIndex

func (this *ControlServer) HandleIndex(w http.ResponseWriter, r *http.Request)

func (*ControlServer) HandlePause

func (this *ControlServer) HandlePause(w http.ResponseWriter, r *http.Request)

func (*ControlServer) HandleStop

func (this *ControlServer) HandleStop(w http.ResponseWriter, r *http.Request)

func (*ControlServer) HandleUnpause

func (this *ControlServer) HandleUnpause(w http.ResponseWriter, r *http.Request)

func (*ControlServer) HandleVerify

func (this *ControlServer) HandleVerify(w http.ResponseWriter, r *http.Request)

func (*ControlServer) Initialize

func (this *ControlServer) Initialize() (err error)

func (*ControlServer) Run

func (this *ControlServer) Run(wg *sync.WaitGroup)

func (*ControlServer) ServeHTTP

func (this *ControlServer) ServeHTTP(w http.ResponseWriter, r *http.Request)

func (*ControlServer) Shutdown

func (this *ControlServer) Shutdown() error

type CopyFilter

type CopyFilter interface {
	// BuildSelect is used to set up the query used for batch data copying,
	// allowing for restricting copying to a subset of data. Returning an error
	// here will cause the query to be retried, until the retry limit is
	// reached, at which point the ferry will be aborted. BuildSelect is passed
	// the columns to be selected, table being copied, the last primary key value
	// from the previous batch, and the batch size. Call DefaultBuildSelect to
	// generate the default query, which may be used as a starting point.
	BuildSelect([]string, *schema.Table, uint64, uint64) (sq.SelectBuilder, error)

	// ApplicableEvent is used to filter events for rows that have been
	// filtered in ConstrainSelect. ApplicableEvent should return true if the
	// event is for a row that would be selected by ConstrainSelect, and false
	// otherwise.
	// Returning an error here will cause the ferry to be aborted.
	ApplicableEvent(DMLEvent) (bool, error)
}

CopyFilter provides an interface for restricting the copying to a subset of data. This typically involves adding a WHERE condition in the ConstrainSelect function, and returning false for unwanted rows in ApplicableEvent.

type CountMetric

type CountMetric struct {
	MetricBase
	Value int64
}

type Cursor

type Cursor struct {
	CursorConfig

	Table         *schema.Table
	MaxPrimaryKey uint64
	RowLock       bool
	// contains filtered or unexported fields
}

func (*Cursor) Each

func (c *Cursor) Each(f func(*RowBatch) error) error

func (*Cursor) Fetch

func (c *Cursor) Fetch(db SqlPreparer) (batch *RowBatch, pkpos uint64, err error)

type CursorConfig

type CursorConfig struct {
	DB        *sql.DB
	Throttler Throttler

	ColumnsToSelect []string
	BuildSelect     func([]string, *schema.Table, uint64, uint64) (squirrel.SelectBuilder, error)
	BatchSize       uint64
	ReadRetries     int
}

func (*CursorConfig) NewCursor

func (c *CursorConfig) NewCursor(table *schema.Table, maxPk uint64) *Cursor

returns a new Cursor with an embedded copy of itself

func (*CursorConfig) NewCursorWithoutRowLock

func (c *CursorConfig) NewCursorWithoutRowLock(table *schema.Table, maxPk uint64) *Cursor

returns a new Cursor with an embedded copy of itself

type DMLEvent

type DMLEvent interface {
	Database() string
	Table() string
	TableSchema() *schema.Table
	AsSQLString(target *schema.Table) (string, error)
	OldValues() RowData
	NewValues() RowData
	PK() (uint64, error)
}

func NewBinlogDMLEvents

func NewBinlogDMLEvents(table *schema.Table, ev *replication.BinlogEvent) ([]DMLEvent, error)

func NewBinlogDeleteEvents

func NewBinlogDeleteEvents(table *schema.Table, rowsEvent *replication.RowsEvent) ([]DMLEvent, error)

func NewBinlogInsertEvents

func NewBinlogInsertEvents(table *schema.Table, rowsEvent *replication.RowsEvent) ([]DMLEvent, error)

func NewBinlogUpdateEvents

func NewBinlogUpdateEvents(table *schema.Table, rowsEvent *replication.RowsEvent) ([]DMLEvent, error)

type DMLEventBase

type DMLEventBase struct {
	// contains filtered or unexported fields
}

The base of DMLEvent to provide the necessary methods. This desires a copy of the struct in case we want to deal with schema changes in the future.

func (*DMLEventBase) Database

func (e *DMLEventBase) Database() string

func (*DMLEventBase) Table

func (e *DMLEventBase) Table() string

func (*DMLEventBase) TableSchema

func (e *DMLEventBase) TableSchema() *schema.Table

type DataIterator

type DataIterator struct {
	DB          *sql.DB
	Tables      []*schema.Table
	Concurrency int

	ErrorHandler ErrorHandler
	CursorConfig *CursorConfig

	CurrentState *DataIteratorState
	// contains filtered or unexported fields
}

func (*DataIterator) AddBatchListener

func (d *DataIterator) AddBatchListener(listener func(*RowBatch) error)

func (*DataIterator) AddDoneListener

func (d *DataIterator) AddDoneListener(listener func() error)

func (*DataIterator) Initialize

func (d *DataIterator) Initialize() error

func (*DataIterator) Run

func (d *DataIterator) Run()

type DataIteratorState

type DataIteratorState struct {
	// contains filtered or unexported fields
}

func (*DataIteratorState) CompletedTables

func (this *DataIteratorState) CompletedTables() map[string]bool

func (*DataIteratorState) EstimatedPKProcessedPerSecond

func (this *DataIteratorState) EstimatedPKProcessedPerSecond() float64

func (*DataIteratorState) LastSuccessfulPrimaryKeys

func (this *DataIteratorState) LastSuccessfulPrimaryKeys() map[string]uint64

func (*DataIteratorState) MarkTableAsCompleted

func (this *DataIteratorState) MarkTableAsCompleted(table string)

func (*DataIteratorState) TargetPrimaryKeys

func (this *DataIteratorState) TargetPrimaryKeys() map[string]uint64

func (*DataIteratorState) UpdateLastSuccessfulPK

func (this *DataIteratorState) UpdateLastSuccessfulPK(table string, pk uint64)

func (*DataIteratorState) UpdateTargetPK

func (this *DataIteratorState) UpdateTargetPK(table string, pk uint64)

type DatabaseConfig

type DatabaseConfig struct {
	Host      string
	Port      uint16
	User      string
	Pass      string
	Collation string
	Params    map[string]string

	TLS *TLSConfig
}

func (*DatabaseConfig) MySQLConfig

func (c *DatabaseConfig) MySQLConfig() (*mysql.Config, error)

func (*DatabaseConfig) Validate

func (c *DatabaseConfig) Validate() error

type ErrorHandler

type ErrorHandler interface {
	Fatal(from string, err error)
}

type Ferry

type Ferry struct {
	*Config

	SourceDB *sql.DB
	TargetDB *sql.DB

	BinlogStreamer *BinlogStreamer
	BinlogWriter   *BinlogWriter

	DataIterator *DataIterator
	BatchWriter  *BatchWriter

	ErrorHandler ErrorHandler
	Throttler    Throttler

	Tables TableSchemaCache

	StartTime    time.Time
	DoneTime     time.Time
	OverallState string
	// contains filtered or unexported fields
}

func (*Ferry) FlushBinlogAndStopStreaming

func (f *Ferry) FlushBinlogAndStopStreaming()

After you stop writing to the source and made sure that all inflight transactions to the source are completed, call this method to ensure that the binlog streaming has caught up and stop the binlog streaming.

This method will actually not shutdown the BinlogStreamer immediately. You will know that the BinlogStreamer finished when .Run() returns.

func (*Ferry) Initialize

func (f *Ferry) Initialize() (err error)

Initialize all the components of Ghostferry and connect to the Database

func (*Ferry) Run

func (f *Ferry) Run()

Spawns the background tasks that actually perform the run. Wait for the background tasks to finish.

func (*Ferry) RunStandaloneDataCopy

func (f *Ferry) RunStandaloneDataCopy(tables []*schema.Table) error

func (*Ferry) Start

func (f *Ferry) Start() error

Determine the binlog coordinates, table mapping for the pending Ghostferry run.

func (*Ferry) WaitUntilBinlogStreamerCatchesUp

func (f *Ferry) WaitUntilBinlogStreamerCatchesUp()

func (*Ferry) WaitUntilRowCopyIsComplete

func (f *Ferry) WaitUntilRowCopyIsComplete()

Call this method and perform the cutover after this method returns.

type GaugeMetric

type GaugeMetric struct {
	MetricBase
	Value float64
}

type IncompleteVerificationError

type IncompleteVerificationError struct{}

func (IncompleteVerificationError) Error

type IterativeVerifier

type IterativeVerifier struct {
	CursorConfig     *CursorConfig
	BinlogStreamer   *BinlogStreamer
	TableSchemaCache TableSchemaCache
	SourceDB         *sql.DB
	TargetDB         *sql.DB

	Tables           []*schema.Table
	IgnoredTables    []string
	DatabaseRewrites map[string]string
	TableRewrites    map[string]string
	Concurrency      int
	// contains filtered or unexported fields
}

func (*IterativeVerifier) GetHashes

func (v *IterativeVerifier) GetHashes(db *sql.DB, schema, table, pkColumn string, columns []schema.TableColumn, pks []uint64) (map[uint64][]byte, error)

func (*IterativeVerifier) Initialize

func (v *IterativeVerifier) Initialize() error

func (*IterativeVerifier) Result

func (*IterativeVerifier) SanityCheckParameters

func (v *IterativeVerifier) SanityCheckParameters() error

func (*IterativeVerifier) StartInBackground

func (v *IterativeVerifier) StartInBackground() error

func (*IterativeVerifier) VerifyBeforeCutover

func (v *IterativeVerifier) VerifyBeforeCutover() error

func (*IterativeVerifier) VerifyDuringCutover

func (v *IterativeVerifier) VerifyDuringCutover() (VerificationResult, error)

func (*IterativeVerifier) VerifyOnce

func (v *IterativeVerifier) VerifyOnce() (VerificationResult, error)

func (*IterativeVerifier) Wait

func (v *IterativeVerifier) Wait()

type LagThrottler

type LagThrottler struct {
	ThrottlerBase
	PauserThrottler

	DB *sql.DB
	// contains filtered or unexported fields
}

func NewLagThrottler

func NewLagThrottler(config *LagThrottlerConfig) (*LagThrottler, error)

func (*LagThrottler) Run

func (t *LagThrottler) Run(ctx context.Context) error

func (*LagThrottler) Throttled

func (t *LagThrottler) Throttled() bool

type LagThrottlerConfig

type LagThrottlerConfig struct {
	Connection     DatabaseConfig
	MaxLag         int
	Query          string
	UpdateInterval string
}

type MetricBase

type MetricBase struct {
	Key        string
	Tags       []MetricTag
	SampleRate float64
}

type MetricTag

type MetricTag struct {
	Name  string
	Value string
}

type Metrics

type Metrics struct {
	Prefix      string
	DefaultTags []MetricTag
	Sink        chan interface{}
	// contains filtered or unexported fields
}

func SetGlobalMetrics

func SetGlobalMetrics(prefix string, sink chan interface{}) *Metrics

func (*Metrics) AddConsumer

func (m *Metrics) AddConsumer()

func (*Metrics) Count

func (m *Metrics) Count(key string, value int64, tags []MetricTag, sampleRate float64)

func (*Metrics) DoneConsumer

func (m *Metrics) DoneConsumer()

func (*Metrics) Gauge

func (m *Metrics) Gauge(key string, value float64, tags []MetricTag, sampleRate float64)

func (*Metrics) Measure

func (m *Metrics) Measure(key string, tags []MetricTag, sampleRate float64, f func())

func (*Metrics) StopAndFlush

func (m *Metrics) StopAndFlush()

func (*Metrics) Timer

func (m *Metrics) Timer(key string, duration time.Duration, tags []MetricTag, sampleRate float64)

type PKPositionLog

type PKPositionLog struct {
	Position uint64
	At       time.Time
}

type PanicErrorHandler

type PanicErrorHandler struct {
	Ferry *Ferry
	// contains filtered or unexported fields
}

func (*PanicErrorHandler) Fatal

func (this *PanicErrorHandler) Fatal(from string, err error)

type PauserThrottler

type PauserThrottler struct {
	ThrottlerBase
	// contains filtered or unexported fields
}

func (*PauserThrottler) Run

func (t *PauserThrottler) Run(ctx context.Context) error

func (*PauserThrottler) SetPaused

func (t *PauserThrottler) SetPaused(paused bool)

func (*PauserThrottler) Throttled

func (t *PauserThrottler) Throttled() bool

type ReverifyBatch

type ReverifyBatch struct {
	Pks   []uint64
	Table TableIdentifier
}

type ReverifyEntry

type ReverifyEntry struct {
	Pk    uint64
	Table *schema.Table
}

type ReverifyStore

type ReverifyStore struct {
	MapStore map[TableIdentifier]map[uint64]struct{}

	BatchStore         []ReverifyBatch
	RowCount           uint64
	EmitLogPerRowCount uint64
	// contains filtered or unexported fields
}

func NewReverifyStore

func NewReverifyStore() *ReverifyStore

func (*ReverifyStore) Add

func (r *ReverifyStore) Add(entry ReverifyEntry)

func (ReverifyStore) FlushAndBatchByTable

func (r ReverifyStore) FlushAndBatchByTable(batchsize int) []ReverifyBatch

type RowBatch

type RowBatch struct {
	// contains filtered or unexported fields
}

func NewRowBatch

func NewRowBatch(table *schema.Table, values []RowData, pkIndex int) *RowBatch

func (*RowBatch) AsSQLQuery

func (e *RowBatch) AsSQLQuery(target *schema.Table) (string, []interface{}, error)

func (*RowBatch) PkIndex

func (e *RowBatch) PkIndex() int

func (*RowBatch) Size

func (e *RowBatch) Size() int

func (*RowBatch) TableSchema

func (e *RowBatch) TableSchema() *schema.Table

func (*RowBatch) Values

func (e *RowBatch) Values() []RowData

func (*RowBatch) ValuesContainPk

func (e *RowBatch) ValuesContainPk() bool

type RowData

type RowData []interface{}

func ScanGenericRow

func ScanGenericRow(rows *sql.Rows, columnCount int) (RowData, error)

func (RowData) GetUint64

func (r RowData) GetUint64(colIdx int) (res uint64, err error)

The mysql driver never actually gives you a uint64 from Scan, instead you get an int64 for values that fit in int64 or a byte slice decimal string with the uint64 value in it.

type SqlDBWithFakeRollback

type SqlDBWithFakeRollback struct {
	*sql.DB
}

func (*SqlDBWithFakeRollback) Rollback

func (d *SqlDBWithFakeRollback) Rollback() error

type SqlPreparer

type SqlPreparer interface {
	Prepare(string) (*sql.Stmt, error)
}

both `sql.Tx` and `sql.DB` allow a SQL query to be `Prepare`d

type SqlPreparerAndRollbacker

type SqlPreparerAndRollbacker interface {
	SqlPreparer
	Rollback() error
}

sql.DB does not implement Rollback, but can use SqlDBWithFakeRollback to perform a noop.

type Status

type Status struct {
	GhostferryVersion string

	SourceHostPort string
	TargetHostPort string

	OverallState      string
	StartTime         time.Time
	CurrentTime       time.Time
	TimeTaken         time.Duration
	ETA               time.Duration
	BinlogStreamerLag time.Duration
	PKsPerSecond      uint64

	AutomaticCutover            bool
	BinlogStreamerStopRequested bool
	LastSuccessfulBinlogPos     mysql.Position
	TargetBinlogPos             mysql.Position

	Throttled bool

	CompletedTableCount int
	TotalTableCount     int
	TableStatuses       []*TableStatus
	AllTableNames       []string
	AllDatabaseNames    []string

	VerifierSupport     bool
	VerifierAvailable   bool
	VerificationStarted bool
	VerificationDone    bool
	VerificationResult  VerificationResult
	VerificationErr     error
}

func FetchStatus

func FetchStatus(f *Ferry, v Verifier) *Status

type TLSConfig

type TLSConfig struct {
	CertPath   string
	ServerName string
	// contains filtered or unexported fields
}

func (*TLSConfig) BuildConfig

func (this *TLSConfig) BuildConfig() (*tls.Config, error)

type TableFilter

type TableFilter interface {
	ApplicableTables([]*schema.Table) ([]*schema.Table, error)
	ApplicableDatabases([]string) ([]string, error)
}

type TableIdentifier

type TableIdentifier struct {
	SchemaName string
	TableName  string
}

A comparable and lightweight type that stores the schema and table name.

func NewTableIdentifierFromSchemaTable

func NewTableIdentifierFromSchemaTable(table *schema.Table) TableIdentifier

type TableSchemaCache

type TableSchemaCache map[string]*schema.Table

func LoadTables

func LoadTables(db *sql.DB, tableFilter TableFilter) (TableSchemaCache, error)

func (TableSchemaCache) AllTableNames

func (c TableSchemaCache) AllTableNames() (tableNames []string)

func (TableSchemaCache) AsSlice

func (c TableSchemaCache) AsSlice() (tables []*schema.Table)

func (TableSchemaCache) Get

func (c TableSchemaCache) Get(database, table string) *schema.Table

type TableStatus

type TableStatus struct {
	TableName        string
	PrimaryKeyName   string
	Status           string
	LastSuccessfulPK uint64
	TargetPK         uint64
}

type Throttler

type Throttler interface {
	Throttled() bool
	Disabled() bool
	SetDisabled(bool)
	SetPaused(bool)
	Run(context.Context) error
}

type ThrottlerBase

type ThrottlerBase struct {
	// contains filtered or unexported fields
}

func (*ThrottlerBase) Disabled

func (t *ThrottlerBase) Disabled() bool

func (*ThrottlerBase) SetDisabled

func (t *ThrottlerBase) SetDisabled(disabled bool)

type TimerMetric

type TimerMetric struct {
	MetricBase
	Value time.Duration
}

type VerificationResult

type VerificationResult struct {
	DataCorrect bool
	Message     string
}

func (VerificationResult) Error

func (e VerificationResult) Error() string

type VerificationResultAndStatus

type VerificationResultAndStatus struct {
	VerificationResult

	StartTime time.Time
	DoneTime  time.Time
}

func (VerificationResultAndStatus) IsDone

func (r VerificationResultAndStatus) IsDone() bool

func (VerificationResultAndStatus) IsStarted

func (r VerificationResultAndStatus) IsStarted() bool

type Verifier

type Verifier interface {
	// Start the verifier in the background during the cutover phase.
	// Traditionally, this is called from within the ControlServer.
	//
	// This method maybe called multiple times and it's up to the verifier
	// to decide if it is possible to re-run the verification.
	StartInBackground() error

	// Wait for the verifier until it finishes verification after it was
	// started with the StartInBackground.
	//
	// A verification is "done" when it verified the dbs (either
	// correct or incorrect) OR when it experiences an error.
	Wait()

	// Returns the result and the status of the verification.
	// To check the status, call IsStarted() and IsDone() on
	// VerificationResultAndStatus.
	//
	// If the verification has been completed successfully (without errors) and
	// the data checks out to be "correct", the result will be
	// VerificationResult{true, ""}, with error = nil.
	// Otherwise, the result will be VerificationResult{false, "message"}, with
	// error = nil.
	//
	// If the verification is "done" but experienced an error during the check,
	// the result will be VerificationResult{}, with err = yourErr.
	Result() (VerificationResultAndStatus, error)
}

The sole purpose of this interface is to make it easier for one to implement their own strategy for verification and hook it up with the ControlServer. If there is no such need, one does not need to implement this interface.

type WorkerPool

type WorkerPool struct {
	Concurrency int
	Process     func(int) (interface{}, error)
}

func (*WorkerPool) Run

func (p *WorkerPool) Run(n int) ([]interface{}, error)

Returns a list of results of the size same as the concurrency number. Returns the first error that occurs during the run. Also as soon as a single worker errors, all workers terminates.

Directories

Path Synopsis
cmd
cmd

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL