replication

package
v1.2.9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 12, 2024 License: MIT Imports: 34 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrUnknownCluster = errors.New("unknown cluster")

ErrUnknownCluster is returned when given cluster is not defined in cluster metadata

View Source
var (
	// ErrUnknownReplicationTask is the error to indicate unknown replication task type
	ErrUnknownReplicationTask = &types.BadRequestError{Message: "unknown replication task"}
)

Functions

This section is empty.

Types

type Cache added in v0.25.0

type Cache struct {
	// contains filtered or unexported fields
}

Cache is an in-memory implementation of a cache for storing hydrated replication messages. Messages can come out of order as long as their task ID is higher than last acknowledged message. Out of order is expected as different source clusters will share hydrated replication messages.

Cache utilizes heap to keep replication messages in order. This is needed for efficient acknowledgements in O(log N).

Cache capacity can be increased dynamically. Decrease will require a restart, as new tasks will not be accepted, but memory will not be reclaimed either.

Cache methods are thread safe. It is expected to have writers and readers from different go routines.

func NewCache added in v0.25.0

func NewCache(capacity dynamicconfig.IntPropertyFn) *Cache

NewCache create a new instance of replication cache

func (*Cache) Ack added in v0.25.0

func (c *Cache) Ack(level int64)

Ack is used to acknowledge replication messages. Meaning they will be removed from the cache.

func (*Cache) Get added in v0.25.0

func (c *Cache) Get(taskID int64) *types.ReplicationTask

Get will return a stored task having a given taskID. If task is not cache, nil is returned.

func (*Cache) Put added in v0.25.0

func (c *Cache) Put(task *types.ReplicationTask) error

Put stores replication task in the cache. - If cache is full, it will return errCacheFull - If given task has ID lower than previously acknowledged task, it will errOutOfOrder

func (*Cache) Size added in v0.25.0

func (c *Cache) Size() int

Size returns current size of the cache

type DLQHandler

type DLQHandler interface {
	common.Daemon

	GetMessageCount(
		ctx context.Context,
		forceFetch bool,
	) (map[string]int64, error)
	ReadMessages(
		ctx context.Context,
		sourceCluster string,
		lastMessageID int64,
		pageSize int,
		pageToken []byte,
	) ([]*types.ReplicationTask, []*types.ReplicationTaskInfo, []byte, error)
	PurgeMessages(
		ctx context.Context,
		sourceCluster string,
		lastMessageID int64,
	) error
	MergeMessages(
		ctx context.Context,
		sourceCluster string,
		lastMessageID int64,
		pageSize int,
		pageToken []byte,
	) ([]byte, error)
}

DLQHandler is the interface handles replication DLQ messages

func NewDLQHandler

func NewDLQHandler(
	shard shard.Context,
	taskExecutors map[string]TaskExecutor,
) DLQHandler

NewDLQHandler initialize the replication message DLQ handler

type DynamicTaskReader added in v0.25.0

type DynamicTaskReader struct {
	// contains filtered or unexported fields
}

DynamicTaskReader will read replication tasks from database using dynamic batch sizing depending on replication lag.

func NewDynamicTaskReader added in v0.25.0

func NewDynamicTaskReader(
	shardID int,
	executionManager persistence.ExecutionManager,
	timeSource clock.TimeSource,
	config *config.Config,
) *DynamicTaskReader

NewDynamicTaskReader creates new DynamicTaskReader

func (*DynamicTaskReader) Read added in v0.25.0

func (r *DynamicTaskReader) Read(ctx context.Context, readLevel int64, maxReadLevel int64) ([]*persistence.ReplicationTaskInfo, bool, error)

Read reads and returns replications tasks from readLevel to maxReadLevel. Batch size is determined dynamically. If replication lag is less than config.ReplicatorUpperLatency it will be proportionally smaller. Otherwise default batch size of config.ReplicatorProcessorFetchTasksBatchSize will be used.

type MetricsEmitterImpl added in v0.25.0

type MetricsEmitterImpl struct {
	// contains filtered or unexported fields
}

MetricsEmitterImpl is responsible for emitting source side replication metrics occasionally.

func NewMetricsEmitter added in v0.25.0

func NewMetricsEmitter(
	shardID int,
	shardData metricsEmitterShardData,
	reader taskReader,
	metricsClient metrics.Client,
) *MetricsEmitterImpl

NewMetricsEmitter creates a new metrics emitter, which starts a goroutine to emit replication metrics occasionally.

func (*MetricsEmitterImpl) Start added in v0.25.0

func (m *MetricsEmitterImpl) Start()

func (*MetricsEmitterImpl) Stop added in v0.25.0

func (m *MetricsEmitterImpl) Stop()

type TaskAckManager added in v0.15.0

type TaskAckManager struct {
	// contains filtered or unexported fields
}

TaskAckManager is the ack manager for replication tasks

func NewTaskAckManager added in v0.15.0

func NewTaskAckManager(
	shardID int,
	ackLevels ackLevelStore,
	metricsClient metrics.Client,
	logger log.Logger,
	reader taskReader,
	store *TaskStore,
) TaskAckManager

NewTaskAckManager initializes a new replication task ack manager

func (*TaskAckManager) GetTasks added in v0.15.0

func (t *TaskAckManager) GetTasks(ctx context.Context, pollingCluster string, lastReadTaskID int64) (*types.ReplicationMessages, error)

type TaskExecutor

type TaskExecutor interface {
	// contains filtered or unexported methods
}

TaskExecutor is the executor for replication task

func NewTaskExecutor

func NewTaskExecutor(
	shard shard.Context,
	domainCache cache.DomainCache,
	historyResender ndc.HistoryResender,
	historyEngine engine.Engine,
	metricsClient metrics.Client,
	logger log.Logger,
) TaskExecutor

NewTaskExecutor creates an replication task executor The executor uses by 1) DLQ replication task handler 2) history replication task processor

type TaskFetcher

type TaskFetcher interface {
	common.Daemon

	GetSourceCluster() string
	GetRequestChan() chan<- *request
	GetRateLimiter() *quotas.DynamicRateLimiter
}

TaskFetcher is responsible for fetching replication messages from remote DC.

type TaskFetchers

type TaskFetchers interface {
	common.Daemon

	GetFetchers() []TaskFetcher
}

TaskFetchers is a group of fetchers, one per source DC.

func NewTaskFetchers

func NewTaskFetchers(
	logger log.Logger,
	config *config.Config,
	clusterMetadata cluster.Metadata,
	clientBean client.Bean,
) TaskFetchers

NewTaskFetchers creates an instance of ReplicationTaskFetchers with given configs.

type TaskHydrator added in v0.25.0

type TaskHydrator struct {
	// contains filtered or unexported fields
}

TaskHydrator will enrich replication task with additional information from mutable state and history events. Mutable state and history providers can be either in-memory or persistence based implementations; depending whether we have available data already or need to load it.

func NewDeferredTaskHydrator added in v0.25.0

func NewDeferredTaskHydrator(shardID int, historyManager persistence.HistoryManager, executionCache *execution.Cache, domains domainCache) TaskHydrator

NewDeferredTaskHydrator will enrich replication tasks with additional information that is not available on hand, but is rather loaded in a deferred way later from a database and cache.

func NewImmediateTaskHydrator added in v0.25.0

func NewImmediateTaskHydrator(isRunning bool, vh *persistence.VersionHistories, activities map[int64]*persistence.ActivityInfo, blob, nextBlob *persistence.DataBlob) TaskHydrator

NewImmediateTaskHydrator will enrich replication tasks with additional information that is immediately available.

func (TaskHydrator) Hydrate added in v0.25.0

func (h TaskHydrator) Hydrate(ctx context.Context, task persistence.ReplicationTaskInfo) (retTask *types.ReplicationTask, retErr error)

Hydrate will enrich replication task with additional information from mutable state and history events.

type TaskProcessor

type TaskProcessor interface {
	common.Daemon
}

TaskProcessor is responsible for processing replication tasks for a shard.

func NewTaskProcessor

func NewTaskProcessor(
	shard shard.Context,
	historyEngine engine.Engine,
	config *config.Config,
	metricsClient metrics.Client,
	taskFetcher TaskFetcher,
	taskExecutor TaskExecutor,
) TaskProcessor

NewTaskProcessor creates a new replication task processor.

type TaskStore added in v0.25.0

type TaskStore struct {
	// contains filtered or unexported fields
}

TaskStore is a component that hydrates and caches replication messages so that they can be reused across several polling source clusters. It also exposes public Put method. This allows pre-store already hydrated messages at the end of successful transaction, saving a DB call to fetch history events.

TaskStore uses a separate cache per each source cluster allowing messages to be fetched at different rates. Once a cache becomes full it will not accept further messages for that cluster. Later those messages be fetched from DB and hydrated again. A cache stores only a pointer to the message. It is hydrates once and shared across caches. Cluster acknowledging the message will remove it from that corresponding cache. Once all clusters acknowledge it, no more references will be held, and GC will eventually pick it up.

func NewTaskStore added in v0.25.0

func NewTaskStore(
	config *config.Config,
	clusterMetadata cluster.Metadata,
	domains domainCache,
	metricsClient metrics.Client,
	logger log.Logger,
	hydrator taskHydrator,
) *TaskStore

NewTaskStore create new instance of TaskStore

func (*TaskStore) Ack added in v0.25.0

func (m *TaskStore) Ack(cluster string, lastTaskID int64) error

Ack will acknowledge replication message for a given cluster. This will result in all messages removed from the cache up to a given lastTaskID.

func (*TaskStore) Get added in v0.25.0

Get will return a hydrated replication message for a given cluster based on raw task info. It will either return it immediately from cache or hydrate it, store in cache and then return.

Returned task may be nil. This may be due domain not existing in a given cluster or replication message is not longer relevant. Either case is valid and such replication message should be ignored and not returned in the response.

func (*TaskStore) Put added in v0.25.0

func (m *TaskStore) Put(task *types.ReplicationTask)

Put will try to store hydrated replication to all cluster caches. Tasks may not be relevant, as domain is not enabled in some clusters. Ignore task for that cluster. Some clusters may be already have full cache. Ignore task, it will be fetched and hydrated again later. Some clusters may already acknowledged such task. Ignore task, it is no longer relevant for such cluster.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL