asyncloaddata

package
v0.0.0-...-503c688 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 8, 2023 License: Apache-2.0 Imports: 18 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// TestSyncCh is used in unit test to synchronize the execution of LOAD DATA.
	TestSyncCh = make(chan struct{})
	// TestLastLoadDataJobID last created job id, used in unit test.
	TestLastLoadDataJobID atomic.Int64
)

vars used for test.

View Source
var (
	// HeartBeatInSec is the interval of heartbeat.
	HeartBeatInSec = 5
	// OfflineThresholdInSec means after failing to update heartbeat for 3 times,
	// we treat the worker of the job as offline.
	OfflineThresholdInSec = HeartBeatInSec * 3
)

Functions

func UpdateJobExpectedStatus

func UpdateJobExpectedStatus(
	ctx context.Context,
	conn sqlexec.SQLExecutor,
	jobID int64,
	status JobExpectedStatus,
) error

UpdateJobExpectedStatus updates the expected status of a load data job. TODO: remove it?

Types

type Job

type Job struct {
	ID int64
	// Job don't manage the life cycle of the connection.
	Conn sqlexec.SQLExecutor
	User string
}

Job import job.

func CreateLoadDataJob

func CreateLoadDataJob(
	ctx context.Context,
	conn sqlexec.SQLExecutor,
	dataSource, db, table string,
	importMode string,
	user string,
) (*Job, error)

CreateLoadDataJob creates a load data job by insert a record to system table. The AUTO_INCREMENT value will be returned as jobID.

func NewJob

func NewJob(ID int64, conn sqlexec.SQLExecutor, user string) *Job

NewJob returns new Job.

func (*Job) CancelJob

func (j *Job) CancelJob(ctx context.Context) (err error)

CancelJob cancels a load data job. Only a running/paused job can be canceled.

func (*Job) DropJob

func (j *Job) DropJob(ctx context.Context) error

DropJob drops a load data job.

func (*Job) FailJob

func (j *Job) FailJob(ctx context.Context, result string) error

FailJob fails a load data job. A job can only be failed once.

func (*Job) FinishJob

func (j *Job) FinishJob(ctx context.Context, result string) error

FinishJob finishes a load data job. A job can only be finished once.

func (*Job) GetJobInfo

func (j *Job) GetJobInfo(ctx context.Context) (*JobInfo, error)

GetJobInfo gets all needed information of a load data job.

func (*Job) GetJobStatus

func (j *Job) GetJobStatus(ctx context.Context) (JobStatus, string, error)

GetJobStatus gets the status of a load data job. The returned error means something wrong when querying the database. Other business logic errors are returned as JobFailed with message.

func (*Job) OnComplete

func (j *Job) OnComplete(inErr error, msg string)

OnComplete is called when a job is finished or failed.

func (*Job) ProgressUpdateRoutineFn

func (j *Job) ProgressUpdateRoutineFn(ctx context.Context, finishCh chan struct{}, errCh <-chan struct{}, progress *Progress) error

ProgressUpdateRoutineFn job progress update routine.

func (*Job) StartJob

func (j *Job) StartJob(ctx context.Context) error

StartJob tries to start a not-yet-started job with jobID. It will not return error when there's no matched job.

func (*Job) UpdateJobProgress

func (j *Job) UpdateJobProgress(ctx context.Context, progress string) (bool, error)

UpdateJobProgress updates the progress of a load data job. It should be called periodically as heartbeat after StartJob. The returned bool indicates whether the keepalive is succeeded. If not, the caller should call FailJob soon. TODO: Currently if the node is crashed after CreateLoadDataJob and before StartJob, it will always be in the status of pending. Maybe we should unify CreateLoadDataJob and StartJob.

type JobExpectedStatus

type JobExpectedStatus int

JobExpectedStatus is the expected status of a load data job. User can set the expected status of a job and worker will respect it.

const (
	// JobExpectedRunning means the job is expected to be running.
	JobExpectedRunning JobExpectedStatus = iota
	// JobExpectedPaused means the job is expected to be paused.
	JobExpectedPaused
	// JobExpectedCanceled means the job is expected to be canceled.
	JobExpectedCanceled
)

type JobInfo

type JobInfo struct {
	JobID         int64
	User          string
	DataSource    string
	TableSchema   string
	TableName     string
	ImportMode    string
	Progress      string
	Status        JobStatus
	StatusMessage string
	CreateTime    types.Time
	StartTime     types.Time
	EndTime       types.Time
}

JobInfo is the information of a load data job.

func GetAllJobInfo

func GetAllJobInfo(
	ctx context.Context,
	conn sqlexec.SQLExecutor,
	user string,
) ([]*JobInfo, error)

GetAllJobInfo gets all jobs status of a user.

type JobStatus

type JobStatus int

JobStatus represents the status of a load data job.

const (
	// JobFailed means the job is failed and can't be resumed.
	JobFailed JobStatus = iota
	// JobCanceled means the job is canceled by user and can't be resumed. It
	// will finally convert to JobFailed with a message indicating the reason
	// is canceled.
	JobCanceled
	// JobPaused means the job is paused by user and can be resumed.
	JobPaused
	// JobFinished means the job is finished.
	JobFinished
	// JobPending means the job is pending to be started.
	JobPending
	// JobRunning means the job is running.
	JobRunning
)

func (JobStatus) String

func (s JobStatus) String() string

type LogicalImportProgress

type LogicalImportProgress struct {
	// LoadedFileSize is the size of the data that's loaded in bytes. It's
	// larger than the actual loaded data size, but due to the fact that reading
	// is once-a-block and a block may generate multiple tasks that are
	// concurrently executed, we can't know the actual loaded data size easily.
	LoadedFileSize atomic.Int64
}

LogicalImportProgress is the progress info of the logical import mode.

type PhysicalImportProgress

type PhysicalImportProgress struct {
	// ReadRowCnt is the number of rows read from data files.
	// Lines ignored by IGNORE N LINES clause is not included.
	ReadRowCnt atomic.Uint64
	// EncodeFileSize is the size of the file that has finished KV encoding in bytes.
	// it should equal to SourceFileSize eventually.
	EncodeFileSize atomic.Int64
}

PhysicalImportProgress is the progress info of the physical import mode.

type Progress

type Progress struct {
	// SourceFileSize is the size of the source file in bytes. When we can't get
	// the size of the source file, it will be set to -1.
	// Currently, the value is read by seek(0, end), when LOAD DATA LOCAL we wrap
	// SimpleSeekerOnReadCloser on MySQL client connection which doesn't support
	// it.
	SourceFileSize          int64
	*LogicalImportProgress  `json:",inline"`
	*PhysicalImportProgress `json:",inline"`
	// LoadedRowCnt is the number of rows that has been loaded.
	// for physical mode, it's the number of rows that has been imported into TiKV.
	// in SHOW LOAD JOB we call it Imported_Rows, to make it compatible with 7.0,
	// the variable name is not changed.
	LoadedRowCnt atomic.Uint64
}

Progress is the progress of the LOAD DATA task.

func NewProgress

func NewProgress(logicalImport bool) *Progress

NewProgress creates a new Progress. todo: better pass import mode, but it causes import cycle.

func ProgressFromJSON

func ProgressFromJSON(bs []byte) (*Progress, error)

ProgressFromJSON creates Progress from a JSON string.

func (*Progress) String

func (p *Progress) String() string

String implements the fmt.Stringer interface.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL