storage

package
v0.1.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 22, 2022 License: Apache-2.0 Imports: 17 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrNoLock is returned when no lock is passed to try or release function
	ErrNoLock = errors.New("no lock provided")
	// ErrAlreadyLocked is returned when lock already exists in repo
	ErrAlreadyLocked = errors.New("lock already attained by someone else")
)
View Source
var (
	// ErrDuplicateMessageIDForChannel represents when the a message with same ID already exists
	ErrDuplicateMessageIDForChannel = errors.New("duplicate message id for channel")
	// ErrNoTxInContext represents the case where transaction is not passed in the context
	ErrNoTxInContext = errors.New("no tx value in content")
)
View Source
var (
	// ErrOptimisticAppInit represents the Error when optimistically update fails to start app init
	ErrOptimisticAppInit = errors.New(optimisticLockInitAppErrMsg)
	// ErrOptimisticAppComplete represents the Error when app complete attempted from not initializing state
	ErrOptimisticAppComplete = errors.New(optimisticLockCompleteAppErrMsg)
	// ErrAppInitializing is returned when app is being initialized by another thread.
	ErrAppInitializing = errors.New("App is in initializing")
	// ErrNoDataChangeFromInitialized is returned when initialization is attempted without any seed data change while app has been initialized
	ErrNoDataChangeFromInitialized = errors.New("No data change on initialized App")
	// ErrCompleteWhileNotBeingInitialized is returned when complete is called without being initialized
	ErrCompleteWhileNotBeingInitialized = errors.New("App not initializing to complete initializing")
	// ErrNoRowsUpdated is returned when a UPDATE query does not change any row which is unexpected
	ErrNoRowsUpdated = errors.New("No rows updated on UPDATE query")
	// ErrInvalidStateToSave is returned when a data is not in a state we can send it to the repo as
	ErrInvalidStateToSave = errors.New("Data model in invalid state to be stored")
	// ErrPaginationDeadlock is returned if both after and before is provided in pagination
	ErrPaginationDeadlock = errors.New("Can not decide on pagination direction! Both after and before provided or pagination is nil")
)
View Source
var (

	// ErrDBConnectionNeverInitialized is returned when same NewDataAccessor is called the first time and it failed to connec to DB; in all subsequent calls the accessor will remain nil
	ErrDBConnectionNeverInitialized = errors.New("DB Connection never initialized")
	// RDBMSStorageInternalInjector injector for data storage related implementation
	RDBMSStorageInternalInjector = wire.NewSet(GetConnectionPool, NewLockRepository, NewAppRepository, NewProducerRepository, NewChannelRepository, NewConsumerRepository, NewMessageRepository, NewDeliveryJobRepository, wire.Struct(new(RelationalDBDataAccessor), "db", "appRepository", "producerRepository", "channelRepository", "consumerRepository", "messageRepository", "deliveryJobRepository", "lockRepository"), wire.Bind(new(DataAccessor), new(*RelationalDBDataAccessor)))
)

Functions

func GetConnectionPool

func GetConnectionPool(dbConfig config.RelationalDatabaseConfig, migrationConf *MigrationConfig, seedDataConfig config.SeedDataConfig) (*sql.DB, error)

GetConnectionPool Gets the DB Connection Pool for the App

Types

type AppDBRepository

type AppDBRepository struct {
	// contains filtered or unexported fields
}

AppDBRepository is the repository to access App data

func (*AppDBRepository) CompleteAppInit

func (appRep *AppDBRepository) CompleteAppInit() error

CompleteAppInit stores that App initialization completed; it will return error if app is not in initializing state before the update is made

func (*AppDBRepository) GetApp

func (appRep *AppDBRepository) GetApp() (*data.App, error)

GetApp retrieves the App from storage, it will never return nil

func (*AppDBRepository) InitAppData

func (appRep *AppDBRepository) InitAppData(seedData *config.SeedData) error

InitAppData initializes only and only if none present in DB with status NotInitialized. Error if insertion fails.

func (*AppDBRepository) StartAppInit

func (appRep *AppDBRepository) StartAppInit(seedData *config.SeedData) error

StartAppInit stores state that App initialization started. It will return error if App is in Initializing state or if data hash is equal and app in initialized state

type AppRepository

type AppRepository interface {
	GetApp() (*data.App, error)
	StartAppInit(data *config.SeedData) error
	CompleteAppInit() error
}

AppRepository allows storage operation interaction for App

func NewAppRepository

func NewAppRepository(db *sql.DB) AppRepository

NewAppRepository retrieves App Repository

type ChannelDBRepository

type ChannelDBRepository struct {
	// contains filtered or unexported fields
}

ChannelDBRepository channel repository implementation for RDBMS

func (*ChannelDBRepository) Get

func (repo *ChannelDBRepository) Get(channelID string) (*data.Channel, error)

Get retrieves the channel with matching channel id

func (*ChannelDBRepository) GetList

func (repo *ChannelDBRepository) GetList(page *data.Pagination) ([]*data.Channel, *data.Pagination, error)

GetList retrieves the list of channel based on pagination params supplied. It will return a error if both after and before is present at the same time

func (*ChannelDBRepository) Store

func (repo *ChannelDBRepository) Store(channel *data.Channel) (*data.Channel, error)

Store either creates or updates the channel information

type ChannelRepository

type ChannelRepository interface {
	Store(channel *data.Channel) (*data.Channel, error)
	Get(channelID string) (*data.Channel, error)
	GetList(page *data.Pagination) ([]*data.Channel, *data.Pagination, error)
}

ChannelRepository allows storage operation interaction for Channel

func NewChannelRepository

func NewChannelRepository(db *sql.DB) ChannelRepository

NewChannelRepository retrieves new instance of channel repository

type ConsumerDBRepository

type ConsumerDBRepository struct {
	// contains filtered or unexported fields
}

ConsumerDBRepository is the RDBMS implementation for ConsumerRepository

func (*ConsumerDBRepository) Delete

func (consumerRepo *ConsumerDBRepository) Delete(consumer *data.Consumer) error

Delete deletes consumer from DB

func (*ConsumerDBRepository) Get

func (consumerRepo *ConsumerDBRepository) Get(channelID string, consumerID string) (consumer *data.Consumer, err error)

Get retrieves consumer for specific consumer, error if either consumer or channel does not exist

func (*ConsumerDBRepository) GetByID

func (consumerRepo *ConsumerDBRepository) GetByID(id string) (consumer *data.Consumer, err error)

GetByID retrieves a consumer by its ID

func (*ConsumerDBRepository) GetList

func (consumerRepo *ConsumerDBRepository) GetList(channelID string, page *data.Pagination) ([]*data.Consumer, *data.Pagination, error)

GetList retrieves consumers for specific consumer; return error if channel does not exist

func (*ConsumerDBRepository) Store

func (consumerRepo *ConsumerDBRepository) Store(consumer *data.Consumer) (*data.Consumer, error)

Store stores consumer with either update or insert

type ConsumerRepository

type ConsumerRepository interface {
	Store(consumer *data.Consumer) (*data.Consumer, error)
	Delete(consumer *data.Consumer) error
	Get(channelID string, consumerID string) (*data.Consumer, error)
	GetList(channelID string, page *data.Pagination) ([]*data.Consumer, *data.Pagination, error)
	GetByID(id string) (*data.Consumer, error)
}

ConsumerRepository allows storage operation interaction for Consumer

func NewConsumerRepository

func NewConsumerRepository(db *sql.DB, channelRepo ChannelRepository) ConsumerRepository

NewConsumerRepository initializes new consumer repository

type ContextKey

type ContextKey string

ContextKey represents context key

type DataAccessor

type DataAccessor interface {
	GetAppRepository() AppRepository
	GetProducerRepository() ProducerRepository
	GetChannelRepository() ChannelRepository
	GetConsumerRepository() ConsumerRepository
	GetMessageRepository() MessageRepository
	GetDeliveryJobRepository() DeliveryJobRepository
	GetLockRepository() LockRepository
	Close()
}

DataAccessor is the facade to all the data repository

func GetNewDataAccessor

func GetNewDataAccessor(dbConfig config.RelationalDatabaseConfig, migrationConf *MigrationConfig, seedDataConfig config.SeedDataConfig) (DataAccessor, error)

type DeliveryJobDBRepository

type DeliveryJobDBRepository struct {
	// contains filtered or unexported fields
}

DeliveryJobDBRepository is the DeliveryJobRepository's RDBMS implementation

func (*DeliveryJobDBRepository) DispatchMessage

func (djRepo *DeliveryJobDBRepository) DispatchMessage(message *data.Message, deliveryJobs ...*data.DeliveryJob) (err error)

DispatchMessage saves the delivery jobs and updates the message status in one atomic state

func (*DeliveryJobDBRepository) GetByID

func (djRepo *DeliveryJobDBRepository) GetByID(id string) (job *data.DeliveryJob, err error)

GetByID loads the delivery job with specified id if it exists, else returns an error

func (*DeliveryJobDBRepository) GetJobsForConsumer

func (djRepo *DeliveryJobDBRepository) GetJobsForConsumer(consumer *data.Consumer, jobStatus data.JobStatus, page *data.Pagination) ([]*data.DeliveryJob, *data.Pagination, error)

GetJobsForConsumer retrieves DeliveryJob created for delivery to a customer and it has to be filtered by a specific status

func (*DeliveryJobDBRepository) GetJobsForMessage

func (djRepo *DeliveryJobDBRepository) GetJobsForMessage(message *data.Message, page *data.Pagination) ([]*data.DeliveryJob, *data.Pagination, error)

GetJobsForMessage retrieves jobs created for a specific message

func (*DeliveryJobDBRepository) GetJobsInflightSince

func (djRepo *DeliveryJobDBRepository) GetJobsInflightSince(delta time.Duration) []*data.DeliveryJob

GetJobsInflightSince retrieves jobs in inflight status since the delta duration

func (*DeliveryJobDBRepository) GetJobsReadyForInflightSince

func (djRepo *DeliveryJobDBRepository) GetJobsReadyForInflightSince(delta time.Duration) []*data.DeliveryJob

GetJobsReadyForInflightSince retrieves jobs in queued status and earliestNextAttemptAt < `now`-delta

func (*DeliveryJobDBRepository) MarkJobDead

func (djRepo *DeliveryJobDBRepository) MarkJobDead(deliveryJob *data.DeliveryJob) error

MarkJobDead sets the status of the job to Dead if the job's current status is Inflight in the object and DB; else returns error

func (*DeliveryJobDBRepository) MarkJobDelivered

func (djRepo *DeliveryJobDBRepository) MarkJobDelivered(deliveryJob *data.DeliveryJob) error

MarkJobDelivered sets the status of the job to Delivered if the job's current status is Inflight in the object and DB; else returns error

func (*DeliveryJobDBRepository) MarkJobInflight

func (djRepo *DeliveryJobDBRepository) MarkJobInflight(deliveryJob *data.DeliveryJob) error

MarkJobInflight sets the status of the job to Inflight if job's current state in the object and DB is Queued; else returns error

func (*DeliveryJobDBRepository) MarkJobRetry

func (djRepo *DeliveryJobDBRepository) MarkJobRetry(deliveryJob *data.DeliveryJob, earliestDelta time.Duration) (err error)

MarkJobRetry increases the retry attempt count and sets the status of the job to Queued if the job's current status is Inflight in the object and DB; else returns error

func (*DeliveryJobDBRepository) RequeueDeadJobsForConsumer

func (djRepo *DeliveryJobDBRepository) RequeueDeadJobsForConsumer(consumer *data.Consumer) (err error)

RequeueDeadJobsForConsumer queues up dead jobs for a specific consumer

type DeliveryJobRepository

type DeliveryJobRepository interface {
	DispatchMessage(message *data.Message, deliveryJobs ...*data.DeliveryJob) error
	MarkJobInflight(deliveryJob *data.DeliveryJob) error
	MarkJobDelivered(deliveryJob *data.DeliveryJob) error
	MarkJobDead(deliveryJob *data.DeliveryJob) error
	MarkJobRetry(deliveryJob *data.DeliveryJob, earliestDelta time.Duration) error
	RequeueDeadJobsForConsumer(consumer *data.Consumer) error
	GetJobsForMessage(message *data.Message, page *data.Pagination) ([]*data.DeliveryJob, *data.Pagination, error)
	GetJobsForConsumer(consumer *data.Consumer, jobStatus data.JobStatus, page *data.Pagination) ([]*data.DeliveryJob, *data.Pagination, error)
	GetByID(id string) (*data.DeliveryJob, error)
	GetJobsInflightSince(delta time.Duration) []*data.DeliveryJob
	GetJobsReadyForInflightSince(delta time.Duration) []*data.DeliveryJob
}

DeliveryJobRepository allows storage operations over DeliveryJob

func NewDeliveryJobRepository

func NewDeliveryJobRepository(db *sql.DB, msgRepo MessageRepository, consumerRepo ConsumerRepository) DeliveryJobRepository

NewDeliveryJobRepository creates a new instance of DeliveryJobRepository

type LockDBRepository

type LockDBRepository struct {
	// contains filtered or unexported fields
}

LockDBRepository represents the RDBMS implementation of LockRepository

func (*LockDBRepository) ReleaseLock

func (lockRepo *LockDBRepository) ReleaseLock(lock *data.Lock) (err error)

ReleaseLock tries to release the lock, will return error if no such lock or any error in releasing

func (*LockDBRepository) TimeoutLocks

func (lockRepo *LockDBRepository) TimeoutLocks(threshold time.Duration) (err error)

TimeoutLocks will force release locks that are older than the duration specified from now. Return error if DB called failed

func (*LockDBRepository) TryLock

func (lockRepo *LockDBRepository) TryLock(lock *data.Lock) (err error)

TryLock tries to achieve confirm the attainment of lock; returns error if it could not attain lock else nil

type LockRepository

type LockRepository interface {
	TryLock(lock *data.Lock) error
	ReleaseLock(lock *data.Lock) error
	TimeoutLocks(threshold time.Duration) error
}

LockRepository allows storage operations over Lock

func NewLockRepository

func NewLockRepository(db *sql.DB) LockRepository

NewLockRepository creates a new instance of LockRepository

type MessageDBRepository

type MessageDBRepository struct {
	// contains filtered or unexported fields
}

MessageDBRepository is the MessageRepository implementation

func (*MessageDBRepository) Create

func (msgRepo *MessageDBRepository) Create(message *data.Message) (err error)

Create creates a new message if message.MessageID does not already exist; please ensure QuickFix is called before repo is called

func (*MessageDBRepository) Get

func (msgRepo *MessageDBRepository) Get(channelID string, messageID string) (*data.Message, error)

Get retrieves a message for a channel if it exists

func (*MessageDBRepository) GetByID

func (msgRepo *MessageDBRepository) GetByID(id string) (*data.Message, error)

GetByID retrieves a message by its ID

func (*MessageDBRepository) GetMessagesForChannel

func (msgRepo *MessageDBRepository) GetMessagesForChannel(channelID string, page *data.Pagination) ([]*data.Message, *data.Pagination, error)

GetMessagesForChannel retrieves messages broadcasted to a specific channel

func (*MessageDBRepository) GetMessagesNotDispatchedForCertainPeriod

func (msgRepo *MessageDBRepository) GetMessagesNotDispatchedForCertainPeriod(delta time.Duration) []*data.Message

GetMessagesNotDispatchedForCertainPeriod retrieves messages in acknowledged state despite `delta` being passed.

func (*MessageDBRepository) SetDispatched

func (msgRepo *MessageDBRepository) SetDispatched(txContext context.Context, message *data.Message) error

SetDispatched sets the status of the message to dispatched within the transaction passed via txContext

type MessageRepository

type MessageRepository interface {
	Create(message *data.Message) error
	Get(channelID string, messageID string) (*data.Message, error)
	GetByID(id string) (*data.Message, error)
	SetDispatched(txContext context.Context, message *data.Message) error
	GetMessagesNotDispatchedForCertainPeriod(delta time.Duration) []*data.Message
	GetMessagesForChannel(channelID string, page *data.Pagination) ([]*data.Message, *data.Pagination, error)
}

MessageRepository allows storage operations over Message. SetDispatched does not accept TX directly to keep the API storage class independent

func NewMessageRepository

func NewMessageRepository(db *sql.DB, channelRepo ChannelRepository, producerRepo ProducerRepository) MessageRepository

NewMessageRepository creates a new instance of MessageRepository

type MigrationConfig

type MigrationConfig struct {
	MigrationEnabled bool
	MigrationSource  string
}

MigrationConfig represents the DB migration config

type ProducerDBRepository

type ProducerDBRepository struct {
	// contains filtered or unexported fields
}

ProducerDBRepository is the producer repository implementation for RDBMS

func (*ProducerDBRepository) Get

func (repo *ProducerDBRepository) Get(producerID string) (*data.Producer, error)

Get retrieves the producer with matching producer id

func (*ProducerDBRepository) GetList

func (repo *ProducerDBRepository) GetList(page *data.Pagination) ([]*data.Producer, *data.Pagination, error)

GetList retrieves the list of producer based on pagination params supplied. It will return a error if both after and before is present at the same time

func (*ProducerDBRepository) Store

func (repo *ProducerDBRepository) Store(producer *data.Producer) (*data.Producer, error)

Store either creates or updates the producer information

type ProducerRepository

type ProducerRepository interface {
	Store(producer *data.Producer) (*data.Producer, error)
	Get(producerID string) (*data.Producer, error)
	GetList(page *data.Pagination) ([]*data.Producer, *data.Pagination, error)
}

ProducerRepository allows storage operation interaction for Producer

func NewProducerRepository

func NewProducerRepository(db *sql.DB) ProducerRepository

NewProducerRepository returns a new producer repository

type RelationalDBDataAccessor

type RelationalDBDataAccessor struct {
	// contains filtered or unexported fields
}

RelationalDBDataAccessor represents the DataAccessor implementation for RDBMS

func (*RelationalDBDataAccessor) Close

func (rdbmsDataAccessor *RelationalDBDataAccessor) Close()

Close closes the connection to DB

func (*RelationalDBDataAccessor) GetAppRepository

func (rdbmsDataAccessor *RelationalDBDataAccessor) GetAppRepository() AppRepository

GetAppRepository returns the AppRepository to be used for App ops

func (*RelationalDBDataAccessor) GetChannelRepository

func (rdbmsDataAccessor *RelationalDBDataAccessor) GetChannelRepository() ChannelRepository

GetChannelRepository returns the ProducerRepository to be used for Producer ops

func (*RelationalDBDataAccessor) GetConsumerRepository

func (rdbmsDataAccessor *RelationalDBDataAccessor) GetConsumerRepository() ConsumerRepository

GetConsumerRepository returns the ProducerRepository to be used for Producer ops

func (*RelationalDBDataAccessor) GetDeliveryJobRepository

func (rdbmsDataAccessor *RelationalDBDataAccessor) GetDeliveryJobRepository() DeliveryJobRepository

GetDeliveryJobRepository retrieves the DeliveryJobRepository to be used for DeliverJob ops

func (*RelationalDBDataAccessor) GetLockRepository

func (rdbmsDataAccessor *RelationalDBDataAccessor) GetLockRepository() LockRepository

GetLockRepository retrieves the LockRepository to be used for Lock ops

func (*RelationalDBDataAccessor) GetMessageRepository

func (rdbmsDataAccessor *RelationalDBDataAccessor) GetMessageRepository() MessageRepository

GetMessageRepository retrieves the MessageRepository to be used for Message ops

func (*RelationalDBDataAccessor) GetProducerRepository

func (rdbmsDataAccessor *RelationalDBDataAccessor) GetProducerRepository() ProducerRepository

GetProducerRepository returns the ProducerRepository to be used for Producer ops

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL