cassandra

package
v1.8.7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 10, 2024 License: MIT Imports: 12 Imported by: 0

Documentation

Overview

Package Cassandra contains code for integration or inter-operation with Cassandra DB.

Index

Constants

View Source
const DateHourLayout = "2006-01-02T15"

DateHourLayout format mask string.

Variables

Marshaler allows you to specify custom marshaler if needed. Defaults to the SOP default marshaler.

View Source
var NilUUID = gocql.UUID(sop.NilUUID)

NilUUID with gocql.UUID type.

View Source
var Now = time.Now

Now lambda to allow unit test to inject replayable time.Now.

Functions

func CloseConnection

func CloseConnection()

Close the singleton connection if open.

func GetBlobPayloadCount

func GetBlobPayloadCount[T sop.UUID](payloads []BlobsPayload[T]) int

Returns the total number of UUIDs given a set of blobs (ID) payload.

func IsConnectionInstantiated

func IsConnectionInstantiated() bool

Returns true if connection instance is valid.

func IsNil added in v1.8.7

func IsNil(id gocql.UUID) bool

Returns true if id is nil or empty UUID, otherwise false.

func SetRegistryCacheDuration

func SetRegistryCacheDuration(duration time.Duration)

SetRegistryDuration allows registry cache duration to get set globally.

func SetStoreCacheDuration

func SetStoreCacheDuration(duration time.Duration)

SetStoreCacheDuration allows store repository cache duration to get set globally.

Types

type BlobStore

type BlobStore interface {
	// Get or fetch a blob given an ID.
	GetOne(ctx context.Context, blobTable string, blobID sop.UUID, target interface{}) error
	// Add blobs to store.
	Add(ctx context.Context, blobs ...BlobsPayload[sop.KeyValuePair[sop.UUID, interface{}]]) error
	// Update blobs in store.
	Update(ctx context.Context, blobs ...BlobsPayload[sop.KeyValuePair[sop.UUID, interface{}]]) error
	// Remove blobs in store with given IDs.
	Remove(ctx context.Context, blobsIDs ...BlobsPayload[sop.UUID]) error
}

BlobStore specifies the backend blob store interface used for storing & managing data blobs. Blobs are data that can vary in size and is big enough that they can't be stored in database as it will impose performance penalties. This kind of data are typically stored in blob stores like AWS S3, or file system, etc...

func NewBlobStore

func NewBlobStore() BlobStore

NewBlobStore instantiates a new BlobStore instance.

func NewMockBlobStore

func NewMockBlobStore() BlobStore

NewBlobStore instantiates a new (mocked) blobstore.

type BlobsPayload

type BlobsPayload[T sop.UUID | sop.KeyValuePair[sop.UUID, interface{}]] struct {
	// Blob store table name.
	BlobTable string
	// Blobs contains the blobs IDs and blobs data for upsert to the store or the blobs IDs to be removed.
	Blobs []T
}

Manage or fetch node blobs request/response payload.

type Config

type Config struct {
	// Cassandra hosts cluster.
	ClusterHosts []string
	// Keyspace to be used when doing I/O to cassandra.
	Keyspace string
	// Default Consistency level.
	Consistency gocql.Consistency
	// Connection Timeout.
	ConnectionTimeout time.Duration
	// Authenticator.
	Authenticator gocql.Authenticator
	// Defaults to "simple strategy & replication factor of 1".
	ReplicationClause string

	// ConsistencyBook should be used to specify consistency level to use for a given
	// API, e.g. one for RegistryAdd, another for StoreAdd, etc... if you so choose to.
	//
	// You can leave it default and the API will use the default Consistency level
	// for the cluster (defaults to local quorum).
	ConsistencyBook ConsistencyBook
}

Config contains the this Cassandra package configuration or configurable variables.

type Connection

type Connection struct {
	Session *gocql.Session
	Config
}

Connection has the Session and the config used to open/create a session.

func OpenConnection added in v1.6.4

func OpenConnection(config Config) (*Connection, error)

OpenConnection will create(& return) a new Connection to Cassandra if there is not one yet, otherwise, will just return existing singleton connection.

type ConsistencyBook added in v1.6.4

type ConsistencyBook struct {
	RegistryAdd     gocql.Consistency
	RegistryUpdate  gocql.Consistency
	RegistryGet     gocql.Consistency
	RegistryRemove  gocql.Consistency
	StoreAdd        gocql.Consistency
	StoreUpdate     gocql.Consistency
	StoreGet        gocql.Consistency
	StoreRemove     gocql.Consistency
	BlobStoreAdd    gocql.Consistency
	BlobStoreGet    gocql.Consistency
	BlobStoreUpdate gocql.Consistency
	BlobStoreRemove gocql.Consistency
}

Lists all the available API's consistency level that are settable in this package.

type MockTransactionLog added in v1.8.7

type MockTransactionLog struct {
	// contains filtered or unexported fields
}

func (*MockTransactionLog) Add added in v1.8.7

func (tl *MockTransactionLog) Add(ctx context.Context, tid gocql.UUID, commitFunction int, payload []byte) error

Add blob(s) to the Blob store.

func (*MockTransactionLog) GetLogsDetails added in v1.8.7

func (tl *MockTransactionLog) GetLogsDetails(ctx context.Context, hour string) (gocql.UUID, []sop.KeyValuePair[int, []byte], error)

func (*MockTransactionLog) GetOne added in v1.8.7

GetOne returns the oldest transaction ID.

func (*MockTransactionLog) GetTIDLogs added in v1.8.7

func (tl *MockTransactionLog) GetTIDLogs(tid gocql.UUID) []sop.KeyValuePair[int, []byte]

func (*MockTransactionLog) Remove added in v1.8.7

func (tl *MockTransactionLog) Remove(ctx context.Context, tid gocql.UUID) error

Remove will delete(non-logged) node records from different Blob stores(node tables).

type Mock_vid_registry added in v1.7.0

type Mock_vid_registry struct {
	Lookup                           map[sop.UUID]sop.Handle
	InducedErrorOnUpdateAllOrNothing bool
}

func (*Mock_vid_registry) Add added in v1.7.0

func (v *Mock_vid_registry) Add(ctx context.Context, storesHandles ...RegistryPayload[sop.Handle]) error

func (*Mock_vid_registry) Get added in v1.7.0

func (*Mock_vid_registry) Remove added in v1.7.0

func (v *Mock_vid_registry) Remove(ctx context.Context, storesLids ...RegistryPayload[sop.UUID]) error

func (*Mock_vid_registry) Update added in v1.7.0

func (v *Mock_vid_registry) Update(ctx context.Context, allOrNothing bool, storesHandles ...RegistryPayload[sop.Handle]) error

type Registry

type Registry interface {
	// Get will fetch handles(given their IDs) from registry table(s).
	Get(context.Context, ...RegistryPayload[sop.UUID]) ([]RegistryPayload[sop.Handle], error)
	// Add will insert handles to registry table(s).
	Add(context.Context, ...RegistryPayload[sop.Handle]) error
	// Update will update handles potentially spanning across registry table(s).
	// Set allOrNothing to true if Update operation is crucial for data consistency and
	// wanting to do an all or nothing update for the entire batch of handles.
	// False is recommended if such consistency is not significant.
	Update(ctx context.Context, allOrNothing bool, handles ...RegistryPayload[sop.Handle]) error
	// Remove will delete handles(given their IDs) from registry table(s).
	Remove(context.Context, ...RegistryPayload[sop.UUID]) error
}

Virtual ID registry is essential in our support for all or nothing (sub)feature, which is essential for fault tolerance.

All methods are taking in a set of items.

func NewMockRegistry

func NewMockRegistry(inducedErrorOnUpdateAllOrNothing bool) Registry

NewMockRegistry manages the Handle in memory for mocking.

func NewRegistry

func NewRegistry() Registry

NewRegistry manages the Handle in the store's Cassandra registry table.

type RegistryPayload

type RegistryPayload[T sop.Handle | sop.UUID] struct {
	// Registry table (name) where the Virtual IDs will be stored or fetched from.
	RegistryTable string
	// IDs is an array containing the Virtual IDs details to be stored or to be fetched.
	IDs []T
}

Manage or fetch Virtual ID request/response payload.

type StoreRepository

type StoreRepository interface {
	// Fetch store info with name.
	Get(context.Context, ...string) ([]btree.StoreInfo, error)
	// Add store info & create related tables like for registry & for node blob.
	Add(context.Context, ...btree.StoreInfo) error
	// Update store info. Update should also merge the Count of items between the incoming store info
	// and the target store info on the backend, as they may differ. It should use StoreInfo.CountDelta to reconcile the two.
	Update(context.Context, ...btree.StoreInfo) error
	// Remove store info with name & drop related tables like for registry & for node blob.
	Remove(context.Context, ...string) error
}

StoreRepository interface specifies the store repository.

func NewMockStoreRepository

func NewMockStoreRepository() StoreRepository

NewMockStoreRepository manages the StoreInfo in Cassandra table.

func NewStoreRepository

func NewStoreRepository() StoreRepository

NewStoreRepository manages the StoreInfo in Cassandra table.

type TransactionLog added in v1.8.7

type TransactionLog interface {
	// Add a transaction log.
	Add(ctx context.Context, tid gocql.UUID, commitFunction int, payload []byte) error
	// Remove all logs of a given transaciton.
	Remove(ctx context.Context, tid gocql.UUID) error

	// GetOne will fetch the oldest transaction logs from the backend, older than 1 hour ago, mark it so succeeding call
	// will return the next hour and so on, until no more, upon reaching the current hour.
	//
	// GetOne behaves like a job distributor by the hour. SOP uses it to sprinkle/distribute task to cleanup
	// left over resources by unfinished transactions in time. Be it due to crash or host reboot, any transaction
	// temp resource will then age and reach expiration limit, then get cleaned up. This method is used to do distribution.
	//
	// It is capped to an hour ago older because anything newer may still be an in-flight or ongoing transaction.
	GetOne(ctx context.Context) (gocql.UUID, string, []sop.KeyValuePair[int, []byte], error)

	// Given a date hour, returns an available for cleanup set of transaction logs with their Transaction ID.
	// Or nils if there is no more needing cleanup for this date hour.
	GetLogsDetails(ctx context.Context, hour string) (gocql.UUID, []sop.KeyValuePair[int, []byte], error)
}

func NewMockTransactionLog added in v1.8.7

func NewMockTransactionLog() TransactionLog

func NewTransactionLog added in v1.8.7

func NewTransactionLog() TransactionLog

NewBlobStore instantiates a new BlobStore instance.

type UpdateAllOrNothingError added in v1.8.7

type UpdateAllOrNothingError struct {
	Err error
}

UpdateAllOrNothingError is a special error type that will allow caller to handle it differently than normal errors.

func (*UpdateAllOrNothingError) Error added in v1.8.7

func (r *UpdateAllOrNothingError) Error() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL