sqlite

package
v0.0.0-...-6b53760 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 16, 2024 License: MPL-2.0 Imports: 16 Imported by: 0

Documentation

Index

Constants

View Source
const EventSavedTopic = "event_saved"

Variables

View Source
var ErrQueueEmpty = errors.New("queue is empty")

Functions

func NewMigrations

func NewMigrations(fns *MigrationFns) (migrations.Migrations, error)

func Open

func Open(conf config.Config) (*sql.DB, error)

Types

type AdaptersFactoryFn

type AdaptersFactoryFn = GenericAdaptersFactoryFn[app.Adapters]

type BackoffManager

type BackoffManager interface {
	// GetMessageErrorBackoff is used to backoff reprocessing of a single
	// specific message if its processing fails. The first time message
	// processing fails 1 is passed to this function, the second time 2 etc.
	GetMessageErrorBackoff(nackCount int) time.Duration

	// GetNoMessagesBackoff is used to backoff querying for new messages on the
	// queue. The first time in a row where the query returns no messages 1 is
	// passed to this function, then 2 is passed etc.
	GetNoMessagesBackoff(tick int) time.Duration
}

type ContactRepository

type ContactRepository struct {
	// contains filtered or unexported fields
}

func NewContactRepository

func NewContactRepository(tx *sql.Tx) *ContactRepository

func (*ContactRepository) CountFollowees

func (r *ContactRepository) CountFollowees(ctx context.Context, publicKey domain.PublicKey) (int, error)

func (*ContactRepository) CountFollowers

func (r *ContactRepository) CountFollowers(ctx context.Context, publicKey domain.PublicKey) (int, error)

func (*ContactRepository) GetCurrentContactsEvent

func (r *ContactRepository) GetCurrentContactsEvent(ctx context.Context, author domain.PublicKey) (domain.Event, error)

func (*ContactRepository) GetFollowees

func (r *ContactRepository) GetFollowees(ctx context.Context, publicKey domain.PublicKey) ([]domain.PublicKey, error)

func (*ContactRepository) IsFolloweeOfMonitoredPublicKey

func (r *ContactRepository) IsFolloweeOfMonitoredPublicKey(ctx context.Context, publicKey domain.PublicKey) (bool, error)

func (*ContactRepository) SetContacts

func (r *ContactRepository) SetContacts(ctx context.Context, event domain.Event, contacts []domain.PublicKey) error

type DefaultBackoffManager

type DefaultBackoffManager struct {
}

func NewDefaultBackoffManager

func NewDefaultBackoffManager() DefaultBackoffManager

func (DefaultBackoffManager) GetMessageErrorBackoff

func (d DefaultBackoffManager) GetMessageErrorBackoff(nackCount int) time.Duration

func (DefaultBackoffManager) GetNoMessagesBackoff

func (d DefaultBackoffManager) GetNoMessagesBackoff(tick int) time.Duration

type EventRepository

type EventRepository struct {
	// contains filtered or unexported fields
}

func NewEventRepository

func NewEventRepository(tx *sql.Tx) (*EventRepository, error)

func (*EventRepository) Count

func (r *EventRepository) Count(ctx context.Context) (int, error)

func (*EventRepository) Exists

func (r *EventRepository) Exists(ctx context.Context, eventID domain.EventId) (bool, error)

func (*EventRepository) Get

func (r *EventRepository) Get(ctx context.Context, eventID domain.EventId) (domain.Event, error)

func (*EventRepository) List

func (r *EventRepository) List(ctx context.Context, after *domain.EventId, limit int) ([]domain.Event, error)

func (*EventRepository) Save

func (r *EventRepository) Save(ctx context.Context, event domain.Event) error

type EventSavedEventTransport

type EventSavedEventTransport struct {
	EventID string `json:"eventID"`
}

type GenericAdaptersFactoryFn

type GenericAdaptersFactoryFn[T any] func(*sql.DB, *sql.Tx) (T, error)

type GenericTransactionProvider

type GenericTransactionProvider[T any] struct {
	// contains filtered or unexported fields
}

func (*GenericTransactionProvider[T]) Transact

func (t *GenericTransactionProvider[T]) Transact(ctx context.Context, fn func(context.Context, T) error) error

type Message

type Message struct {
	// contains filtered or unexported fields
}

func NewMessage

func NewMessage(uuid string, payload []byte) (Message, error)

func (Message) Payload

func (m Message) Payload() []byte

func (Message) UUID

func (m Message) UUID() string

type MigrationFns

type MigrationFns struct {
	// contains filtered or unexported fields
}

func NewMigrationFns

func NewMigrationFns(db *sql.DB, pubsub *PubSub) *MigrationFns

func (*MigrationFns) CreatePubsubTables

func (m *MigrationFns) CreatePubsubTables(ctx context.Context, state migrations.State, saveStateFunc migrations.SaveStateFunc) error

func (*MigrationFns) Initial

func (m *MigrationFns) Initial(ctx context.Context, state migrations.State, saveStateFunc migrations.SaveStateFunc) error

type MigrationsStorage

type MigrationsStorage struct {
	// contains filtered or unexported fields
}

func NewMigrationsStorage

func NewMigrationsStorage(db *sql.DB) (*MigrationsStorage, error)

func (*MigrationsStorage) LoadState

func (b *MigrationsStorage) LoadState(name string) (migrations.State, error)

func (*MigrationsStorage) LoadStatus

func (b *MigrationsStorage) LoadStatus(name string) (migrations.Status, error)

func (*MigrationsStorage) SaveState

func (b *MigrationsStorage) SaveState(name string, state migrations.State) error

func (*MigrationsStorage) SaveStatus

func (b *MigrationsStorage) SaveStatus(name string, status migrations.Status) error

type PubSub

type PubSub struct {
	// contains filtered or unexported fields
}

func NewPubSub

func NewPubSub(
	transactionProvider PubsubTransactionProvider,
	logger logging.Logger,
) *PubSub

func (*PubSub) InitializingQueries

func (p *PubSub) InitializingQueries() []string

func (*PubSub) OldestMessageAge

func (p *PubSub) OldestMessageAge(ctx context.Context, topic string) (time.Duration, error)

OldestMessageAge returns ErrQueueEmpty if the queue is empty.

func (*PubSub) Publish

func (p *PubSub) Publish(ctx context.Context, topic string, msg Message) error

func (*PubSub) QueueLength

func (p *PubSub) QueueLength(ctx context.Context, topic string) (int, error)

func (*PubSub) Subscribe

func (p *PubSub) Subscribe(ctx context.Context, topic string) <-chan *ReceivedMessage

type PubSubTxTransactionProvider

type PubSubTxTransactionProvider = GenericTransactionProvider[*sql.Tx]

func NewPubSubTxTransactionProvider

func NewPubSubTxTransactionProvider(
	db *sql.DB,
	runner *TransactionRunner,
) *PubSubTxTransactionProvider

type PublicKeysToMonitorRepository

type PublicKeysToMonitorRepository struct {
	// contains filtered or unexported fields
}

func NewPublicKeysToMonitorRepository

func NewPublicKeysToMonitorRepository(tx *sql.Tx) (*PublicKeysToMonitorRepository, error)

func (*PublicKeysToMonitorRepository) Get

func (*PublicKeysToMonitorRepository) List

func (*PublicKeysToMonitorRepository) Save

type Publisher

type Publisher struct {
	// contains filtered or unexported fields
}

func NewPublisher

func NewPublisher(pubsub *TxPubSub) *Publisher

func (*Publisher) PublishEventSaved

func (p *Publisher) PublishEventSaved(ctx context.Context, id domain.EventId) error

type PubsubTransactionProvider

type PubsubTransactionProvider interface {
	Transact(context.Context, func(context.Context, *sql.Tx) error) error
}

type ReceivedMessage

type ReceivedMessage struct {
	Message
	// contains filtered or unexported fields
}

func NewReceivedMessage

func NewReceivedMessage(message Message) *ReceivedMessage

func (*ReceivedMessage) Ack

func (m *ReceivedMessage) Ack() error

func (*ReceivedMessage) Nack

func (m *ReceivedMessage) Nack() error

type RelayRepository

type RelayRepository struct {
	// contains filtered or unexported fields
}

func NewRelayRepository

func NewRelayRepository(tx *sql.Tx) *RelayRepository

func (*RelayRepository) Count

func (r *RelayRepository) Count(ctx context.Context) (int, error)

func (*RelayRepository) List

func (*RelayRepository) Save

func (r *RelayRepository) Save(ctx context.Context, eventID domain.EventId, relayAddress domain.MaybeRelayAddress) error

type Subscriber

type Subscriber struct {
	// contains filtered or unexported fields
}

func NewSubscriber

func NewSubscriber(
	pubsub *PubSub,
	db *sql.DB,
) *Subscriber

func (*Subscriber) EventSavedOldestMessageAge

func (s *Subscriber) EventSavedOldestMessageAge(ctx context.Context) (time.Duration, error)

func (*Subscriber) EventSavedQueueLength

func (s *Subscriber) EventSavedQueueLength(ctx context.Context) (int, error)

func (*Subscriber) SubscribeToEventSaved

func (s *Subscriber) SubscribeToEventSaved(ctx context.Context) <-chan *ReceivedMessage

type TestAdapters

type TestAdapters struct {
	EventRepository               *EventRepository
	RelayRepository               *RelayRepository
	ContactRepository             *ContactRepository
	PublicKeysToMonitorRepository *PublicKeysToMonitorRepository
	Publisher                     *Publisher
}

type TestAdaptersFactoryFn

type TestAdaptersFactoryFn = GenericAdaptersFactoryFn[TestAdapters]

type TestTransactionProvider

type TestTransactionProvider = GenericTransactionProvider[TestAdapters]

func NewTestTransactionProvider

func NewTestTransactionProvider(
	db *sql.DB,
	fn TestAdaptersFactoryFn,
	runner *TransactionRunner,
) *TestTransactionProvider

type TestedItems

type TestedItems struct {
	TransactionProvider *TestTransactionProvider
	Subscriber          *Subscriber
	MigrationsStorage   *MigrationsStorage
	PubSub              *PubSub

	MigrationsRunner           *migrations.Runner
	Migrations                 migrations.Migrations
	MigrationsProgressCallback migrations.ProgressCallback
	TransactionRunner          *TransactionRunner
}

type TransactionFunc

type TransactionFunc func(context.Context, *sql.DB, *sql.Tx) error

type TransactionProvider

type TransactionProvider = GenericTransactionProvider[app.Adapters]

func NewTransactionProvider

func NewTransactionProvider(
	db *sql.DB,
	fn AdaptersFactoryFn,
	runner *TransactionRunner,
) *TransactionProvider

type TransactionRunner

type TransactionRunner struct {
	// contains filtered or unexported fields
}

func NewTransactionRunner

func NewTransactionRunner(db *sql.DB) *TransactionRunner

func (*TransactionRunner) Run

func (t *TransactionRunner) Run(ctx context.Context) error

func (*TransactionRunner) TryRun

type TxPubSub

type TxPubSub struct {
	// contains filtered or unexported fields
}

func NewTxPubSub

func NewTxPubSub(
	tx *sql.Tx,
	logger logging.Logger,
) *TxPubSub

func (*TxPubSub) PublishTx

func (p *TxPubSub) PublishTx(topic string, msg Message) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL