model

package
v0.0.0-...-1b33b2a Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 5, 2024 License: Apache-2.0 Imports: 17 Imported by: 0

Documentation

Index

Constants

View Source
const (
	OperDispatched uint64 = iota
	OperProcessed
	OperFinished
)

All KeySpanOperation status

View Source
const (

	// ErrorHistoryThreshold represents failure upper limit in time window.
	// Before a changefeed is initialized, check the the failure count of this
	// changefeed, if it is less than ErrorHistoryThreshold, then initialize it.
	ErrorHistoryThreshold = 3
)
View Source
const (
	// Move means after the delete operation, the kyespan will be re added.
	// This field is necessary since we must persist enough information to
	// restore complete keyspan operation in case of processor or owner crashes.
	OperFlagMoveKeySpan uint64 = 1 << iota
)

All KeySpanOperation flags

Variables

This section is empty.

Functions

func ComparePolymorphicEvents

func ComparePolymorphicEvents(i, j *PolymorphicEvent) bool

ComparePolymorphicEvents compares two events by CRTs, Resolved order. It returns true if and only if i should precede j.

func ExtractKeySuffix

func ExtractKeySuffix(key string) (string, error)

ExtractKeySuffix extracts the suffix of an etcd key, such as extracting "6a6c6dd290bc8732" from /tidb/cdc/changefeed/config/6a6c6dd290bc8732

func HolderString

func HolderString(n int) string

HolderString returns a string of place holders separated by comma n must be greater or equal than 1, or the function will panic

func ListVersionsFromCaptureInfos

func ListVersionsFromCaptureInfos(captureInfos []*CaptureInfo) []string

ListVersionsFromCaptureInfos returns the version list of the CaptureInfo list.

func ValidateChangefeedID

func ValidateChangefeedID(changefeedID string) error

ValidateChangefeedID returns true if the changefeed ID matches the pattern "^[a-zA-Z0-9]+(\-[a-zA-Z0-9]+)*$", length no more than "changeFeedIDMaxLen", eg, "simple-changefeed-task".

Types

type AdminJob

type AdminJob struct {
	CfID  string
	Type  AdminJobType
	Opts  *AdminJobOption
	Error *RunningError
}

AdminJob holds an admin job

type AdminJobOption

type AdminJobOption struct {
	ForceRemove bool
}

AdminJobOption records addition options of an admin job

type AdminJobType

type AdminJobType int

AdminJobType represents for admin job type, both used in owner and processor

const (
	AdminNone AdminJobType = iota
	AdminStop
	AdminResume
	AdminRemove
	AdminFinish
)

All AdminJob types

func (AdminJobType) IsStopState

func (t AdminJobType) IsStopState() bool

IsStopState returns whether changefeed is in stop state with give admin job

func (AdminJobType) String

func (t AdminJobType) String() string

String implements fmt.Stringer interface.

type Capture

type Capture struct {
	ID            string `json:"id"`
	IsOwner       bool   `json:"is_owner"`
	AdvertiseAddr string `json:"address"`
}

Capture holds common information of a capture in cdc

type CaptureID

type CaptureID = string

CaptureID is the type for capture ID

type CaptureInfo

type CaptureInfo struct {
	ID            CaptureID `json:"id"`
	AdvertiseAddr string    `json:"address"`
	Version       string    `json:"version"`
}

CaptureInfo store in etcd.

func (*CaptureInfo) Marshal

func (c *CaptureInfo) Marshal() ([]byte, error)

Marshal using json.Marshal.

func (*CaptureInfo) Unmarshal

func (c *CaptureInfo) Unmarshal(data []byte) error

Unmarshal from binary data.

type CaptureTaskStatus

type CaptureTaskStatus struct {
	CaptureID string `json:"capture_id"`
	// KeySpan list, containing keyspans that processor should process
	KeySpans  []uint64                        `json:"keyspan_ids"`
	Operation map[KeySpanID]*KeySpanOperation `json:"keyspan_operations"`
}

CaptureTaskStatus holds TaskStatus of a capture

type ChangeFeedDDLState

type ChangeFeedDDLState int

ChangeFeedDDLState is the type for change feed status

const (
	// ChangeFeedUnknown stands for all unknown status
	ChangeFeedUnknown ChangeFeedDDLState = iota
	// ChangeFeedSyncDML means DMLs are being processed
	ChangeFeedSyncDML
	// ChangeFeedWaitToExecDDL means we are waiting to execute a DDL
	ChangeFeedWaitToExecDDL
	// ChangeFeedExecDDL means a DDL is being executed
	ChangeFeedExecDDL
	// ChangeFeedDDLExecuteFailed means that an error occurred when executing a DDL
	ChangeFeedDDLExecuteFailed
)

func (ChangeFeedDDLState) String

func (s ChangeFeedDDLState) String() string

String implements fmt.Stringer interface.

type ChangeFeedID

type ChangeFeedID = string

ChangeFeedID is the type for change feed ID

type ChangeFeedInfo

type ChangeFeedInfo struct {
	SinkURI    string            `json:"sink-uri"`
	Opts       map[string]string `json:"opts"`
	CreateTime time.Time         `json:"create-time"`
	// Start sync at this commit ts if `StartTs` is specify or using the CreateTime of changefeed.
	StartTs uint64 `json:"start-ts"`
	// The ChangeFeed will exits until sync to timestamp TargetTs
	TargetTs uint64 `json:"target-ts"`
	// The Start Key of changefeed, inclusive
	StartKey string `json:"start-key"`
	// The End Key of changefeed, exclusive
	EndKey string `json:"end-key"`
	// Format of StartKey and EndKey, "raw", "escaped", "hex"
	// Persist format to show exact same start/end key input when query changefeed.
	Format string `json:"format"`
	// used for admin job notification, trigger watch event in capture
	AdminJobType AdminJobType `json:"admin-job-type"`
	Engine       SortEngine   `json:"sort-engine"`
	// SortDir is deprecated
	// it cannot be set by user in changefeed level, any assignment to it should be ignored.
	// but can be fetched for backward compatibility
	SortDir string `json:"sort-dir"`

	Config   *config.ReplicaConfig `json:"config"`
	State    FeedState             `json:"state"`
	ErrorHis []int64               `json:"history"`
	Error    *RunningError         `json:"error"`

	CreatorVersion string `json:"creator-version"`
}

ChangeFeedInfo describes the detail of a ChangeFeed

func (*ChangeFeedInfo) CheckErrorHistory

func (info *ChangeFeedInfo) CheckErrorHistory() (needSave bool, canInit bool)

CheckErrorHistory checks error history of a changefeed if having error record older than GC interval, set needSave to true. if error counts reach threshold, set canInit to false.

func (*ChangeFeedInfo) CleanUpOutdatedErrorHistory

func (info *ChangeFeedInfo) CleanUpOutdatedErrorHistory() bool

CleanUpOutdatedErrorHistory cleans up the outdated error history return true if the ErrorHis changed

func (*ChangeFeedInfo) Clone

func (info *ChangeFeedInfo) Clone() (*ChangeFeedInfo, error)

Clone returns a cloned ChangeFeedInfo

func (*ChangeFeedInfo) ErrorsReachedThreshold

func (info *ChangeFeedInfo) ErrorsReachedThreshold() bool

ErrorsReachedThreshold checks error history of a changefeed returns true if error counts reach threshold

func (*ChangeFeedInfo) FixIncompatible

func (info *ChangeFeedInfo) FixIncompatible()

FixIncompatible fixes incompatible changefeed meta info.

func (*ChangeFeedInfo) GetCheckpointTs

func (info *ChangeFeedInfo) GetCheckpointTs(status *ChangeFeedStatus) uint64

GetCheckpointTs returns CheckpointTs if it's specified in ChangeFeedStatus, otherwise StartTs is returned.

func (*ChangeFeedInfo) GetStartTs

func (info *ChangeFeedInfo) GetStartTs() uint64

GetStartTs returns StartTs if it's specified or using the CreateTime of changefeed.

func (*ChangeFeedInfo) GetTargetTs

func (info *ChangeFeedInfo) GetTargetTs() uint64

GetTargetTs returns TargetTs if it's specified, otherwise MaxUint64 is returned.

func (*ChangeFeedInfo) HasFastFailError

func (info *ChangeFeedInfo) HasFastFailError() bool

HasFastFailError returns true if the error in changefeed is fast-fail

func (*ChangeFeedInfo) Marshal

func (info *ChangeFeedInfo) Marshal() (string, error)

Marshal returns the json marshal format of a ChangeFeedInfo

func (*ChangeFeedInfo) String

func (info *ChangeFeedInfo) String() (str string)

String implements fmt.Stringer interface, but hide some sensitive information

func (*ChangeFeedInfo) Unmarshal

func (info *ChangeFeedInfo) Unmarshal(data []byte) error

Unmarshal unmarshals into *ChangeFeedInfo from json marshal byte slice

func (*ChangeFeedInfo) VerifyAndComplete

func (info *ChangeFeedInfo) VerifyAndComplete() error

VerifyAndComplete verifies changefeed info and may fill in some fields. If a required field is not provided, return an error. If some necessary filed is missing but can use a default value, fill in it.

type ChangeFeedStatus

type ChangeFeedStatus struct {
	ResolvedTs   uint64       `json:"resolved-ts"`
	CheckpointTs uint64       `json:"checkpoint-ts"`
	AdminJobType AdminJobType `json:"admin-job-type"`
}

ChangeFeedStatus stores information about a ChangeFeed

func (*ChangeFeedStatus) Marshal

func (status *ChangeFeedStatus) Marshal() (string, error)

Marshal returns json encoded string of ChangeFeedStatus, only contains necessary fields stored in storage

func (*ChangeFeedStatus) Unmarshal

func (status *ChangeFeedStatus) Unmarshal(data []byte) error

Unmarshal unmarshals into *ChangeFeedStatus from json marshal byte slice

type ChangefeedCommonInfo

type ChangefeedCommonInfo struct {
	ID             string        `json:"id"`
	FeedState      FeedState     `json:"state"`
	CheckpointTSO  uint64        `json:"checkpoint_tso"`
	CheckpointTime JSONTime      `json:"checkpoint_time"`
	RunningError   *RunningError `json:"error"`
}

ChangefeedCommonInfo holds some common usage information of a changefeed

func (ChangefeedCommonInfo) MarshalJSON

func (c ChangefeedCommonInfo) MarshalJSON() ([]byte, error)

MarshalJSON use to marshal ChangefeedCommonInfo

type ChangefeedConfig

type ChangefeedConfig struct {
	ID         string `json:"changefeed_id"`
	StartTS    uint64 `json:"start_ts"`
	TargetTS   uint64 `json:"target_ts"`
	SinkURI    string `json:"sink_uri"`
	Format     string `json:"format"`
	StartKey   string `json:"start_key"`
	EndKey     string `json:"end_key"`
	SortEngine string `json:"sort_engine"`
	// timezone used when checking sink uri
	TimeZone   string             `json:"timezone" default:"system"`
	SinkConfig *config.SinkConfig `json:"sink_config"`
}

ChangefeedConfig use to create a changefeed

type ChangefeedDetail

type ChangefeedDetail struct {
	ID             string              `json:"id"`
	SinkURI        string              `json:"sink_uri"`
	CreateTime     JSONTime            `json:"create_time"`
	StartTs        uint64              `json:"start_ts"`
	ResolvedTs     uint64              `json:"resolved_ts"`
	TargetTs       uint64              `json:"target_ts"`
	CheckpointTSO  uint64              `json:"checkpoint_tso"`
	CheckpointTime JSONTime            `json:"checkpoint_time"`
	Engine         SortEngine          `json:"sort_engine"`
	FeedState      FeedState           `json:"state"`
	RunningError   *RunningError       `json:"error"`
	ErrorHis       []int64             `json:"error_history"`
	CreatorVersion string              `json:"creator_version"`
	TaskStatus     []CaptureTaskStatus `json:"task_status"`
}

ChangefeedDetail holds detail info of a changefeed

func (ChangefeedDetail) MarshalJSON

func (c ChangefeedDetail) MarshalJSON() ([]byte, error)

MarshalJSON use to marshal ChangefeedDetail

type FeedState

type FeedState string

FeedState represents the running state of a changefeed

const (
	StateNormal   FeedState = "normal"
	StateError    FeedState = "error"
	StateFailed   FeedState = "failed"
	StateStopped  FeedState = "stopped"
	StateRemoved  FeedState = "removed"
	StateFinished FeedState = "finished"
)

All FeedStates

func (FeedState) IsNeeded

func (s FeedState) IsNeeded(need string) bool

IsNeeded return true if the given feedState matches the listState.

func (FeedState) ToInt

func (s FeedState) ToInt() int

ToInt return an int for each `FeedState`, only use this for metrics.

type HTTPError

type HTTPError struct {
	Error string `json:"error_msg"`
	Code  string `json:"error_code"`
}

HTTPError of cdc http api

func NewHTTPError

func NewHTTPError(err error) HTTPError

NewHTTPError wrap a err into HTTPError

type JSONTime

type JSONTime time.Time

JSONTime used to wrap time into json format

func (JSONTime) MarshalJSON

func (t JSONTime) MarshalJSON() ([]byte, error)

MarshalJSON used to specify the time format

func (*JSONTime) UnmarshalJSON

func (t *JSONTime) UnmarshalJSON(data []byte) error

UnmarshalJSON is used to parse time.Time from bytes. The builtin json.Unmarshal function cannot unmarshal a date string formatted as "2006-01-02 15:04:05.000", so we must implement a customized unmarshal function.

type KeySpanID

type KeySpanID = uint64

KeySpanID is the ID of the KeySpan

type KeySpanLocation

type KeySpanLocation struct {
	CaptureID string    `json:"capture_id"`
	KeySpanID KeySpanID `json:"keyspan_id"`
}

KeySpanLocation records which capture a keyspan is in

type KeySpanOperation

type KeySpanOperation struct {
	Delete bool   `json:"delete"`
	Flag   uint64 `json:"flag,omitempty"`
	// if the operation is a delete operation, BoundaryTs is checkpoint ts
	// if the operation is a add operation, BoundaryTs is start ts
	BoundaryTs uint64 `json:"boundary_ts"`
	Status     uint64 `json:"status,omitempty"`

	RelatedKeySpans []KeySpanLocation `json:"related_key_spans"`
}

KeySpanOperation records the current information of a keyspan migration

func (*KeySpanOperation) Clone

func (o *KeySpanOperation) Clone() *KeySpanOperation

Clone returns a deep-clone of the struct

func (*KeySpanOperation) KeySpanApplied

func (o *KeySpanOperation) KeySpanApplied() bool

KeySpanApplied returns whether the keyspan has finished the startup procedure. Returns true if keyspan has been processed by processor and resolved ts reaches global resolved ts.

func (*KeySpanOperation) KeySpanProcessed

func (o *KeySpanOperation) KeySpanProcessed() bool

KeySpanProcessed returns whether the keyspan has been processed by processor

type KeySpanReplicaInfo

type KeySpanReplicaInfo struct {
	StartTs Ts     `json:"start-ts"`
	Start   []byte `json:"start"`
	End     []byte `json:"end"`
}

KeySpanReplicaInfo records the keyspan replica info

func (*KeySpanReplicaInfo) Clone

Clone clones a KeySpanReplicaInfo

type MoveKeySpanJob

type MoveKeySpanJob struct {
	From               CaptureID
	To                 CaptureID
	KeySpanID          KeySpanID
	KeySpanReplicaInfo *KeySpanReplicaInfo
	Status             MoveKeySpanStatus
}

MoveKeySpanJob records a move operation of a keyspan

type MoveKeySpanStatus

type MoveKeySpanStatus int

MoveKeySpanStatus represents for the status of a MoveKeySpanJob

const (
	MoveKeySpanStatusNone MoveKeySpanStatus = iota
	MoveKeySpanStatusDeleted
	MoveKeySpanStatusFinished
)

All MoveKeySpan status

type MqMessageType

type MqMessageType int

MqMessageType is the type of message

const (
	// MqMessageTypeUnknown is unknown type of message key
	MqMessageTypeUnknown MqMessageType = 0

	// MqMessageTypeResolved is resolved type of message key
	MqMessageTypeResolved MqMessageType = 3
	// MqMessageTypeKv is RawKV entry type of message key
	MqMessageTypeKv MqMessageType = 4
)

Use explicit values to avoid compatibility issues.

func (*MqMessageType) DecodeMsg

func (z *MqMessageType) DecodeMsg(dc *msgp.Reader) (err error)

DecodeMsg implements msgp.Decodable

func (MqMessageType) EncodeMsg

func (z MqMessageType) EncodeMsg(en *msgp.Writer) (err error)

EncodeMsg implements msgp.Encodable

func (MqMessageType) MarshalMsg

func (z MqMessageType) MarshalMsg(b []byte) (o []byte, err error)

MarshalMsg implements msgp.Marshaler

func (MqMessageType) Msgsize

func (z MqMessageType) Msgsize() (s int)

Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message

func (*MqMessageType) UnmarshalMsg

func (z *MqMessageType) UnmarshalMsg(bts []byte) (o []byte, err error)

UnmarshalMsg implements msgp.Unmarshaler

type OpType

type OpType int

OpType for the kv, delete or put

const (
	OpTypeUnknown  OpType = 0
	OpTypePut      OpType = 1
	OpTypeDelete   OpType = 2
	OpTypeResolved OpType = 3
)

OpType for kv Use explicit values to avoid compatibility issues.

func (*OpType) DecodeMsg

func (z *OpType) DecodeMsg(dc *msgp.Reader) (err error)

DecodeMsg implements msgp.Decodable

func (OpType) EncodeMsg

func (z OpType) EncodeMsg(en *msgp.Writer) (err error)

EncodeMsg implements msgp.Encodable

func (OpType) MarshalMsg

func (z OpType) MarshalMsg(b []byte) (o []byte, err error)

MarshalMsg implements msgp.Marshaler

func (OpType) Msgsize

func (z OpType) Msgsize() (s int)

Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message

func (*OpType) UnmarshalMsg

func (z *OpType) UnmarshalMsg(bts []byte) (o []byte, err error)

UnmarshalMsg implements msgp.Unmarshaler

type PolymorphicEvent

type PolymorphicEvent struct {
	StartTs uint64
	// Commit or resolved TS
	CRTs uint64

	RawKV *RawKVEntry
	// contains filtered or unexported fields
}

PolymorphicEvent describes an event can be in multiple states

func NewPolymorphicEvent

func NewPolymorphicEvent(rawKV *RawKVEntry) *PolymorphicEvent

NewPolymorphicEvent creates a new PolymorphicEvent with a raw KV

func NewResolvedPolymorphicEvent

func NewResolvedPolymorphicEvent(regionID uint64, resolvedTs uint64, keyspanID uint64) *PolymorphicEvent

NewResolvedPolymorphicEvent creates a new PolymorphicEvent with the resolved ts

func (*PolymorphicEvent) IsResolved

func (e *PolymorphicEvent) IsResolved() bool

IsResolved returns true if the event is resolved. Note that this function can only be called when `RawKV != nil`.

func (*PolymorphicEvent) PrepareFinished

func (e *PolymorphicEvent) PrepareFinished()

PrepareFinished marks the prepare process is finished In prepare process, Mounter will translate raw KV to row data

func (*PolymorphicEvent) RegionID

func (e *PolymorphicEvent) RegionID() uint64

RegionID returns the region ID where the event comes from.

func (*PolymorphicEvent) Sequence

func (e *PolymorphicEvent) Sequence() uint64

Sequence returns the sequence of the event.

func (*PolymorphicEvent) SetUpFinishedChan

func (e *PolymorphicEvent) SetUpFinishedChan()

SetUpFinishedChan creates an internal channel to support PrepareFinished and WaitPrepare

func (*PolymorphicEvent) WaitPrepare

func (e *PolymorphicEvent) WaitPrepare(ctx context.Context) error

WaitPrepare waits for prepare process finished

type ProcInfoSnap

type ProcInfoSnap struct {
	CfID      string                            `json:"changefeed-id"`
	CaptureID string                            `json:"capture-id"`
	KeySpans  map[KeySpanID]*KeySpanReplicaInfo `json:"-"`
}

ProcInfoSnap holds most important replication information of a processor

type ProcessorCommonInfo

type ProcessorCommonInfo struct {
	CfID      string `json:"changefeed_id"`
	CaptureID string `json:"capture_id"`
}

ProcessorCommonInfo holds the common info of a processor

type ProcessorDetail

type ProcessorDetail struct {
	// The maximum event CommitTs that has been synchronized.
	CheckPointTs uint64 `json:"checkpoint_ts"`
	// The event that satisfies CommitTs <= ResolvedTs can be synchronized.
	ResolvedTs uint64 `json:"resolved_ts"`
	// all keyspan that this processor are replicating
	KeySpans map[KeySpanID]*KeySpanReplicaInfo `json:"keyspans"`
	// The count of events that have been replicated.
	Count uint64 `json:"count"`
	// Error code when error happens
	Error *RunningError `json:"error"`
}

ProcessorDetail holds the detail info of a processor

type ProcessorsInfos

type ProcessorsInfos map[CaptureID]*TaskStatus

ProcessorsInfos maps from capture IDs to TaskStatus

func (ProcessorsInfos) String

func (p ProcessorsInfos) String() string

String implements fmt.Stringer interface.

type RawKVEntry

type RawKVEntry struct {
	OpType OpType `msg:"op_type"`
	Key    []byte `msg:"key"`
	// nil for delete type
	Value []byte `msg:"value"`
	// nil for insert type
	OldValue []byte `msg:"old_value"`
	StartTs  uint64 `msg:"start_ts"`
	// Commit or resolved TS
	CRTs      uint64 `msg:"crts"`
	ExpiredTs uint64 `msg:"expired_ts"`

	// Additional debug info
	RegionID  uint64 `msg:"region_id"`
	KeySpanID uint64 `msg:"keyspan_id"`
	// For providing additional sequence number.
	// To keep `RawKVEntries` from the same region in order during unstable sorting of `sorter`.
	// The sequence number is generated by auto-increment in `puller` node of `processor.pipeline`.
	Sequence uint64 `msg:"sequence"`
}

RawKVEntry notify the KV operator

func (*RawKVEntry) ApproximateDataSize

func (v *RawKVEntry) ApproximateDataSize() int64

ApproximateDataSize calculate the approximate size of protobuf binary representation of this event.

func (*RawKVEntry) DecodeMsg

func (z *RawKVEntry) DecodeMsg(dc *msgp.Reader) (err error)

DecodeMsg implements msgp.Decodable

func (*RawKVEntry) EncodeMsg

func (z *RawKVEntry) EncodeMsg(en *msgp.Writer) (err error)

EncodeMsg implements msgp.Encodable

func (*RawKVEntry) MarshalMsg

func (z *RawKVEntry) MarshalMsg(b []byte) (o []byte, err error)

MarshalMsg implements msgp.Marshaler

func (*RawKVEntry) Msgsize

func (z *RawKVEntry) Msgsize() (s int)

Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message

func (*RawKVEntry) String

func (v *RawKVEntry) String() string

func (*RawKVEntry) UnmarshalMsg

func (z *RawKVEntry) UnmarshalMsg(bts []byte) (o []byte, err error)

UnmarshalMsg implements msgp.Unmarshaler

type RegionFeedEvent

type RegionFeedEvent struct {
	Val      *RawKVEntry
	Resolved *ResolvedSpan

	// Additional debug info
	RegionID uint64
}

RegionFeedEvent from the kv layer. Only one of the event will be setted.

func (*RegionFeedEvent) GetValue

func (e *RegionFeedEvent) GetValue() interface{}

GetValue returns the underlying value

func (RegionFeedEvent) String

func (e RegionFeedEvent) String() string

type ResolvedSpan

type ResolvedSpan struct {
	Span       regionspan.ComparableSpan
	ResolvedTs uint64
}

ResolvedSpan guarantees all the KV value event with commit ts less than ResolvedTs has been emitted.

func (*ResolvedSpan) String

func (rs *ResolvedSpan) String() string

String implements fmt.Stringer interface.

type RunningError

type RunningError struct {
	Addr    string `json:"addr"`
	Code    string `json:"code"`
	Message string `json:"message"`
}

RunningError represents some running error from cdc components, such as processor.

type SchemaID

type SchemaID = int64

SchemaID is the ID of the schema

type ServerStatus

type ServerStatus struct {
	Version string `json:"version"`
	GitHash string `json:"git_hash"`
	ID      string `json:"id"`
	Pid     int    `json:"pid"`
	IsOwner bool   `json:"is_owner"`
}

ServerStatus holds some common information of a server

type SortEngine

type SortEngine = string

SortEngine is the sorter engine

const (
	SortInMemory SortEngine = "memory"
	SortInFile   SortEngine = "file"
	SortUnified  SortEngine = "unified"
)

sort engines

type SorterStatus

type SorterStatus = int32

SorterStatus is the state of the puller sorter

const (
	SorterStatusWorking SorterStatus = iota
	SorterStatusStopping
	SorterStatusStopped
	SorterStatusFinished
)

SorterStatus of the puller sorter

type TaskPosition

type TaskPosition struct {
	// The maximum event CommitTs that has been synchronized. This is updated by corresponding processor.
	CheckPointTs uint64 `json:"checkpoint-ts"` // Deprecated
	// The event that satisfies CommitTs <= ResolvedTs can be synchronized. This is updated by corresponding processor.
	ResolvedTs uint64 `json:"resolved-ts"` // Deprecated
	// The count of events were synchronized. This is updated by corresponding processor.
	Count uint64 `json:"count"`
	// Error when error happens
	Error *RunningError `json:"error"`
}

TaskPosition records the process information of a capture

func (*TaskPosition) Clone

func (tp *TaskPosition) Clone() *TaskPosition

Clone returns a deep clone of TaskPosition

func (*TaskPosition) Marshal

func (tp *TaskPosition) Marshal() (string, error)

Marshal returns the json marshal format of a TaskStatus

func (*TaskPosition) String

func (tp *TaskPosition) String() string

String implements fmt.Stringer interface.

func (*TaskPosition) Unmarshal

func (tp *TaskPosition) Unmarshal(data []byte) error

Unmarshal unmarshals into *TaskStatus from json marshal byte slice

type TaskStatus

type TaskStatus struct {
	// KeySpan information list, containing keyspans that processor should process, updated by ownrer, processor is read only.
	KeySpans     map[KeySpanID]*KeySpanReplicaInfo `json:"keyspans"`
	Operation    map[KeySpanID]*KeySpanOperation   `json:"operation"` // Deprecated
	AdminJobType AdminJobType                      `json:"admin-job-type"`
	ModRevision  int64                             `json:"-"`
}

TaskStatus records the task information of a capture

func (*TaskStatus) AddKeySpan

func (ts *TaskStatus) AddKeySpan(id KeySpanID, keyspan *KeySpanReplicaInfo, boundaryTs Ts, relatedKeySpans []KeySpanLocation)

AddKeySpan add the keyspan in KeySpanInfos and add a add kyespan operation.

func (*TaskStatus) AppliedTs

func (ts *TaskStatus) AppliedTs() Ts

AppliedTs returns a Ts which less or equal to the ts boundary of any unapplied operation

func (*TaskStatus) Clone

func (ts *TaskStatus) Clone() *TaskStatus

Clone returns a deep-clone of the struct

func (*TaskStatus) Marshal

func (ts *TaskStatus) Marshal() (string, error)

Marshal returns the json marshal format of a TaskStatus

func (*TaskStatus) RemoveKeySpan

func (ts *TaskStatus) RemoveKeySpan(id KeySpanID, boundaryTs Ts, isMoveKeySpan bool) (*KeySpanReplicaInfo, bool)

RemoveKeySpan remove the keyspan in KeySpanInfos and add a remove keyspan operation.

func (*TaskStatus) Snapshot

func (ts *TaskStatus) Snapshot(cfID ChangeFeedID, captureID CaptureID, checkpointTs Ts) *ProcInfoSnap

Snapshot takes a snapshot of `*TaskStatus` and returns a new `*ProcInfoSnap`

func (*TaskStatus) SomeOperationsUnapplied

func (ts *TaskStatus) SomeOperationsUnapplied() bool

SomeOperationsUnapplied returns true if there are some operations not applied

func (*TaskStatus) String

func (ts *TaskStatus) String() string

String implements fmt.Stringer interface.

func (*TaskStatus) Unmarshal

func (ts *TaskStatus) Unmarshal(data []byte) error

Unmarshal unmarshals into *TaskStatus from json marshal byte slice

type TaskWorkload

type TaskWorkload map[KeySpanID]WorkloadInfo

TaskWorkload records the workloads of a task the value of the struct is the workload

func (*TaskWorkload) Marshal

func (w *TaskWorkload) Marshal() (string, error)

Marshal returns the json marshal format of a TaskWorkload

func (*TaskWorkload) Unmarshal

func (w *TaskWorkload) Unmarshal(data []byte) error

Unmarshal unmarshals into *TaskWorkload from json marshal byte slice

type Ts

type Ts = uint64

Ts is the timestamp with a logical count

type WorkloadInfo

type WorkloadInfo struct {
	Workload uint64 `json:"workload"`
}

WorkloadInfo records the workload info of a keyspan

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL