rdb

package
v0.7.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 29, 2020 License: MIT Imports: 9 Imported by: 0

Documentation

Overview

Package rdb encapsulates the interactions with redis.

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrNoProcessableTask indicates that there are no tasks ready to be processed.
	ErrNoProcessableTask = errors.New("no tasks are ready for processing")

	// ErrTaskNotFound indicates that a task that matches the given identifier was not found.
	ErrTaskNotFound = errors.New("could not find a task")

	// ErrDuplicateTask indicates that another task with the same unique key holds the uniqueness lock.
	ErrDuplicateTask = errors.New("task already exists")
)

Functions

This section is empty.

Types

type DailyStats

type DailyStats struct {
	Processed int
	Failed    int
	Time      time.Time
}

DailyStats holds aggregate data for a given day.

type DeadTask

type DeadTask struct {
	ID           xid.ID
	Type         string
	Payload      map[string]interface{}
	LastFailedAt time.Time
	ErrorMsg     string
	Score        int64
	Queue        string
}

DeadTask is a task in that has exhausted all retries.

type EnqueuedTask

type EnqueuedTask struct {
	ID      xid.ID
	Type    string
	Payload map[string]interface{}
	Queue   string
}

EnqueuedTask is a task in a queue and is ready to be processed.

type ErrQueueNotEmpty

type ErrQueueNotEmpty struct {
	// contains filtered or unexported fields
}

ErrQueueNotEmpty indicates specified queue is not empty.

func (*ErrQueueNotEmpty) Error

func (e *ErrQueueNotEmpty) Error() string

type ErrQueueNotFound

type ErrQueueNotFound struct {
	// contains filtered or unexported fields
}

ErrQueueNotFound indicates specified queue does not exist.

func (*ErrQueueNotFound) Error

func (e *ErrQueueNotFound) Error() string

type InProgressTask

type InProgressTask struct {
	ID      xid.ID
	Type    string
	Payload map[string]interface{}
}

InProgressTask is a task that's currently being processed.

type Pagination

type Pagination struct {
	// Number of items in the page.
	Size int

	// Page number starting from zero.
	Page int
}

Pagination specifies the page size and page number for the list operation.

type RDB

type RDB struct {
	// contains filtered or unexported fields
}

RDB is a client interface to query and mutate task queues.

func NewRDB

func NewRDB(client *redis.Client) *RDB

NewRDB returns a new instance of RDB.

func (*RDB) CancelationPubSub

func (r *RDB) CancelationPubSub() (*redis.PubSub, error)

CancelationPubSub returns a pubsub for cancelation messages.

func (*RDB) CheckAndEnqueue

func (r *RDB) CheckAndEnqueue(qnames ...string) error

CheckAndEnqueue checks for all scheduled tasks and enqueues any tasks that have to be processed.

qnames specifies to which queues to send tasks.

func (*RDB) ClearProcessState

func (r *RDB) ClearProcessState(ps *base.ProcessState) error

ClearProcessState deletes process state data from redis.

func (*RDB) Close

func (r *RDB) Close() error

Close closes the connection with redis server.

func (*RDB) CurrentStats

func (r *RDB) CurrentStats() (*Stats, error)

CurrentStats returns a current state of the queues.

func (*RDB) DeleteAllDeadTasks

func (r *RDB) DeleteAllDeadTasks() error

DeleteAllDeadTasks deletes all tasks from the dead queue.

func (*RDB) DeleteAllRetryTasks

func (r *RDB) DeleteAllRetryTasks() error

DeleteAllRetryTasks deletes all tasks from the dead queue.

func (*RDB) DeleteAllScheduledTasks

func (r *RDB) DeleteAllScheduledTasks() error

DeleteAllScheduledTasks deletes all tasks from the dead queue.

func (*RDB) DeleteDeadTask

func (r *RDB) DeleteDeadTask(id xid.ID, score int64) error

DeleteDeadTask finds a task that matches the given id and score from dead queue and deletes it. If a task that matches the id and score does not exist, it returns ErrTaskNotFound.

func (*RDB) DeleteRetryTask

func (r *RDB) DeleteRetryTask(id xid.ID, score int64) error

DeleteRetryTask finds a task that matches the given id and score from retry queue and deletes it. If a task that matches the id and score does not exist, it returns ErrTaskNotFound.

func (*RDB) DeleteScheduledTask

func (r *RDB) DeleteScheduledTask(id xid.ID, score int64) error

DeleteScheduledTask finds a task that matches the given id and score from scheduled queue and deletes it. If a task that matches the id and score does not exist, it returns ErrTaskNotFound.

func (*RDB) Dequeue

func (r *RDB) Dequeue(qnames ...string) (*base.TaskMessage, error)

Dequeue queries given queues in order and pops a task message if there is one and returns it. If all queues are empty, ErrNoProcessableTask error is returned.

func (*RDB) Done

func (r *RDB) Done(msg *base.TaskMessage) error

Done removes the task from in-progress queue to mark the task as done. It removes a uniqueness lock acquired by the task, if any.

func (*RDB) Enqueue

func (r *RDB) Enqueue(msg *base.TaskMessage) error

Enqueue inserts the given task to the tail of the queue.

func (*RDB) EnqueueAllDeadTasks

func (r *RDB) EnqueueAllDeadTasks() (int64, error)

EnqueueAllDeadTasks enqueues all tasks from dead queue and returns the number of tasks enqueued.

func (*RDB) EnqueueAllRetryTasks

func (r *RDB) EnqueueAllRetryTasks() (int64, error)

EnqueueAllRetryTasks enqueues all tasks from retry queue and returns the number of tasks enqueued.

func (*RDB) EnqueueAllScheduledTasks

func (r *RDB) EnqueueAllScheduledTasks() (int64, error)

EnqueueAllScheduledTasks enqueues all tasks from scheduled queue and returns the number of tasks enqueued.

func (*RDB) EnqueueDeadTask

func (r *RDB) EnqueueDeadTask(id xid.ID, score int64) error

EnqueueDeadTask finds a task that matches the given id and score from dead queue and enqueues it for processing. If a task that matches the id and score does not exist, it returns ErrTaskNotFound.

func (*RDB) EnqueueRetryTask

func (r *RDB) EnqueueRetryTask(id xid.ID, score int64) error

EnqueueRetryTask finds a task that matches the given id and score from retry queue and enqueues it for processing. If a task that matches the id and score does not exist, it returns ErrTaskNotFound.

func (*RDB) EnqueueScheduledTask

func (r *RDB) EnqueueScheduledTask(id xid.ID, score int64) error

EnqueueScheduledTask finds a task that matches the given id and score from scheduled queue and enqueues it for processing. If a task that matches the id and score does not exist, it returns ErrTaskNotFound.

func (*RDB) EnqueueUnique

func (r *RDB) EnqueueUnique(msg *base.TaskMessage, ttl time.Duration) error

EnqueueUnique inserts the given task if the task's uniqueness lock can be acquired. It returns ErrDuplicateTask if the lock cannot be acquired.

func (*RDB) HistoricalStats

func (r *RDB) HistoricalStats(n int) ([]*DailyStats, error)

HistoricalStats returns a list of stats from the last n days.

func (*RDB) Kill

func (r *RDB) Kill(msg *base.TaskMessage, errMsg string) error

Kill sends the task to "dead" queue from in-progress queue, assigning the error message to the task. It also trims the set by timestamp and set size.

func (*RDB) KillAllRetryTasks

func (r *RDB) KillAllRetryTasks() (int64, error)

KillAllRetryTasks moves all tasks from retry queue to dead queue and returns the number of tasks that were moved.

func (*RDB) KillAllScheduledTasks

func (r *RDB) KillAllScheduledTasks() (int64, error)

KillAllScheduledTasks moves all tasks from scheduled queue to dead queue and returns the number of tasks that were moved.

func (*RDB) KillRetryTask

func (r *RDB) KillRetryTask(id xid.ID, score int64) error

KillRetryTask finds a task that matches the given id and score from retry queue and moves it to dead queue. If a task that maches the id and score does not exist, it returns ErrTaskNotFound.

func (*RDB) KillScheduledTask

func (r *RDB) KillScheduledTask(id xid.ID, score int64) error

KillScheduledTask finds a task that matches the given id and score from scheduled queue and moves it to dead queue. If a task that maches the id and score does not exist, it returns ErrTaskNotFound.

func (*RDB) ListDead

func (r *RDB) ListDead(pgn Pagination) ([]*DeadTask, error)

ListDead returns all tasks that have exhausted its retry limit.

func (*RDB) ListEnqueued

func (r *RDB) ListEnqueued(qname string, pgn Pagination) ([]*EnqueuedTask, error)

ListEnqueued returns enqueued tasks that are ready to be processed.

func (*RDB) ListInProgress

func (r *RDB) ListInProgress(pgn Pagination) ([]*InProgressTask, error)

ListInProgress returns all tasks that are currently being processed.

func (*RDB) ListProcesses

func (r *RDB) ListProcesses() ([]*base.ProcessInfo, error)

ListProcesses returns the list of process statuses.

func (*RDB) ListRetry

func (r *RDB) ListRetry(pgn Pagination) ([]*RetryTask, error)

ListRetry returns all tasks that have failed before and willl be retried in the future.

func (*RDB) ListScheduled

func (r *RDB) ListScheduled(pgn Pagination) ([]*ScheduledTask, error)

ListScheduled returns all tasks that are scheduled to be processed in the future.

func (*RDB) ListWorkers

func (r *RDB) ListWorkers() ([]*base.WorkerInfo, error)

ListWorkers returns the list of worker stats.

func (*RDB) PublishCancelation

func (r *RDB) PublishCancelation(id string) error

PublishCancelation publish cancelation message to all subscribers. The message is the ID for the task to be canceled.

func (*RDB) RedisInfo

func (r *RDB) RedisInfo() (map[string]string, error)

RedisInfo returns a map of redis info.

func (*RDB) RemoveQueue

func (r *RDB) RemoveQueue(qname string, force bool) error

RemoveQueue removes the specified queue.

If force is set to true, it will remove the queue regardless of whether the queue is empty. If force is set to false, it will only remove the queue if it is empty.

func (*RDB) Requeue

func (r *RDB) Requeue(msg *base.TaskMessage) error

Requeue moves the task from in-progress queue to the specified queue.

func (*RDB) RequeueAll

func (r *RDB) RequeueAll() (int64, error)

RequeueAll moves all tasks from in-progress list to the queue and reports the number of tasks restored.

func (*RDB) Retry

func (r *RDB) Retry(msg *base.TaskMessage, processAt time.Time, errMsg string) error

Retry moves the task from in-progress to retry queue, incrementing retry count and assigning error message to the task message.

func (*RDB) Schedule

func (r *RDB) Schedule(msg *base.TaskMessage, processAt time.Time) error

Schedule adds the task to the backlog queue to be processed in the future.

func (*RDB) ScheduleUnique

func (r *RDB) ScheduleUnique(msg *base.TaskMessage, processAt time.Time, ttl time.Duration) error

ScheduleUnique adds the task to the backlog queue to be processed in the future if the uniqueness lock can be acquired. It returns ErrDuplicateTask if the lock cannot be acquired.

func (*RDB) WriteProcessState

func (r *RDB) WriteProcessState(ps *base.ProcessState, ttl time.Duration) error

WriteProcessState writes process state data to redis with expiration set to the value ttl.

type RetryTask

type RetryTask struct {
	ID      xid.ID
	Type    string
	Payload map[string]interface{}
	// TODO(hibiken): add LastFailedAt time.Time
	ProcessAt time.Time
	ErrorMsg  string
	Retried   int
	Retry     int
	Score     int64
	Queue     string
}

RetryTask is a task that's in retry queue because worker failed to process the task.

type ScheduledTask

type ScheduledTask struct {
	ID        xid.ID
	Type      string
	Payload   map[string]interface{}
	ProcessAt time.Time
	Score     int64
	Queue     string
}

ScheduledTask is a task that's scheduled to be processed in the future.

type Stats

type Stats struct {
	Enqueued   int
	InProgress int
	Scheduled  int
	Retry      int
	Dead       int
	Processed  int
	Failed     int
	Queues     map[string]int // map of queue name to number of tasks in the queue (e.g., "default": 100, "critical": 20)
	Timestamp  time.Time
}

Stats represents a state of queues at a certain time.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL