storage

package
v0.0.0-...-c43c499 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 15, 2024 License: AGPL-3.0 Imports: 10 Imported by: 0

Documentation

Index

Constants

View Source
const DefaultMaxRetries int32 = 3

Variables

View Source
var ErrFatalError = errors.New("fatal storage error")
View Source
var ErrMissingPayload = errors.New("there was no payload in the job")
View Source
var ErrNoJob = errors.New("the job given was nil")
View Source
var ErrNoQueue = errors.New("the queue in the job is blank, there must be a queue")
View Source
var ErrNoType = errors.New("there was no type given in the job")
View Source
var ErrParentIsMissing = errors.New("a child job's parent is no longer set")

Functions

func RetryIn

func RetryIn(job *wire.Job) time.Duration

Types

type Handler

type Handler interface {
	Heartbeat(ctx context.Context, workerID string) error

	GetActiveJob(ctx context.Context, uuid string) (*wire.Job, error)
	AddJob(ctx context.Context, job *wire.Job) error
	Pop(ctx context.Context, queues ...string) (*wire.Job, error)
	PopScheduledJobs(ctx context.Context) error

	RemoveJob(ctx context.Context, job *wire.Job) error
	RemoveScheduledJob(ctx context.Context, job *wire.Job) error

	Notify(ctx context.Context, job *wire.Job, status wire.JobStatus) error
	Subscribe(ctx context.Context, onStatusChange OnStatusChange) error

	CloseJob(ctx context.Context, job *wire.Job) (bool, error)

	FailJob(ctx context.Context, uuid string) error

	Ping(ctx context.Context) error
	Close() error

	Stats
}

func NewRedisHandler

func NewRedisHandler(cfg config.RedisConfig) (Handler, error)

type OnStatusChange

type OnStatusChange func(job *wire.Job)

type RedisHandler

type RedisHandler struct {
	Namespace string
	// contains filtered or unexported fields
}

func (*RedisHandler) ActiveJobCount

func (s *RedisHandler) ActiveJobCount(ctx context.Context) (int64, error)

func (RedisHandler) ActiveJobsKey

func (r RedisHandler) ActiveJobsKey() string

func (*RedisHandler) ActiveQueueCount

func (s *RedisHandler) ActiveQueueCount(ctx context.Context) (int64, error)

func (*RedisHandler) ActiveQueues

func (s *RedisHandler) ActiveQueues(ctx context.Context) ([]string, error)

func (RedisHandler) ActiveQueuesKey

func (r RedisHandler) ActiveQueuesKey() string

func (*RedisHandler) ActiveWorkerCount

func (s *RedisHandler) ActiveWorkerCount(ctx context.Context) (int64, error)

func (RedisHandler) ActiveWorkersKey

func (r RedisHandler) ActiveWorkersKey() string

func (*RedisHandler) AddJob

func (s *RedisHandler) AddJob(ctx context.Context, job *wire.Job) error

func (RedisHandler) ChildenListKey

func (r RedisHandler) ChildenListKey(parentUuid string) string

func (RedisHandler) ChildenSetKey

func (r RedisHandler) ChildenSetKey(parentUuid string) string

func (*RedisHandler) Close

func (s *RedisHandler) Close() error

func (*RedisHandler) CloseJob

func (s *RedisHandler) CloseJob(ctx context.Context, job *wire.Job) (bool, error)

func (*RedisHandler) CountQueueJobs

func (s *RedisHandler) CountQueueJobs(ctx context.Context, queue string) (int64, error)

func (*RedisHandler) CountScheduledJobs

func (s *RedisHandler) CountScheduledJobs(ctx context.Context) (int64, error)

func (*RedisHandler) FailJob

func (s *RedisHandler) FailJob(ctx context.Context, uuid string) error

func (RedisHandler) FailedJobsKey

func (r RedisHandler) FailedJobsKey() string

func (*RedisHandler) GetActiveJob

func (s *RedisHandler) GetActiveJob(ctx context.Context, uuid string) (*wire.Job, error)

func (*RedisHandler) Heartbeat

func (s *RedisHandler) Heartbeat(ctx context.Context, workerId string) error

func (*RedisHandler) HistoricalJobCount

func (s *RedisHandler) HistoricalJobCount(ctx context.Context, t time.Time, result Result) (int64, error)

func (RedisHandler) HistoricalResultKey

func (r RedisHandler) HistoricalResultKey(t time.Time, result Result) string

func (RedisHandler) JobEventsChannelKey

func (r RedisHandler) JobEventsChannelKey() string

func (RedisHandler) JobKey

func (r RedisHandler) JobKey(uuid string) string

func (*RedisHandler) ListQueueJobs

func (s *RedisHandler) ListQueueJobs(ctx context.Context, queue string, start, stop int64) ([]*wire.Job, error)

func (*RedisHandler) ListScheduledJobs

func (s *RedisHandler) ListScheduledJobs(ctx context.Context, start, stop int64) ([]*wire.Job, error)

func (*RedisHandler) Notify

func (s *RedisHandler) Notify(ctx context.Context, job *wire.Job, status wire.JobStatus) error

func (RedisHandler) OnCompleteListKey

func (r RedisHandler) OnCompleteListKey(predecessorUuid string) string

func (*RedisHandler) Ping

func (s *RedisHandler) Ping(ctx context.Context) error

func (*RedisHandler) Pop

func (s *RedisHandler) Pop(ctx context.Context, queues ...string) (*wire.Job, error)

func (*RedisHandler) PopScheduledJobs

func (s *RedisHandler) PopScheduledJobs(ctx context.Context) error

func (*RedisHandler) PruneActiveQueues

func (s *RedisHandler) PruneActiveQueues(ctx context.Context) error

PruneActiveQueues will remove queues that haven't pinned a job in the last 30 seconds

func (*RedisHandler) PruneActiveWorkers

func (s *RedisHandler) PruneActiveWorkers(ctx context.Context) error

func (RedisHandler) QueueKey

func (r RedisHandler) QueueKey(queue string) string

func (*RedisHandler) RemoveJob

func (s *RedisHandler) RemoveJob(ctx context.Context, job *wire.Job) error

func (*RedisHandler) RemoveScheduledJob

func (s *RedisHandler) RemoveScheduledJob(ctx context.Context, job *wire.Job) error

func (RedisHandler) ScheduledJobsKey

func (r RedisHandler) ScheduledJobsKey() string

func (*RedisHandler) Subscribe

func (s *RedisHandler) Subscribe(ctx context.Context, onStatusChange OnStatusChange) error

type Result

type Result int
const (
	ResultSuccess Result = iota
	ResultError
	ResultFailure
)

func (Result) String

func (r Result) String() string

type Stats

type Stats interface {
	// Queues
	ActiveQueues(ctx context.Context) ([]string, error)
	ActiveQueueCount(ctx context.Context) (int64, error)
	PruneActiveQueues(ctx context.Context) error
	CountQueueJobs(ctx context.Context, queue string) (int64, error)
	CountScheduledJobs(ctx context.Context) (int64, error)
	ListQueueJobs(ctx context.Context, queue string, start, stop int64) ([]*wire.Job, error)
	ListScheduledJobs(ctx context.Context, start, stop int64) ([]*wire.Job, error)

	// Jobs
	ActiveJobCount(ctx context.Context) (int64, error)
	HistoricalJobCount(context.Context, time.Time, Result) (int64, error)

	// Workers
	ActiveWorkerCount(ctx context.Context) (int64, error)
	PruneActiveWorkers(ctx context.Context) error
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL