Documentation ¶
Index ¶
- Constants
- func ExtractRawKVEntry(entry *model.RawKVEntry, now uint64) (opType model.OpType, key []byte, value []byte, ttl uint64, err error)
- func InitMetrics(registry *prometheus.Registry)
- func ParseTiKVUri(sinkURI *url.URL, opts map[string]string) (*tikvconfig.Config, []string, error)
- func RegisterSink(scheme string, initFunc InitFunc, checkerFunc InitFunc)
- func Validate(ctx context.Context, sinkURIStr string, cfg *config.ReplicaConfig, ...) error
- type InitFunc
- type Manager
- type Sink
- type Statistics
- func (b *Statistics) AddDDLCount()
- func (b *Statistics) AddEntriesCount(count int)
- func (b *Statistics) AddInvalidKeyCount()
- func (b *Statistics) PrintStatus(ctx context.Context)
- func (b *Statistics) RecordBatchExecution(executor func() (int, error)) error
- func (b *Statistics) SubRowsCount(count int)
- func (b *Statistics) TotalRowsCount() uint64
- type TikvBatcher
Constants ¶
const ( OptChangefeedID = "_changefeed_id" OptCaptureAddr = "_capture_addr" )
Sink options keys
Variables ¶
This section is empty.
Functions ¶
func ExtractRawKVEntry ¶
func InitMetrics ¶
func InitMetrics(registry *prometheus.Registry)
InitMetrics registers all metrics in this file
func ParseTiKVUri ¶
func RegisterSink ¶
Types ¶
type Manager ¶
type Manager struct {
// contains filtered or unexported fields
}
Manager manages keyspan sinks, maintains the relationship between keyspan sinks and backendSink. Manager is thread-safe.
func NewManager ¶
func NewManager( ctx context.Context, backendSink Sink, errCh chan error, checkpointTs model.Ts, captureAddr string, changefeedID model.ChangeFeedID, ) *Manager
NewManager creates a new Sink manager
func (*Manager) Close ¶
Close closes the Sink manager and backend Sink, this method can be reentrantly called
func (*Manager) CreateKeySpanSink ¶
CreateKeySpanSink creates a keyspan sink
func (*Manager) UpdateChangeFeedCheckpointTs ¶
type Sink ¶
type Sink interface { // EmitChangedEvents sends Changed Event to Sink // EmitChangedEvents may write rows to downstream directly; // // EmitChangedEvents is thread-safe. // FIXME: some sink implementation, they should be. EmitChangedEvents(ctx context.Context, rawKVEntries ...*model.RawKVEntry) error // FlushChangedEvents flushes each row which of commitTs less than or // equal to `resolvedTs` into downstream. // TiKV-CDC guarantees that all the Events whose commitTs is less than or // equal to `resolvedTs` are sent to Sink through `EmitRowChangedEvents` // // FlushChangedEvents is thread-safe. // FIXME: some sink implementation, they should be. FlushChangedEvents(ctx context.Context, keyspanID model.KeySpanID, resolvedTs uint64) (uint64, error) // EmitCheckpointTs sends CheckpointTs to Sink. // TiCDC guarantees that all Events **in the cluster** which of commitTs // less than or equal `checkpointTs` are sent to downstream successfully. // // EmitCheckpointTs is thread-safe. // FIXME: some sink implementation, they should be. EmitCheckpointTs(ctx context.Context, ts uint64) error // Close closes the Sink. // // Close is thread-safe and idempotent. Close(ctx context.Context) error // Barrier is a synchronous function to wait all events to be flushed // in underlying sink. // Note once Barrier is called, the resolved ts won't be pushed until // the Barrier call returns. // // Barrier is thread-safe. Barrier(ctx context.Context, keyspanID model.KeySpanID) error }
Sink is an abstraction for anything that a changefeed may emit into.
type Statistics ¶
type Statistics struct {
// contains filtered or unexported fields
}
Statistics maintains some status and metrics of the Sink
func NewStatistics ¶
NewStatistics creates a statistics
func (*Statistics) AddDDLCount ¶
func (b *Statistics) AddDDLCount()
AddDDLCount records total number of ddl needs to flush
func (*Statistics) AddEntriesCount ¶
func (b *Statistics) AddEntriesCount(count int)
AddEntriesCount records total number of rows needs to flush
func (*Statistics) AddInvalidKeyCount ¶
func (b *Statistics) AddInvalidKeyCount()
AddInvalidKeyCount records total number of invalid keys
func (*Statistics) PrintStatus ¶
func (b *Statistics) PrintStatus(ctx context.Context)
PrintStatus prints the status of the Sink
func (*Statistics) RecordBatchExecution ¶
func (b *Statistics) RecordBatchExecution(executor func() (int, error)) error
RecordBatchExecution records the cost time of batch execution and batch size
func (*Statistics) SubRowsCount ¶
func (b *Statistics) SubRowsCount(count int)
SubRowsCount records total number of rows needs to flush
func (*Statistics) TotalRowsCount ¶
func (b *Statistics) TotalRowsCount() uint64
TotalRowsCount returns total number of rows
type TikvBatcher ¶
type TikvBatcher struct { Batches []innerBatch // contains filtered or unexported fields }
func NewTiKVBatcher ¶
func NewTiKVBatcher(statistics *Statistics) *TikvBatcher
func (*TikvBatcher) Append ¶
func (b *TikvBatcher) Append(entry *model.RawKVEntry) error
func (*TikvBatcher) ByteSize ¶
func (b *TikvBatcher) ByteSize() uint64
func (*TikvBatcher) Count ¶
func (b *TikvBatcher) Count() int
func (*TikvBatcher) IsEmpty ¶
func (b *TikvBatcher) IsEmpty() bool
func (*TikvBatcher) Reset ¶
func (b *TikvBatcher) Reset()