storage

package
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 2, 2022 License: MIT Imports: 16 Imported by: 0

Documentation

Index

Constants

View Source
const SequenceLeaseSize = 10_000

SequenceLeaseSize is the number of indexes leased at a time.

If the application does not gracefully shut down, this number represents the maximum number of indexes that will be "lost". Lost indexes are just numbers we will not be able to assign to new tasks; they don't represent lost data.

A higher number will permit faster index assignment but at the cost of more lost indexes during application crashes.

Variables

This section is empty.

Functions

This section is empty.

Types

type FifoRange added in v0.2.0

type FifoRange struct {
	// Queue restricts the search to only tasks in a given queue.
	Queue string

	StartIndex uint64
	EndIndex   uint64
}

FifoRange is a query for tasks within an index range

func (*FifoRange) GetEnd added in v0.2.0

func (fr *FifoRange) GetEnd() []byte

func (*FifoRange) GetPrefix added in v0.2.0

func (fr *FifoRange) GetPrefix() []byte

func (*FifoRange) GetQueue added in v0.2.0

func (fr *FifoRange) GetQueue() string

func (*FifoRange) GetStart added in v0.2.0

func (fr *FifoRange) GetStart() []byte

type IDGenerator added in v0.2.0

type IDGenerator struct {
	// contains filtered or unexported fields
}

func NewGenerator added in v0.2.0

func NewGenerator(indexes *IndexStore) *IDGenerator

func (*IDGenerator) ID added in v0.2.0

func (g *IDGenerator) ID(queue string, ts time.Time) (iden.TaskID, error)

type IndexStore added in v0.2.0

type IndexStore struct {
	// contains filtered or unexported fields
}

func NewIndexStore added in v0.2.0

func NewIndexStore(settings *SettingStore) *IndexStore

func (*IndexStore) Close added in v0.2.0

func (c *IndexStore) Close() error

func (*IndexStore) Next added in v0.2.0

func (c *IndexStore) Next(queue string) (uint64, error)

type PebbleClient added in v0.2.0

type PebbleClient struct {
	DB *pebble.DB
}

PebbleClient manages a connection to a pebble file store.

func NewPebbleClient added in v0.2.0

func NewPebbleClient(dataDir string) (*PebbleClient, error)

func (*PebbleClient) Close added in v0.2.0

func (bc *PebbleClient) Close() error

Close flushes writes to disk.

It must be called when the client is no longer needed or else pending writes may be canceled when the application terminates.

type ScheduledRange added in v0.2.0

type ScheduledRange struct {
	// Queue restricts the search to only tasks in a given queue.
	Queue string

	// StartID restricts the search to all task IDs that are equal to it
	// or occur after it in ascending sorted order.
	StartID iden.TaskID

	// EndID restricts the search to all task IDs that are equal to it
	// or occur before it in ascending sorted order.
	EndID iden.TaskID
}

ScheduledRange is a query for tasks within a time range.

func (*ScheduledRange) GetEnd added in v0.2.0

func (tr *ScheduledRange) GetEnd() []byte

func (*ScheduledRange) GetPrefix added in v0.2.0

func (tr *ScheduledRange) GetPrefix() []byte

func (*ScheduledRange) GetQueue added in v0.2.0

func (tr *ScheduledRange) GetQueue() string

func (*ScheduledRange) GetStart added in v0.2.0

func (tr *ScheduledRange) GetStart() []byte

type Sequence added in v0.2.0

type Sequence struct {
	// contains filtered or unexported fields
}

func NewSequence added in v0.2.0

func NewSequence(db *pebble.DB, key []byte, lease uint64) (*Sequence, error)

func (*Sequence) Close added in v0.2.0

func (s *Sequence) Close() error

func (*Sequence) Next added in v0.2.0

func (s *Sequence) Next() (uint64, error)

type SettingStore added in v0.2.0

type SettingStore struct {
	Client *PebbleClient
	// contains filtered or unexported fields
}

SettingStore manages persistent storage for queue settings.

func NewSettingStore added in v0.2.0

func NewSettingStore(client *PebbleClient) *SettingStore

func (*SettingStore) Get added in v0.2.0

func (s *SettingStore) Get(queue string) (*proto.QueueConfig, error)

func (*SettingStore) GetAll added in v0.2.0

func (s *SettingStore) GetAll() ([]*proto.QueueConfig, error)

func (*SettingStore) Save added in v0.2.0

func (s *SettingStore) Save(settings *proto.QueueConfig) error

func (*SettingStore) Sequence added in v0.2.0

func (s *SettingStore) Sequence(queue string) (*Sequence, error)

type SyncedSettings added in v0.2.0

type SyncedSettings struct {
	// S is the queue settings that will be synced to disk.
	// It is safe to make changes directly to this field.
	S *proto.QueueConfig
	// contains filtered or unexported fields
}

SyncedSettings are queue settings that are synchronized to disk.

func NewSyncedSettings added in v0.2.0

func NewSyncedSettings(
	store *SettingStore,
	queue string,
	raft *raft.Raft,
) *SyncedSettings

func (*SyncedSettings) Close added in v0.2.0

func (s *SyncedSettings) Close() error

Close flushes config changes to disk and stops the sync thread. The object should not be used after a call to Close.

func (*SyncedSettings) StartSync added in v0.2.0

func (s *SyncedSettings) StartSync() error

type TaskIterator

type TaskIterator struct {
	// contains filtered or unexported fields
}

TaskIterator iterates over a range of tasks in the database.

func NewTaskIterator

func NewTaskIterator(client *PebbleClient, query TaskRange) *TaskIterator

func (*TaskIterator) Close

func (ti *TaskIterator) Close() error

func (*TaskIterator) ForEach

func (ti *TaskIterator) ForEach(handle func(task *proto.Task) error) error

ForEach calls handle on the result of Next until an error occurs or no tasks remain.

func (*TaskIterator) Next

func (ti *TaskIterator) Next() (*proto.Task, error)

func (*TaskIterator) Peek

func (ti *TaskIterator) Peek() (*proto.Task, error)

type TaskRange

type TaskRange interface {
	GetQueue() string
	GetStart() []byte
	GetEnd() []byte
	GetPrefix() []byte
}

type TaskStore

type TaskStore struct {
	Client *PebbleClient
}

TaskStore manages persistent storage for tasks.

func NewTaskStore

func NewTaskStore(db *PebbleClient) *TaskStore

func (*TaskStore) DeleteAll

func (ts *TaskStore) DeleteAll(acks []*proto.Ack) error

func (*TaskStore) Get

func (ts *TaskStore) Get(queue string, id iden.TaskID) (*proto.Task, error)

func (*TaskStore) IterateRange

func (ts *TaskStore) IterateRange(query TaskRange) (*TaskIterator, error)

func (*TaskStore) SaveAll

func (ts *TaskStore) SaveAll(tasks []*proto.Task, saveIndex bool) error

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL