Documentation ¶
Overview ¶
Package rdb encapsulates the interactions with redis.
Index ¶
- Variables
- type DailyStats
- type DeadTask
- type EnqueuedTask
- type InProgressTask
- type RDB
- func (r *RDB) CheckAndEnqueue() error
- func (r *RDB) Close() error
- func (r *RDB) CurrentStats() (*Stats, error)
- func (r *RDB) DeleteAllDeadTasks() error
- func (r *RDB) DeleteAllRetryTasks() error
- func (r *RDB) DeleteAllScheduledTasks() error
- func (r *RDB) DeleteDeadTask(id xid.ID, score int64) error
- func (r *RDB) DeleteRetryTask(id xid.ID, score int64) error
- func (r *RDB) DeleteScheduledTask(id xid.ID, score int64) error
- func (r *RDB) Dequeue(timeout time.Duration) (*base.TaskMessage, error)
- func (r *RDB) Done(msg *base.TaskMessage) error
- func (r *RDB) Enqueue(msg *base.TaskMessage) error
- func (r *RDB) EnqueueAllDeadTasks() (int64, error)
- func (r *RDB) EnqueueAllRetryTasks() (int64, error)
- func (r *RDB) EnqueueAllScheduledTasks() (int64, error)
- func (r *RDB) EnqueueDeadTask(id xid.ID, score int64) error
- func (r *RDB) EnqueueRetryTask(id xid.ID, score int64) error
- func (r *RDB) EnqueueScheduledTask(id xid.ID, score int64) error
- func (r *RDB) HistoricalStats(n int) ([]*DailyStats, error)
- func (r *RDB) Kill(msg *base.TaskMessage, errMsg string) error
- func (r *RDB) KillAllRetryTasks() (int64, error)
- func (r *RDB) KillAllScheduledTasks() (int64, error)
- func (r *RDB) KillRetryTask(id xid.ID, score int64) error
- func (r *RDB) KillScheduledTask(id xid.ID, score int64) error
- func (r *RDB) ListDead() ([]*DeadTask, error)
- func (r *RDB) ListEnqueued() ([]*EnqueuedTask, error)
- func (r *RDB) ListInProgress() ([]*InProgressTask, error)
- func (r *RDB) ListRetry() ([]*RetryTask, error)
- func (r *RDB) ListScheduled() ([]*ScheduledTask, error)
- func (r *RDB) RedisInfo() (map[string]string, error)
- func (r *RDB) Requeue(msg *base.TaskMessage) error
- func (r *RDB) RestoreUnfinished() (int64, error)
- func (r *RDB) Retry(msg *base.TaskMessage, processAt time.Time, errMsg string) error
- func (r *RDB) Schedule(msg *base.TaskMessage, processAt time.Time) error
- type RetryTask
- type ScheduledTask
- type Stats
Constants ¶
This section is empty.
Variables ¶
var ( // ErrDequeueTimeout indicates that the blocking dequeue operation timed out. ErrDequeueTimeout = errors.New("blocking dequeue operation timed out") // ErrTaskNotFound indicates that a task that matches the given identifier was not found. ErrTaskNotFound = errors.New("could not find a task") )
Functions ¶
This section is empty.
Types ¶
type DailyStats ¶
DailyStats holds aggregate data for a given day.
type DeadTask ¶
type DeadTask struct { ID xid.ID Type string Payload map[string]interface{} LastFailedAt time.Time ErrorMsg string Score int64 }
DeadTask is a task in that has exhausted all retries.
type EnqueuedTask ¶
EnqueuedTask is a task in a queue and is ready to be processed.
type InProgressTask ¶
InProgressTask is a task that's currently being processed.
type RDB ¶
type RDB struct {
// contains filtered or unexported fields
}
RDB is a client interface to query and mutate task queues.
func (*RDB) CheckAndEnqueue ¶
CheckAndEnqueue checks for all scheduled tasks and enqueues any tasks that have to be processed.
func (*RDB) CurrentStats ¶
CurrentStats returns a current state of the queues.
func (*RDB) DeleteAllDeadTasks ¶
DeleteAllDeadTasks deletes all tasks from the dead queue.
func (*RDB) DeleteAllRetryTasks ¶
DeleteAllRetryTasks deletes all tasks from the dead queue.
func (*RDB) DeleteAllScheduledTasks ¶
DeleteAllScheduledTasks deletes all tasks from the dead queue.
func (*RDB) DeleteDeadTask ¶
DeleteDeadTask finds a task that matches the given id and score from dead queue and deletes it. If a task that matches the id and score does not exist, it returns ErrTaskNotFound.
func (*RDB) DeleteRetryTask ¶
DeleteRetryTask finds a task that matches the given id and score from retry queue and deletes it. If a task that matches the id and score does not exist, it returns ErrTaskNotFound.
func (*RDB) DeleteScheduledTask ¶
DeleteScheduledTask finds a task that matches the given id and score from scheduled queue and deletes it. If a task that matches the id and score does not exist, it returns ErrTaskNotFound.
func (*RDB) Dequeue ¶
Dequeue blocks until there is a task available to be processed, once a task is available, it adds the task to "in progress" queue and returns the task. If there are no tasks for the entire timeout duration, it returns ErrDequeueTimeout.
func (*RDB) Done ¶
func (r *RDB) Done(msg *base.TaskMessage) error
Done removes the task from in-progress queue to mark the task as done.
func (*RDB) Enqueue ¶
func (r *RDB) Enqueue(msg *base.TaskMessage) error
Enqueue inserts the given task to the tail of the queue.
func (*RDB) EnqueueAllDeadTasks ¶
EnqueueAllDeadTasks enqueues all tasks from dead queue and returns the number of tasks enqueued.
func (*RDB) EnqueueAllRetryTasks ¶
EnqueueAllRetryTasks enqueues all tasks from retry queue and returns the number of tasks enqueued.
func (*RDB) EnqueueAllScheduledTasks ¶
EnqueueAllScheduledTasks enqueues all tasks from scheduled queue and returns the number of tasks enqueued.
func (*RDB) EnqueueDeadTask ¶
EnqueueDeadTask finds a task that matches the given id and score from dead queue and enqueues it for processing. If a task that matches the id and score does not exist, it returns ErrTaskNotFound.
func (*RDB) EnqueueRetryTask ¶
EnqueueRetryTask finds a task that matches the given id and score from retry queue and enqueues it for processing. If a task that matches the id and score does not exist, it returns ErrTaskNotFound.
func (*RDB) EnqueueScheduledTask ¶
EnqueueScheduledTask finds a task that matches the given id and score from scheduled queue and enqueues it for processing. If a task that matches the id and score does not exist, it returns ErrTaskNotFound.
func (*RDB) HistoricalStats ¶
func (r *RDB) HistoricalStats(n int) ([]*DailyStats, error)
HistoricalStats returns a list of stats from the last n days.
func (*RDB) Kill ¶
func (r *RDB) Kill(msg *base.TaskMessage, errMsg string) error
Kill sends the task to "dead" queue from in-progress queue, assigning the error message to the task. It also trims the set by timestamp and set size.
func (*RDB) KillAllRetryTasks ¶
KillAllRetryTasks moves all tasks from retry queue to dead queue and returns the number of tasks that were moved.
func (*RDB) KillAllScheduledTasks ¶
KillAllScheduledTasks moves all tasks from scheduled queue to dead queue and returns the number of tasks that were moved.
func (*RDB) KillRetryTask ¶
KillRetryTask finds a task that matches the given id and score from retry queue and moves it to dead queue. If a task that maches the id and score does not exist, it returns ErrTaskNotFound.
func (*RDB) KillScheduledTask ¶
KillScheduledTask finds a task that matches the given id and score from scheduled queue and moves it to dead queue. If a task that maches the id and score does not exist, it returns ErrTaskNotFound.
func (*RDB) ListEnqueued ¶
func (r *RDB) ListEnqueued() ([]*EnqueuedTask, error)
ListEnqueued returns all enqueued tasks that are ready to be processed.
func (*RDB) ListInProgress ¶
func (r *RDB) ListInProgress() ([]*InProgressTask, error)
ListInProgress returns all tasks that are currently being processed.
func (*RDB) ListRetry ¶
ListRetry returns all tasks that have failed before and willl be retried in the future.
func (*RDB) ListScheduled ¶
func (r *RDB) ListScheduled() ([]*ScheduledTask, error)
ListScheduled returns all tasks that are scheduled to be processed in the future.
func (*RDB) Requeue ¶
func (r *RDB) Requeue(msg *base.TaskMessage) error
Requeue moves the task from in-progress queue to the default queue.
func (*RDB) RestoreUnfinished ¶
RestoreUnfinished moves all tasks from in-progress list to the queue and reports the number of tasks restored.
type RetryTask ¶
type RetryTask struct { ID xid.ID Type string Payload map[string]interface{} // TODO(brianbinbin): add LastFailedAt time.Time ProcessAt time.Time ErrorMsg string Retried int Retry int Score int64 }
RetryTask is a task that's in retry queue because worker failed to process the task.