checkpoint

package
v0.0.0-...-503c688 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 8, 2023 License: Apache-2.0, Apache-2.0 Imports: 25 Imported by: 0

Documentation

Index

Constants

View Source
const (
	CheckpointBackupDir = CheckpointDir + "/backup"

	CheckpointDataDirForBackup     = CheckpointBackupDir + "/data"
	CheckpointChecksumDirForBackup = CheckpointBackupDir + "/checksum"
	CheckpointMetaPathForBackup    = CheckpointBackupDir + "/checkpoint.meta"
	CheckpointLockPathForBackup    = CheckpointBackupDir + "/checkpoint.lock"
)
View Source
const (
	CheckpointTaskInfoForLogRestorePathFormat = CheckpointDir + "/restore-%d/taskInfo.meta"
	CheckpointIngestIndexRepairSQLPathFormat  = CheckpointDir + "/restore-%s/ingest-repair.meta"
)
View Source
const (
	CheckpointRestoreDirFormat            = CheckpointDir + "/restore-%s"
	CheckpointDataDirForRestoreFormat     = CheckpointRestoreDirFormat + "/data"
	CheckpointChecksumDirForRestoreFormat = CheckpointRestoreDirFormat + "/checksum"
	CheckpointMetaPathForRestoreFormat    = CheckpointRestoreDirFormat + "/checkpoint.meta"
)
View Source
const CheckpointDir = "/checkpoints"
View Source
const MaxChecksumTotalCost float64 = 60.0

Variables

This section is empty.

Functions

func AppendForBackup

func AppendForBackup(
	ctx context.Context,
	r *CheckpointRunner[BackupKeyType, BackupValueType],
	groupKey BackupKeyType,
	startKey []byte,
	endKey []byte,
	files []*backuppb.File,
) error

func AppendRangeForLogRestore

func AppendRangeForLogRestore(
	ctx context.Context,
	r *CheckpointRunner[LogRestoreKeyType, LogRestoreValueType],
	groupKey LogRestoreKeyType,
	tableID int64,
	goff int,
	foff int,
) error

func AppendRangesForRestore

func AppendRangesForRestore(
	ctx context.Context,
	r *CheckpointRunner[RestoreKeyType, RestoreValueType],
	tableID RestoreKeyType,
	rangeKey string,
) error

func ExistsCheckpointIngestIndexRepairSQLs

func ExistsCheckpointIngestIndexRepairSQLs(
	ctx context.Context,
	s storage.ExternalStorage,
	taskName string,
) (bool, error)

func ExistsCheckpointTaskInfo

func ExistsCheckpointTaskInfo(
	ctx context.Context,
	s storage.ExternalStorage,
	clusterID uint64,
) (bool, error)

func ExistsRestoreCheckpoint

func ExistsRestoreCheckpoint(
	ctx context.Context,
	s storage.ExternalStorage,
	taskName string,
) (bool, error)

func LoadCheckpointChecksumForRestore

func LoadCheckpointChecksumForRestore(
	ctx context.Context,
	s storage.ExternalStorage,
	taskName string,
) (map[int64]*ChecksumItem, time.Duration, error)

func RemoveCheckpointDataForBackup

func RemoveCheckpointDataForBackup(ctx context.Context, s storage.ExternalStorage) error

func RemoveCheckpointDataForLogRestore

func RemoveCheckpointDataForLogRestore(
	ctx context.Context,
	s storage.ExternalStorage,
	taskName string,
	clusterID uint64,
) error

func RemoveCheckpointDataForRestore

func RemoveCheckpointDataForRestore(ctx context.Context, s storage.ExternalStorage, taskName string) error

func SaveCheckpointIngestIndexRepairSQLs

func SaveCheckpointIngestIndexRepairSQLs(
	ctx context.Context,
	s storage.ExternalStorage,
	meta *CheckpointIngestIndexRepairSQLs,
	taskName string,
) error

func SaveCheckpointMetadata

func SaveCheckpointMetadata(ctx context.Context, s storage.ExternalStorage, meta *CheckpointMetadataForBackup) error

save the checkpoint metadata into the external storage

func SaveCheckpointMetadataForRestore

func SaveCheckpointMetadataForRestore(
	ctx context.Context,
	s storage.ExternalStorage,
	meta *CheckpointMetadataForRestore,
	taskName string,
) error

func SaveCheckpointTaskInfoForLogRestore

func SaveCheckpointTaskInfoForLogRestore(
	ctx context.Context,
	s storage.ExternalStorage,
	meta *CheckpointTaskInfoForLogRestore,
	clusterID uint64,
) error

func WalkCheckpointFileForBackup

func WalkCheckpointFileForBackup(
	ctx context.Context,
	s storage.ExternalStorage,
	cipher *backuppb.CipherInfo,
	fn func(BackupKeyType, BackupValueType),
) (time.Duration, error)

walk the whole checkpoint range files and retrieve the metadata of backed up ranges and return the total time cost in the past executions

func WalkCheckpointFileForRestore

func WalkCheckpointFileForRestore[K KeyType, V ValueType](
	ctx context.Context,
	s storage.ExternalStorage,
	cipher *backuppb.CipherInfo,
	taskName string,
	fn func(K, V),
) (time.Duration, error)

walk the whole checkpoint range files and retrieve the metadata of restored ranges and return the total time cost in the past executions

Types

type BackupKeyType

type BackupKeyType = string

type BackupValueType

type BackupValueType = RangeType

type CheckpointData

type CheckpointData struct {
	DureTime        time.Duration     `json:"dure-time"`
	RangeGroupMetas []*RangeGroupData `json:"range-group-metas"`
}

type CheckpointIngestIndexRepairSQL

type CheckpointIngestIndexRepairSQL struct {
	IndexID    int64         `json:"index-id"`
	SchemaName model.CIStr   `json:"schema-name"`
	TableName  model.CIStr   `json:"table-name"`
	IndexName  string        `json:"index-name"`
	AddSQL     string        `json:"add-sql"`
	AddArgs    []interface{} `json:"add-args"`
}

type CheckpointIngestIndexRepairSQLs

type CheckpointIngestIndexRepairSQLs struct {
	SQLs []CheckpointIngestIndexRepairSQL
}

func LoadCheckpointIngestIndexRepairSQLs

func LoadCheckpointIngestIndexRepairSQLs(
	ctx context.Context,
	s storage.ExternalStorage,
	taskName string,
) (*CheckpointIngestIndexRepairSQLs, error)

type CheckpointLock

type CheckpointLock struct {
	LockId   uint64 `json:"lock-id"`
	ExpireAt int64  `json:"expire-at"`
}

type CheckpointMessage

type CheckpointMessage[K KeyType, V ValueType] struct {
	// start-key of the origin range
	GroupKey K

	Group []V
}

type CheckpointMetadataForBackup

type CheckpointMetadataForBackup struct {
	GCServiceId string        `json:"gc-service-id"`
	ConfigHash  []byte        `json:"config-hash"`
	BackupTS    uint64        `json:"backup-ts"`
	Ranges      []rtree.Range `json:"ranges"`

	CheckpointChecksum map[int64]*ChecksumItem    `json:"-"`
	CheckpointDataMap  map[string]rtree.RangeTree `json:"-"`
}

func LoadCheckpointMetadata

func LoadCheckpointMetadata(ctx context.Context, s storage.ExternalStorage) (*CheckpointMetadataForBackup, error)

load checkpoint metadata from the external storage

type CheckpointMetadataForRestore

type CheckpointMetadataForRestore struct {
	SchedulersConfig *pdutil.ClusterConfig `json:"schedulers-config,omitempty"`
	GcRatio          string                `json:"gc-ratio,omitempty"`
}

func LoadCheckpointMetadataForRestore

func LoadCheckpointMetadataForRestore(
	ctx context.Context,
	s storage.ExternalStorage,
	taskName string,
) (*CheckpointMetadataForRestore, error)

type CheckpointRunner

type CheckpointRunner[K KeyType, V ValueType] struct {
	// contains filtered or unexported fields
}

func StartCheckpointBackupRunnerForTest

func StartCheckpointBackupRunnerForTest(
	ctx context.Context,
	storage storage.ExternalStorage,
	cipher *backuppb.CipherInfo,
	tick time.Duration,
	timer GlobalTimer,
) (*CheckpointRunner[BackupKeyType, BackupValueType], error)

only for test

func StartCheckpointLogRestoreRunnerForTest

func StartCheckpointLogRestoreRunnerForTest(
	ctx context.Context,
	storage storage.ExternalStorage,
	cipher *backuppb.CipherInfo,
	tick time.Duration,
	taskName string,
) (*CheckpointRunner[LogRestoreKeyType, LogRestoreValueType], error)

only for test

func StartCheckpointRestoreRunnerForTest

func StartCheckpointRestoreRunnerForTest(
	ctx context.Context,
	storage storage.ExternalStorage,
	cipher *backuppb.CipherInfo,
	tick time.Duration,
	taskName string,
) (*CheckpointRunner[RestoreKeyType, RestoreValueType], error)

only for test

func StartCheckpointRunnerForRestore

func StartCheckpointRunnerForRestore(
	ctx context.Context,
	storage storage.ExternalStorage,
	cipher *backuppb.CipherInfo,
	taskName string,
) (*CheckpointRunner[RestoreKeyType, RestoreValueType], error)

func (*CheckpointRunner[K, V]) Append

func (r *CheckpointRunner[K, V]) Append(
	ctx context.Context,
	message *CheckpointMessage[K, V],
) error

func (*CheckpointRunner[K, V]) FlushChecksum

func (r *CheckpointRunner[K, V]) FlushChecksum(
	ctx context.Context,
	tableID int64,
	crc64xor uint64,
	totalKvs uint64,
	totalBytes uint64,
) error

func (*CheckpointRunner[K, V]) FlushChecksumItem

func (r *CheckpointRunner[K, V]) FlushChecksumItem(
	ctx context.Context,
	checksumItem *ChecksumItem,
) error

func (*CheckpointRunner[K, V]) WaitForFinish

func (r *CheckpointRunner[K, V]) WaitForFinish(ctx context.Context, flush bool)

Note: Cannot be parallel with `Append` function

type CheckpointTaskInfoForLogRestore

type CheckpointTaskInfoForLogRestore struct {
	// the progress for this task
	Progress RestoreProgress `json:"progress"`
	// a task marker to distinguish the different tasks
	StartTS   uint64 `json:"start-ts"`
	RestoreTS uint64 `json:"restore-ts"`
	// updated in the progress of `InLogRestoreAndIdMapPersist`
	RewriteTS uint64 `json:"rewrite-ts"`
	// tiflash recorder items with snapshot restore records
	TiFlashItems map[int64]model.TiFlashReplicaInfo `json:"tiflash-recorder,omitempty"`
}

CheckpointTaskInfo is unique information within the same cluster id. It represents the last restore task executed for this cluster.

func LoadCheckpointTaskInfoForLogRestore

func LoadCheckpointTaskInfoForLogRestore(
	ctx context.Context,
	s storage.ExternalStorage,
	clusterID uint64,
) (*CheckpointTaskInfoForLogRestore, error)

type ChecksumInfo

type ChecksumInfo struct {
	Content  []byte        `json:"content"`
	Checksum []byte        `json:"checksum"`
	DureTime time.Duration `json:"dure-time"`
}

type ChecksumItem

type ChecksumItem struct {
	TableID    int64  `json:"table-id"`
	Crc64xor   uint64 `json:"crc64-xor"`
	TotalKvs   uint64 `json:"total-kvs"`
	TotalBytes uint64 `json:"total-bytes"`
}

type ChecksumItems

type ChecksumItems struct {
	Items []*ChecksumItem `json:"checksum-items"`
}

type GlobalTimer

type GlobalTimer interface {
	GetTS(context.Context) (int64, int64, error)
}

type KeyType

type KeyType interface {
	~BackupKeyType | ~RestoreKeyType
}

type LogRestoreKeyType

type LogRestoreKeyType = string

type LogRestoreValueMarshaled

type LogRestoreValueMarshaled struct {
	// group index in the metadata
	Goff int `json:"goff"`
	// downstream table id -> file indexes in the group
	Foffs map[int64][]int `json:"foffs"`
}

func (LogRestoreValueMarshaled) IdentKey

func (l LogRestoreValueMarshaled) IdentKey() []byte

type LogRestoreValueType

type LogRestoreValueType struct {
	// downstream table id
	TableID int64
	// group index in the metadata
	Goff int
	// file index in the group
	Foff int
}

func (LogRestoreValueType) IdentKey

func (l LogRestoreValueType) IdentKey() []byte

type RangeGroup

type RangeGroup[K KeyType, V ValueType] struct {
	GroupKey K   `json:"group-key"`
	Group    []V `json:"groups"`
}

type RangeGroupData

type RangeGroupData struct {
	RangeGroupsEncriptedData []byte
	Checksum                 []byte
	CipherIv                 []byte

	Size int
}

type RangeType

type RangeType struct {
	*rtree.Range
}

func (RangeType) IdentKey

func (r RangeType) IdentKey() []byte

type RestoreKeyType

type RestoreKeyType = int64

type RestoreProgress

type RestoreProgress int

A progress type for snapshot + log restore.

Before the id-maps is persist into external storage, the snapshot restore and id-maps constructure can be repeated. So if the progress is in `InSnapshotRestore`, it can retry from snapshot restore.

After the id-maps is persist into external storage, there are some meta-kvs has been restored into the cluster, such as `rename ddl`. Where would be a situation:

the first execution:

table A created in snapshot restore is renamed to table B in log restore
     table A (id 80)       -------------->        table B (id 80)
  ( snapshot restore )                            ( log restore )

the second execution if don't skip snasphot restore:

table A is created again in snapshot restore, because there is no table named A
     table A (id 81)       -------------->   [not in id-maps, so ignored]
  ( snapshot restore )                            ( log restore )

Finally, there is a duplicated table A in the cluster. Therefore, need to skip snapshot restore when the progress is `InLogRestoreAndIdMapPersist`.

const (
	InSnapshotRestore RestoreProgress = iota
	// Only when the id-maps is persist, status turns into it.
	InLogRestoreAndIdMapPersist
)

type RestoreValueType

type RestoreValueType struct {
	// the file key of a range
	RangeKey string
}

func (RestoreValueType) IdentKey

func (rv RestoreValueType) IdentKey() []byte

type TimeTicker

type TimeTicker interface {
	Ch() <-chan time.Time
	Stop()
}

type ValueType

type ValueType interface {
	IdentKey() []byte
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL