sink

package
v0.0.0-...-1b33b2a Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 5, 2024 License: Apache-2.0 Imports: 32 Imported by: 0

Documentation

Index

Constants

View Source
const (
	OptChangefeedID = "_changefeed_id"
	OptCaptureAddr  = "_capture_addr"
)

Sink options keys

Variables

This section is empty.

Functions

func ExtractRawKVEntry

func ExtractRawKVEntry(entry *model.RawKVEntry, now uint64) (opType model.OpType,
	key []byte, value []byte, ttl uint64, err error,
)

func InitMetrics

func InitMetrics(registry *prometheus.Registry)

InitMetrics registers all metrics in this file

func ParseTiKVUri

func ParseTiKVUri(sinkURI *url.URL, opts map[string]string) (*tikvconfig.Config, []string, error)

func RegisterSink

func RegisterSink(scheme string, initFunc InitFunc, checkerFunc InitFunc)

func Validate

func Validate(ctx context.Context, sinkURIStr string, cfg *config.ReplicaConfig, opts map[string]string) error

Validate sink if given valid parameters.

Types

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

Manager manages keyspan sinks, maintains the relationship between keyspan sinks and backendSink. Manager is thread-safe.

func NewManager

func NewManager(
	ctx context.Context, backendSink Sink, errCh chan error, checkpointTs model.Ts,
	captureAddr string, changefeedID model.ChangeFeedID,
) *Manager

NewManager creates a new Sink manager

func (*Manager) Close

func (m *Manager) Close(ctx context.Context) error

Close closes the Sink manager and backend Sink, this method can be reentrantly called

func (*Manager) CreateKeySpanSink

func (m *Manager) CreateKeySpanSink(keyspanID model.KeySpanID, checkpointTs model.Ts) Sink

CreateKeySpanSink creates a keyspan sink

func (*Manager) UpdateChangeFeedCheckpointTs

func (m *Manager) UpdateChangeFeedCheckpointTs(checkpointTs uint64)

type Sink

type Sink interface {
	// EmitChangedEvents sends Changed Event to Sink
	// EmitChangedEvents may write rows to downstream directly;
	//
	// EmitChangedEvents is thread-safe.
	// FIXME: some sink implementation, they should be.
	EmitChangedEvents(ctx context.Context, rawKVEntries ...*model.RawKVEntry) error

	// FlushChangedEvents flushes each row which of commitTs less than or
	// equal to `resolvedTs` into downstream.
	// TiKV-CDC guarantees that all the Events whose commitTs is less than or
	// equal to `resolvedTs` are sent to Sink through `EmitRowChangedEvents`
	//
	// FlushChangedEvents is thread-safe.
	// FIXME: some sink implementation, they should be.
	FlushChangedEvents(ctx context.Context, keyspanID model.KeySpanID, resolvedTs uint64) (uint64, error)

	// EmitCheckpointTs sends CheckpointTs to Sink.
	// TiCDC guarantees that all Events **in the cluster** which of commitTs
	// less than or equal `checkpointTs` are sent to downstream successfully.
	//
	// EmitCheckpointTs is thread-safe.
	// FIXME: some sink implementation, they should be.
	EmitCheckpointTs(ctx context.Context, ts uint64) error

	// Close closes the Sink.
	//
	// Close is thread-safe and idempotent.
	Close(ctx context.Context) error

	// Barrier is a synchronous function to wait all events to be flushed
	// in underlying sink.
	// Note once Barrier is called, the resolved ts won't be pushed until
	// the Barrier call returns.
	//
	// Barrier is thread-safe.
	Barrier(ctx context.Context, keyspanID model.KeySpanID) error
}

Sink is an abstraction for anything that a changefeed may emit into.

func New

func New(ctx context.Context, changefeedID model.ChangeFeedID, sinkURIStr string, config *config.ReplicaConfig, opts map[string]string, errCh chan error) (Sink, error)

New creates a new sink with the sink-uri

type Statistics

type Statistics struct {
	// contains filtered or unexported fields
}

Statistics maintains some status and metrics of the Sink

func NewStatistics

func NewStatistics(ctx context.Context, name string, opts map[string]string) *Statistics

NewStatistics creates a statistics

func (*Statistics) AddDDLCount

func (b *Statistics) AddDDLCount()

AddDDLCount records total number of ddl needs to flush

func (*Statistics) AddEntriesCount

func (b *Statistics) AddEntriesCount(count int)

AddEntriesCount records total number of rows needs to flush

func (*Statistics) AddInvalidKeyCount

func (b *Statistics) AddInvalidKeyCount()

AddInvalidKeyCount records total number of invalid keys

func (*Statistics) PrintStatus

func (b *Statistics) PrintStatus(ctx context.Context)

PrintStatus prints the status of the Sink

func (*Statistics) RecordBatchExecution

func (b *Statistics) RecordBatchExecution(executor func() (int, error)) error

RecordBatchExecution records the cost time of batch execution and batch size

func (*Statistics) SubRowsCount

func (b *Statistics) SubRowsCount(count int)

SubRowsCount records total number of rows needs to flush

func (*Statistics) TotalRowsCount

func (b *Statistics) TotalRowsCount() uint64

TotalRowsCount returns total number of rows

type TikvBatcher

type TikvBatcher struct {
	Batches []innerBatch
	// contains filtered or unexported fields
}

func NewTiKVBatcher

func NewTiKVBatcher(statistics *Statistics) *TikvBatcher

func (*TikvBatcher) Append

func (b *TikvBatcher) Append(entry *model.RawKVEntry) error

func (*TikvBatcher) ByteSize

func (b *TikvBatcher) ByteSize() uint64

func (*TikvBatcher) Count

func (b *TikvBatcher) Count() int

func (*TikvBatcher) IsEmpty

func (b *TikvBatcher) IsEmpty() bool

func (*TikvBatcher) Reset

func (b *TikvBatcher) Reset()

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL