taskqueue

package
v0.0.0-...-83cf971 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 22, 2022 License: LGPL-3.0 Imports: 17 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrEmpty    = errors.New("empty")
	ErrNotFound = errors.New("task not found")
	ErrLocked   = errors.New("task is locked")
)

Functions

This section is empty.

Types

type Handler

type Handler interface {
	// HandleTask is called in order to process given task Payload.
	//
	// Provided scheduler can be used to atomically push more tasks into
	// the queue. Tasks are scheduled only if the current task payload
	// handling was successful and no error was returned.
	//
	// Provided task payload must be an interface, but it is safe to cast
	// it into the specific for that handler payload type pointer. Each
	// handler is passed only the strcutrures that it was registered with.
	HandleTask(context.Context, Scheduler, Payload) error
}

Handler is implemented in order to handler particular Payload type.

type Payload

type Payload interface {
	// TaskName returns a unique name of a task. This is usually
	// implemented as a static method.
	TaskName() string
}

Payload is implemented by any task that can be scheduled for execution.

type RecordingScheduler

type RecordingScheduler struct {
	Scheduled []Payload
}

RecordingScheduler implements scheduler interface and records all scheduled tasks. This implementation does not execute tasks.

func (*RecordingScheduler) Cancel

func (*RecordingScheduler) LoadRecorded

func (rs *RecordingScheduler) LoadRecorded(t testing.TB, position int, dest Payload)

LoadRecorded assigns to dest payload recorded at specified position.

func (*RecordingScheduler) Schedule

func (rs *RecordingScheduler) Schedule(ctx context.Context, p Payload, opts ...ScheduleOption) (string, error)

type Registry

type Registry struct {
	// contains filtered or unexported fields
}

Registry binds together task payloads and handlers.

func NewRegistry

func NewRegistry(queue *Store) *Registry

NewRegistry returns a task registry that binds together task payloads and handlers.

func (*Registry) Cancel

func (r *Registry) Cancel(ctx context.Context, taskID string) error

Cancel is a best effort to remove a scheduled, but not yet executed task from the queue.

func (*Registry) MustRegister

func (r *Registry) MustRegister(p Payload, h Handler)

MustRegister is a Register call that will panic on failure.

func (*Registry) ProcessIncoming

func (r *Registry) ProcessIncoming(ctx context.Context, workers uint) error

ProcessIncoming is a blocking function that is monitoring task queue and processing jobs. This function returns only on a worker error or when provided context is cancelled.

When multiple worker fails, only the first error is returned.

func (*Registry) ProcessOne

func (r *Registry) ProcessOne(ctx context.Context) error

ProcessOne pops the first task from the task queue and process it.

func (*Registry) Register

func (r *Registry) Register(p Payload, h Handler) error

Register a task handler. First argument must be a data structure that represents the payload.

func (*Registry) Schedule

func (r *Registry) Schedule(ctx context.Context, s Payload, opts ...ScheduleOption) (string, error)

Schedule is publishing one or more tasks to be processed. Using various schedule options, it is possible to tune how tasks are published.

This operation is atomic for all provided tasks - either or or none is published.

type ScheduleOption

type ScheduleOption = func(*scheduleOpts)

ScheduleOption allows to configure how a task should be scheduled.

func Delay

func Delay(executeIn time.Duration) ScheduleOption

Delay configures task execution to be postponed by given delay value.

func Retry

func Retry(moveToDeadqueueAfter uint) ScheduleOption

Retry configures how many failed task execution is repeated before it gets removed from the queue and pushed into the dead letter queue storage.

func Timeout

func Timeout(cancelExecutionAfter time.Duration) ScheduleOption

Timeout configures how long a task processing can be running before its context is cancelled.

type Scheduler

type Scheduler interface {
	// Schedule task execution adds specified job to the queue.
	Schedule(context.Context, Payload, ...ScheduleOption) (string, error)

	// Cancel scheduled task execution. If successful, task is removed from
	// the queue and will never be executed.
	Cancel(context.Context, string) error
}

type Store

type Store struct {
	// contains filtered or unexported fields
}

func OpenTaskQueue

func OpenTaskQueue(dbpath string) (*Store, error)

OpenTaskQueue returns a task queue store implementation.

func (*Store) Ack

func (s *Store) Ack(ctx context.Context, taskID string) error

func (*Store) Close

func (s *Store) Close() error

Close the store and free all resources.

func (*Store) Delete

func (s *Store) Delete(ctx context.Context, taskID string) error

Delete removes task with given ID from the queue if present and not locked for processing.

func (*Store) Nack

func (s *Store) Nack(ctx context.Context, taskID string, reason string) error

func (*Store) Pull

func (s *Store) Pull(ctx context.Context) (*Task, error)

func (*Store) Push

func (s *Store) Push(ctx context.Context, tasks []TaskReq) ([]string, error)

Push one or more tasks to the queue. This is an atomic operation.

func (*Store) ServeHTTP

func (s *Store) ServeHTTP(w http.ResponseWriter, r *http.Request)

type Task

type Task struct {
	TaskID  string
	Name    string
	Payload []byte
	Timeout time.Duration
}

Task represents a single task (job) acquired from the queue.

type TaskReq

type TaskReq struct {
	Name      string
	Payload   []byte
	Retry     uint
	ExecuteIn time.Duration
	Timeout   time.Duration
}

TaskReq represents a task creation request. If successfully processed, results in a task being queued.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL