Documentation ¶
Overview ¶
Package rdb encapsulates the interactions with redis.
Index ¶
- Variables
- type DailyStats
- type DeadTask
- type EnqueuedTask
- type ErrQueueNotEmpty
- type ErrQueueNotFound
- type InProgressTask
- type Pagination
- type RDB
- func (r *RDB) CancelationPubSub() (*redis.PubSub, error)
- func (r *RDB) CheckAndEnqueue(qnames ...string) error
- func (r *RDB) ClearProcessState(ps *base.ProcessState) error
- func (r *RDB) Close() error
- func (r *RDB) CurrentStats() (*Stats, error)
- func (r *RDB) DeleteAllDeadTasks() error
- func (r *RDB) DeleteAllRetryTasks() error
- func (r *RDB) DeleteAllScheduledTasks() error
- func (r *RDB) DeleteDeadTask(id xid.ID, score int64) error
- func (r *RDB) DeleteRetryTask(id xid.ID, score int64) error
- func (r *RDB) DeleteScheduledTask(id xid.ID, score int64) error
- func (r *RDB) Dequeue(qnames ...string) (*base.TaskMessage, error)
- func (r *RDB) Done(msg *base.TaskMessage) error
- func (r *RDB) Enqueue(msg *base.TaskMessage) error
- func (r *RDB) EnqueueAllDeadTasks() (int64, error)
- func (r *RDB) EnqueueAllRetryTasks() (int64, error)
- func (r *RDB) EnqueueAllScheduledTasks() (int64, error)
- func (r *RDB) EnqueueDeadTask(id xid.ID, score int64) error
- func (r *RDB) EnqueueRetryTask(id xid.ID, score int64) error
- func (r *RDB) EnqueueScheduledTask(id xid.ID, score int64) error
- func (r *RDB) EnqueueUnique(msg *base.TaskMessage, ttl time.Duration) error
- func (r *RDB) HistoricalStats(n int) ([]*DailyStats, error)
- func (r *RDB) Kill(msg *base.TaskMessage, errMsg string) error
- func (r *RDB) KillAllRetryTasks() (int64, error)
- func (r *RDB) KillAllScheduledTasks() (int64, error)
- func (r *RDB) KillRetryTask(id xid.ID, score int64) error
- func (r *RDB) KillScheduledTask(id xid.ID, score int64) error
- func (r *RDB) ListDead(pgn Pagination) ([]*DeadTask, error)
- func (r *RDB) ListEnqueued(qname string, pgn Pagination) ([]*EnqueuedTask, error)
- func (r *RDB) ListInProgress(pgn Pagination) ([]*InProgressTask, error)
- func (r *RDB) ListProcesses() ([]*base.ProcessInfo, error)
- func (r *RDB) ListRetry(pgn Pagination) ([]*RetryTask, error)
- func (r *RDB) ListScheduled(pgn Pagination) ([]*ScheduledTask, error)
- func (r *RDB) ListWorkers() ([]*base.WorkerInfo, error)
- func (r *RDB) PublishCancelation(id string) error
- func (r *RDB) RedisInfo() (map[string]string, error)
- func (r *RDB) RemoveQueue(qname string, force bool) error
- func (r *RDB) Requeue(msg *base.TaskMessage) error
- func (r *RDB) RequeueAll() (int64, error)
- func (r *RDB) Retry(msg *base.TaskMessage, processAt time.Time, errMsg string) error
- func (r *RDB) Schedule(msg *base.TaskMessage, processAt time.Time) error
- func (r *RDB) ScheduleUnique(msg *base.TaskMessage, processAt time.Time, ttl time.Duration) error
- func (r *RDB) WriteProcessState(ps *base.ProcessState, ttl time.Duration) error
- type RetryTask
- type ScheduledTask
- type Stats
Constants ¶
This section is empty.
Variables ¶
var ( // ErrNoProcessableTask indicates that there are no tasks ready to be processed. ErrNoProcessableTask = errors.New("no tasks are ready for processing") // ErrTaskNotFound indicates that a task that matches the given identifier was not found. ErrTaskNotFound = errors.New("could not find a task") // ErrDuplicateTask indicates that another task with the same unique key holds the uniqueness lock. ErrDuplicateTask = errors.New("task already exists") )
Functions ¶
This section is empty.
Types ¶
type DailyStats ¶
DailyStats holds aggregate data for a given day.
type DeadTask ¶
type DeadTask struct { ID xid.ID Type string Payload map[string]interface{} LastFailedAt time.Time ErrorMsg string Score int64 Queue string }
DeadTask is a task in that has exhausted all retries.
type EnqueuedTask ¶
EnqueuedTask is a task in a queue and is ready to be processed.
type ErrQueueNotEmpty ¶
type ErrQueueNotEmpty struct {
// contains filtered or unexported fields
}
ErrQueueNotEmpty indicates specified queue is not empty.
func (*ErrQueueNotEmpty) Error ¶
func (e *ErrQueueNotEmpty) Error() string
type ErrQueueNotFound ¶
type ErrQueueNotFound struct {
// contains filtered or unexported fields
}
ErrQueueNotFound indicates specified queue does not exist.
func (*ErrQueueNotFound) Error ¶
func (e *ErrQueueNotFound) Error() string
type InProgressTask ¶
InProgressTask is a task that's currently being processed.
type Pagination ¶
type Pagination struct { // Number of items in the page. Size int // Page number starting from zero. Page int }
Pagination specifies the page size and page number for the list operation.
type RDB ¶
type RDB struct {
// contains filtered or unexported fields
}
RDB is a client interface to query and mutate task queues.
func (*RDB) CancelationPubSub ¶
CancelationPubSub returns a pubsub for cancelation messages.
func (*RDB) CheckAndEnqueue ¶
CheckAndEnqueue checks for all scheduled tasks and enqueues any tasks that have to be processed.
qnames specifies to which queues to send tasks.
func (*RDB) ClearProcessState ¶
func (r *RDB) ClearProcessState(ps *base.ProcessState) error
ClearProcessState deletes process state data from redis.
func (*RDB) CurrentStats ¶
CurrentStats returns a current state of the queues.
func (*RDB) DeleteAllDeadTasks ¶
DeleteAllDeadTasks deletes all tasks from the dead queue.
func (*RDB) DeleteAllRetryTasks ¶
DeleteAllRetryTasks deletes all tasks from the dead queue.
func (*RDB) DeleteAllScheduledTasks ¶
DeleteAllScheduledTasks deletes all tasks from the dead queue.
func (*RDB) DeleteDeadTask ¶
DeleteDeadTask finds a task that matches the given id and score from dead queue and deletes it. If a task that matches the id and score does not exist, it returns ErrTaskNotFound.
func (*RDB) DeleteRetryTask ¶
DeleteRetryTask finds a task that matches the given id and score from retry queue and deletes it. If a task that matches the id and score does not exist, it returns ErrTaskNotFound.
func (*RDB) DeleteScheduledTask ¶
DeleteScheduledTask finds a task that matches the given id and score from scheduled queue and deletes it. If a task that matches the id and score does not exist, it returns ErrTaskNotFound.
func (*RDB) Dequeue ¶
func (r *RDB) Dequeue(qnames ...string) (*base.TaskMessage, error)
Dequeue queries given queues in order and pops a task message if there is one and returns it. If all queues are empty, ErrNoProcessableTask error is returned.
func (*RDB) Done ¶
func (r *RDB) Done(msg *base.TaskMessage) error
Done removes the task from in-progress queue to mark the task as done. It removes a uniqueness lock acquired by the task, if any.
func (*RDB) Enqueue ¶
func (r *RDB) Enqueue(msg *base.TaskMessage) error
Enqueue inserts the given task to the tail of the queue.
func (*RDB) EnqueueAllDeadTasks ¶
EnqueueAllDeadTasks enqueues all tasks from dead queue and returns the number of tasks enqueued.
func (*RDB) EnqueueAllRetryTasks ¶
EnqueueAllRetryTasks enqueues all tasks from retry queue and returns the number of tasks enqueued.
func (*RDB) EnqueueAllScheduledTasks ¶
EnqueueAllScheduledTasks enqueues all tasks from scheduled queue and returns the number of tasks enqueued.
func (*RDB) EnqueueDeadTask ¶
EnqueueDeadTask finds a task that matches the given id and score from dead queue and enqueues it for processing. If a task that matches the id and score does not exist, it returns ErrTaskNotFound.
func (*RDB) EnqueueRetryTask ¶
EnqueueRetryTask finds a task that matches the given id and score from retry queue and enqueues it for processing. If a task that matches the id and score does not exist, it returns ErrTaskNotFound.
func (*RDB) EnqueueScheduledTask ¶
EnqueueScheduledTask finds a task that matches the given id and score from scheduled queue and enqueues it for processing. If a task that matches the id and score does not exist, it returns ErrTaskNotFound.
func (*RDB) EnqueueUnique ¶
EnqueueUnique inserts the given task if the task's uniqueness lock can be acquired. It returns ErrDuplicateTask if the lock cannot be acquired.
func (*RDB) HistoricalStats ¶
func (r *RDB) HistoricalStats(n int) ([]*DailyStats, error)
HistoricalStats returns a list of stats from the last n days.
func (*RDB) Kill ¶
func (r *RDB) Kill(msg *base.TaskMessage, errMsg string) error
Kill sends the task to "dead" queue from in-progress queue, assigning the error message to the task. It also trims the set by timestamp and set size.
func (*RDB) KillAllRetryTasks ¶
KillAllRetryTasks moves all tasks from retry queue to dead queue and returns the number of tasks that were moved.
func (*RDB) KillAllScheduledTasks ¶
KillAllScheduledTasks moves all tasks from scheduled queue to dead queue and returns the number of tasks that were moved.
func (*RDB) KillRetryTask ¶
KillRetryTask finds a task that matches the given id and score from retry queue and moves it to dead queue. If a task that maches the id and score does not exist, it returns ErrTaskNotFound.
func (*RDB) KillScheduledTask ¶
KillScheduledTask finds a task that matches the given id and score from scheduled queue and moves it to dead queue. If a task that maches the id and score does not exist, it returns ErrTaskNotFound.
func (*RDB) ListDead ¶
func (r *RDB) ListDead(pgn Pagination) ([]*DeadTask, error)
ListDead returns all tasks that have exhausted its retry limit.
func (*RDB) ListEnqueued ¶
func (r *RDB) ListEnqueued(qname string, pgn Pagination) ([]*EnqueuedTask, error)
ListEnqueued returns enqueued tasks that are ready to be processed.
func (*RDB) ListInProgress ¶
func (r *RDB) ListInProgress(pgn Pagination) ([]*InProgressTask, error)
ListInProgress returns all tasks that are currently being processed.
func (*RDB) ListProcesses ¶
func (r *RDB) ListProcesses() ([]*base.ProcessInfo, error)
ListProcesses returns the list of process statuses.
func (*RDB) ListRetry ¶
func (r *RDB) ListRetry(pgn Pagination) ([]*RetryTask, error)
ListRetry returns all tasks that have failed before and willl be retried in the future.
func (*RDB) ListScheduled ¶
func (r *RDB) ListScheduled(pgn Pagination) ([]*ScheduledTask, error)
ListScheduled returns all tasks that are scheduled to be processed in the future.
func (*RDB) ListWorkers ¶
func (r *RDB) ListWorkers() ([]*base.WorkerInfo, error)
ListWorkers returns the list of worker stats.
func (*RDB) PublishCancelation ¶
PublishCancelation publish cancelation message to all subscribers. The message is the ID for the task to be canceled.
func (*RDB) RemoveQueue ¶
RemoveQueue removes the specified queue.
If force is set to true, it will remove the queue regardless of whether the queue is empty. If force is set to false, it will only remove the queue if it is empty.
func (*RDB) Requeue ¶
func (r *RDB) Requeue(msg *base.TaskMessage) error
Requeue moves the task from in-progress queue to the specified queue.
func (*RDB) RequeueAll ¶
RequeueAll moves all tasks from in-progress list to the queue and reports the number of tasks restored.
func (*RDB) Retry ¶
Retry moves the task from in-progress to retry queue, incrementing retry count and assigning error message to the task message.
func (*RDB) ScheduleUnique ¶
ScheduleUnique adds the task to the backlog queue to be processed in the future if the uniqueness lock can be acquired. It returns ErrDuplicateTask if the lock cannot be acquired.
func (*RDB) WriteProcessState ¶
WriteProcessState writes process state data to redis with expiration set to the value ttl.
type RetryTask ¶
type RetryTask struct { ID xid.ID Type string Payload map[string]interface{} // TODO(hibiken): add LastFailedAt time.Time ProcessAt time.Time ErrorMsg string Retried int Retry int Score int64 Queue string }
RetryTask is a task that's in retry queue because worker failed to process the task.
type ScheduledTask ¶
type ScheduledTask struct { ID xid.ID Type string Payload map[string]interface{} ProcessAt time.Time Score int64 Queue string }
ScheduledTask is a task that's scheduled to be processed in the future.
type Stats ¶
type Stats struct { Enqueued int InProgress int Scheduled int Retry int Dead int Processed int Failed int Queues map[string]int // map of queue name to number of tasks in the queue (e.g., "default": 100, "critical": 20) Timestamp time.Time }
Stats represents a state of queues at a certain time.