mysqlbatch

package
v0.9.71 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 29, 2020 License: Apache-2.0 Imports: 29 Imported by: 2

Documentation

Index

Constants

View Source
const (
	Unknown       = "*"
	PlainString   = "string"
	PlainInt      = "int"
	PlainUInt     = "uint"
	PlainBytes    = "bytes"
	PlainTime     = "time"
	SQLNullInt64  = "sqlNullInt64"
	SQLNullString = "sqlNullString"
	SQLNullBool   = "sqlNullBool"
	SQLNullTime   = "sqlNullTime"
	SQLRawBytes   = "sqlRawBytes"

	ScanColumnForDump = "*"

	SchemaVersionV1 = "v1.0"
)
View Source
const Name = "mysql-batch"

Variables

View Source
var (
	BatchQueryDuration = prometheus.NewHistogramVec(
		prometheus.HistogramOpts{
			Namespace: "gravity",
			Subsystem: "output",
			Name:      "batch_query_duration",
			Help:      "bucketed histogram of batch fetch duration time",
			Buckets:   prometheus.ExponentialBuckets(0.0005, 2, 22),
		}, []string{metrics.PipelineTag})

	JobFetchedCount = prometheus.NewCounterVec(prometheus.CounterOpts{
		Namespace: "gravity",
		Subsystem: "output",
		Name:      "job_fetched_count",
		Help:      "Number of data rows fetched by scanner",
	}, []string{metrics.PipelineTag})
)

Functions

func DecodeBatchPositionValue added in v0.9.19

func DecodeBatchPositionValue(s string) (interface{}, error)

func DetectScanColumns added in v0.9.27

func DetectScanColumns(sourceDB *sql.DB, dbName string, tableName string, estimatedRowsCount int64, maxFullDumpRowsCountLimit int64) ([]string, error)

DetectScanColumns find columns that we used to scan the table First, we try primary keys, then we try unique key; we try dump the table at last. Note that composite unique key is not supported.

func EncodeBatchPositionValue added in v0.9.19

func EncodeBatchPositionValue(v interface{}) (string, error)

func FindMaxMinValueFromDB

func FindMaxMinValueFromDB(db *sql.DB, dbName string, tableName string, scanColumns []string, condition string) ([]interface{}, []interface{})

func GenerateNextScanQueryAndArgs added in v0.9.27

func GenerateNextScanQueryAndArgs(
	fullTableName string,
	scanColumns []string,
	currentMinValues []interface{},
	pivotIndex int,
	batch int,
	condition string) (string, []interface{})

func GenerateScanQueryAndArgs added in v0.9.27

func GenerateScanQueryAndArgs(
	fullTableName string,
	scanColumns []string,
	currentMinValues []interface{},
	batch int,
	pivotIndex int,
	condition string) (string, []interface{})

pivotIndex is the index in scanColumns

func GetMaxMin added in v0.9.17

func GetMaxMin(cache position_cache.PositionCacheInterface, fullTableName string) ([]TablePosition, []TablePosition, bool, error)

func GetScanIdx

func GetScanIdx(columnTypes []*sql.ColumnType, scanColumn string) (int, error)

func GetStartBinlog added in v0.9.17

func GetTableColumnTypes

func GetTableColumnTypes(db *sql.DB, schema string, table string) ([]*sql.ColumnType, error)

func GreaterThanMax added in v0.9.27

func GreaterThanMax(
	db *sql.DB,
	fullTableName string,
	scanColumns []string,
	scanValues []interface{},
	maxValues []interface{}) (bool, error)

func InitTablePosition added in v0.9.18

func InitTablePosition(
	db *sql.DB,
	positionCache position_cache.PositionCacheInterface,
	tableDef *schema_store.Table,
	scanColumns []string,
	tableConfig TableConfig,
	estimatedRowCount *int64) (bool, error)

func IsScanColumnsForDump added in v0.9.27

func IsScanColumnsForDump(scanColumns []string) bool

func NewBarrierMsg added in v0.9.17

func NewBarrierMsg(tableDef *schema_store.Table) *core.Msg

func NewCloseInputStreamMsg

func NewCloseInputStreamMsg(tableDef *schema_store.Table) *core.Msg

func NewCreateTableMsg

func NewCreateTableMsg(parser *parser.Parser, table *schema_store.Table, createTblStmt string) *core.Msg

func NewMsg

func NewMsg(
	rowPtrs []interface{},
	columnTypes []*sql.ColumnType,
	sourceTableDef *schema_store.Table,
	callbackFunc core.MsgCallbackFunc,
	positions []TablePosition,
	scanTime time.Time) *core.Msg

NewMsg creates a job, it converts sql.NullString to interface{} based on the column type. If the column type is time, then we parse the time

func NextBatchStartPoint added in v0.9.27

func NextBatchStartPoint(
	db *sql.DB,
	fullTableName string,
	scanColumns []string,
	columnTypes []*sql.ColumnType,
	scanIndexes []int,
	currentMinValues []interface{},
	maxValues []interface{},
	condition string) (nextMinValues []interface{}, continueNext bool, pivotIndex int, err error)

func NextScanElementForChunk added in v0.9.27

func NextScanElementForChunk(
	db *sql.DB,
	fullTableName string,
	columnTypes []*sql.ColumnType,
	scanColumns []string,
	currentScanValues []interface{},
	pivotIndex int,
	condition string) (nextRowValues []interface{}, exists bool, err error)

func PutCurrentPos added in v0.9.17

func PutCurrentPos(cache position_cache.PositionCacheInterface, fullTableName string, pos []TablePosition, incScanCount bool) error

func PutDone added in v0.9.27

func PutDone(cache position_cache.PositionCacheInterface, fullTableName string) error

func PutEstimatedCount added in v0.9.18

func PutEstimatedCount(cache position_cache.PositionCacheInterface, fullTableName string, estimatedCount int64) error

func PutMaxMin added in v0.9.17

func PutMaxMin(cache position_cache.PositionCacheInterface, fullTableName string, max []TablePosition, min []TablePosition) error

func ScanValuesFromRowValues added in v0.9.27

func ScanValuesFromRowValues(rowValues []interface{}, scanIndexes []int) []interface{}

func SetupInitialPosition added in v0.9.17

func SetupInitialPosition(cache position_cache.PositionCacheInterface, sourceDB *sql.DB) error

Types

type BatchPositionValueV1 added in v0.9.27

type BatchPositionValueV1 struct {
	SchemaVersion string                     `toml:"schema-version" json:"schema-version"`
	Start         config.MySQLBinlogPosition `toml:"start-binlog" json:"start-binlog"`
	TableStates   map[string]TableStatsV1    `toml:"table-stats" json:"table-stats"`
}

type BatchPositionValueV1Beta1 added in v0.9.27

type BatchPositionValueV1Beta1 struct {
	SchemaVersion string                     `toml:"schema-version" json:"schema-version"`
	Start         config.MySQLBinlogPosition `toml:"start-binlog" json:"start-binlog"`
	TableStates   map[string]TableStats      `toml:"table-stats" json:"table-stats"`
}

type BatchPositionVersionMigrationWrapper added in v0.9.27

type BatchPositionVersionMigrationWrapper struct {
	SchemaVersion string `toml:"schema-version" json:"schema-version"`
}

type PluginConfig

type PluginConfig struct {
	Source      *config.DBConfig `mapstructure:"source" toml:"source" json:"source"` // keep same with mysql binlog config to make most cases simple
	SourceSlave *config.DBConfig `mapstructure:"source-slave" toml:"source-slave" json:"source-slave"`

	SourceProbeCfg *helper.SourceProbeCfg `mapstructure:"source-probe-config"json:"source-probe-config"`

	PositionRepo *config.GenericPluginConfig `mapstructure:"position-repo" toml:"position-repo" json:"position-repo"`

	TableConfigs []TableConfig `mapstructure:"table-configs" json:"table-configs"`

	IgnoreTables []TableConfig `mapstructure:"ignore-tables" json:"ignore-tables"`

	NrScanner        int   `mapstructure:"nr-scanner" toml:"nr-scanner" json:"nr-scanner"`
	TableScanBatch   int   `mapstructure:"table-scan-batch" toml:"table-scan-batch" json:"table-scan-batch"`
	MaxFullDumpCount int64 `mapstructure:"max-full-dump-count"  toml:"max-full-dump-count"  json:"max-full-dump-count"`

	BatchPerSecondLimit int `mapstructure:"batch-per-second-limit" toml:"batch-per-second-limit" json:"batch-per-second-limit"`
}

func (*PluginConfig) ValidateAndSetDefault

func (cfg *PluginConfig) ValidateAndSetDefault() error

type TableConfig

type TableConfig struct {
	Schema string `mapstructure:"schema" toml:"schema" json:"schema"`
	// Table is an array of string, each string is a glob expression
	// that describes the table name
	Table []string `mapstructure:"table" toml:"table" json:"table"`

	// ScanColumn is an array of string, that enforces these table's scan columns
	ScanColumn []string `mapstructure:"scan-column" toml:"scan-column" json:"scan-column"`

	Condition string `mapstructure:"condition"  toml:"condition" json:"condition"`
}

func DeleteEmptyTables added in v0.9.18

func DeleteEmptyTables(db *sql.DB, tables []*schema_store.Table, tableConfigs []TableConfig) ([]*schema_store.Table, []TableConfig)

func GetTables

func GetTables(db *sql.DB, schemaStore schema_store.SchemaStore, ignoreTables []TableConfig, tableConfigs []TableConfig, router core.Router) ([]*schema_store.Table, []TableConfig)

GetTables returns a list of table definition based on the schema, table name patterns We only support single sourceDB for now.

func InitializePositionAndDeleteScannedTable added in v0.9.19

func InitializePositionAndDeleteScannedTable(
	db *sql.DB,
	positionCache position_cache.PositionCacheInterface,
	scanColumnsArray [][]string,
	estimatedRowCount []int64,
	tables []*schema_store.Table,
	tableConfigs []TableConfig) ([]*schema_store.Table, []TableConfig, [][]string, []int64, error)

type TablePosition added in v0.9.17

type TablePosition struct {
	Value  interface{} `toml:"value" json:"value,omitempty"`
	Type   string      `toml:"type" json:"type"`
	Column string      `toml:"column" json:"column"`
}

func GetCurrentPos added in v0.9.17

func GetCurrentPos(cache position_cache.PositionCacheInterface, fullTableName string) ([]TablePosition, bool, bool, error)

func (TablePosition) MapString added in v0.9.17

func (p TablePosition) MapString() (map[string]string, error)

func (TablePosition) MarshalJSON added in v0.9.17

func (p TablePosition) MarshalJSON() ([]byte, error)

func (*TablePosition) UnmarshalJSON added in v0.9.17

func (p *TablePosition) UnmarshalJSON(value []byte) error

type TableScanner

type TableScanner struct {
	// contains filtered or unexported fields
}

func NewTableScanner

func NewTableScanner(
	pipelineName string,
	tableWorkC chan *TableWork,
	db *sql.DB,
	positionCache position_cache.PositionCacheInterface,
	emitter core.Emitter,
	throttle *time.Ticker,
	schemaStore schema_store.SchemaStore,
	cfg *PluginConfig,
	ctx context.Context) *TableScanner

func (*TableScanner) AfterMsgCommit

func (tableScanner *TableScanner) AfterMsgCommit(msg *core.Msg) error

func (*TableScanner) FindAll

func (tableScanner *TableScanner) FindAll(db *sql.DB, tableDef *schema_store.Table, tableConfig *TableConfig)

func (*TableScanner) LoopInBatch

func (tableScanner *TableScanner) LoopInBatch(
	db *sql.DB,
	tableDef *schema_store.Table,
	tableConfig *TableConfig,
	scanColumns []string,
	max []TablePosition,
	min []TablePosition,
	batch int)

LoopInBatch will iterate the table by sql like this: SELECT * FROM a WHERE some_key > some_value LIMIT 10000 It will get the min, max value of the column and iterate batch by batch

func (*TableScanner) Start

func (tableScanner *TableScanner) Start() error

func (*TableScanner) Wait

func (tableScanner *TableScanner) Wait()

type TableStats added in v0.9.18

type TableStats struct {
	Max               *TablePosition `toml:"max" json:"max"`
	Min               *TablePosition `toml:"min" json:"min"`
	Current           *TablePosition `toml:"current" json:"current"`
	EstimatedRowCount int64          `json:"estimated-count"`
	ScannedCount      int64          `json:"scanned-count"`
	Done              bool           `json:"done"`
}

type TableStatsV1 added in v0.9.27

type TableStatsV1 struct {
	Max               []TablePosition `toml:"max" json:"max"`
	Min               []TablePosition `toml:"min" json:"min"`
	Current           []TablePosition `toml:"current" json:"current"`
	EstimatedRowCount int64           `json:"estimated-count"`
	ScannedCount      int64           `json:"scanned-count"`
	Done              bool            `json:"done"`
}

type TableWork

type TableWork struct {
	TableDef          *schema_store.Table
	TableConfig       *TableConfig
	ScanColumns       []string
	EstimatedRowCount int64
	Condition         string
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL