ingest

package
v7.0.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 30, 2023 License: Apache-2.0 Imports: 35 Imported by: 0

Documentation

Index

Constants

View Source
const (
	LitErrAllocMemFail      string = "[ddl-ingest] allocate memory failed"
	LitErrCreateDirFail     string = "[ddl-ingest] create ingest sort path error"
	LitErrStatDirFail       string = "[ddl-ingest] stat ingest sort path error"
	LitErrDeleteDirFail     string = "[ddl-ingest] delete ingest sort path error"
	LitErrCreateBackendFail string = "[ddl-ingest] build ingest backend failed"
	LitErrGetBackendFail    string = "[ddl-ingest] cannot get ingest backend"
	LitErrCreateEngineFail  string = "[ddl-ingest] build ingest engine failed"
	LitErrCreateContextFail string = "[ddl-ingest] build ingest writer context failed"
	LitErrGetEngineFail     string = "[ddl-ingest] can not get ingest engine info"
	LitErrGetStorageQuota   string = "[ddl-ingest] get storage quota error"
	LitErrCloseEngineErr    string = "[ddl-ingest] close engine error"
	LitErrCleanEngineErr    string = "[ddl-ingest] clean engine error"
	LitErrFlushEngineErr    string = "[ddl-ingest] flush engine data err"
	LitErrIngestDataErr     string = "[ddl-ingest] ingest data into storage error"
	LitErrRemoteDupExistErr string = "[ddl-ingest] remote duplicate index key exist"
	LitErrExceedConcurrency string = "[ddl-ingest] the concurrency is greater than ingest limit"
	LitErrUpdateDiskStats   string = "[ddl-ingest] update disk usage error"
	LitWarnEnvInitFail      string = "[ddl-ingest] initialize environment failed"
	LitWarnConfigError      string = "[ddl-ingest] build config for backend failed"
	LitInfoEnvInitSucc      string = "[ddl-ingest] init global ingest backend environment finished"
	LitInfoSortDir          string = "[ddl-ingest] the ingest sorted directory"
	LitInfoCreateBackend    string = "[ddl-ingest] create one backend for an DDL job"
	LitInfoCloseBackend     string = "[ddl-ingest] close one backend for DDL job"
	LitInfoOpenEngine       string = "[ddl-ingest] open an engine for index reorg task"
	LitInfoAddWriter        string = "[ddl-ingest] reuse engine and add a writer for index reorg task"
	LitInfoCreateWrite      string = "[ddl-ingest] create one local writer for index reorg task"
	LitInfoCloseEngine      string = "[ddl-ingest] flush all writer and get closed engine"
	LitInfoRemoteDupCheck   string = "[ddl-ingest] start remote duplicate checking"
	LitInfoStartImport      string = "[ddl-ingest] start to import data"
	LitInfoChgMemSetting    string = "[ddl-ingest] change memory setting for ingest"
	LitInfoInitMemSetting   string = "[ddl-ingest] initial memory setting for ingest"
	LitInfoUnsafeImport     string = "[ddl-ingest] do a partial import data into the storage"
	LitErrCloseWriterErr    string = "[ddl-ingest] close writer error"
)

Message const text

Variables

View Source
var (
	// LitBackCtxMgr is the entry for the lightning backfill process.
	LitBackCtxMgr backendCtxManager
	// LitMemRoot is used to track the memory usage of the lightning backfill process.
	LitMemRoot MemRoot
	// LitDiskRoot is used to track the disk usage of the lightning backfill process.
	LitDiskRoot DiskRoot
	// LitRLimit is the max open file number of the lightning backfill process.
	LitRLimit uint64
	// LitSortPath is the sort path for the lightning backfill process.
	LitSortPath string
	// LitInitialized is the flag indicates whether the lightning backfill process is initialized.
	LitInitialized bool
)
View Source
var (
	// StructSizeBackendCtx is the size of BackendContext.
	StructSizeBackendCtx int64
	// StructSizeEngineInfo is the size of EngineInfo.
	StructSizeEngineInfo int64
	// StructSizeWriterCtx is the size of WriterContext.
	StructSizeWriterCtx int64
)
View Source
var GenLightningDataDirForTest = genLightningDataDir

GenLightningDataDirForTest is only used for test.

View Source
var ImporterRangeConcurrencyForTest *atomic.Int32

ImporterRangeConcurrencyForTest is only used for test.

Functions

func InitGlobalLightningEnv

func InitGlobalLightningEnv()

InitGlobalLightningEnv initialize Lightning backfill environment.

func NewEngineInfo

func NewEngineInfo(ctx context.Context, jobID, indexID int64, cfg *backend.EngineConfig,
	en *backend.OpenedEngine, uuid uuid.UUID, wCnt int, memRoot MemRoot, diskRoot DiskRoot) *engineInfo

NewEngineInfo create a new EngineInfo struct.

func NewMemRootImpl

func NewMemRootImpl(maxQuota int64, bcCtxMgr *backendCtxManager) *memRootImpl

NewMemRootImpl creates a new memRootImpl.

Types

type BackendContext

type BackendContext struct {
	EngMgr engineManager
	// contains filtered or unexported fields
}

BackendContext store a backend info for add index reorg task.

func (*BackendContext) Done

func (bc *BackendContext) Done() bool

Done returns true if the lightning backfill is done.

func (*BackendContext) FinishImport

func (bc *BackendContext) FinishImport(indexID int64, unique bool, tbl table.Table) error

FinishImport imports all the key-values in engine into the storage, collects the duplicate errors if any, and removes the engine from the backend context.

func (*BackendContext) Flush

func (bc *BackendContext) Flush(indexID int64) error

Flush checks the disk quota and imports the current key-values in engine to the storage.

func (*BackendContext) SetDone

func (bc *BackendContext) SetDone()

SetDone sets the done flag.

type DiskRoot

type DiskRoot interface {
	CurrentUsage() uint64
	MaxQuota() uint64
	UpdateUsageAndQuota() error
}

DiskRoot is used to track the disk usage for the lightning backfill process.

func NewDiskRootImpl

func NewDiskRootImpl(path string, bcCtx *backendCtxManager) DiskRoot

NewDiskRootImpl creates a new DiskRoot.

type MemRoot

type MemRoot interface {
	Consume(size int64)
	Release(size int64)
	CheckConsume(size int64) bool
	ConsumeWithTag(tag string, size int64)
	ReleaseWithTag(tag string)

	SetMaxMemoryQuota(quota int64)
	MaxMemoryQuota() int64
	CurrentUsage() int64
	CurrentUsageWithTag(tag string) int64
	RefreshConsumption()
}

MemRoot is used to track the memory usage for the lightning backfill process.

type WriterContext

type WriterContext struct {
	// contains filtered or unexported fields
}

WriterContext is used to keep a lightning local writer for each backfill worker.

func (*WriterContext) WriteRow

func (wCtx *WriterContext) WriteRow(key, idxVal []byte) error

WriteRow Write one row into local writer buffer.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL