queue

package
v0.0.173 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 18, 2024 License: Apache-2.0 Imports: 10 Imported by: 0

Documentation

Overview

Package queue contains methods and structures for manipulating and persisting data that should be repeatedly processed in a prioritized (but changing) order.

Esepcially useful for combining with systems like pubsub to receive events that change the order of queue processing.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Fixer

type Fixer func(context.Context, *Queue) error

Fixer will adjust the queue until the context expires.

func FixPersistent

func FixPersistent(logr logrus.FieldLogger, client PersistClient, path gcs.Path, tick <-chan time.Time) Fixer

FixPersistent persists the queue to the remote path every tick.

The first time it will load the state. Thereafter it will save the state. This includes restarts due to expiring contexts -- it will just load once.

type PersistClient

type PersistClient interface {
	gcs.Uploader
	gcs.Opener
}

PersistClient contains interfaces for reading from and writing to a Path.

type Queue

type Queue struct {
	// contains filtered or unexported fields
}

Queue can send names to receivers at a specific frequency.

Also contains the ability to modify the next time to send names. First call must be to Init(). Exported methods are safe to call concurrently.

func (*Queue) Current

func (q *Queue) Current() map[string]time.Time

Current status for each item in the queue.

func (*Queue) Fix

func (q *Queue) Fix(name string, when time.Time, later bool) error

Fix the next time to send the group to receivers.

If later is set then it will move out the next update time, otherwise it will only reduce it.

func (*Queue) FixAll

func (q *Queue) FixAll(whens map[string]time.Time, later bool) error

FixAll will fix multiple groups inside a single critical section.

If later is set then it will move out the next update time, otherwise it will only reduce it.

func (*Queue) Init

func (q *Queue) Init(log logrus.FieldLogger, names []string, when time.Time)

Init (or reinit) the queue with the specified groups, which should be updated at frequency.

func (*Queue) Send

func (q *Queue) Send(ctx context.Context, receivers chan<- string, frequency time.Duration) error

Send test groups to receivers until the context expires.

Pops items off the queue when frequency is zero. Otherwise reschedules the item after the specified frequency has elapsed.

func (*Queue) Status

func (q *Queue) Status() (int, *string, time.Time)

Status of the queue: depth, next item and when the next item is ready.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL