common

package
v0.0.0-...-503c688 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 8, 2023 License: Apache-2.0, Apache-2.0 Imports: 44 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// IndexEngineID is the engine ID for index engine.
	IndexEngineID = -1
)

Variables

View Source
var (
	ErrUnknown         = errors.Normalize("unknown error", errors.RFCCodeText("Lightning:Common:ErrUnknown"))
	ErrInvalidArgument = errors.Normalize("invalid argument", errors.RFCCodeText("Lightning:Common:ErrInvalidArgument"))
	ErrVersionMismatch = errors.Normalize("version mismatch", errors.RFCCodeText("Lightning:Common:ErrVersionMismatch"))

	ErrReadConfigFile     = errors.Normalize("cannot read config file '%s'", errors.RFCCodeText("Lightning:Config:ErrReadConfigFile"))
	ErrParseConfigFile    = errors.Normalize("cannot parse config file '%s'", errors.RFCCodeText("Lightning:Config:ErrParseConfigFile"))
	ErrInvalidConfig      = errors.Normalize("invalid config", errors.RFCCodeText("Lightning:Config:ErrInvalidConfig"))
	ErrInvalidTLSConfig   = errors.Normalize("invalid tls config", errors.RFCCodeText("Lightning:Config:ErrInvalidTLSConfig"))
	ErrInvalidSortedKVDir = errors.Normalize("invalid sorted-kv-dir '%s' for local backend, please change the config or delete the path", errors.RFCCodeText("Lightning:Config:ErrInvalidSortedKVDir"))

	ErrStorageUnknown       = errors.Normalize("unknown storage error", errors.RFCCodeText("Lightning:Storage:ErrStorageUnknown"))
	ErrInvalidPermission    = errors.Normalize("invalid permission", errors.RFCCodeText("Lightning:Storage:ErrInvalidPermission"))
	ErrInvalidStorageConfig = errors.Normalize("invalid data-source-dir", errors.RFCCodeText("Lightning:Storage:ErrInvalidStorageConfig"))
	ErrEmptySourceDir       = errors.Normalize("data-source-dir '%s' doesn't exist or contains no files", errors.RFCCodeText("Lightning:Storage:ErrEmptySourceDir"))

	ErrTableRoute         = errors.Normalize("table route error", errors.RFCCodeText("Lightning:Loader:ErrTableRoute"))
	ErrInvalidSchemaFile  = errors.Normalize("invalid schema file", errors.RFCCodeText("Lightning:Loader:ErrInvalidSchemaFile"))
	ErrTooManySourceFiles = errors.Normalize("too many source files", errors.RFCCodeText("Lightning:Loader:ErrTooManySourceFiles"))

	ErrSystemRequirementNotMet  = errors.Normalize("system requirement not met", errors.RFCCodeText("Lightning:PreCheck:ErrSystemRequirementNotMet"))
	ErrCheckpointSchemaConflict = errors.Normalize("checkpoint schema conflict", errors.RFCCodeText("Lightning:PreCheck:ErrCheckpointSchemaConflict"))
	ErrPreCheckFailed           = errors.Normalize("tidb-lightning pre-check failed: %s", errors.RFCCodeText("Lightning:PreCheck:ErrPreCheckFailed"))
	ErrCheckClusterRegion       = errors.Normalize("check tikv cluster region error", errors.RFCCodeText("Lightning:PreCheck:ErrCheckClusterRegion"))
	ErrCheckLocalResource       = errors.Normalize("check local storage resource error", errors.RFCCodeText("Lightning:PreCheck:ErrCheckLocalResource"))
	ErrCheckTableEmpty          = errors.Normalize("check table empty error", errors.RFCCodeText("Lightning:PreCheck:ErrCheckTableEmpty"))
	ErrCheckCSVHeader           = errors.Normalize("check csv header error", errors.RFCCodeText("Lightning:PreCheck:ErrCheckCSVHeader"))
	ErrCheckDataSource          = errors.Normalize("check data source error", errors.RFCCodeText("Lightning:PreCheck:ErrCheckDataSource"))
	ErrCheckCDCPiTR             = errors.Normalize("check TiCDC/PiTR task error", errors.RFCCodeText("Lightning:PreCheck:ErrCheckCDCPiTR"))

	ErrOpenCheckpoint          = errors.Normalize("open checkpoint error", errors.RFCCodeText("Lightning:Checkpoint:ErrOpenCheckpoint"))
	ErrReadCheckpoint          = errors.Normalize("read checkpoint error", errors.RFCCodeText("Lightning:Checkpoint:ErrReadCheckpoint"))
	ErrUpdateCheckpoint        = errors.Normalize("update checkpoint error", errors.RFCCodeText("Lightning:Checkpoint:ErrUpdateCheckpoint"))
	ErrUnknownCheckpointDriver = errors.Normalize("unknown checkpoint driver '%s'", errors.RFCCodeText("Lightning:Checkpoint:ErrUnknownCheckpointDriver"))
	ErrInvalidCheckpoint       = errors.Normalize("invalid checkpoint", errors.RFCCodeText("Lightning:Checkpoint:ErrInvalidCheckpoint"))
	ErrCheckpointNotFound      = errors.Normalize("checkpoint not found", errors.RFCCodeText("Lightning:Checkpoint:ErrCheckpointNotFound"))
	ErrInitCheckpoint          = errors.Normalize("init checkpoint error", errors.RFCCodeText("Lightning:Checkpoint:ErrInitCheckpoint"))
	ErrCleanCheckpoint         = errors.Normalize("clean checkpoint error", errors.RFCCodeText("Lightning:Checkpoint:ErrCleanCheckpoint"))

	ErrMetaMgrUnknown = errors.Normalize("unknown error occur on meta manager", errors.RFCCodeText("Lightning:MetaMgr:ErrMetaMgrUnknown"))

	ErrDBConnect       = errors.Normalize("failed to connect database", errors.RFCCodeText("Lightning:DB:ErrDBConnect"))
	ErrInitErrManager  = errors.Normalize("init error manager error", errors.RFCCodeText("Lightning:DB:ErrInitErrManager"))
	ErrInitMetaManager = errors.Normalize("init meta manager error", errors.RFCCodeText("Lightning:DB:ErrInitMetaManager"))

	ErrUpdatePD       = errors.Normalize("update pd error", errors.RFCCodeText("Lightning:PD:ErrUpdatePD"))
	ErrCreatePDClient = errors.Normalize("create pd client error", errors.RFCCodeText("Lightning:PD:ErrCreatePDClient"))
	ErrPauseGC        = errors.Normalize("pause gc error", errors.RFCCodeText("Lightning:PD:ErrPauseGC"))

	ErrCheckKVVersion        = errors.Normalize("check tikv version error", errors.RFCCodeText("Lightning:KV:ErrCheckKVVersion"))
	ErrCreateKVClient        = errors.Normalize("create kv client error", errors.RFCCodeText("Lightning:KV:ErrCreateKVClient"))
	ErrCheckMultiIngest      = errors.Normalize("check multi-ingest support error", errors.RFCCodeText("Lightning:KV:ErrCheckMultiIngest"))
	ErrKVEpochNotMatch       = errors.Normalize("epoch not match", errors.RFCCodeText("Lightning:KV:EpochNotMatch"))
	ErrKVNotLeader           = errors.Normalize("not leader", errors.RFCCodeText("Lightning:KV:NotLeader"))
	ErrKVServerIsBusy        = errors.Normalize("server is busy", errors.RFCCodeText("Lightning:KV:ServerIsBusy"))
	ErrKVRegionNotFound      = errors.Normalize("region not found", errors.RFCCodeText("Lightning:KV:RegionNotFound"))
	ErrKVReadIndexNotReady   = errors.Normalize("read index not ready", errors.RFCCodeText("Lightning:KV:ReadIndexNotReady"))
	ErrKVIngestFailed        = errors.Normalize("ingest tikv failed", errors.RFCCodeText("Lightning:KV:ErrKVIngestFailed"))
	ErrKVRaftProposalDropped = errors.Normalize("raft proposal dropped", errors.RFCCodeText("Lightning:KV:ErrKVRaftProposalDropped"))

	ErrUnknownBackend       = errors.Normalize("unknown backend %s", errors.RFCCodeText("Lightning:Restore:ErrUnknownBackend"))
	ErrCheckLocalFile       = errors.Normalize("cannot find local file for table: %s engineDir: %s", errors.RFCCodeText("Lightning:Restore:ErrCheckLocalFile"))
	ErrOpenDuplicateDB      = errors.Normalize("open duplicate db error", errors.RFCCodeText("Lightning:Restore:ErrOpenDuplicateDB"))
	ErrSchemaNotExists      = errors.Normalize("table `%s`.`%s` schema not found", errors.RFCCodeText("Lightning:Restore:ErrSchemaNotExists"))
	ErrInvalidSchemaStmt    = errors.Normalize("invalid schema statement: '%s'", errors.RFCCodeText("Lightning:Restore:ErrInvalidSchemaStmt"))
	ErrCreateSchema         = errors.Normalize("create schema failed, table: %s, stmt: %s", errors.RFCCodeText("Lightning:Restore:ErrCreateSchema"))
	ErrUnknownColumns       = errors.Normalize("unknown columns in header (%s) for table %s", errors.RFCCodeText("Lightning:Restore:ErrUnknownColumns"))
	ErrChecksumMismatch     = errors.Normalize("checksum mismatched remote vs local => (checksum: %d vs %d) (total_kvs: %d vs %d) (total_bytes:%d vs %d)", errors.RFCCodeText("Lighting:Restore:ErrChecksumMismatch"))
	ErrRestoreTable         = errors.Normalize("restore table %s failed", errors.RFCCodeText("Lightning:Restore:ErrRestoreTable"))
	ErrEncodeKV             = errors.Normalize("encode kv error in file %s at offset %d", errors.RFCCodeText("Lightning:Restore:ErrEncodeKV"))
	ErrAllocTableRowIDs     = errors.Normalize("allocate table row id error", errors.RFCCodeText("Lightning:Restore:ErrAllocTableRowIDs"))
	ErrInvalidMetaStatus    = errors.Normalize("invalid meta status: '%s'", errors.RFCCodeText("Lightning:Restore:ErrInvalidMetaStatus"))
	ErrTableIsChecksuming   = errors.Normalize("table '%s' is checksuming", errors.RFCCodeText("Lightning:Restore:ErrTableIsChecksuming"))
	ErrResolveDuplicateRows = errors.Normalize("resolve duplicate rows error on table '%s'", errors.RFCCodeText("Lightning:Restore:ErrResolveDuplicateRows"))
	ErrFoundDuplicateKeys   = errors.Normalize("found duplicate key '%s', value '%s'", errors.RFCCodeText("Lightning:Restore:ErrFoundDuplicateKey"))
	ErrAddIndexFailed       = errors.Normalize("add index on table %s failed", errors.RFCCodeText("Lightning:Restore:ErrAddIndexFailed"))
	ErrDropIndexFailed      = errors.Normalize("drop index %s on table %s failed", errors.RFCCodeText("Lightning:Restore:ErrDropIndexFailed"))
)

error definitions

View Source
var DefaultImportVariablesTiDB = map[string]string{
	"tidb_row_format_version": "1",
}

DefaultImportVariablesTiDB is used in ObtainImportantVariables to retrieve the system variables from downstream in local/importer backend. The values record the default values if missing.

View Source
var DefaultImportantVariables = map[string]string{
	"max_allowed_packet":      "67108864",
	"div_precision_increment": "4",
	"time_zone":               "SYSTEM",
	"lc_time_names":           "en_US",
	"default_week_format":     "0",
	"block_encryption_mode":   "aes-128-ecb",
	"group_concat_max_len":    "1024",
}

DefaultImportantVariables is used in ObtainImportantVariables to retrieve the system variables from downstream which may affect KV encode result. The values record the default values if missing.

View Source
var EncodeIntRowIDToBuf = codec.EncodeComparableVarint

EncodeIntRowIDToBuf encodes an int64 row id to a buffer.

Functions

func AllocGlobalAutoID

func AllocGlobalAutoID(ctx context.Context, n int64, store kv.Storage, dbID int64,
	tblInfo *model.TableInfo) (autoIDBase, autoIDMax int64, err error)

AllocGlobalAutoID allocs N consecutive autoIDs from TiDB.

func ConnectMySQL

func ConnectMySQL(cfg *mysql.Config) (*sql.DB, error)

ConnectMySQL connects MySQL with the dsn. If access is denied and the password is a valid base64 encoding, we will try to connect MySQL with the base64 decoding of the password.

func EncodeIntRowID

func EncodeIntRowID(rowID int64) []byte

EncodeIntRowID encodes an int64 row id.

func EscapeIdentifier

func EscapeIdentifier(identifier string) string

EscapeIdentifier quote and escape an sql identifier

func GetAutoRandomColumn

func GetAutoRandomColumn(tblInfo *model.TableInfo) *model.ColumnInfo

GetAutoRandomColumn return the column with auto_random, return nil if the table doesn't have it. todo: better put in ddl package, but this will cause import cycle since ddl package import lightning

func GetJSON

func GetJSON(ctx context.Context, client *http.Client, url string, v interface{}) error

GetJSON fetches a page and parses it as JSON. The parsed result will be stored into the `v`. The variable `v` must be a pointer to a type that can be unmarshalled from JSON.

Example:

client := &http.Client{}
var resp struct { IP string }
if err := util.GetJSON(client, "http://api.ipify.org/?format=json", &resp); err != nil {
	return errors.Trace(err)
}
fmt.Println(resp.IP)

func InterpolateMySQLString

func InterpolateMySQLString(s string) string

InterpolateMySQLString interpolates a string into a MySQL string literal.

func IsContextCanceledError

func IsContextCanceledError(err error) bool

IsContextCanceledError returns whether the error is caused by context cancellation. This function should only be used when the code logic is affected by whether the error is canceling or not.

This function returns `false` (not a context-canceled error) if `err == nil`.

func IsDirExists

func IsDirExists(name string) bool

IsDirExists checks if dir exists.

func IsEmptyDir

func IsEmptyDir(name string) bool

IsEmptyDir checks if dir is empty.

func IsRetryableError

func IsRetryableError(err error) bool

IsRetryableError returns whether the error is transient (e.g. network connection dropped) or irrecoverable (e.g. user pressing Ctrl+C). This function returns `false` (irrecoverable) if `err == nil`.

If the error is a multierr, returns true only if all suberrors are retryable.

func KillMySelf

func KillMySelf() error

KillMySelf sends sigint to current process, used in integration test only

Only works on Unix. Signaling on Windows is not supported.

func NormalizeError

func NormalizeError(err error) error

NormalizeError converts an arbitrary error to *errors.Error based above predefined errors. If the underlying err is already an *error.Error which is prefixed by "Lightning:", leave error ID unchanged. Otherwise, converts the error ID to Lightning's predefined error IDs.

func NormalizeOrWrapErr

func NormalizeOrWrapErr(rfcErr *errors.Error, err error, args ...interface{}) error

NormalizeOrWrapErr tries to normalize err. If the returned error is ErrUnknown, wraps it with the given rfcErr.

func RebaseGlobalAutoID

func RebaseGlobalAutoID(ctx context.Context, newBase int64, store kv.Storage, dbID int64,
	tblInfo *model.TableInfo) error

RebaseGlobalAutoID rebase the autoID base to newBase.

func Retry

func Retry(purpose string, parentLogger log.Logger, action func() error) error

Retry is shared by SQLWithRetry.perform, implementation of GlueCheckpointsDB and TiDB's glue implementation

func SameDisk

func SameDisk(dir1 string, dir2 string) (bool, error)

SameDisk is used to check dir1 and dir2 in the same disk.

func SchemaExists

func SchemaExists(ctx context.Context, db utils.QueryExecutor, schema string) (bool, error)

SchemaExists return whether schema with specified name exists.

func StringSliceEqual

func StringSliceEqual(a, b []string) bool

StringSliceEqual checks if two string slices are equal.

func TableExists

func TableExists(ctx context.Context, db utils.QueryExecutor, schema, table string) (bool, error)

TableExists return whether table with specified name exists in target db

func TableHasAutoID

func TableHasAutoID(info *model.TableInfo) bool

TableHasAutoID return whether table has auto generated id.

func TableHasAutoRowID

func TableHasAutoRowID(info *model.TableInfo) bool

TableHasAutoRowID return whether table has auto generated row id

func UniqueTable

func UniqueTable(schema string, table string) string

UniqueTable returns an unique table name.

func WriteMySQLIdentifier

func WriteMySQLIdentifier(builder *strings.Builder, identifier string)

WriteMySQLIdentifier writes a MySQL identifier into the string builder. Writes a MySQL identifier into the string builder. The identifier is always escaped into the form "`foo`".

Types

type ConnPool

type ConnPool struct {
	// contains filtered or unexported fields
}

ConnPool is a lazy pool of gRPC channels. When `Get` called, it lazily allocates new connection if connection not full. If it's full, then it will return allocated channels round-robin.

func NewConnPool

func NewConnPool(capacity int, newConn func(ctx context.Context) (*grpc.ClientConn, error),
	logger log.Logger) *ConnPool

NewConnPool creates a new connPool by the specified conn factory function and capacity.

func (*ConnPool) Close

func (p *ConnPool) Close()

Close closes the conn pool.

func (*ConnPool) TakeConns

func (p *ConnPool) TakeConns() (conns []*grpc.ClientConn)

TakeConns takes all connections from the pool.

type GRPCConns

type GRPCConns struct {
	// contains filtered or unexported fields
}

GRPCConns is a pool of gRPC connections.

func NewGRPCConns

func NewGRPCConns() *GRPCConns

NewGRPCConns creates a new GRPCConns.

func (*GRPCConns) Close

func (conns *GRPCConns) Close()

Close closes all gRPC connections in the pool.

func (*GRPCConns) GetGrpcConn

func (conns *GRPCConns) GetGrpcConn(ctx context.Context, storeID uint64,
	tcpConcurrency int, newConn func(ctx context.Context) (*grpc.ClientConn, error)) (*grpc.ClientConn, error)

GetGrpcConn gets a gRPC connection from the pool.

type KvPair

type KvPair struct {
	// Key is the key of the KV pair
	Key []byte
	// Val is the value of the KV pair
	Val []byte
	// RowID is the row id of the KV pair.
	RowID []byte
}

KvPair is a pair of key and value.

type MySQLConnectParam

type MySQLConnectParam struct {
	Host                     string
	Port                     int
	User                     string
	Password                 string
	SQLMode                  string
	MaxAllowedPacket         uint64
	TLSConfig                *tls.Config
	AllowFallbackToPlaintext bool
	Net                      string
	Vars                     map[string]string
}

MySQLConnectParam records the parameters needed to connect to a MySQL database.

func (*MySQLConnectParam) Connect

func (param *MySQLConnectParam) Connect() (*sql.DB, error)

Connect creates a new connection to the database.

func (*MySQLConnectParam) ToDriverConfig

func (param *MySQLConnectParam) ToDriverConfig() *mysql.Config

ToDriverConfig converts the MySQLConnectParam to a mysql.Config.

type OnceError

type OnceError struct {
	// contains filtered or unexported fields
}

OnceError is an error value which will can be assigned once.

The zero value is ready for use.

func (*OnceError) Get

func (oe *OnceError) Get() error

Get returns the first error value stored in this instance.

func (*OnceError) Set

func (oe *OnceError) Set(e error)

Set assigns an error to this instance, if `e != nil`.

If this method is called multiple times, only the first call is effective.

type Pauser

type Pauser struct {
	// contains filtered or unexported fields
}

Pauser is a type which could allow multiple goroutines to wait on demand, similar to a gate or traffic light.

func NewPauser

func NewPauser() *Pauser

NewPauser returns an initialized pauser.

func (*Pauser) IsPaused

func (p *Pauser) IsPaused() bool

IsPaused gets whether the current state is paused or not.

func (*Pauser) Pause

func (p *Pauser) Pause()

Pause causes all calls to Wait() to block.

func (*Pauser) Resume

func (p *Pauser) Resume()

Resume causes all calls to Wait() to continue.

func (*Pauser) Wait

func (p *Pauser) Wait(ctx context.Context) error

Wait blocks the current goroutine if the current state is paused, until the pauser itself is resumed at least once.

If `ctx` is done, this method will also unblock immediately, and return the context error.

type SQLWithRetry

type SQLWithRetry struct {
	// either *sql.DB or *sql.Conn
	DB           utils.DBExecutor
	Logger       log.Logger
	HideQueryLog bool
}

SQLWithRetry constructs a retryable transaction.

func (SQLWithRetry) Exec

func (t SQLWithRetry) Exec(ctx context.Context, purpose string, query string, args ...interface{}) error

Exec executes a single SQL with optional retry.

func (SQLWithRetry) QueryRow

func (t SQLWithRetry) QueryRow(ctx context.Context, purpose string, query string, dest ...interface{}) error

QueryRow executes a query that is expected to return at most one row.

func (SQLWithRetry) QueryStringRows

func (t SQLWithRetry) QueryStringRows(ctx context.Context, purpose string, query string) ([][]string, error)

QueryStringRows executes a query that is expected to return multiple rows whose every column is string.

func (SQLWithRetry) Transact

func (t SQLWithRetry) Transact(ctx context.Context, purpose string, action func(context.Context, *sql.Tx) error) error

Transact executes an action in a transaction, and retry if the action failed with a retryable error.

type StorageSize

type StorageSize struct {
	Capacity  uint64
	Available uint64
}

StorageSize represents the storage's capacity and available size Learn from tidb-binlog source code.

func GetStorageSize

func GetStorageSize(dir string) (size StorageSize, err error)

GetStorageSize gets storage's capacity and available size

type TLS

type TLS struct {
	// contains filtered or unexported fields
}

TLS is a wrapper around a TLS configuration.

func NewTLS

func NewTLS(caPath, certPath, keyPath, host string, caBytes, certBytes, keyBytes []byte) (*TLS, error)

NewTLS constructs a new HTTP client with TLS configured with the CA, certificate and key paths.

func NewTLSFromMockServer

func NewTLSFromMockServer(server *httptest.Server) *TLS

NewTLSFromMockServer constructs a new TLS instance from the certificates of an *httptest.Server.

func (*TLS) GetJSON

func (tc *TLS) GetJSON(ctx context.Context, path string, v interface{}) error

GetJSON performs a GET request to the given path and unmarshals the response

func (*TLS) TLSConfig

func (tc *TLS) TLSConfig() *tls.Config

TLSConfig returns the underlying TLS configuration.

func (*TLS) ToGRPCDialOption

func (tc *TLS) ToGRPCDialOption() grpc.DialOption

ToGRPCDialOption constructs a gRPC dial option.

func (*TLS) ToPDSecurityOption

func (tc *TLS) ToPDSecurityOption() pd.SecurityOption

ToPDSecurityOption converts the TLS configuration to a PD security option.

func (*TLS) ToTiKVSecurityConfig

func (tc *TLS) ToTiKVSecurityConfig() config.Security

ToTiKVSecurityConfig converts the TLS configuration to a TiKV security config. TODO: TiKV does not support pass in content.

func (*TLS) WithHost

func (tc *TLS) WithHost(host string) *TLS

WithHost creates a new TLS instance with the host replaced.

func (*TLS) WrapListener

func (tc *TLS) WrapListener(l net.Listener) net.Listener

WrapListener places a TLS layer on top of the existing listener.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL