importer

package
v0.0.0-...-503c688 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 8, 2023 License: Apache-2.0 Imports: 60 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// LoadDataFormatDelimitedData delimited data.
	LoadDataFormatDelimitedData = "delimited data"
	// LoadDataFormatSQLDump represents the data source file of LOAD DATA is mydumper-format DML file.
	LoadDataFormatSQLDump = "sql file"
	// LoadDataFormatParquet represents the data source file of LOAD DATA is parquet.
	LoadDataFormatParquet = "parquet"

	// LogicalImportMode represents the import mode is SQL-like.
	LogicalImportMode = "logical"
	// PhysicalImportMode represents the import mode is KV-like.
	PhysicalImportMode = "physical"
)

Variables

View Source
var (
	MinDeliverBytes uint64 = 96 * units.KiB // 96 KB (data + index). batch at least this amount of bytes to reduce number of messages
	// see default for tikv-importer.max-kv-pairs
	MinDeliverRowCnt = 4096
)

constants, make it a variable for test

View Source
var GetKVStore func(path string, tls kvconfig.Security) (tidbkv.Storage, error)

GetKVStore returns a kv.Storage. kv encoder of physical mode needs it.

View Source
var (

	// LoadDataReadBlockSize is exposed for test.
	LoadDataReadBlockSize = int64(config.ReadBlockSize)
)
View Source
var TestSyncCh = make(chan struct{})

TestSyncCh is used in unit test to synchronize the execution of LOAD DATA.

Functions

func GetMsgFromBRError

func GetMsgFromBRError(err error) string

GetMsgFromBRError get msg from BR error. TODO: add GetMsg() to errors package to replace this function. see TestGetMsgFromBRError for more details.

func ProcessChunk

func ProcessChunk(
	ctx context.Context,
	chunk *checkpoints.ChunkCheckpoint,
	tableImporter *TableImporter,
	dataEngine,
	indexEngine *backend.OpenedEngine,
	logger *zap.Logger,
) (err error)

ProcessChunk processes a chunk, and write kv pairs to dataEngine and indexEngine.

Types

type FieldMapping

type FieldMapping struct {
	Column  *table.Column
	UserVar *ast.VariableExpr
}

FieldMapping indicates the relationship between input field and table column or user variable

type JobImportParam

type JobImportParam struct {
	Job      *asyncloaddata.Job
	Group    *errgroup.Group
	GroupCtx context.Context
	// should be closed in the end of the job.
	Done chan struct{}

	Progress *asyncloaddata.Progress
}

JobImportParam is the param of the job import.

type JobImporter

type JobImporter interface {
	// Param returns the param of the job import.
	Param() *JobImportParam
	// Import imports the job.
	// import should run in routines using param.Group, when import finished, it should close param.Done.
	// during import, we should use param.GroupCtx, so this method has no context param.
	Import()
	// Result returns the result of the job import.
	// todo: return a struct
	Result() string
	io.Closer
}

JobImporter is the interface for importing a job.

type LoadDataController

type LoadDataController struct {
	FileLocRef         ast.FileLocRefTp
	Path               string
	Format             string
	ColumnsAndUserVars []*ast.ColumnNameOrUserVar
	ColumnAssignments  []*ast.Assignment
	OnDuplicate        ast.OnDuplicateKeyHandlingType

	Table  table.Table
	DBName string
	DBID   int64

	// how input field(or input column) from data file is mapped, either to a column or variable.
	// if there's NO column list clause in load data statement, then it's table's columns
	// else it's user defined list.
	FieldMappings []*FieldMapping
	// see InsertValues.InsertColumns
	// todo: our behavior is different with mysql. such as for table t(a,b)
	// - "...(a,a) set a=100" is allowed in mysql, but not in tidb
	// - "...(a,b) set b=100" will set b=100 in mysql, but in tidb the set is ignored.
	// - ref columns in set clause is allowed in mysql, but not in tidb
	InsertColumns []*table.Column
	// Data interpretation is restrictive if the SQL mode is restrictive and neither
	// the IGNORE nor the LOCAL modifier is specified. Errors terminate the load
	// operation.
	// ref https://dev.mysql.com/doc/refman/8.0/en/load-data.html#load-data-column-assignments
	Restrictive bool

	// used for DELIMITED DATA format
	FieldNullDef         []string
	NullValueOptEnclosed bool
	plannercore.LineFieldsInfo
	IgnoreLines uint64

	// import options
	ImportMode string

	ThreadCnt int64
	BatchSize int64

	Detached bool

	// total data file size in bytes, only initialized when load from remote.
	TotalFileSize int64
	// user session context. DO NOT use it if load is in DETACHED mode.
	UserCtx sessionctx.Context
	// contains filtered or unexported fields
}

LoadDataController load data controller. todo: need a better name

func NewLoadDataController

func NewLoadDataController(userCtx sessionctx.Context, plan *Plan, tbl table.Table) (*LoadDataController, error)

NewLoadDataController create new controller.

func (*LoadDataController) CheckRequirements

func (e *LoadDataController) CheckRequirements(ctx context.Context, conn sqlexec.SQLExecutor) error

CheckRequirements checks the requirements for load data.

func (*LoadDataController) GenerateCSVConfig

func (e *LoadDataController) GenerateCSVConfig() *config.CSVConfig

GenerateCSVConfig generates a CSV config for parser from LoadDataWorker.

func (*LoadDataController) GetFieldCount

func (e *LoadDataController) GetFieldCount() int

GetFieldCount get field count.

func (*LoadDataController) GetLoadDataReaderInfos

func (e *LoadDataController) GetLoadDataReaderInfos() []LoadDataReaderInfo

GetLoadDataReaderInfos returns the LoadDataReaderInfo for each data file.

func (*LoadDataController) GetParser

func (e *LoadDataController) GetParser(
	ctx context.Context,
	dataFileInfo LoadDataReaderInfo,
) (parser mydump.Parser, err error)

GetParser returns a parser for the data file.

func (*LoadDataController) InitDataFiles

func (e *LoadDataController) InitDataFiles(ctx context.Context) error

InitDataFiles initializes the data store and load data files.

type LoadDataReaderInfo

type LoadDataReaderInfo struct {
	// Opener can be called at needed to get a io.ReadSeekCloser. It will only
	// be called once.
	Opener func(ctx context.Context) (io.ReadSeekCloser, error)
	// Remote is not nil only if load from cloud storage.
	Remote *mydump.SourceFileMeta
}

LoadDataReaderInfo provides information for a data reader of LOAD DATA.

type Plan

type Plan struct {
	TableName *ast.TableName
	TableInfo *model.TableInfo

	FileLocRef         ast.FileLocRefTp
	Path               string
	Format             string
	ColumnsAndUserVars []*ast.ColumnNameOrUserVar
	ColumnAssignments  []*ast.Assignment
	OnDuplicate        ast.OnDuplicateKeyHandlingType
	FieldsInfo         *ast.FieldsClause
	LinesInfo          *ast.LinesClause
	Restrictive        bool
	IgnoreLines        *uint64

	SQLMode          mysql.SQLMode
	Charset          *string
	ImportantSysVars map[string]string

	ImportMode        string
	DiskQuota         config.ByteSize
	Checksum          config.PostOpLevel
	AddIndex          bool
	Analyze           config.PostOpLevel
	ThreadCnt         int64
	BatchSize         int64
	MaxWriteSpeed     config.ByteSize
	SplitFile         bool
	MaxRecordedErrors int64
	Detached          bool

	DistSQLScanConcurrency int
}

Plan describes the plan of LOAD DATA.

func NewPlan

func NewPlan(userSctx sessionctx.Context, plan *plannercore.LoadData, tbl table.Table) (*Plan, error)

NewPlan creates a new load data plan.

type TableImporter

type TableImporter struct {
	*JobImportParam
	*LoadDataController
	// contains filtered or unexported fields
}

TableImporter is a table importer.

func NewTableImporter

func NewTableImporter(param *JobImportParam, e *LoadDataController) (ti *TableImporter, err error)

NewTableImporter creates a new table importer.

func (*TableImporter) Close

func (ti *TableImporter) Close() error

Close implements the io.Closer interface.

func (*TableImporter) Import

func (ti *TableImporter) Import()

Import implements JobImporter.Import.

func (*TableImporter) ImportAndCleanup

func (ti *TableImporter) ImportAndCleanup(ctx context.Context, closedEngine *backend.ClosedEngine) error

ImportAndCleanup imports the engine and cleanup the engine data.

func (*TableImporter) OpenDataEngine

func (ti *TableImporter) OpenDataEngine(ctx context.Context, engineID int32) (*backend.OpenedEngine, error)

OpenDataEngine opens a data engine.

func (*TableImporter) OpenIndexEngine

func (ti *TableImporter) OpenIndexEngine(ctx context.Context) (*backend.OpenedEngine, error)

OpenIndexEngine opens an index engine.

func (*TableImporter) Param

func (ti *TableImporter) Param() *JobImportParam

Param implements JobImporter.Param.

func (*TableImporter) PopulateChunks

func (ti *TableImporter) PopulateChunks(ctx context.Context) (map[int32]*checkpoints.EngineCheckpoint, error)

PopulateChunks populates chunks from table regions. in dist framework, this should be done in the tidb node which is responsible for splitting job into subtasks then table-importer handles data belongs to the subtask.

func (*TableImporter) Result

func (ti *TableImporter) Result() string

Result implements JobImporter.Result.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL