persistence

package
v0.0.0-...-76b3090 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 23, 2024 License: MIT Imports: 40 Imported by: 1

Documentation

Index

Constants

View Source
const (
	// NOTE: transaction ID is *= -1 in DB
	MinTxnID int64 = math.MaxInt64
	MaxTxnID int64 = math.MinInt64 + 1 // int overflow
)
View Source
const (

	// YDBSeeds env
	YDBSeeds = "YDB_SEEDS"
	// YDBPort env
	YDBPort = "YDB_PORT"
	// YDBDefaultPort YDB default port
	YDBDefaultPort = 2136
	Localhost      = "127.0.0.1"
)

Variables

View Source
var (
	NumHistoryShards = 1024
)

Functions

func FromYDBDateTime

func FromYDBDateTime(t time.Time) time.Time

FromYDBDateTime converts YDB datetime and returns go time

func GetYDBAddress

func GetYDBAddress() string

GetYDBAddress return the YDB address

func GetYDBPort

func GetYDBPort() int

GetYDBPort return the YDB port

func NewClusterMetadataStore

func NewClusterMetadataStore(
	client *xydb.Client,
	logger log.Logger,
) (p.ClusterMetadataStore, error)

NewClusterMetadataStore is used to create an instance of ClusterMetadataStore implementation

func NewMetadataStore

func NewMetadataStore(
	currentClusterName string,
	client *xydb.Client,
	logger log.Logger,
) (p.MetadataStore, error)

NewMetadataStore is used to create an instance of the Namespace MetadataStore implementation

func NewQueueStore

func NewQueueStore(
	queueType persistence.QueueType,
	client *xydb.Client,
	logger log.Logger,
) (persistence.Queue, error)

func NewYDBAbstractDataStoreFactory

func NewYDBAbstractDataStoreFactory() client.AbstractDataStoreFactory

func OptionsToYDBConfig

func OptionsToYDBConfig(options map[string]any) (xydb.Config, error)

func ToShardIDColumnValue

func ToShardIDColumnValue(shardID int32) uint32

func ToYDBDateTime

func ToYDBDateTime(t time.Time) time.Time

ToYDBDateTime converts to time to YDB datetime

Types

type ClusterMetadataStore

type ClusterMetadataStore struct {
	// contains filtered or unexported fields
}

func (*ClusterMetadataStore) Close

func (m *ClusterMetadataStore) Close()

func (*ClusterMetadataStore) DeleteClusterMetadata

func (m *ClusterMetadataStore) DeleteClusterMetadata(
	ctx context.Context,
	request *p.InternalDeleteClusterMetadataRequest,
) error

func (*ClusterMetadataStore) GetClusterMembers

func (m *ClusterMetadataStore) GetClusterMembers(
	ctx context.Context,
	request *p.GetClusterMembersRequest,
) (resp *p.GetClusterMembersResponse, err error)

func (*ClusterMetadataStore) GetClusterMetadata

func (*ClusterMetadataStore) GetName

func (m *ClusterMetadataStore) GetName() string

func (*ClusterMetadataStore) ListClusterMetadata

func (*ClusterMetadataStore) PruneClusterMembership

func (m *ClusterMetadataStore) PruneClusterMembership(
	ctx context.Context,
	request *p.PruneClusterMembershipRequest,
) error

func (*ClusterMetadataStore) SaveClusterMetadata

func (m *ClusterMetadataStore) SaveClusterMetadata(
	ctx context.Context,
	request *p.InternalSaveClusterMetadataRequest,
) (rv bool, err error)

func (*ClusterMetadataStore) UpsertClusterMembership

func (m *ClusterMetadataStore) UpsertClusterMembership(
	ctx context.Context,
	request *p.UpsertClusterMembershipRequest,
) error

type ExecutionStore

type ExecutionStore struct {
	*HistoryStore
	*MutableStateStore
	*MutableStateTaskStore
	// contains filtered or unexported fields
}

func NewExecutionStore

func NewExecutionStore(
	client *xydb.Client,
	logger log.Logger,
	metricsHandler metrics.Handler,
) *ExecutionStore

func (*ExecutionStore) Close

func (d *ExecutionStore) Close()

func (*ExecutionStore) ConflictResolveWorkflowExecution

func (d *ExecutionStore) ConflictResolveWorkflowExecution(
	ctx context.Context,
	request *p.InternalConflictResolveWorkflowExecutionRequest,
) error

func (*ExecutionStore) GetName

func (d *ExecutionStore) GetName() string

func (*ExecutionStore) UpdateWorkflowExecution

func (d *ExecutionStore) UpdateWorkflowExecution(
	ctx context.Context,
	request *p.InternalUpdateWorkflowExecutionRequest,
) error

type Factory

type Factory struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

Factory vends datastore implementations backed by YDB

func NewFactory

func NewFactory(
	cfg config.CustomDatastoreConfig,
	r resolver.ServiceResolver,
	clusterName string,
	logger log.Logger,
	metricsHandler metrics.Handler,
) *Factory

NewFactory returns an instance of a factory object which can be used to create data stores that are backed by YDB

func NewFactoryFromYDBConfig

func NewFactoryFromYDBConfig(
	clusterName string,
	ydbCfg xydb.Config,
	r resolver.ServiceResolver,
	logger log.Logger,
	metricsHandler metrics.Handler,
) *Factory

func (*Factory) Close

func (f *Factory) Close()

Close closes the factory

func (*Factory) NewClusterMetadataStore

func (f *Factory) NewClusterMetadataStore() (p.ClusterMetadataStore, error)

NewClusterMetadataStore returns a metadata store

func (*Factory) NewExecutionStore

func (f *Factory) NewExecutionStore() (p.ExecutionStore, error)

NewExecutionStore returns a new ExecutionStore.

func (*Factory) NewMetadataStore

func (f *Factory) NewMetadataStore() (p.MetadataStore, error)

NewMetadataStore returns a metadata store

func (*Factory) NewQueue

func (f *Factory) NewQueue(queueType p.QueueType) (p.Queue, error)

NewQueue returns a new queue backed by YDB

func (*Factory) NewShardStore

func (f *Factory) NewShardStore() (p.ShardStore, error)

NewShardStore returns a new shard store

func (*Factory) NewTaskStore

func (f *Factory) NewTaskStore() (p.TaskStore, error)

NewTaskStore returns a new task store

type HistoryStore

type HistoryStore struct {
	Client *xydb.Client
	Logger log.Logger
	p.HistoryBranchUtilImpl
}

func NewHistoryStore

func NewHistoryStore(
	client *xydb.Client,
	logger log.Logger,
) *HistoryStore

func (*HistoryStore) AppendHistoryNodes

func (h *HistoryStore) AppendHistoryNodes(
	ctx context.Context,
	request *p.InternalAppendHistoryNodesRequest,
) error

AppendHistoryNodes upsert a batch of events as a single node to a history branch

func (*HistoryStore) DeleteHistoryBranch

func (h *HistoryStore) DeleteHistoryBranch(
	ctx context.Context,
	request *p.InternalDeleteHistoryBranchRequest,
) error

DeleteHistoryBranch removes a branch

func (*HistoryStore) DeleteHistoryNodes

func (h *HistoryStore) DeleteHistoryNodes(
	ctx context.Context,
	request *p.InternalDeleteHistoryNodesRequest,
) (err error)

DeleteHistoryNodes delete a history node

func (*HistoryStore) ForkHistoryBranch

func (h *HistoryStore) ForkHistoryBranch(
	ctx context.Context,
	request *p.InternalForkHistoryBranchRequest,
) (err error)

ForkHistoryBranch forks a new branch from an existing branch

func (*HistoryStore) GetAllHistoryTreeBranches

func (h *HistoryStore) GetAllHistoryTreeBranches(
	ctx context.Context,
	request *p.GetAllHistoryTreeBranchesRequest,
) (resp *p.InternalGetAllHistoryTreeBranchesResponse, err error)

func (*HistoryStore) GetHistoryTree

func (h *HistoryStore) GetHistoryTree(
	ctx context.Context,
	request *p.GetHistoryTreeRequest,
) (resp *p.InternalGetHistoryTreeResponse, err error)

GetHistoryTree returns all branch information of a tree

func (*HistoryStore) ReadHistoryBranch

func (h *HistoryStore) ReadHistoryBranch(
	ctx context.Context,
	request *p.InternalReadHistoryBranchRequest,
) (resp *p.InternalReadHistoryBranchResponse, err error)

ReadHistoryBranch returns history node data for a branch

type MatchingTaskStore

type MatchingTaskStore struct {
	// contains filtered or unexported fields
}

func NewMatchingTaskStore

func NewMatchingTaskStore(
	client *xydb.Client,
	logger log.Logger,
) *MatchingTaskStore

func (*MatchingTaskStore) Close

func (d *MatchingTaskStore) Close()

func (*MatchingTaskStore) CompleteTask

func (d *MatchingTaskStore) CompleteTask(
	ctx context.Context,
	request *p.CompleteTaskRequest,
) error

CompleteTask delete a task

func (*MatchingTaskStore) CompleteTasksLessThan

func (d *MatchingTaskStore) CompleteTasksLessThan(
	ctx context.Context,
	request *p.CompleteTasksLessThanRequest,
) (int, error)

CompleteTasksLessThan deletes all tasks less than the given task id. This API ignores the Limit request parameter i.e. either all tasks leq the task_id will be deleted or an error will be returned to the caller

func (*MatchingTaskStore) CountTaskQueuesByBuildId

func (d *MatchingTaskStore) CountTaskQueuesByBuildId(ctx context.Context, request *p.CountTaskQueuesByBuildIdRequest) (count int, err error)

func (*MatchingTaskStore) CreateTaskQueue

func (d *MatchingTaskStore) CreateTaskQueue(
	ctx context.Context,
	request *p.InternalCreateTaskQueueRequest,
) error

func (*MatchingTaskStore) CreateTasks

CreateTasks add tasks

func (*MatchingTaskStore) DeleteTaskQueue

func (d *MatchingTaskStore) DeleteTaskQueue(
	ctx context.Context,
	request *p.DeleteTaskQueueRequest,
) error

func (*MatchingTaskStore) GetName

func (d *MatchingTaskStore) GetName() string

func (*MatchingTaskStore) GetTaskQueue

func (d *MatchingTaskStore) GetTaskQueue(
	ctx context.Context,
	request *p.InternalGetTaskQueueRequest,
) (resp *p.InternalGetTaskQueueResponse, err error)

func (*MatchingTaskStore) GetTaskQueueUserData

func (d *MatchingTaskStore) GetTaskQueueUserData(
	ctx context.Context,
	request *p.GetTaskQueueUserDataRequest,
) (resp *p.InternalGetTaskQueueUserDataResponse, err error)

func (*MatchingTaskStore) GetTaskQueuesByBuildId

func (d *MatchingTaskStore) GetTaskQueuesByBuildId(ctx context.Context, request *p.GetTaskQueuesByBuildIdRequest) (rv []string, err error)

func (*MatchingTaskStore) GetTasks

func (d *MatchingTaskStore) GetTasks(
	ctx context.Context,
	request *p.GetTasksRequest,
) (resp *p.InternalGetTasksResponse, err error)

GetTasks get a task

func (*MatchingTaskStore) ListTaskQueue

func (*MatchingTaskStore) ListTaskQueueUserDataEntries

func (*MatchingTaskStore) UpdateTaskQueue

UpdateTaskQueue update task queue

func (*MatchingTaskStore) UpdateTaskQueueUserData

func (d *MatchingTaskStore) UpdateTaskQueueUserData(
	ctx context.Context,
	request *p.InternalUpdateTaskQueueUserDataRequest,
) (err error)

type MetadataStore

type MetadataStore struct {
	// contains filtered or unexported fields
}

func (*MetadataStore) Close

func (m *MetadataStore) Close()

func (*MetadataStore) CreateNamespace

CreateNamespace create a namespace

func (*MetadataStore) DeleteNamespace

func (m *MetadataStore) DeleteNamespace(ctx context.Context, request *p.DeleteNamespaceRequest) (err error)

func (*MetadataStore) DeleteNamespaceByName

func (m *MetadataStore) DeleteNamespaceByName(ctx context.Context, request *p.DeleteNamespaceByNameRequest) error

func (*MetadataStore) GetMetadata

func (m *MetadataStore) GetMetadata(ctx context.Context) (resp *p.GetMetadataResponse, err error)

func (*MetadataStore) GetName

func (m *MetadataStore) GetName() string

func (*MetadataStore) GetNamespace

func (m *MetadataStore) GetNamespace(ctx context.Context, request *p.GetNamespaceRequest) (resp *p.InternalGetNamespaceResponse, err error)

func (*MetadataStore) ListNamespaces

func (m *MetadataStore) ListNamespaces(ctx context.Context, request *p.InternalListNamespacesRequest) (resp *p.InternalListNamespacesResponse, err error)

func (*MetadataStore) RenameNamespace

func (m *MetadataStore) RenameNamespace(ctx context.Context, request *p.InternalRenameNamespaceRequest) error

func (*MetadataStore) UpdateNamespace

func (m *MetadataStore) UpdateNamespace(
	ctx context.Context,
	request *p.InternalUpdateNamespaceRequest,
) error

type MutableStateStore

type MutableStateStore struct {
	// contains filtered or unexported fields
}

func NewMutableStateStore

func NewMutableStateStore(
	client *xydb.Client,
	logger log.Logger,
) *MutableStateStore

func (*MutableStateStore) DeleteCurrentWorkflowExecution

func (d *MutableStateStore) DeleteCurrentWorkflowExecution(
	ctx context.Context,
	request *p.DeleteCurrentWorkflowExecutionRequest,
) error

func (*MutableStateStore) DeleteWorkflowExecution

func (d *MutableStateStore) DeleteWorkflowExecution(
	ctx context.Context,
	request *p.DeleteWorkflowExecutionRequest,
) error

func (*MutableStateStore) GetCurrentExecution

func (d *MutableStateStore) GetCurrentExecution(
	ctx context.Context,
	request *p.GetCurrentExecutionRequest,
) (resp *p.InternalGetCurrentExecutionResponse, err error)

func (*MutableStateStore) GetWorkflowExecution

func (d *MutableStateStore) GetWorkflowExecution(
	ctx context.Context,
	request *p.GetWorkflowExecutionRequest,
) (resp *p.InternalGetWorkflowExecutionResponse, err error)

func (*MutableStateStore) ListConcreteExecutions

func (*MutableStateStore) SetWorkflowExecution

func (d *MutableStateStore) SetWorkflowExecution(ctx context.Context, request *p.InternalSetWorkflowExecutionRequest) (err error)

type MutableStateTaskStore

type MutableStateTaskStore struct {
	// contains filtered or unexported fields
}

func NewMutableStateTaskStore

func NewMutableStateTaskStore(
	client *xydb.Client,
	logger log.Logger,
) *MutableStateTaskStore

func (*MutableStateTaskStore) AddHistoryTasks

func (d *MutableStateTaskStore) AddHistoryTasks(
	ctx context.Context,
	request *p.InternalAddHistoryTasksRequest,
) error

func (*MutableStateTaskStore) CompleteHistoryTask

func (d *MutableStateTaskStore) CompleteHistoryTask(
	ctx context.Context,
	request *p.CompleteHistoryTaskRequest,
) (err error)

func (*MutableStateTaskStore) DeleteReplicationTaskFromDLQ

func (d *MutableStateTaskStore) DeleteReplicationTaskFromDLQ(
	ctx context.Context,
	request *p.DeleteReplicationTaskFromDLQRequest,
) error

func (*MutableStateTaskStore) GetHistoryTasks

func (d *MutableStateTaskStore) GetHistoryTasks(
	ctx context.Context,
	request *p.GetHistoryTasksRequest,
) (resp *p.InternalGetHistoryTasksResponse, err error)

func (*MutableStateTaskStore) GetReplicationTasksFromDLQ

func (d *MutableStateTaskStore) GetReplicationTasksFromDLQ(
	ctx context.Context,
	request *p.GetReplicationTasksFromDLQRequest,
) (resp *p.InternalGetHistoryTasksResponse, err error)

func (*MutableStateTaskStore) IsReplicationDLQEmpty

func (d *MutableStateTaskStore) IsReplicationDLQEmpty(
	ctx context.Context,
	request *p.GetReplicationTasksFromDLQRequest,
) (empty bool, err error)

func (*MutableStateTaskStore) PutReplicationTaskToDLQ

func (d *MutableStateTaskStore) PutReplicationTaskToDLQ(
	ctx context.Context,
	request *p.PutReplicationTaskToDLQRequest,
) (err error)

func (*MutableStateTaskStore) RangeCompleteHistoryTasks

func (d *MutableStateTaskStore) RangeCompleteHistoryTasks(
	ctx context.Context,
	request *p.RangeCompleteHistoryTasksRequest,
) (err error)

func (*MutableStateTaskStore) RangeDeleteReplicationTaskFromDLQ

func (d *MutableStateTaskStore) RangeDeleteReplicationTaskFromDLQ(
	ctx context.Context,
	request *p.RangeDeleteReplicationTaskFromDLQRequest,
) error

func (*MutableStateTaskStore) RegisterHistoryTaskReader

func (d *MutableStateTaskStore) RegisterHistoryTaskReader(
	_ context.Context,
	_ *p.RegisterHistoryTaskReaderRequest,
) error

func (*MutableStateTaskStore) UnregisterHistoryTaskReader

func (d *MutableStateTaskStore) UnregisterHistoryTaskReader(
	_ context.Context,
	_ *p.UnregisterHistoryTaskReaderRequest,
)

func (*MutableStateTaskStore) UpdateHistoryTaskReaderProgress

func (d *MutableStateTaskStore) UpdateHistoryTaskReaderProgress(
	_ context.Context,
	_ *p.UpdateHistoryTaskReaderProgressRequest,
)

type QueueStore

type QueueStore struct {
	// contains filtered or unexported fields
}

func (*QueueStore) Close

func (q *QueueStore) Close()

func (*QueueStore) DeleteMessageFromDLQ

func (q *QueueStore) DeleteMessageFromDLQ(
	ctx context.Context,
	messageID int64,
) error

func (*QueueStore) DeleteMessagesBefore

func (q *QueueStore) DeleteMessagesBefore(
	ctx context.Context,
	messageID int64,
) error

func (*QueueStore) EnqueueMessage

func (q *QueueStore) EnqueueMessage(
	ctx context.Context,
	blob commonpb.DataBlob,
) error

func (*QueueStore) EnqueueMessageToDLQ

func (q *QueueStore) EnqueueMessageToDLQ(
	ctx context.Context,
	blob commonpb.DataBlob,
) (int64, error)

func (*QueueStore) GetAckLevels

func (q *QueueStore) GetAckLevels(
	ctx context.Context,
) (*persistence.InternalQueueMetadata, error)

func (*QueueStore) GetDLQAckLevels

func (q *QueueStore) GetDLQAckLevels(
	ctx context.Context,
) (*persistence.InternalQueueMetadata, error)

func (*QueueStore) Init

func (q *QueueStore) Init(
	ctx context.Context,
	blob *commonpb.DataBlob,
) error

func (*QueueStore) RangeDeleteMessagesFromDLQ

func (q *QueueStore) RangeDeleteMessagesFromDLQ(
	ctx context.Context,
	firstMessageID int64,
	lastMessageID int64,
) error

func (*QueueStore) ReadMessages

func (q *QueueStore) ReadMessages(
	ctx context.Context,
	lastMessageID int64,
	maxCount int,
) (rv []*persistence.QueueMessage, err error)

func (*QueueStore) ReadMessagesFromDLQ

func (q *QueueStore) ReadMessagesFromDLQ(
	ctx context.Context,
	firstMessageID int64,
	lastMessageID int64,
	pageSize int,
	pageToken []byte,
) (m []*persistence.QueueMessage, nextPageToken []byte, err error)

func (*QueueStore) UpdateAckLevel

func (q *QueueStore) UpdateAckLevel(
	ctx context.Context,
	metadata *persistence.InternalQueueMetadata,
) (err error)

func (*QueueStore) UpdateDLQAckLevel

func (q *QueueStore) UpdateDLQAckLevel(
	ctx context.Context,
	metadata *persistence.InternalQueueMetadata,
) (err error)

type ShardStore

type ShardStore struct {
	// contains filtered or unexported fields
}

func NewShardStore

func NewShardStore(
	clusterName string,
	client *xydb.Client,
	logger log.Logger,
) *ShardStore

func (*ShardStore) AssertShardOwnership

func (d *ShardStore) AssertShardOwnership(ctx context.Context, request *p.AssertShardOwnershipRequest) error

func (*ShardStore) Close

func (d *ShardStore) Close()

func (*ShardStore) GetClusterName

func (d *ShardStore) GetClusterName() string

func (*ShardStore) GetName

func (d *ShardStore) GetName() string

func (*ShardStore) GetOrCreateShard

func (d *ShardStore) GetOrCreateShard(
	ctx context.Context,
	request *p.InternalGetOrCreateShardRequest,
) (resp *p.InternalGetOrCreateShardResponse, err error)

func (*ShardStore) UpdateShard

func (d *ShardStore) UpdateShard(ctx context.Context, request *p.InternalUpdateShardRequest) (err error)

type TestCluster

type TestCluster struct {
	Cfg    xydb.Config
	Logger log.Logger
	Client *xydb.Client
	// contains filtered or unexported fields
}

TestCluster allows executing YDB operations in testing.

func NewTestCluster

func NewTestCluster(database, username, password, host string, port int, schemaDir string, faultInjection *config.FaultInjection, logger log.Logger) *TestCluster

NewTestCluster returns a new YDB test cluster

func (*TestCluster) Config

func (s *TestCluster) Config() config.Persistence

Config returns the persistence config for connecting to this test cluster

func (*TestCluster) DatabaseName

func (s *TestCluster) DatabaseName() string

func (*TestCluster) SetupTestDatabase

func (s *TestCluster) SetupTestDatabase()

func (*TestCluster) TearDownTestDatabase

func (s *TestCluster) TearDownTestDatabase()

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL