persistencesql

package module
v0.0.0-...-0f70582 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 16, 2022 License: Apache-2.0 Imports: 18 Imported by: 0

README

ProtoActor Persistence SQL

Codacy grade GitHub Workflow Status (branch) Codecov

An implementation of the ProtoActor persistence plugin APIs using RDBMS. It writes journal and snapshot to a configured SQL datastore. At the moment the following data stores are supported out of the box:

The events and state snapshots are protocol buffer bytes array persisted respectively in the journal and snapshot tables.

Note: The developer does not need to create the database tables. They are created by default by the library. One can have a look at them in the constants.go code.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type DBConfig

type DBConfig struct {
	// contains filtered or unexported fields
}

DBConfig represents the database configuration

func NewDBConfig

func NewDBConfig(dbUser, dbPassword, dbName, dbSchema, dbHost string, dbPort int, opts ...PoolOpt) *DBConfig

NewDBConfig creates an instance of DBConfig

type Driver

type Driver string

Driver defines a type of SQL driver accepted. This will be used by the golang sql library to load a specific driver

const (
	// POSTGRES driver type
	POSTGRES Driver = "postgres"
	// MYSQL driver type
	MYSQL Driver = "mysql"
)

func (Driver) ConnStr

func (d Driver) ConnStr(dbHost string, dbPort int, dbName, dbUser, dbPassword, dbSchema string) string

ConnStr returns the connection string provided by the driver

func (Driver) IsValid

func (d Driver) IsValid() error

IsValid checks whether the given driver is valid or not

func (Driver) SQLFile

func (d Driver) SQLFile() string

SQLFile returns the sql file to create schema for a given driver

func (Driver) String

func (d Driver) String() string

String returns the actual value

type Journal

type Journal struct {
	// the unique id of the journal row
	Ordering int64
	// Persistent ID that journals a persistent message.
	PersistenceID string
	// This persistent message's sequence number
	SequenceNumber int
	// The `timestamp` is the time the event was stored, in milliseconds since midnight, January 1, 1970 UTC.
	Timestamp int64
	// This persistent message's payload (the event).
	Payload []byte
	// A type hint for the event. This will be the proto message name of the event
	EventManifest Manifest
	// Unique identifier of the writing persistent actor.
	WriterID string
	// Flag to indicate the event has been deleted when logical deletion is set.
	Deleted bool
}

Journal defines the journal row

func NewJournal

func NewJournal(persistenceID string, message proto.Message, sequenceNumber int, writerID string) *Journal

NewJournal creates a new instance of Snapshot

type Manifest

type Manifest protoreflect.FullName

func (*Manifest) Scan

func (m *Manifest) Scan(value interface{}) error

Scan - Implement the database/sql scanner interface

func (Manifest) Value

func (m Manifest) Value() (driver.Value, error)

Value - Implementation of valuer for database/sql

type OptFunc

type OptFunc = func(provider *SQLProvider)

func WithLogicalDeletion

func WithLogicalDeletion() OptFunc

WithLogicalDeletion enables logical deletion

func WithSnapshotInterval

func WithSnapshotInterval(interval int) OptFunc

WithSnapshotInterval sets the snapshot interval

type PoolOpt

type PoolOpt = func(*DBConfig)

PoolOpt defines the connection pool options

func WithConnectionMaxLife

func WithConnectionMaxLife(connectionMaxLife int) PoolOpt

WithConnectionMaxLife sets the database connection max life

func WithMaxIdleConnections

func WithMaxIdleConnections(maxIdleConnections int) PoolOpt

WithMaxIdleConnections sets the max idle connections

func WithMaxOpenConnections

func WithMaxOpenConnections(maxOpenConnection int) PoolOpt

WithMaxOpenConnections sets the max open connections

type SQLDialect

type SQLDialect interface {
	CreateSchemasIfNotExist(ctx context.Context) error

	Connect(ctx context.Context) error
	Close() error

	PersistJournal(ctx context.Context, journal *Journal) error
	PersistSnapshot(ctx context.Context, snapshot *Snapshot) error

	GetLatestSnapshot(ctx context.Context, persistenceID string) (*Snapshot, error)
	GetJournals(ctx context.Context, persistenceID string, fromSequenceNumber int, toSequenceNumber int) (
		[]*Journal, error,
	)

	DeleteSnapshots(ctx context.Context, persistenceID string, toSequenceNumber int) error
	DeleteJournals(ctx context.Context, persistenceID string, toSequenceNumber int, logical bool) error
}

SQLDialect will be implemented any database dialect

func NewDialect

func NewDialect(config *DBConfig, driver Driver) (SQLDialect, error)

NewDialect creates a new instance of SQLDialect

func NewMySQLDialect

func NewMySQLDialect(dbConfig *DBConfig) (SQLDialect, error)

NewMySQLDialect creates a new instance of SQLDialect

func NewPostgresDialect

func NewPostgresDialect(dbConfig *DBConfig) (SQLDialect, error)

NewPostgresDialect creates a new instance of SQLDialect

type SQLProvider

type SQLProvider struct {
	// contains filtered or unexported fields
}

SQLProvider defines a generic persistence provider. The type of provider is determined by the type of SQLDialect defined

func NewMySQLProvider

func NewMySQLProvider(ctx context.Context, actorSystem *actor.ActorSystem, dbConfig *DBConfig, opts ...OptFunc) (*SQLProvider, error)

NewMySQLProvider creates an instance postgres base SQLProvider

func NewPostgresProvider

func NewPostgresProvider(ctx context.Context, actorSystem *actor.ActorSystem, dbConfig *DBConfig, opts ...OptFunc) (*SQLProvider, error)

NewPostgresProvider creates an instance postgres base SQLProvider

func NewSQLProvider

func NewSQLProvider(
	ctx context.Context, actorSystem *actor.ActorSystem, dialect SQLDialect, opts ...OptFunc,
) *SQLProvider

NewSQLProvider creates a new instance of the SQLProvider

func (*SQLProvider) GetState

func (p *SQLProvider) GetState() persistence.ProviderState

GetState returns an instance of the ProviderState

type SQLProviderState

type SQLProviderState struct {
	*SQLProvider
	// contains filtered or unexported fields
}

SQLProviderState is an implementation of the proto-actor ProviderState interface

func (*SQLProviderState) DeleteEvents

func (s *SQLProviderState) DeleteEvents(actorName string, inclusiveToIndex int)

DeleteEvents deletes events from journal to a given index actorName is the persistenceID inclusiveToIndex is the sequence Number

func (*SQLProviderState) DeleteSnapshots

func (s *SQLProviderState) DeleteSnapshots(actorName string, inclusiveToIndex int)

DeleteSnapshots deletes snapshots for a given persistenceID from the store to a given sequenceNumber. actorName is the persistenceID inclusiveToIndex is the sequenceNumber

func (*SQLProviderState) GetEvents

func (s *SQLProviderState) GetEvents(
	actorName string, eventIndexStart int, eventIndexEnd int, callback func(e interface{}),
)

GetEvents list events from the journal store within a range of sequenceNumber for a given persistence ID actorName is the persistenceID eventIndexStart is the from sequenceNumber eventIndexEnd is the to sequenceNumber

func (*SQLProviderState) GetSnapshot

func (s *SQLProviderState) GetSnapshot(actorName string) (snapshot interface{}, eventIndex int, ok bool)

GetSnapshot fetches the latest snapshot of a given persistenceID represented by the actorName actorName is the persistenceID

func (*SQLProviderState) GetSnapshotInterval

func (s *SQLProviderState) GetSnapshotInterval() int

GetSnapshotInterval return the snapshot interval

func (*SQLProviderState) PersistEvent

func (s *SQLProviderState) PersistEvent(actorName string, eventIndex int, event proto.Message)

PersistEvent persists an event for a given persistence ID actorName is the persistenceID eventIndex is the event to persist sequenceNumber event is the event payload

func (*SQLProviderState) PersistSnapshot

func (s *SQLProviderState) PersistSnapshot(actorName string, snapshotIndex int, snapshot proto.Message)

PersistSnapshot saves the snapshot of a given persistenceID. actorName is the persistenceID snapshotIndex is the sequenceNumber of the snapshot data snapshot is the payload to persist

func (*SQLProviderState) Restart

func (s *SQLProviderState) Restart()

Restart executes task to run before the provider state is up

type Snapshot

type Snapshot struct {
	// Persistent ID that journals a persistent message.
	PersistenceID string
	// This persistent message's sequence number
	SequenceNumber int
	// The `timestamp` is the time the event was stored, in milliseconds since midnight, January 1, 1970 UTC.
	Timestamp int64
	// This snapshot message's payload.
	Snapshot []byte
	// A type hint for the snapshot. This will be the proto message name of the snapshot
	SnapshotManifest Manifest
	// Unique identifier of the writing persistent actor.
	WriterID string
}

Snapshot defines the snapshot row

func NewSnapshot

func NewSnapshot(persistenceID string, message proto.Message, sequenceNumber int, writerID string) *Snapshot

NewSnapshot creates a new instance of Snapshot

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL