streamhelper

package
v7.0.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 30, 2023 License: Apache-2.0, Apache-2.0 Imports: 44 Imported by: 0

Documentation

Overview

Copyright 2021 PingCAP, Inc. Licensed under Apache-2.0.

Copyright 2021 PingCAP, Inc. Licensed under Apache-2.0.

Copyright 2021 PingCAP, Inc. Licensed under Apache-2.0.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func CheckPointsOf

func CheckPointsOf(task string) string

CheckpointOf returns the checkpoint prefix of some store. Normally it would be <prefix>/checkpoint/<task-name>/.

func CheckRegionConsistency

func CheckRegionConsistency(startKey, endKey []byte, regions []RegionWithLeader) error

func GlobalCheckpointOf

func GlobalCheckpointOf(task string) string

GlobalCheckpointOf returns the path to the "global" checkpoint of some task. Normally it would be <prefix>/checkpoint/<task-name>/central_globa.

func LastErrorPrefixOf

func LastErrorPrefixOf(task string) string

LastErrorPrefixOf make the prefix for searching last error by some task.

func NewClusterCollector

func NewClusterCollector(ctx context.Context, srv LogBackupService) *clusterCollector

NewClusterCollector creates a new cluster collector. collectors are the structure transform region information to checkpoint information, by requesting the checkpoint of regions in the store.

func OwnerManagerForLogBackup

func OwnerManagerForLogBackup(ctx context.Context, etcdCli *clientv3.Client) owner.Manager

func Pause

func Pause(task string) string

Pause returns the path for pausing the task. Normally it would be <prefix>/pause/<task-name>.

func PrefixOfTask

func PrefixOfTask() string

PrefixOfTask is the prefix of all task It would be `<prefix>/info/`

func RangeKeyOf

func RangeKeyOf(name string, startKey []byte) string

RangeKeyOf returns the path for ranges of some task. Normally it would be <prefix>/ranges/<task-name(string)>/<start-key(binary)> -> <end-key(binary)>

func RangesOf

func RangesOf(name string) string

RangesOf returns the path prefix for some task. Normally it would be <prefix>/ranges/<task-name(string)>/ the trailling slash is essential or we may scan ranges of tasks with same prefix.

func StorageCheckpointOf

func StorageCheckpointOf(task string) string

StorageCheckpointOf get the prefix path of the `storage checkpoint status` of a task. Normally it would be <prefix>/storage-checkpoint/<task>.

func TaskOf

func TaskOf(name string) string

TaskOf returns the path of tasks. Normally it would be <prefix>/info/<task-name(string)> -> <task(protobuf)>

Types

type AdvancerExt

type AdvancerExt struct {
	MetaDataClient
}

func (AdvancerExt) Begin

func (t AdvancerExt) Begin(ctx context.Context, ch chan<- TaskEvent) error

func (AdvancerExt) ClearV3GlobalCheckpointForTask

func (t AdvancerExt) ClearV3GlobalCheckpointForTask(ctx context.Context, taskName string) error

func (AdvancerExt) GetGlobalCheckpointForTask

func (t AdvancerExt) GetGlobalCheckpointForTask(ctx context.Context, taskName string) (uint64, error)

func (AdvancerExt) UploadV3GlobalCheckpointForTask

func (t AdvancerExt) UploadV3GlobalCheckpointForTask(ctx context.Context, taskName string, checkpoint uint64) error

type Checkpoint

type Checkpoint struct {
	ID      uint64 `json:"id,omitempty"`
	Version uint64 `json:"epoch_version,omitempty"`
	TS      uint64 `json:"ts"`

	IsGlobal bool `json:"-"`
}

Checkpoint is the polymorphic checkpoint type. The `ID` and `Version` implies the type of this checkpoint: When ID == 0 and Version == 0, it is the task start ts. When ID != 0 and Version == 0, it is the store level checkpoint. When ID != 0 and Version != 0, it is the region level checkpoint.

func ParseCheckpoint

func ParseCheckpoint(task string, key, value []byte) (Checkpoint, error)

ParseCheckpoint parses the checkpoint from a key & value pair.

func (Checkpoint) Type

func (cp Checkpoint) Type() CheckpointType

Type returns the type(provider) of the checkpoint.

type CheckpointAdvancer

type CheckpointAdvancer struct {
	// contains filtered or unexported fields
}

CheckpointAdvancer is the central node for advancing the checkpoint of log backup. It's a part of "checkpoint v3". Generally, it scan the regions in the task range, collect checkpoints from tikvs.

                                        ┌──────┐
                                  ┌────►│ TiKV │
                                  │     └──────┘
                                  │
                                  │
┌──────────┐GetLastFlushTSOfRegion│     ┌──────┐
│ Advancer ├──────────────────────┼────►│ TiKV │
└────┬─────┘                      │     └──────┘
     │                            │
     │                            │
     │                            │     ┌──────┐
     │                            └────►│ TiKV │
     │                                  └──────┘
     │
     │ UploadCheckpointV3   ┌──────────────────┐
     └─────────────────────►│  PD              │
                            └──────────────────┘

func NewCheckpointAdvancer

func NewCheckpointAdvancer(env Env) *CheckpointAdvancer

NewCheckpointAdvancer creates a checkpoint advancer with the env.

func (*CheckpointAdvancer) CalculateGlobalCheckpointLight

func (c *CheckpointAdvancer) CalculateGlobalCheckpointLight(ctx context.Context, threshold time.Duration) (uint64, error)

func (*CheckpointAdvancer) Config

func (c *CheckpointAdvancer) Config() config.Config

Config returns the current config.

func (*CheckpointAdvancer) GetCheckpointInRange

func (c *CheckpointAdvancer) GetCheckpointInRange(ctx context.Context, start, end []byte, collector *clusterCollector) error

GetCheckpointInRange scans the regions in the range, collect them to the collector.

func (*CheckpointAdvancer) Name

func (c *CheckpointAdvancer) Name() string

Name implements daemon.Interface.

func (*CheckpointAdvancer) OnStart

func (c *CheckpointAdvancer) OnStart(ctx context.Context)

OnStart implements daemon.Interface.

func (*CheckpointAdvancer) OnTick

func (c *CheckpointAdvancer) OnTick(ctx context.Context) (err error)

OnTick advances the inner logic clock for the advancer. It's synchronous: this would only return after the events triggered by the clock has all been done. It's generally panic-free, you may not need to trying recover a panic here.

func (*CheckpointAdvancer) StartTaskListener

func (c *CheckpointAdvancer) StartTaskListener(ctx context.Context)

StartTaskListener starts the task listener for the advancer. When no task detected, advancer would do nothing, please call this before begin the tick loop.

func (*CheckpointAdvancer) UpdateConfig

func (c *CheckpointAdvancer) UpdateConfig(newConf config.Config)

UpdateConfig updates the config for the advancer. Note this should be called before starting the loop, because there isn't locks, TODO: support updating config when advancer starts working. (Maybe by applying changes at begin of ticking, and add locks.)

func (*CheckpointAdvancer) UpdateConfigWith

func (c *CheckpointAdvancer) UpdateConfigWith(f func(*config.Config))

UpdateConfigWith updates the config by modifying the current config.

type CheckpointType

type CheckpointType int
const (
	CheckpointTypeStore CheckpointType = iota
	CheckpointTypeRegion
	CheckpointTypeTask
	CheckpointTypeGlobal
	CheckpointTypeInvalid
)

type Env

type Env interface {
	// The region scanner provides the region information.
	TiKVClusterMeta
	// LogBackupService connects to the TiKV, so we can collect the region checkpoints.
	LogBackupService
	// StreamMeta connects to the metadata service (normally PD).
	StreamMeta
}

Env is the interface required by the advancer.

func CliEnv

func CliEnv(cli *utils.StoreManager, etcdCli *clientv3.Client) Env

CliEnv creates the Env for CLI usage.

func TiDBEnv

func TiDBEnv(pdCli pd.Client, etcdCli *clientv3.Client, conf *config.Config) (Env, error)

TiDBEnv creates the Env by TiDB config.

type EventType

type EventType int
const (
	EventAdd EventType = iota
	EventDel
	EventErr
)

func (EventType) String

func (t EventType) String() string

type FlushSubscriber

type FlushSubscriber struct {
	// contains filtered or unexported fields
}

FlushSubscriber maintains the state of subscribing to the cluster.

func NewSubscriber

func NewSubscriber(dialer LogBackupService, cluster TiKVClusterMeta, config ...SubscriberConfig) *FlushSubscriber

NewSubscriber creates a new subscriber via the environment and optional configs.

func (*FlushSubscriber) Clear

func (f *FlushSubscriber) Clear()

Clear clears all the subscriptions.

func (*FlushSubscriber) Drop

func (f *FlushSubscriber) Drop()

Drop terminates the lifetime of the subscriber. This subscriber would be no more usable.

func (*FlushSubscriber) Events

func (f *FlushSubscriber) Events() <-chan spans.Valued

Events returns the output channel of the events.

func (*FlushSubscriber) HandleErrors

func (f *FlushSubscriber) HandleErrors(ctx context.Context)

HandleErrors execute the handlers over all pending errors. Note that the handler may cannot handle the pending errors, at that time, you can fetch the errors via `PendingErrors` call.

func (*FlushSubscriber) PendingErrors

func (f *FlushSubscriber) PendingErrors() error

func (*FlushSubscriber) UpdateStoreTopology

func (f *FlushSubscriber) UpdateStoreTopology(ctx context.Context) error

UpdateStoreTopology fetches the current store topology and try to adapt the subscription state with it.

type LogBackupService

type LogBackupService interface {
	// GetLogBackupClient gets the log backup client.
	GetLogBackupClient(ctx context.Context, storeID uint64) (logbackup.LogBackupClient, error)
}

type MetaDataClient

type MetaDataClient struct {
	*clientv3.Client
}

MetaDataClient is the client for operations over metadata.

func NewMetaDataClient

func NewMetaDataClient(c *clientv3.Client) *MetaDataClient

func (*MetaDataClient) CleanLastErrorOfTask

func (c *MetaDataClient) CleanLastErrorOfTask(ctx context.Context, taskName string) error

func (*MetaDataClient) DeleteTask

func (c *MetaDataClient) DeleteTask(ctx context.Context, taskName string) error

DeleteTask deletes a task, along with its metadata.

func (*MetaDataClient) GetAllTasks

func (c *MetaDataClient) GetAllTasks(ctx context.Context) ([]Task, error)

GetAllTasks get all of tasks from metadata storage.

func (*MetaDataClient) GetAllTasksWithRevision

func (c *MetaDataClient) GetAllTasksWithRevision(ctx context.Context) ([]Task, int64, error)

func (*MetaDataClient) GetTask

func (c *MetaDataClient) GetTask(ctx context.Context, taskName string) (*Task, error)

GetTask get the basic task handle from the metadata storage.

func (*MetaDataClient) GetTaskCount

func (c *MetaDataClient) GetTaskCount(ctx context.Context) (int, error)

GetTaskCount get the count of tasks from metadata storage.

func (*MetaDataClient) GetTaskWithPauseStatus

func (c *MetaDataClient) GetTaskWithPauseStatus(ctx context.Context, taskName string) (*Task, bool, error)

func (*MetaDataClient) PauseTask

func (c *MetaDataClient) PauseTask(ctx context.Context, taskName string) error

func (*MetaDataClient) PutTask

func (c *MetaDataClient) PutTask(ctx context.Context, task TaskInfo) error

PutTask put a task to the metadata storage.

func (*MetaDataClient) ResumeTask

func (c *MetaDataClient) ResumeTask(ctx context.Context, taskName string) error

func (*MetaDataClient) TaskByInfo

type PDRegionScanner

type PDRegionScanner struct {
	pd.Client
}

PDRegionScanner is a simple wrapper over PD to adapt the requirement of `RegionScan`.

func (PDRegionScanner) RegionScan

func (c PDRegionScanner) RegionScan(ctx context.Context, key []byte, endKey []byte, limit int) ([]RegionWithLeader, error)

RegionScan gets a list of regions, starts from the region that contains key. Limit limits the maximum number of regions returned.

func (PDRegionScanner) Stores

func (c PDRegionScanner) Stores(ctx context.Context) ([]Store, error)

type Range

type Range = kv.KeyRange

type Ranges

type Ranges = []Range

Ranges is a vector of [start_key, end_key) pairs.

type RegionIter

type RegionIter struct {

	// The max slice size returned by `Next`.
	// This can be changed before calling `Next` each time,
	// however no thread safety provided.
	PageSize int
	// contains filtered or unexported fields
}

func IterateRegion

func IterateRegion(cli TiKVClusterMeta, startKey, endKey []byte) *RegionIter

IterateRegion creates an iterater over the region range.

func (*RegionIter) Done

func (r *RegionIter) Done() bool

Done checks whether the iteration is done.

func (*RegionIter) Next

func (r *RegionIter) Next(ctx context.Context) ([]RegionWithLeader, error)

Next get the next page of regions.

func (*RegionIter) String

func (r *RegionIter) String() string

type RegionWithLeader

type RegionWithLeader struct {
	Region *metapb.Region
	Leader *metapb.Peer
}

type Store

type Store struct {
	ID     uint64
	BootAt uint64
}

type StoreCheckpoints

type StoreCheckpoints struct {
	HasCheckpoint    bool
	Checkpoint       uint64
	FailureSubRanges []kv.KeyRange
}

func (*StoreCheckpoints) String

func (s *StoreCheckpoints) String() string

type StreamMeta

type StreamMeta interface {
	// Begin begins listen the task event change.
	Begin(ctx context.Context, ch chan<- TaskEvent) error
	// UploadV3GlobalCheckpointForTask uploads the global checkpoint to the meta store.
	UploadV3GlobalCheckpointForTask(ctx context.Context, taskName string, checkpoint uint64) error
	// ClearV3GlobalCheckpointForTask clears the global checkpoint to the meta store.
	ClearV3GlobalCheckpointForTask(ctx context.Context, taskName string) error
}

StreamMeta connects to the metadata service (normally PD). It provides the global checkpoint information.

type SubscriberConfig

type SubscriberConfig func(*FlushSubscriber)

SubscriberConfig is a config which cloud be applied into the subscriber.

func WithMasterContext

func WithMasterContext(ctx context.Context) SubscriberConfig

WithMasterContext sets the "master context" for the subscriber, that context would be the "background" context for every subtasks created by the subscription manager.

type Task

type Task struct {
	Info backuppb.StreamBackupTaskInfo
	// contains filtered or unexported fields
}

Task presents a remote "task" object. returned by a query of task. Associated to the client created it, hence be able to fetch remote fields like `ranges`.

func NewTask

func NewTask(client *MetaDataClient, info backuppb.StreamBackupTaskInfo) *Task

func (*Task) GetGlobalCheckPointTS

func (t *Task) GetGlobalCheckPointTS(ctx context.Context) (uint64, error)

GetGlobalCheckPointTS gets the global checkpoint timestamp according to log task.

func (*Task) GetStorageCheckpoint

func (t *Task) GetStorageCheckpoint(ctx context.Context) (uint64, error)

func (*Task) IsPaused

func (t *Task) IsPaused(ctx context.Context) (bool, error)

func (*Task) LastError

func (t *Task) LastError(ctx context.Context) (map[uint64]backuppb.StreamBackupError, error)

func (*Task) NextBackupTSList

func (t *Task) NextBackupTSList(ctx context.Context) ([]Checkpoint, error)

NextBackupTSList lists the backup ts of each store.

func (*Task) Pause

func (t *Task) Pause(ctx context.Context) error

Pause is a shorthand for `metaCli.PauseTask`.

func (*Task) Ranges

func (t *Task) Ranges(ctx context.Context) (Ranges, error)

Ranges tries to fetch the range from the metadata storage.

func (*Task) Resume

func (t *Task) Resume(ctx context.Context) error

Resume is a shorthand for `metaCli.ResumeTask`

func (*Task) UploadGlobalCheckpoint

func (t *Task) UploadGlobalCheckpoint(ctx context.Context, ts uint64) error

type TaskEvent

type TaskEvent struct {
	Type   EventType
	Name   string
	Info   *backuppb.StreamBackupTaskInfo
	Ranges []kv.KeyRange
	Err    error
}

func (*TaskEvent) String

func (t *TaskEvent) String() string

type TaskInfo

type TaskInfo struct {
	PBInfo  backuppb.StreamBackupTaskInfo
	Ranges  Ranges
	Pausing bool
}

TaskInfo is a task info with extra information.

func NewTaskInfo

func NewTaskInfo(name string) *TaskInfo

NewTask creates a new task with the name.

func (*TaskInfo) Check

func (t *TaskInfo) Check() (*TaskInfo, error)

func (*TaskInfo) FromTS

func (t *TaskInfo) FromTS(ts uint64) *TaskInfo

FromTS set the initial version of the stream backup, and return itself.

func (*TaskInfo) ToStorage

func (t *TaskInfo) ToStorage(backend *backuppb.StorageBackend) *TaskInfo

ToStorage indicates the backup task to the external storage.

func (*TaskInfo) UntilTS

func (t *TaskInfo) UntilTS(ts uint64) *TaskInfo

UntilTS set the terminal version of the stream backup, and return itself.

func (*TaskInfo) WithRange

func (t *TaskInfo) WithRange(startKey, endKey []byte) *TaskInfo

WithRange adds a backup range to the task, and return itself.

func (*TaskInfo) WithRanges

func (t *TaskInfo) WithRanges(ranges ...Range) *TaskInfo

WithRanges adds some ranges to the task, and return itself.

func (*TaskInfo) WithTableFilter

func (t *TaskInfo) WithTableFilter(filterChain ...string) *TaskInfo

WithTableFilterHint adds the table filter of the stream backup, and return itself. When schama version changed, TiDB should change the ranges of the task according to the table filter.

func (*TaskInfo) ZapTaskInfo

func (t *TaskInfo) ZapTaskInfo() []zap.Field

type TiKVClusterMeta

type TiKVClusterMeta interface {
	// RegionScan gets a list of regions, starts from the region that contains key.
	// Limit limits the maximum number of regions returned.
	RegionScan(ctx context.Context, key, endKey []byte, limit int) ([]RegionWithLeader, error)

	// Stores returns the store metadata from the cluster.
	Stores(ctx context.Context) ([]Store, error)
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL