Documentation ¶
Index ¶
- Variables
- type Handler
- type Payload
- type RecordingScheduler
- type Registry
- func (r *Registry) Cancel(ctx context.Context, taskID string) error
- func (r *Registry) MustRegister(p Payload, h Handler)
- func (r *Registry) ProcessIncoming(ctx context.Context, workers uint) error
- func (r *Registry) ProcessOne(ctx context.Context) error
- func (r *Registry) Register(p Payload, h Handler) error
- func (r *Registry) Schedule(ctx context.Context, s Payload, opts ...ScheduleOption) (string, error)
- type ScheduleOption
- type Scheduler
- type Store
- func (s *Store) Ack(ctx context.Context, taskID string) error
- func (s *Store) Close() error
- func (s *Store) Delete(ctx context.Context, taskID string) error
- func (s *Store) Nack(ctx context.Context, taskID string, reason string) error
- func (s *Store) Pull(ctx context.Context) (*Task, error)
- func (s *Store) Push(ctx context.Context, tasks []TaskReq) ([]string, error)
- func (s *Store) ServeHTTP(w http.ResponseWriter, r *http.Request)
- type Task
- type TaskReq
Constants ¶
This section is empty.
Variables ¶
Functions ¶
This section is empty.
Types ¶
type Handler ¶
type Handler interface { // HandleTask is called in order to process given task Payload. // // Provided scheduler can be used to atomically push more tasks into // the queue. Tasks are scheduled only if the current task payload // handling was successful and no error was returned. // // Provided task payload must be an interface, but it is safe to cast // it into the specific for that handler payload type pointer. Each // handler is passed only the strcutrures that it was registered with. HandleTask(context.Context, Scheduler, Payload) error }
Handler is implemented in order to handler particular Payload type.
type Payload ¶
type Payload interface { // TaskName returns a unique name of a task. This is usually // implemented as a static method. TaskName() string }
Payload is implemented by any task that can be scheduled for execution.
type RecordingScheduler ¶
type RecordingScheduler struct {
Scheduled []Payload
}
RecordingScheduler implements scheduler interface and records all scheduled tasks. This implementation does not execute tasks.
func (*RecordingScheduler) Cancel ¶
func (rs *RecordingScheduler) Cancel(context.Context, string) error
func (*RecordingScheduler) LoadRecorded ¶
func (rs *RecordingScheduler) LoadRecorded(t testing.TB, position int, dest Payload)
LoadRecorded assigns to dest payload recorded at specified position.
func (*RecordingScheduler) Schedule ¶
func (rs *RecordingScheduler) Schedule(ctx context.Context, p Payload, opts ...ScheduleOption) (string, error)
type Registry ¶
type Registry struct {
// contains filtered or unexported fields
}
Registry binds together task payloads and handlers.
func NewRegistry ¶
NewRegistry returns a task registry that binds together task payloads and handlers.
func (*Registry) Cancel ¶
Cancel is a best effort to remove a scheduled, but not yet executed task from the queue.
func (*Registry) MustRegister ¶
MustRegister is a Register call that will panic on failure.
func (*Registry) ProcessIncoming ¶
ProcessIncoming is a blocking function that is monitoring task queue and processing jobs. This function returns only on a worker error or when provided context is cancelled.
When multiple worker fails, only the first error is returned.
func (*Registry) ProcessOne ¶
ProcessOne pops the first task from the task queue and process it.
func (*Registry) Register ¶
Register a task handler. First argument must be a data structure that represents the payload.
type ScheduleOption ¶
type ScheduleOption = func(*scheduleOpts)
ScheduleOption allows to configure how a task should be scheduled.
func Delay ¶
func Delay(executeIn time.Duration) ScheduleOption
Delay configures task execution to be postponed by given delay value.
func Retry ¶
func Retry(moveToDeadqueueAfter uint) ScheduleOption
Retry configures how many failed task execution is repeated before it gets removed from the queue and pushed into the dead letter queue storage.
func Timeout ¶
func Timeout(cancelExecutionAfter time.Duration) ScheduleOption
Timeout configures how long a task processing can be running before its context is cancelled.
type Scheduler ¶
type Scheduler interface { // Schedule task execution adds specified job to the queue. Schedule(context.Context, Payload, ...ScheduleOption) (string, error) // Cancel scheduled task execution. If successful, task is removed from // the queue and will never be executed. Cancel(context.Context, string) error }
type Store ¶
type Store struct {
// contains filtered or unexported fields
}
func OpenTaskQueue ¶
OpenTaskQueue returns a task queue store implementation.
func (*Store) Delete ¶
Delete removes task with given ID from the queue if present and not locked for processing.