store

package
v0.0.0-...-9cf9cea Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 26, 2024 License: Apache-2.0 Imports: 20 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrInsuficientChannelAccounts = fmt.Errorf("there are no channel accounts available to process transactions")
View Source
var ErrRecordNotFound = errors.New("record not found")

Functions

func CreateChannelAccountFixturesEncryptedKPs

func CreateChannelAccountFixturesEncryptedKPs(
	t *testing.T,
	ctx context.Context,
	dbConnectionPool db.DBConnectionPool,
	encrypter utils.PrivateKeyEncrypter,
	encryptionPassphrase string,
	count int,
) []*keypair.Full

CreateChannelAccountFixturesEncryptedKPs creates 'count' number of channel accounts, and store them in the DB with the private keys encrypted, returning the Keypairs.

func CreateDBVaultFixturesEncryptedKPs

func CreateDBVaultFixturesEncryptedKPs(
	t *testing.T,
	ctx context.Context,
	dbConnectionPool db.DBConnectionPool,
	encrypter utils.PrivateKeyEncrypter,
	encryptionPassphrase string,
	count int,
) []*keypair.Full

CreateDBVaultFixturesEncryptedKPs creates 'count' number of dbVaultEntries, and store them in the DB with the private keys encrypted, returning the slice of Keypairs.

func DeleteAllFromChannelAccounts

func DeleteAllFromChannelAccounts(t *testing.T, ctx context.Context, sqlExec db.SQLExecuter)

func DeleteAllFromDBVaultEntries

func DeleteAllFromDBVaultEntries(t *testing.T, ctx context.Context, sqlExec db.SQLExecuter)

func DeleteAllTransactionFixtures

func DeleteAllTransactionFixtures(t *testing.T, ctx context.Context, sqlExec db.SQLExecuter)

DeleteAllTransactionFixtures deletes all submitter transactions in the database

Types

type ChannelAccount

type ChannelAccount struct {
	PublicKey  string       `db:"public_key"`
	PrivateKey string       `db:"private_key"` // TODO: remove this from the model, since we now rely on a Signer interface.
	UpdatedAt  *time.Time   `db:"updated_at"`
	CreatedAt  *time.Time   `db:"created_at"`
	LockedAt   sql.NullTime `db:"locked_at"`
	// LockedUntilLedgerNumber is the ledger number after which the lock expires. It should be synched with the
	// expiration ledger bound of the transaction submitted by this Stellar channel account.
	LockedUntilLedgerNumber sql.NullInt32 `db:"locked_until_ledger_number"`
}

func CreateChannelAccountFixtures

func CreateChannelAccountFixtures(t *testing.T, ctx context.Context, dbConnectionPool db.DBConnectionPool, count int) []*ChannelAccount

CreateChannelAccountFixtures creates 'count' number of channel accounts and store them in the DB, returning the channel accounts.

func CreateChannelAccountFixturesEncrypted

func CreateChannelAccountFixturesEncrypted(
	t *testing.T,
	ctx context.Context,
	dbConnectionPool db.DBConnectionPool,
	encrypter utils.PrivateKeyEncrypter,
	encryptionPassphrase string,
	count int,
) []*ChannelAccount

CreateChannelAccountFixturesEncrypted creates 'count' number of channel accounts, and store them in the DB with the private keys encrypted, returning the channel accounts.

func (*ChannelAccount) IsLocked

func (ca *ChannelAccount) IsLocked(currentLedgerNumber int32) bool

type ChannelAccountModel

type ChannelAccountModel struct {
	DBConnectionPool db.DBConnectionPool
}

func NewChannelAccountModel

func NewChannelAccountModel(dbConnectionPool db.DBConnectionPool) *ChannelAccountModel

func (*ChannelAccountModel) BatchInsert

func (ca *ChannelAccountModel) BatchInsert(ctx context.Context, sqlExec db.SQLExecuter, channelAccounts []*ChannelAccount) error

BatchInsert inserts a a batch of (publicKey, privateKey) pairs into the database.

func (*ChannelAccountModel) BatchInsertAndLock

func (ca *ChannelAccountModel) BatchInsertAndLock(ctx context.Context, channelAccounts []*ChannelAccount, currentLedger, nextLedgerLock int) error

BatchInsertAndLock inserts a batch of account keypairs into the database and locks them until some future ledger.

func (*ChannelAccountModel) Count

func (ca *ChannelAccountModel) Count(ctx context.Context) (int, error)

Count retrieves the current count of channel accounts in the database.

func (*ChannelAccountModel) Delete

func (ca *ChannelAccountModel) Delete(ctx context.Context, sqlExec db.SQLExecuter, publicKey string) error

Delete deletes a channel account with the provided publicKey from the database.

func (*ChannelAccountModel) DeleteIfLockedUntil

func (ca *ChannelAccountModel) DeleteIfLockedUntil(ctx context.Context, publicKey string, lockedUntilLedgerNumber int) error

DeleteIfLockedUntil deletes a channel account with the provided publicKey from the database only if the provided `lockedUntilLedgerNumber` matches the value of the same field on the channel account. Also, if the account has not been locked previously, does not proceed with the deletion.

func (*ChannelAccountModel) Get

func (ca *ChannelAccountModel) Get(ctx context.Context, sqlExec db.SQLExecuter, publicKey string, currentLedgerNumber int) (*ChannelAccount, error)

Get retrieves the channel account with the given public key from the database if account is not locked or `currentLedgerNumber` is ahead of the ledger number the account has been locked to.

func (*ChannelAccountModel) GetAll

func (ca *ChannelAccountModel) GetAll(ctx context.Context, sqlExec db.SQLExecuter, currentLedgerNumber, limit int) ([]*ChannelAccount, error)

GetAll all channel accounts from the database, respecting the limit provided for accounts that are not locked or `currentLedgerNumber` is ahead of the ledger number each account has been locked to.

func (*ChannelAccountModel) GetAndLock

func (ca *ChannelAccountModel) GetAndLock(ctx context.Context, publicKey string, currentLedger, nextLedgerLock int) (*ChannelAccount, error)

GetAndLock retrieves the channel account with the given public key from the database and locks the account until some future ledger.

func (*ChannelAccountModel) GetAndLockAll

func (ca *ChannelAccountModel) GetAndLockAll(ctx context.Context, currentLedger, nextLedgerLock, limit int) ([]*ChannelAccount, error)

GetAndLockAll retrieves all channel account that are not already locked from the database and locks them until some future ledger.

func (*ChannelAccountModel) Insert

func (ca *ChannelAccountModel) Insert(ctx context.Context, sqlExec db.SQLExecuter, publicKey string, privateKey string) error

Insert inserts a (publicKey, privateKey) pair to the database.

func (*ChannelAccountModel) InsertAndLock

func (ca *ChannelAccountModel) InsertAndLock(ctx context.Context, publicKey string, privateKey string, currentLedger, nextLedgerLock int) error

InsertAndLock insert an account keypair into the database and locks it until some future ledger.

func (*ChannelAccountModel) Lock

func (ca *ChannelAccountModel) Lock(ctx context.Context, sqlExec db.SQLExecuter, publicKey string, currentLedger, nextLedgerLock int32) (*ChannelAccount, error)

Lock locks the channel account with the provided publicKey. It returns a ErrRecordNotFound error if you try to lock a channel account that is already locked.

func (*ChannelAccountModel) Unlock

func (ca *ChannelAccountModel) Unlock(ctx context.Context, sqlExec db.SQLExecuter, publicKey string) (*ChannelAccount, error)

Unlock lifts the lock from the channel account with the provided publicKey.

type ChannelAccountStore

type ChannelAccountStore interface {
	Delete(ctx context.Context, sqlExec db.SQLExecuter, publicKey string) (err error)
	DeleteIfLockedUntil(ctx context.Context, publicKey string, lockedUntilLedgerNumber int) (err error)
	Get(ctx context.Context, sqlExec db.SQLExecuter, publicKey string, currentLedgerNumber int) (ca *ChannelAccount, err error)
	GetAndLock(ctx context.Context, publicKey string, currentLedger, nextLedgerLock int) (*ChannelAccount, error)
	Count(ctx context.Context) (count int, err error)
	GetAll(ctx context.Context, sqlExec db.SQLExecuter, currentLedger, limit int) ([]*ChannelAccount, error)
	GetAndLockAll(ctx context.Context, currentLedger, nextLedgerLock, limit int) ([]*ChannelAccount, error)
	Insert(ctx context.Context, sqlExec db.SQLExecuter, publicKey string, privateKey string) error
	InsertAndLock(ctx context.Context, publicKey string, privateKey string, currentLedger, nextLedgerLock int) error
	BatchInsert(ctx context.Context, sqlExec db.SQLExecuter, channelAccounts []*ChannelAccount) error
	BatchInsertAndLock(ctx context.Context, channelAccounts []*ChannelAccount, currentLedger, nextLedgerLock int) error
	// Lock management:
	Lock(ctx context.Context, sqlExec db.SQLExecuter, publicKey string, currentLedger, nextLedgerLock int32) (*ChannelAccount, error)
	Unlock(ctx context.Context, sqlExec db.SQLExecuter, publicKey string) (*ChannelAccount, error)
}

type ChannelTransactionBundle

type ChannelTransactionBundle struct {
	// ChannelAccount is the resource needed to process the Transaction.
	ChannelAccount ChannelAccount `db:"channel_account"`
	// Transaction is the job that would be handled by the worker.
	Transaction Transaction `db:"transaction"`
	// LockedUntilLedgerNumber is the ledger number until which both the transaction and channel account are locked.
	LockedUntilLedgerNumber int `db:"locked_until_ledger_number"`
}

ChannelTransactionBundle is an abstraction that aggregates a bundle of a ChannelAccount and a Transaction. It is used to prepare the resources for the workers, locking both the Transaction (the job) and the ChannelAccount (the resource), and then updating the lock according with the parameters provided.

type ChannelTransactionBundleModel

type ChannelTransactionBundleModel struct {
	// contains filtered or unexported fields
}

func NewChannelTransactionBundleModel

func NewChannelTransactionBundleModel(dbConnectionPool db.DBConnectionPool) (*ChannelTransactionBundleModel, error)

func (*ChannelTransactionBundleModel) LoadAndLockTuples

func (m *ChannelTransactionBundleModel) LoadAndLockTuples(ctx context.Context, currentLedgerNumber, lockToLedgerNumber, limit int) ([]*ChannelTransactionBundle, error)

LoadAndLockTuples loads a slice of ChannelTransactionBundle from the database, and locks them until the given ledger number, up to the amount of transactions specified by the {limit} parameter. It returns the ErrInsuficientChannelAccounts error if there are transactions to process but no channel accounts available.

type DBVault

type DBVault interface {
	BatchInsert(ctx context.Context, dbVaultEntries []*DBVaultEntry) error
	Get(ctx context.Context, publicKey string) (*DBVaultEntry, error)
	Delete(ctx context.Context, publicKey string) error
}

type DBVaultEntry

type DBVaultEntry struct {
	PublicKey           string    `db:"public_key"`
	EncryptedPrivateKey string    `db:"encrypted_private_key"`
	UpdatedAt           time.Time `db:"updated_at"`
	CreatedAt           time.Time `db:"created_at"`
}

func CreateDBVaultFixturesEncrypted

func CreateDBVaultFixturesEncrypted(
	t *testing.T,
	ctx context.Context,
	dbConnectionPool db.DBConnectionPool,
	encrypter utils.PrivateKeyEncrypter,
	encryptionPassphrase string,
	count int,
) []*DBVaultEntry

CreateDBVaultFixturesEncrypted creates 'count' number of dbVaultEntries, and store them in the DB with the private keys encrypted, returning the slice of dbVaultEntries.

func (DBVaultEntry) String

func (e DBVaultEntry) String() string

type DBVaultModel

type DBVaultModel struct {
	DBConnectionPool db.DBConnectionPool
}

func NewDBVaultModel

func NewDBVaultModel(dbConnectionPool db.DBConnectionPool) *DBVaultModel

func (*DBVaultModel) BatchInsert

func (m *DBVaultModel) BatchInsert(ctx context.Context, dbVaultEntries []*DBVaultEntry) error

BatchInsert inserts a batch of (publicKey, encryptedPrivateKey) pairs into the database vault.

func (*DBVaultModel) Delete

func (m *DBVaultModel) Delete(ctx context.Context, publicKey string) error

Delete deletes an entry with the provided publicKey from the database vault.

func (*DBVaultModel) Get

func (m *DBVaultModel) Get(ctx context.Context, publicKey string) (*DBVaultEntry, error)

Get returns a DBVaultEntry with the provided publicKey from the database vault.

type Transaction

type Transaction struct {
	ID string `db:"id"`
	// ExternalID contains an external ID for the transaction. This is used for reconciliation.
	ExternalID string `db:"external_id"`
	// Status is the status of the transaction. Don't change it directly and use the internal methods of the model instead.
	Status        TransactionStatus        `db:"status"`
	StatusMessage sql.NullString           `db:"status_message"`
	StatusHistory TransactionStatusHistory `db:"status_history"`
	AssetCode     string                   `db:"asset_code"`
	AssetIssuer   string                   `db:"asset_issuer"`
	Amount        float64                  `db:"amount"`
	Destination   string                   `db:"destination"`

	TenantID            string         `db:"tenant_id"`
	DistributionAccount sql.NullString `db:"distribution_account"`

	CreatedAt *time.Time `db:"created_at"`
	UpdatedAt *time.Time `db:"updated_at"`
	// StartedAt is when the transaction was read from the queue into memory.
	StartedAt *time.Time `db:"started_at"`
	// SentAt is when the transaction was sent to the Stellar network.
	SentAt *time.Time `db:"sent_at"`
	// CompletedAt is when the transaction reached a terminal state, either SUCCESS or ERROR.
	CompletedAt *time.Time `db:"completed_at"`
	// SyncedAt is when the transaction was synced with SDP.
	SyncedAt *time.Time `db:"synced_at"`

	AttemptsCount          int            `db:"attempts_count"`
	StellarTransactionHash sql.NullString `db:"stellar_transaction_hash"`
	// XDRSent is the EnvelopeXDR submitted when creating a Stellar transaction in the network.
	XDRSent sql.NullString `db:"xdr_sent"`
	// XDRReceived is the ResultXDR received from the Stellar network when attempting to create a transaction.
	XDRReceived sql.NullString `db:"xdr_received"`
	LockedAt    *time.Time     `db:"locked_at"`
	// LockedUntilLedgerNumber is the ledger number after which the lock expires. It should be synched with the
	// expiration ledger bound set in the Stellar transaction submitted to the blockchain, and the same value in the
	// namesake column of the channel account model.
	LockedUntilLedgerNumber sql.NullInt32 `db:"locked_until_ledger_number"`
}

func CreateTransactionFixtureNew

func CreateTransactionFixtureNew(
	t *testing.T,
	ctx context.Context,
	sqlExec db.SQLExecuter,
	txFixture TransactionFixture,
) *Transaction

CreateTransactionFixture creates a submitter transaction in the database

func CreateTransactionFixturesNew

func CreateTransactionFixturesNew(t *testing.T,
	ctx context.Context,
	sqlExec db.SQLExecuter,
	count int,
	txFixture TransactionFixture,
) []*Transaction

CreateTransactionFixtures creates count number submitter transactions

func (*Transaction) IsLocked

func (tx *Transaction) IsLocked(currentLedgerNumber int32) bool

type TransactionFixture

type TransactionFixture struct {
	ExternalID          string
	AssetCode           string
	AssetIssuer         string
	DestinationAddress  string
	Status              TransactionStatus
	Amount              float64
	TenantID            string
	DistributionAccount string
}

type TransactionModel

type TransactionModel struct {
	DBConnectionPool db.DBConnectionPool
}

func NewTransactionModel

func NewTransactionModel(dbConnectionPool db.DBConnectionPool) *TransactionModel

func (*TransactionModel) BulkInsert

func (t *TransactionModel) BulkInsert(ctx context.Context, sqlExec db.SQLExecuter, transactions []Transaction) ([]Transaction, error)

BulkInsert adds a batch of Transactions to the database and returns the inserted transactions.

func (*TransactionModel) Get

func (t *TransactionModel) Get(ctx context.Context, txID string) (*Transaction, error)

Get gets a Transaction from the database.

func (*TransactionModel) GetAllByPaymentIDs

func (t *TransactionModel) GetAllByPaymentIDs(ctx context.Context, paymentIDs []string) ([]*Transaction, error)

func (*TransactionModel) GetTransactionBatchForUpdate

func (t *TransactionModel) GetTransactionBatchForUpdate(ctx context.Context, dbTx db.DBTransaction, batchSize int, tenantID string) ([]*Transaction, error)

GetTransactionBatchForUpdate returns a batch of transactions that are ready to be synced. Locks the rows for update.

func (*TransactionModel) GetTransactionPendingUpdateByID

func (t *TransactionModel) GetTransactionPendingUpdateByID(ctx context.Context, dbTx db.SQLExecuter, txID string) (*Transaction, error)

func (*TransactionModel) Insert

Insert adds a new Transaction to the database.

func (*TransactionModel) Lock

func (ca *TransactionModel) Lock(ctx context.Context, sqlExec db.SQLExecuter, transactionID string, currentLedger, nextLedgerLock int32) (*Transaction, error)

Lock locks the transaction with the provided transactionID. It returns a ErrRecordNotFound error if you try to lock a transaction that is already locked.

func (*TransactionModel) PrepareTransactionForReprocessing

func (ca *TransactionModel) PrepareTransactionForReprocessing(ctx context.Context, sqlExec db.SQLExecuter, transactionID string) (*Transaction, error)

PrepareTransactionForReprocessing pushes the transaction with the provided transactionID back to the queue.

func (*TransactionModel) Unlock

func (ca *TransactionModel) Unlock(ctx context.Context, sqlExec db.SQLExecuter, publicKey string) (*Transaction, error)

Unlock lifts the lock from the transactionID with the provided publicKey.

func (*TransactionModel) UpdateStatusToError

func (t *TransactionModel) UpdateStatusToError(ctx context.Context, tx Transaction, message string) (*Transaction, error)

UpdateStatusToError updates a Transaction's status to ERROR. Only succeeds if the current status is PROCESSING.

func (*TransactionModel) UpdateStatusToSuccess

func (t *TransactionModel) UpdateStatusToSuccess(ctx context.Context, tx Transaction) (*Transaction, error)

UpdateStatusToSuccess updates a Transaction's status to SUCCESS. Only succeeds if the current status is PROCESSING.

func (*TransactionModel) UpdateStellarTransactionHashAndXDRSent

func (t *TransactionModel) UpdateStellarTransactionHashAndXDRSent(ctx context.Context, txID string, txHash, txXDRSent string) (*Transaction, error)

func (*TransactionModel) UpdateStellarTransactionXDRReceived

func (t *TransactionModel) UpdateStellarTransactionXDRReceived(ctx context.Context, txID string, xdrReceived string) (*Transaction, error)

UpdateStellarTransactionXDRReceived updates a Transaction's XDR received.

func (*TransactionModel) UpdateSyncedTransactions

func (t *TransactionModel) UpdateSyncedTransactions(ctx context.Context, dbTx db.SQLExecuter, txIDs []string) error

UpdateSyncedTransactions updates the synced_at field for the given transaction IDs. Returns an error if the number of updated rows is not equal to the number of provided transaction IDs.

type TransactionStatus

type TransactionStatus string
const (
	// TransactionStatusPending indicates that a transaction has been created and added to the queue.
	TransactionStatusPending TransactionStatus = "PENDING" // TODO: rename to TransactionStatusQueued
	// TransactionStatusProcessing indicates that a transaction has been read from the queue and is being processed.
	TransactionStatusProcessing TransactionStatus = "PROCESSING"
	// TransactionStatusSuccess indicates that the transaction was successfully sent and included in the ledger.
	TransactionStatusSuccess TransactionStatus = "SUCCESS"
	// TransactionStatusError indicates that there was an error when trying to send this transaction.
	TransactionStatusError TransactionStatus = "ERROR"
)

func (TransactionStatus) All

func (status TransactionStatus) All() []TransactionStatus

func (TransactionStatus) CanTransitionTo

func (status TransactionStatus) CanTransitionTo(targetState TransactionStatus) error

CanTransitionTo verifies if the transition is allowed.

func (TransactionStatus) State

func (status TransactionStatus) State() data.State

State will parse the TransactionState into a data.State.

func (TransactionStatus) Validate

func (status TransactionStatus) Validate() error

Validate validates the disbursement status

type TransactionStatusHistory

type TransactionStatusHistory []TransactionStatusHistoryEntry

func (*TransactionStatusHistory) Scan

func (tsh *TransactionStatusHistory) Scan(src interface{}) error

Scan implements the sql.Scanner interface.

func (TransactionStatusHistory) Value

func (tsh TransactionStatusHistory) Value() (driver.Value, error)

Value implements the driver.Valuer interface.

type TransactionStatusHistoryEntry

type TransactionStatusHistoryEntry struct {
	Status                 string    `json:"status"`
	StatusMessage          string    `json:"status_message"`
	Timestamp              time.Time `json:"timestamp"`
	StellarTransactionHash string    `json:"stellar_transaction_hash"`
	XDRSent                string    `json:"xdr_sent"`
	XDRReceived            string    `json:"xdr_received"`
}

type TransactionStore

type TransactionStore interface {
	// CRUD:
	Insert(ctx context.Context, tx Transaction) (*Transaction, error)
	BulkInsert(ctx context.Context, sqlExec db.SQLExecuter, transactions []Transaction) ([]Transaction, error)
	Get(ctx context.Context, txID string) (tx *Transaction, err error)
	GetAllByPaymentIDs(ctx context.Context, paymentIDs []string) (transactions []*Transaction, err error)
	// Status & Lock management:
	UpdateStatusToSuccess(ctx context.Context, tx Transaction) (updatedTx *Transaction, err error)
	UpdateStatusToError(ctx context.Context, tx Transaction, message string) (updatedTx *Transaction, err error)
	UpdateStellarTransactionXDRReceived(ctx context.Context, txID string, xdrReceived string) (*Transaction, error)
	UpdateStellarTransactionHashAndXDRSent(ctx context.Context, txID string, txHash, txXDRSent string) (*Transaction, error)
	Lock(ctx context.Context, sqlExec db.SQLExecuter, transactionID string, currentLedger, nextLedgerLock int32) (*Transaction, error)
	Unlock(ctx context.Context, sqlExec db.SQLExecuter, publicKey string) (*Transaction, error)
	// Queue management:
	PrepareTransactionForReprocessing(ctx context.Context, sqlExec db.SQLExecuter, transactionID string) (*Transaction, error)
	GetTransactionBatchForUpdate(ctx context.Context, dbTx db.DBTransaction, batchSize int, tenantID string) (transactions []*Transaction, err error)
	GetTransactionPendingUpdateByID(ctx context.Context, sqlExec db.SQLExecuter, txID string) (transaction *Transaction, err error)
	UpdateSyncedTransactions(ctx context.Context, sqlExec db.SQLExecuter, txIDs []string) error
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL