clone

package
v0.0.0-...-87eadf3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 26, 2022 License: Apache-2.0 Imports: 54 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func BatchTableWrites

func BatchTableWrites(ctx context.Context, diffs chan Diff, batches chan Batch) error

BatchTableWrites consumes diffs for a single table and batches them up into batches by type

func BatchWrites

func BatchWrites(ctx context.Context, diffs chan Diff, batches chan Batch) error

BatchWrites consumes diffs and batches them up into batches by type and table

func CloseConnections

func CloseConnections(conns []*sql.Conn)

func IndefiniteExponentialBackOff

func IndefiniteExponentialBackOff() *backoff.ExponentialBackOff

func OpenConnections

func OpenConnections(ctx context.Context, db *sql.DB, count int) ([]*sql.Conn, error)

OpenConnections opens count connections

func OpenSyncedConnections

func OpenSyncedConnections(ctx context.Context, source *sql.DB, count int) ([]*sql.Conn, error)

OpenSyncedConnections opens count connections that have a synchronized view of the database

func PKSetString

func PKSetString(t Transaction) string

func RestartLoop

func RestartLoop(ctx context.Context, b backoff.BackOff, loop func(b backoff.BackOff) error) func() error

func Retry

func Retry(ctx context.Context, options RetryOptions, f func(context.Context) error) error

Retry retries with back off

func RowsEqual

func RowsEqual(sourceRow *Row, targetRow *Row) (bool, error)

Types

type Batch

type Batch struct {
	Type  MutationType
	Table *Table
	Rows  []*Row
}

func BatchTableWritesSync

func BatchTableWritesSync(diffs []Diff) ([]Batch, error)

BatchTableWritesSync synchronously batches up diffs into batches by type

type Checksum

type Checksum struct {
	ReaderConfig
}

func (*Checksum) Run

func (cmd *Checksum) Run() error

Run finds any differences between source and target

type Chunk

type Chunk struct {
	Table *Table

	// Seq is the sequence number of chunks for this table
	Seq int64

	// Start is the first id of the chunk inclusive. If nil, chunk starts at -inf.
	Start []interface{}
	// End is the first id of the next chunk (i.e. the last id of this chunk exclusively). If nil, chunk ends at +inf.
	End []interface{} // exclusive

	// First chunk of a table
	First bool
	// Last chunk of a table
	Last bool

	// Size is the expected number of rows in the chunk
	Size int
}

Chunk is an chunk of rows closed to the left [start,end)

func (*Chunk) ContainsKeys

func (c *Chunk) ContainsKeys(keys []interface{}) bool

func (*Chunk) ContainsRow

func (c *Chunk) ContainsRow(row []interface{}) bool

func (*Chunk) String

func (c *Chunk) String() string

type ChunkSnapshot

type ChunkSnapshot struct {
	InsideWatermarks bool
	Rows             []*Row
	Chunk            Chunk
}

ChunkSnapshot is a mutable struct for representing the current reconciliation state of a chunk, it is used single threaded by the replication thread only

type Clone

type Clone struct {
	WriterConfig
}

func (*Clone) Run

func (cmd *Clone) Run() error

Run finds any differences between source and target

type Config

type Config struct {
	Tables map[string]TableConfig `toml:"table"`
}

type DBConfig

type DBConfig struct {
	Type             DataSourceType `help:"Datasource name" enum:"mysql,vitess" optional:"" default:"mysql"`
	Host             string         `help:"Hostname" optional:""`
	EgressSocket     string         `help:"Use an egress socket when connecting to Vitess, for example '@egress.sock'" optional:""`
	Username         string         `help:"User" optional:""`
	Password         string         `help:"Password" optional:""`
	Database         string         `help:"Database or Vitess shard with format <keyspace>/<shard>" optional:""`
	MiskDatasource   string         `help:"Misk formatted config yaml file" optional:"" path:""`
	MiskReader       bool           `help:"Use the reader endpoint from Misk (defaults to writer)" optional:"" default:"false"`
	GrpcCustomHeader []string       `help:"Custom GRPC headers separated by ="`
	CA               string         `help:"CA root file, if this is specified then TLS will be enabled (PEM encoded)"`
	Cert             string         `help:"Certificate file for client side authentication (PEM encoded)"`
	Key              string         `help:"Key file for client side authentication (PEM encoded)"`
}

func (DBConfig) BinlogSyncerConfig

func (c DBConfig) BinlogSyncerConfig(serverID uint32) (replication.BinlogSyncerConfig, error)

func (DBConfig) DB

func (c DBConfig) DB() (*sql.DB, error)

func (DBConfig) IsSharded

func (c DBConfig) IsSharded() (bool, error)

func (DBConfig) ReaderDB

func (c DBConfig) ReaderDB() (*sql.DB, error)

func (DBConfig) Schema

func (c DBConfig) Schema() (string, error)

func (DBConfig) ShardingKeyrange

func (c DBConfig) ShardingKeyrange() ([]*topodata.KeyRange, error)

func (DBConfig) String

func (c DBConfig) String() string

func (DBConfig) VitessTarget

func (c DBConfig) VitessTarget() (*query.Target, error)

type DBReader

type DBReader interface {
	QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error)
}

DBReader is an interface that can be implemented by sql.Conn or sql.Tx or sql.DB so that we can easily change synchronization method

type DBWriter

type DBWriter interface {
	DBReader
	ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error)
}

type DataSourceType

type DataSourceType string
const (
	Vitess DataSourceType = "vitess"
	MySQL  DataSourceType = "mysql"
)

type DatabaseContainer

type DatabaseContainer struct {
	// contains filtered or unexported fields
}

func (*DatabaseContainer) Close

func (c *DatabaseContainer) Close() error

func (*DatabaseContainer) Config

func (c *DatabaseContainer) Config() DBConfig

type Diff

type Diff struct {
	Type MutationType
	// Row is the row to update to or insert or delete
	Row *Row

	// Target is in case of the Update MutationType also set so that it can be compared
	Target *Row
}

func StreamDiff

func StreamDiff(ctx context.Context, table *Table, source RowStream, target RowStream) ([]Diff, error)

StreamDiff returns the changes need to make target become exactly like source

type Heartbeat

type Heartbeat struct {
	// contains filtered or unexported fields
}

Heartbeat receives transactions and requests to snapshot and writes transactions and strongly consistent chunk snapshots

func NewHeartbeat

func NewHeartbeat(config Replicate) (*Heartbeat, error)

func (*Heartbeat) Init

func (h *Heartbeat) Init(ctx context.Context) error

func (*Heartbeat) Run

func (h *Heartbeat) Run(ctx context.Context, b backoff.BackOff) error

type IdStreamer

type IdStreamer interface {
	Next(context.Context) ([]interface{}, error)
}

type Mutation

type Mutation struct {
	Type  MutationType
	Table *Table
	Rows  [][]interface{}

	// Chunk is only sent with a Repair mutation type
	Chunk Chunk
}

func (*Mutation) Write

func (m *Mutation) Write(ctx context.Context, tx DBWriter) error

type MutationType

type MutationType byte
const (
	Insert MutationType = iota
	Update
	Delete

	// Repair is a mutation which sends a full chunk which is then diffed against the target and the diffs are applied
	Repair
)

func (MutationType) String

func (m MutationType) String() string

type PeekingIdStreamer

type PeekingIdStreamer interface {
	// Next returns next id and a boolean indicating if there is a next after this one
	Next(context.Context) ([]interface{}, bool, error)
	// Peek returns the id ahead of the current, Next above has to be called first
	Peek() []interface{}
}

type Ping

type Ping struct {
	SourceTargetConfig

	Table string `help:"If set select a row from this table" optional:""`
}

func (*Ping) Run

func (cmd *Ping) Run() error

type Position

type Position struct {
	File     string
	Position uint32
	// Gset can be nil in case database does not support GTID
	Gset mysql.GTIDSet
}

type Reader

type Reader struct {
	// contains filtered or unexported fields
}

func NewReader

func NewReader(
	config ReaderConfig,
	table *Table,
	source *sql.DB,
	sourceLimiter core.Limiter,
	target *sql.DB,
	targetLimiter core.Limiter,
) *Reader

func (*Reader) Diff

func (r *Reader) Diff(ctx context.Context, diffs chan Diff) error

func (*Reader) Read

func (r *Reader) Read(ctx context.Context, diffs chan Diff) error

type ReaderConfig

type ReaderConfig struct {
	SourceTargetConfig

	ChunkSize     int  `help:"Default size of the chunks to diff (can also be overridden per table)" default:"5000"`
	ShuffleChunks bool `` /* 161-byte string literal not displayed */

	TableParallelism  int64         `help:"Number of tables to process concurrently" default:"10"`
	ReaderCount       int           `help:"Number of reader connections" default:"20"`
	ReaderParallelism int64         `help:"Number of reader goroutines" default:"200"`
	ReadTimeout       time.Duration `help:"Timeout for faster reads like diffing a single chunk" default:"30s"`
	ReadRetries       uint64        `help:"How many times to retry reading a single chunk (with backoff)" default:"10"`

	UseCRC32Checksum bool `` /* 126-byte string literal not displayed */

	UseConcurrencyLimits bool `help:"Use concurrency limits to automatically find the throughput of the underlying databases" default:"false"`

	ConfigFile string `help:"TOML formatted config file" short:"f" optional:"" type:"path"`

	// WriteBatchSize doesn't belong to ReaderConfig but we put that in the TableConfig when we load the table which is
	// code reused by both checksum and clone so it's easier to put this here for now
	WriteBatchSize int `help:"Default size of the write batch per transaction (can also be overridden per table)" default:"100"`

	FailedChunkRetryCount int `help:"Retry a chunk if it fails the checksum, this can be used to checksum across a replica with a master" default:"0"`

	Config Config `kong:"-"`
}

ReaderConfig are used to control the read side, shared between Clone and Checksum

func (*ReaderConfig) LoadConfig

func (c *ReaderConfig) LoadConfig() error

LoadConfig loads the ConfigFile if specified

type Replicate

type Replicate struct {
	WriterConfig

	TaskName string `` /* 163-byte string literal not displayed */
	ServerID uint32 `help:"Unique identifier of this server, defaults to a hash of the TaskName" optional:""`

	// TODO should this just be ReadParallelism
	ChunkParallelism int `help:"Number of chunks to snapshot concurrently" default:"10"`

	CheckpointTable    string        `` /* 142-byte string literal not displayed */
	WatermarkTable     string        `` /* 127-byte string literal not displayed */
	HeartbeatTable     string        `` /* 163-byte string literal not displayed */
	HeartbeatFrequency time.Duration `` /* 173-byte string literal not displayed */
	CreateTables       bool          `help:"Create the required tables if they do not exist" default:"true"`
	ChunkBufferSize    int           `help:"Size of internal queues" default:"100"`
	ReconnectTimeout   time.Duration `help:"How long to try to reconnect after a replication failure (set to 0 to retry forever)" default:"5m"`

	DoSnapshot      bool          `` /* 127-byte string literal not displayed */
	DoSnapshotDelay time.Duration `help:"How long to wait until running a snapshot" default:"60s"`

	ReplicationParallelism          int64         `help:"Many transactions to apply in parallel during replication" default:"1"`
	ParallelTransactionBatchMaxSize int           `help:"How large batch of transactions to parallelize" default:"100"`
	ParallelTransactionBatchTimeout time.Duration `help:"How long to wait for a batch of transactions to fill up before executing them anyway" default:"5s"`
}

func (*Replicate) ReconnectBackoff

func (cmd *Replicate) ReconnectBackoff() backoff.BackOff

func (*Replicate) Run

func (cmd *Replicate) Run() error

Run replicates from source to target

type Replicator

type Replicator struct {
	// contains filtered or unexported fields
}

Replicator replicates from source to target

func NewReplicator

func NewReplicator(config Replicate) (*Replicator, error)

type RetryOptions

type RetryOptions struct {
	Limiter       core.Limiter
	AcquireMetric prometheus.Observer
	MaxRetries    uint64
	Timeout       time.Duration
}

type Row

type Row struct {
	Table *Table
	Data  []interface{}
}

func (*Row) AppendKeyValues

func (r *Row) AppendKeyValues(values []interface{}) []interface{}

func (*Row) KeyValues

func (r *Row) KeyValues() []interface{}

func (*Row) PkAfterOrEqual

func (r *Row) PkAfterOrEqual(row []interface{}) bool

PkAfterOrEqual returns true if the pk of the row is higher or equal to the PK of the receiver row

func (*Row) PkEqual

func (r *Row) PkEqual(row []interface{}) bool

PkEqual returns true if the pk of the row is equal to the PK of the receiver row

func (*Row) Updated

func (r *Row) Updated(row []interface{}) *Row

type RowStream

type RowStream interface {
	// Next returns the next row or nil if we're done
	Next() (*Row, error)
	// Close releases any potential underlying resources
	Close() error
}

func StreamChunk

func StreamChunk(ctx context.Context, conn DBReader, chunk Chunk, hint string, extraWhereClause string) (RowStream, error)

type Snapshotter

type Snapshotter struct {
	// contains filtered or unexported fields
}

Snapshotter receives transactions and requests to snapshot and writes transactions and strongly consistent chunk snapshots

func NewSnapshotter

func NewSnapshotter(config Replicate) (*Snapshotter, error)

func (*Snapshotter) Init

func (s *Snapshotter) Init(ctx context.Context) error

func (*Snapshotter) Run

func (s *Snapshotter) Run(ctx context.Context, b backoff.BackOff, source chan Transaction, sink chan Transaction) error

type SourceTargetConfig

type SourceTargetConfig struct {
	Source DBConfig `help:"Database config of source to be copied from" prefix:"source-" embed:""`
	Target DBConfig `help:"Database config of source to be copied from" prefix:"target-" embed:""`
}

type Table

type Table struct {
	Name string

	// KeyColumns is the columns the table is chunked by, by default the primary key columns
	KeyColumns []string
	// KeyColumnList is KeyColumns quoted and comma separated
	KeyColumnList string
	// KeyColumnIndexes is KeyColumns quoted and comma separated
	KeyColumnIndexes []int

	Config TableConfig

	Columns       []string
	ColumnsQuoted []string
	CRC32Columns  []string
	// ColumnList is a comma separated list of quoted strings
	ColumnList    string
	EstimatedRows int64

	// MysqlTable is the go-mysql schema, we should probably start using this one as much as possible
	MysqlTable *mysqlschema.Table
}

func LoadTable

func LoadTable(ctx context.Context, config ReaderConfig, databaseType DataSourceType, conn *sql.DB, schema, tableName string, tableConfig TableConfig) (*Table, error)

func LoadTables

func LoadTables(ctx context.Context, config ReaderConfig) ([]*Table, error)

func (*Table) KeysOfRow

func (t *Table) KeysOfRow(row []interface{}) []interface{}

func (*Table) PkOfRow

func (t *Table) PkOfRow(row []interface{}) int64

func (*Table) ToRow

func (t *Table) ToRow(raw []interface{}) *Row

type TableConfig

type TableConfig struct {
	IgnoreColumns  []string `toml:"ignore_columns" help:"Ignore columns in table"`
	TargetWhere    string   `toml:"target_where" help:"Extra where clause that is added on the target"`
	TargetHint     string   `toml:"target_hint" help:"Hint placed after the SELECT on target reads"`
	SourceWhere    string   `toml:"source_where" help:"Extra where clause that is added on the source"`
	SourceHint     string   `toml:"source_hint" help:"Hint placed after the SELECT on target reads"`
	ChunkSize      int      `toml:"chunk_size" help:"Global chunk size if chunk size not specified on the table"`
	WriteBatchSize int      `toml:"write_batch_size" help:"Global chunk size if chunk size not specified on the table"`
	WriteTimout    duration `toml:"write_timeout" help:"Global chunk size if chunk size not specified on the table"`
	KeyColumns     []string `toml:"keys" help:"Use these columns as a unique key for this table, defaults to primary key columns"`
}

type Transaction

type Transaction struct {
	Mutations     []Mutation
	FinalPosition Position
}

type TransactionStream

type TransactionStream struct {
	// contains filtered or unexported fields
}

TransactionStream consumes binlog events and emits full transactions

func NewTransactionStreamer

func NewTransactionStreamer(config Replicate) (*TransactionStream, error)

func (*TransactionStream) Init

func (s *TransactionStream) Init(ctx context.Context) error

func (*TransactionStream) Run

func (s *TransactionStream) Run(ctx context.Context, b backoff.BackOff, output chan Transaction) error

type TransactionWriter

type TransactionWriter struct {
	// contains filtered or unexported fields
}

TransactionWriter receives transactions and requests to snapshot and writes transactions and strongly consistent chunk snapshots

func NewTransactionWriter

func NewTransactionWriter(config Replicate) (*TransactionWriter, error)

func (*TransactionWriter) Init

func (w *TransactionWriter) Init(ctx context.Context) error

func (*TransactionWriter) Run

func (w *TransactionWriter) Run(ctx context.Context, b backoff.BackOff, transactions chan Transaction) error

type Writer

type Writer struct {
	// contains filtered or unexported fields
}

func NewWriter

func NewWriter(config WriterConfig, table *Table, writer *sql.DB, limiter core.Limiter) *Writer

func (*Writer) Write

func (w *Writer) Write(ctx context.Context, g *errgroup.Group, diffs chan Diff)

Write forks off two go routines to batch and write the batches from the diffs channel

type WriterConfig

type WriterConfig struct {
	ReaderConfig

	WriteBatchStatementSize int           `help:"Size of the write batch per statement" default:"100"`
	WriterParallelism       int64         `help:"Number of writer goroutines" default:"200"`
	WriterCount             int           `help:"Number of writer connections" default:"10"`
	WriteRetries            uint64        `help:"Number of retries" default:"5"`
	WriteTimeout            time.Duration `help:"Timeout for each write" default:"30s"`

	NoDiff bool `help:"Clone without diffing using INSERT IGNORE can be faster as a first pass" default:"false"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL