Documentation ¶
Index ¶
- Constants
- Variables
- func NewMigrations(fns *MigrationFns) (migrations.Migrations, error)
- func Open(conf config.Config) (*sql.DB, error)
- type AdaptersFactoryFn
- type BackoffManager
- type ContactRepository
- func (r *ContactRepository) CountFollowees(ctx context.Context, publicKey domain.PublicKey) (int, error)
- func (r *ContactRepository) CountFollowers(ctx context.Context, publicKey domain.PublicKey) (int, error)
- func (r *ContactRepository) GetCurrentContactsEvent(ctx context.Context, author domain.PublicKey) (domain.Event, error)
- func (r *ContactRepository) GetFollowees(ctx context.Context, publicKey domain.PublicKey) ([]domain.PublicKey, error)
- func (r *ContactRepository) IsFolloweeOfMonitoredPublicKey(ctx context.Context, publicKey domain.PublicKey) (bool, error)
- func (r *ContactRepository) SetContacts(ctx context.Context, event domain.Event, contacts []domain.PublicKey) error
- type DefaultBackoffManager
- type EventRepository
- func (r *EventRepository) Count(ctx context.Context) (int, error)
- func (r *EventRepository) Exists(ctx context.Context, eventID domain.EventId) (bool, error)
- func (r *EventRepository) Get(ctx context.Context, eventID domain.EventId) (domain.Event, error)
- func (r *EventRepository) List(ctx context.Context, after *domain.EventId, limit int) ([]domain.Event, error)
- func (r *EventRepository) Save(ctx context.Context, event domain.Event) error
- type EventSavedEventTransport
- type GenericAdaptersFactoryFn
- type GenericTransactionProvider
- type Message
- type MigrationFns
- type MigrationsStorage
- func (b *MigrationsStorage) LoadState(name string) (migrations.State, error)
- func (b *MigrationsStorage) LoadStatus(name string) (migrations.Status, error)
- func (b *MigrationsStorage) SaveState(name string, state migrations.State) error
- func (b *MigrationsStorage) SaveStatus(name string, status migrations.Status) error
- type PubSub
- func (p *PubSub) InitializingQueries() []string
- func (p *PubSub) OldestMessageAge(ctx context.Context, topic string) (time.Duration, error)
- func (p *PubSub) Publish(ctx context.Context, topic string, msg Message) error
- func (p *PubSub) QueueLength(ctx context.Context, topic string) (int, error)
- func (p *PubSub) Subscribe(ctx context.Context, topic string) <-chan *ReceivedMessage
- type PubSubTxTransactionProvider
- type PublicKeysToMonitorRepository
- func (r *PublicKeysToMonitorRepository) Get(ctx context.Context, publicKey domain.PublicKey) (domain.PublicKeyToMonitor, error)
- func (r *PublicKeysToMonitorRepository) List(ctx context.Context) ([]domain.PublicKeyToMonitor, error)
- func (r *PublicKeysToMonitorRepository) Save(ctx context.Context, publicKeyToMonitor domain.PublicKeyToMonitor) error
- type Publisher
- type PubsubTransactionProvider
- type ReceivedMessage
- type RelayRepository
- type Subscriber
- type TestAdapters
- type TestAdaptersFactoryFn
- type TestTransactionProvider
- type TestedItems
- type TransactionFunc
- type TransactionProvider
- type TransactionRunner
- type TxPubSub
Constants ¶
View Source
const EventSavedTopic = "event_saved"
Variables ¶
View Source
var ErrQueueEmpty = errors.New("queue is empty")
Functions ¶
func NewMigrations ¶
func NewMigrations(fns *MigrationFns) (migrations.Migrations, error)
Types ¶
type AdaptersFactoryFn ¶
type AdaptersFactoryFn = GenericAdaptersFactoryFn[app.Adapters]
type BackoffManager ¶
type BackoffManager interface { // GetMessageErrorBackoff is used to backoff reprocessing of a single // specific message if its processing fails. The first time message // processing fails 1 is passed to this function, the second time 2 etc. GetMessageErrorBackoff(nackCount int) time.Duration // GetNoMessagesBackoff is used to backoff querying for new messages on the // queue. The first time in a row where the query returns no messages 1 is // passed to this function, then 2 is passed etc. GetNoMessagesBackoff(tick int) time.Duration }
type ContactRepository ¶
type ContactRepository struct {
// contains filtered or unexported fields
}
func NewContactRepository ¶
func NewContactRepository(tx *sql.Tx) *ContactRepository
func (*ContactRepository) CountFollowees ¶
func (*ContactRepository) CountFollowers ¶
func (*ContactRepository) GetCurrentContactsEvent ¶
func (*ContactRepository) GetFollowees ¶
func (*ContactRepository) IsFolloweeOfMonitoredPublicKey ¶
func (*ContactRepository) SetContacts ¶
type DefaultBackoffManager ¶
type DefaultBackoffManager struct { }
func NewDefaultBackoffManager ¶
func NewDefaultBackoffManager() DefaultBackoffManager
func (DefaultBackoffManager) GetMessageErrorBackoff ¶
func (d DefaultBackoffManager) GetMessageErrorBackoff(nackCount int) time.Duration
func (DefaultBackoffManager) GetNoMessagesBackoff ¶
func (d DefaultBackoffManager) GetNoMessagesBackoff(tick int) time.Duration
type EventRepository ¶
type EventRepository struct {
// contains filtered or unexported fields
}
func NewEventRepository ¶
func NewEventRepository(tx *sql.Tx) (*EventRepository, error)
type EventSavedEventTransport ¶
type EventSavedEventTransport struct {
EventID string `json:"eventID"`
}
type GenericTransactionProvider ¶
type GenericTransactionProvider[T any] struct { // contains filtered or unexported fields }
type MigrationFns ¶
type MigrationFns struct {
// contains filtered or unexported fields
}
func NewMigrationFns ¶
func NewMigrationFns(db *sql.DB, pubsub *PubSub) *MigrationFns
func (*MigrationFns) CreatePubsubTables ¶
func (m *MigrationFns) CreatePubsubTables(ctx context.Context, state migrations.State, saveStateFunc migrations.SaveStateFunc) error
func (*MigrationFns) Initial ¶
func (m *MigrationFns) Initial(ctx context.Context, state migrations.State, saveStateFunc migrations.SaveStateFunc) error
type MigrationsStorage ¶
type MigrationsStorage struct {
// contains filtered or unexported fields
}
func NewMigrationsStorage ¶
func NewMigrationsStorage(db *sql.DB) (*MigrationsStorage, error)
func (*MigrationsStorage) LoadState ¶
func (b *MigrationsStorage) LoadState(name string) (migrations.State, error)
func (*MigrationsStorage) LoadStatus ¶
func (b *MigrationsStorage) LoadStatus(name string) (migrations.Status, error)
func (*MigrationsStorage) SaveState ¶
func (b *MigrationsStorage) SaveState(name string, state migrations.State) error
func (*MigrationsStorage) SaveStatus ¶
func (b *MigrationsStorage) SaveStatus(name string, status migrations.Status) error
type PubSub ¶
type PubSub struct {
// contains filtered or unexported fields
}
func NewPubSub ¶
func NewPubSub( transactionProvider PubsubTransactionProvider, logger logging.Logger, ) *PubSub
func (*PubSub) InitializingQueries ¶
func (*PubSub) OldestMessageAge ¶
OldestMessageAge returns ErrQueueEmpty if the queue is empty.
func (*PubSub) QueueLength ¶
type PubSubTxTransactionProvider ¶
type PubSubTxTransactionProvider = GenericTransactionProvider[*sql.Tx]
func NewPubSubTxTransactionProvider ¶
func NewPubSubTxTransactionProvider( db *sql.DB, runner *TransactionRunner, ) *PubSubTxTransactionProvider
type PublicKeysToMonitorRepository ¶
type PublicKeysToMonitorRepository struct {
// contains filtered or unexported fields
}
func NewPublicKeysToMonitorRepository ¶
func NewPublicKeysToMonitorRepository(tx *sql.Tx) (*PublicKeysToMonitorRepository, error)
func (*PublicKeysToMonitorRepository) Get ¶
func (r *PublicKeysToMonitorRepository) Get(ctx context.Context, publicKey domain.PublicKey) (domain.PublicKeyToMonitor, error)
func (*PublicKeysToMonitorRepository) List ¶
func (r *PublicKeysToMonitorRepository) List(ctx context.Context) ([]domain.PublicKeyToMonitor, error)
func (*PublicKeysToMonitorRepository) Save ¶
func (r *PublicKeysToMonitorRepository) Save(ctx context.Context, publicKeyToMonitor domain.PublicKeyToMonitor) error
type Publisher ¶
type Publisher struct {
// contains filtered or unexported fields
}
func NewPublisher ¶
type ReceivedMessage ¶
type ReceivedMessage struct { Message // contains filtered or unexported fields }
func NewReceivedMessage ¶
func NewReceivedMessage(message Message) *ReceivedMessage
func (*ReceivedMessage) Ack ¶
func (m *ReceivedMessage) Ack() error
func (*ReceivedMessage) Nack ¶
func (m *ReceivedMessage) Nack() error
type RelayRepository ¶
type RelayRepository struct {
// contains filtered or unexported fields
}
func NewRelayRepository ¶
func NewRelayRepository(tx *sql.Tx) *RelayRepository
func (*RelayRepository) List ¶
func (r *RelayRepository) List(ctx context.Context) ([]domain.MaybeRelayAddress, error)
func (*RelayRepository) Save ¶
func (r *RelayRepository) Save(ctx context.Context, eventID domain.EventId, relayAddress domain.MaybeRelayAddress) error
type Subscriber ¶
type Subscriber struct {
// contains filtered or unexported fields
}
func NewSubscriber ¶
func NewSubscriber( pubsub *PubSub, db *sql.DB, ) *Subscriber
func (*Subscriber) EventSavedOldestMessageAge ¶
func (*Subscriber) EventSavedQueueLength ¶
func (s *Subscriber) EventSavedQueueLength(ctx context.Context) (int, error)
func (*Subscriber) SubscribeToEventSaved ¶
func (s *Subscriber) SubscribeToEventSaved(ctx context.Context) <-chan *ReceivedMessage
type TestAdapters ¶
type TestAdapters struct { EventRepository *EventRepository RelayRepository *RelayRepository ContactRepository *ContactRepository PublicKeysToMonitorRepository *PublicKeysToMonitorRepository Publisher *Publisher }
type TestAdaptersFactoryFn ¶
type TestAdaptersFactoryFn = GenericAdaptersFactoryFn[TestAdapters]
type TestTransactionProvider ¶
type TestTransactionProvider = GenericTransactionProvider[TestAdapters]
func NewTestTransactionProvider ¶
func NewTestTransactionProvider( db *sql.DB, fn TestAdaptersFactoryFn, runner *TransactionRunner, ) *TestTransactionProvider
type TestedItems ¶
type TestedItems struct { TransactionProvider *TestTransactionProvider Subscriber *Subscriber MigrationsStorage *MigrationsStorage PubSub *PubSub MigrationsRunner *migrations.Runner Migrations migrations.Migrations MigrationsProgressCallback migrations.ProgressCallback TransactionRunner *TransactionRunner }
type TransactionProvider ¶
type TransactionProvider = GenericTransactionProvider[app.Adapters]
func NewTransactionProvider ¶
func NewTransactionProvider( db *sql.DB, fn AdaptersFactoryFn, runner *TransactionRunner, ) *TransactionProvider
type TransactionRunner ¶
type TransactionRunner struct {
// contains filtered or unexported fields
}
func NewTransactionRunner ¶
func NewTransactionRunner(db *sql.DB) *TransactionRunner
func (*TransactionRunner) TryRun ¶
func (t *TransactionRunner) TryRun(ctx context.Context, fn TransactionFunc) error
Click to show internal directories.
Click to hide internal directories.