rdb

package
v0.0.0-...-41df229 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 11, 2022 License: MIT Imports: 9 Imported by: 0

Documentation

Overview

Package rdb encapsulates the interactions with redis.

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrDequeueTimeout indicates that the blocking dequeue operation timed out.
	ErrDequeueTimeout = errors.New("blocking dequeue operation timed out")

	// ErrTaskNotFound indicates that a task that matches the given identifier was not found.
	ErrTaskNotFound = errors.New("could not find a task")
)

Functions

This section is empty.

Types

type DailyStats

type DailyStats struct {
	Processed int
	Failed    int
	Time      time.Time
}

DailyStats holds aggregate data for a given day.

type DeadTask

type DeadTask struct {
	ID           xid.ID
	Type         string
	Payload      map[string]interface{}
	LastFailedAt time.Time
	ErrorMsg     string
	Score        int64
}

DeadTask is a task in that has exhausted all retries.

type EnqueuedTask

type EnqueuedTask struct {
	ID      xid.ID
	Type    string
	Payload map[string]interface{}
}

EnqueuedTask is a task in a queue and is ready to be processed.

type InProgressTask

type InProgressTask struct {
	ID      xid.ID
	Type    string
	Payload map[string]interface{}
}

InProgressTask is a task that's currently being processed.

type RDB

type RDB struct {
	// contains filtered or unexported fields
}

RDB is a client interface to query and mutate task queues.

func NewRDB

func NewRDB(client *redis.Client) *RDB

NewRDB returns a new instance of RDB.

func (*RDB) CheckAndEnqueue

func (r *RDB) CheckAndEnqueue() error

CheckAndEnqueue checks for all scheduled tasks and enqueues any tasks that have to be processed.

func (*RDB) Close

func (r *RDB) Close() error

Close closes the connection with redis server.

func (*RDB) CurrentStats

func (r *RDB) CurrentStats() (*Stats, error)

CurrentStats returns a current state of the queues.

func (*RDB) DeleteAllDeadTasks

func (r *RDB) DeleteAllDeadTasks() error

DeleteAllDeadTasks deletes all tasks from the dead queue.

func (*RDB) DeleteAllRetryTasks

func (r *RDB) DeleteAllRetryTasks() error

DeleteAllRetryTasks deletes all tasks from the dead queue.

func (*RDB) DeleteAllScheduledTasks

func (r *RDB) DeleteAllScheduledTasks() error

DeleteAllScheduledTasks deletes all tasks from the dead queue.

func (*RDB) DeleteDeadTask

func (r *RDB) DeleteDeadTask(id xid.ID, score int64) error

DeleteDeadTask finds a task that matches the given id and score from dead queue and deletes it. If a task that matches the id and score does not exist, it returns ErrTaskNotFound.

func (*RDB) DeleteRetryTask

func (r *RDB) DeleteRetryTask(id xid.ID, score int64) error

DeleteRetryTask finds a task that matches the given id and score from retry queue and deletes it. If a task that matches the id and score does not exist, it returns ErrTaskNotFound.

func (*RDB) DeleteScheduledTask

func (r *RDB) DeleteScheduledTask(id xid.ID, score int64) error

DeleteScheduledTask finds a task that matches the given id and score from scheduled queue and deletes it. If a task that matches the id and score does not exist, it returns ErrTaskNotFound.

func (*RDB) Dequeue

func (r *RDB) Dequeue(timeout time.Duration) (*base.TaskMessage, error)

Dequeue blocks until there is a task available to be processed, once a task is available, it adds the task to "in progress" queue and returns the task. If there are no tasks for the entire timeout duration, it returns ErrDequeueTimeout.

func (*RDB) Done

func (r *RDB) Done(msg *base.TaskMessage) error

Done removes the task from in-progress queue to mark the task as done.

func (*RDB) Enqueue

func (r *RDB) Enqueue(msg *base.TaskMessage) error

Enqueue inserts the given task to the tail of the queue.

func (*RDB) EnqueueAllDeadTasks

func (r *RDB) EnqueueAllDeadTasks() (int64, error)

EnqueueAllDeadTasks enqueues all tasks from dead queue and returns the number of tasks enqueued.

func (*RDB) EnqueueAllRetryTasks

func (r *RDB) EnqueueAllRetryTasks() (int64, error)

EnqueueAllRetryTasks enqueues all tasks from retry queue and returns the number of tasks enqueued.

func (*RDB) EnqueueAllScheduledTasks

func (r *RDB) EnqueueAllScheduledTasks() (int64, error)

EnqueueAllScheduledTasks enqueues all tasks from scheduled queue and returns the number of tasks enqueued.

func (*RDB) EnqueueDeadTask

func (r *RDB) EnqueueDeadTask(id xid.ID, score int64) error

EnqueueDeadTask finds a task that matches the given id and score from dead queue and enqueues it for processing. If a task that matches the id and score does not exist, it returns ErrTaskNotFound.

func (*RDB) EnqueueRetryTask

func (r *RDB) EnqueueRetryTask(id xid.ID, score int64) error

EnqueueRetryTask finds a task that matches the given id and score from retry queue and enqueues it for processing. If a task that matches the id and score does not exist, it returns ErrTaskNotFound.

func (*RDB) EnqueueScheduledTask

func (r *RDB) EnqueueScheduledTask(id xid.ID, score int64) error

EnqueueScheduledTask finds a task that matches the given id and score from scheduled queue and enqueues it for processing. If a task that matches the id and score does not exist, it returns ErrTaskNotFound.

func (*RDB) HistoricalStats

func (r *RDB) HistoricalStats(n int) ([]*DailyStats, error)

HistoricalStats returns a list of stats from the last n days.

func (*RDB) Kill

func (r *RDB) Kill(msg *base.TaskMessage, errMsg string) error

Kill sends the task to "dead" queue from in-progress queue, assigning the error message to the task. It also trims the set by timestamp and set size.

func (*RDB) KillAllRetryTasks

func (r *RDB) KillAllRetryTasks() (int64, error)

KillAllRetryTasks moves all tasks from retry queue to dead queue and returns the number of tasks that were moved.

func (*RDB) KillAllScheduledTasks

func (r *RDB) KillAllScheduledTasks() (int64, error)

KillAllScheduledTasks moves all tasks from scheduled queue to dead queue and returns the number of tasks that were moved.

func (*RDB) KillRetryTask

func (r *RDB) KillRetryTask(id xid.ID, score int64) error

KillRetryTask finds a task that matches the given id and score from retry queue and moves it to dead queue. If a task that maches the id and score does not exist, it returns ErrTaskNotFound.

func (*RDB) KillScheduledTask

func (r *RDB) KillScheduledTask(id xid.ID, score int64) error

KillScheduledTask finds a task that matches the given id and score from scheduled queue and moves it to dead queue. If a task that maches the id and score does not exist, it returns ErrTaskNotFound.

func (*RDB) ListDead

func (r *RDB) ListDead() ([]*DeadTask, error)

ListDead returns all tasks that have exhausted its retry limit.

func (*RDB) ListEnqueued

func (r *RDB) ListEnqueued() ([]*EnqueuedTask, error)

ListEnqueued returns all enqueued tasks that are ready to be processed.

func (*RDB) ListInProgress

func (r *RDB) ListInProgress() ([]*InProgressTask, error)

ListInProgress returns all tasks that are currently being processed.

func (*RDB) ListRetry

func (r *RDB) ListRetry() ([]*RetryTask, error)

ListRetry returns all tasks that have failed before and willl be retried in the future.

func (*RDB) ListScheduled

func (r *RDB) ListScheduled() ([]*ScheduledTask, error)

ListScheduled returns all tasks that are scheduled to be processed in the future.

func (*RDB) RedisInfo

func (r *RDB) RedisInfo() (map[string]string, error)

RedisInfo returns a map of redis info.

func (*RDB) Requeue

func (r *RDB) Requeue(msg *base.TaskMessage) error

Requeue moves the task from in-progress queue to the default queue.

func (*RDB) RestoreUnfinished

func (r *RDB) RestoreUnfinished() (int64, error)

RestoreUnfinished moves all tasks from in-progress list to the queue and reports the number of tasks restored.

func (*RDB) Retry

func (r *RDB) Retry(msg *base.TaskMessage, processAt time.Time, errMsg string) error

Retry moves the task from in-progress to retry queue, incrementing retry count and assigning error message to the task message.

func (*RDB) Schedule

func (r *RDB) Schedule(msg *base.TaskMessage, processAt time.Time) error

Schedule adds the task to the backlog queue to be processed in the future.

type RetryTask

type RetryTask struct {
	ID      xid.ID
	Type    string
	Payload map[string]interface{}
	// TODO(brianbinbin): add LastFailedAt time.Time
	ProcessAt time.Time
	ErrorMsg  string
	Retried   int
	Retry     int
	Score     int64
}

RetryTask is a task that's in retry queue because worker failed to process the task.

type ScheduledTask

type ScheduledTask struct {
	ID        xid.ID
	Type      string
	Payload   map[string]interface{}
	ProcessAt time.Time
	Score     int64
}

ScheduledTask is a task that's scheduled to be processed in the future.

type Stats

type Stats struct {
	Enqueued   int
	InProgress int
	Scheduled  int
	Retry      int
	Dead       int
	Processed  int
	Failed     int
	Timestamp  time.Time
}

Stats represents a state of queues at a certain time.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL