work

package module
v0.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 16, 2024 License: MIT Imports: 14 Imported by: 9

README

gocraft/work v2

GoDoc Go Report Card FOSSA Status CircleCI

Please see cmd/ for enqueuer and worker demo.

Improvements

  • queue backend abstraction
    • redis is still the default, but the new design allows custom queue implementation.
  • simplify the keyspace design of redis queue backend
    • The new design uses 1 redis hash per job, and 1 redis sorted set for queue.
    • Interesting read
  • modular
    • The core only catches panics, retries on failure, and waits if a queue is empty.
    • All other functionalities are either removed or moved to separate middlewares.
  • support binary payload/args with message pack.
  • replace built-in UI with prometheus metrics (use grafana if you want dashboard).
  • additional optimizations (alloc + bulk queue ops)
    BenchmarkWorkerRunJob/work_v1_1-8         	    3000	    515957 ns/op
    BenchmarkWorkerRunJob/work_v2_1-8         	    5000	    284516 ns/op
    BenchmarkWorkerRunJob/work_v1_10-8        	    1000	   2136546 ns/op
    BenchmarkWorkerRunJob/work_v2_10-8        	    5000	    367997 ns/op
    BenchmarkWorkerRunJob/work_v1_100-8       	     100	  18234023 ns/op
    BenchmarkWorkerRunJob/work_v2_100-8       	    1000	   1759186 ns/op
    BenchmarkWorkerRunJob/work_v1_1000-8      	      10	 162110100 ns/op
    BenchmarkWorkerRunJob/work_v2_1000-8      	     100	  12646080 ns/op
    BenchmarkWorkerRunJob/work_v1_10000-8     	       1	1691287122 ns/op
    BenchmarkWorkerRunJob/work_v2_10000-8     	      10	 144923087 ns/op
    BenchmarkWorkerRunJob/work_v1_100000-8    	       1	17515722574 ns/op
    BenchmarkWorkerRunJob/work_v2_100000-8    	       1	1502468637 ns/op
    PASS
    ok  	github.com/taylorchu/work	87.901s
    
  • http server
    • delete job
    • create job
    • get job status
    • get queue metrics (kubernetes autoscaler integration with keda metrics api scaler)
    • OpenAPI spec

License

FOSSA Status

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrEmptyNamespace = errors.New("work: empty namespace")
	ErrEmptyQueueID   = errors.New("work: empty queue id")
	ErrAt             = errors.New("work: at should not be zero")
	ErrInvisibleSec   = errors.New("work: invisible sec should be >= 0")
)

options validation errors

View Source
var (
	ErrMaxExecutionTime = errors.New("work: max execution time should be > 0")
	ErrNumGoroutines    = errors.New("work: number of goroutines should be > 0")
	ErrIdleWait         = errors.New("work: idle wait should be > 0")
)

options validation error

View Source
var (
	// ErrDoNotRetry is returned if the job should not be retried;
	// this may be because the job is unrecoverable, or because
	// the handler has already rescheduled it.
	ErrDoNotRetry = errors.New("work: do not retry")

	// ErrUnrecoverable is returned if the error is unrecoverable.
	// The job will be discarded.
	ErrUnrecoverable = fmt.Errorf("work: permanent error: %w", ErrDoNotRetry)
)

retry error

View Source
var (
	// ErrEmptyQueue is returned if Dequeue() is called on an empty queue.
	ErrEmptyQueue = errors.New("work: no job is found")
)

Functions

This section is empty.

Types

type AckOptions

type AckOptions struct {
	Namespace string
	QueueID   string
}

AckOptions specifies how a job is deleted from a queue.

func (*AckOptions) Validate

func (opt *AckOptions) Validate() error

Validate validates AckOptions.

type BackoffFunc added in v0.1.12

type BackoffFunc func(*Job, *DequeueOptions) time.Duration

BackoffFunc computes when to retry this job from now.

type BulkDequeuer

type BulkDequeuer interface {
	BulkDequeue(int64, *DequeueOptions) ([]*Job, error)
	BulkAck([]*Job, *AckOptions) error
}

BulkDequeuer dequeues jobs in a batch.

type BulkEnqueuer

type BulkEnqueuer interface {
	BulkEnqueue([]*Job, *EnqueueOptions) error
}

BulkEnqueuer enqueues jobs in a batch.

type BulkJobFinder added in v0.1.4

type BulkJobFinder interface {
	BulkFind(jobIDs []string, opts *FindOptions) ([]*Job, error)
}

BulkJobFinder finds jobs by ids. It allows third-party tools to get job status, or modify job by re-enqueue. It returns nil if the job is no longer in the queue. The length of the returned job list will be equal to the length of jobIDs.

type ContextHandleFunc added in v0.1.4

type ContextHandleFunc func(context.Context, *Job, *DequeueOptions) error

ContextHandleFunc runs a job.

type DequeueFunc

type DequeueFunc func(*DequeueOptions) (*Job, error)

DequeueFunc generates a job.

type DequeueMiddleware

type DequeueMiddleware func(DequeueFunc) DequeueFunc

DequeueMiddleware modifies DequeueFunc behavior.

type DequeueOptions

type DequeueOptions struct {
	// Namespace is the namespace of a queue.
	Namespace string
	// QueueID is the id of a queue.
	QueueID string
	// At is the current time of the dequeuer.
	// Any job that is scheduled before this can be executed.
	At time.Time
	// After the job is dequeued, no other dequeuer can see this job for a while.
	// InvisibleSec controls how long this period is.
	InvisibleSec int64
}

DequeueOptions specifies how a job is dequeued.

func (*DequeueOptions) Validate

func (opt *DequeueOptions) Validate() error

Validate validates DequeueOptions.

type Dequeuer

type Dequeuer interface {
	Dequeue(*DequeueOptions) (*Job, error)
	Ack(*Job, *AckOptions) error
}

Dequeuer dequeues a job. If a job is processed successfully, call Ack() to delete the job.

type EnqueueFunc

type EnqueueFunc func(*Job, *EnqueueOptions) error

EnqueueFunc takes in a job for processing.

type EnqueueMiddleware

type EnqueueMiddleware func(EnqueueFunc) EnqueueFunc

EnqueueMiddleware modifies EnqueueFunc behavior.

type EnqueueOptions

type EnqueueOptions struct {
	// Namespace is the namespace of a queue.
	Namespace string
	// QueueID is the id of a queue.
	QueueID string
}

EnqueueOptions specifies how a job is enqueued.

func (*EnqueueOptions) Validate

func (opt *EnqueueOptions) Validate() error

Validate validates EnqueueOptions.

type Enqueuer

type Enqueuer interface {
	Enqueue(*Job, *EnqueueOptions) error
}

Enqueuer enqueues a job.

type ExternalBulkEnqueuer added in v0.1.10

type ExternalBulkEnqueuer interface {
	ExternalBulkEnqueue([]*Job, *EnqueueOptions) error
}

ExternalBulkEnqueuer enqueues jobs in a batch with other queue protocol. Queue adaptor that implements this can publish jobs directly to other types of queue systems.

type ExternalEnqueuer added in v0.1.10

type ExternalEnqueuer interface {
	ExternalEnqueue(*Job, *EnqueueOptions) error
}

ExternalEnqueuer enqueues a job with other queue protocol. Queue adaptor that implements this can publish jobs directly to other types of queue systems.

type FindOptions added in v0.1.4

type FindOptions struct {
	Namespace string
}

FindOptions specifies how a job is searched from a queue.

func (*FindOptions) Validate added in v0.1.4

func (opt *FindOptions) Validate() error

Validate validates FindOptions.

type HandleFunc

type HandleFunc func(*Job, *DequeueOptions) error

HandleFunc runs a job.

type HandleMiddleware

type HandleMiddleware func(HandleFunc) HandleFunc

HandleMiddleware modifies HandleFunc hehavior.

type InvalidJobPayloadError added in v0.1.3

type InvalidJobPayloadError struct {
	Err error
}

InvalidJobPayloadError wraps json or msgpack decoding error.

func (*InvalidJobPayloadError) Error added in v0.1.3

func (e *InvalidJobPayloadError) Error() string

type Job

type Job struct {
	// ID is the unique id of a job.
	ID string `msgpack:"id"`
	// CreatedAt is set to the time when NewJob() is called.
	CreatedAt time.Time `msgpack:"created_at"`
	// UpdatedAt is when the job is last executed.
	// UpdatedAt is set to the time when NewJob() is called initially.
	UpdatedAt time.Time `msgpack:"updated_at"`
	// EnqueuedAt is when the job will be executed next.
	// EnqueuedAt is set to the time when NewJob() is called initially.
	EnqueuedAt time.Time `msgpack:"enqueued_at"`

	// Payload is raw bytes.
	Payload []byte `msgpack:"payload"`

	// If the job previously fails, Retries will be incremented.
	Retries int64 `msgpack:"retries"`
	// If the job previously fails, LastError will be populated with error string.
	LastError string `msgpack:"last_error"`
}

Job is a single unit of work.

func NewJob

func NewJob() *Job

NewJob creates a job.

func (Job) Delay

func (j Job) Delay(d time.Duration) *Job

Delay creates a job that can be executed in future.

func (*Job) MarshalJSONPayload

func (j *Job) MarshalJSONPayload(v interface{}) error

MarshalJSONPayload encodes a variable into the JSON payload.

func (*Job) MarshalPayload

func (j *Job) MarshalPayload(v interface{}) error

MarshalPayload encodes a variable into the msgpack payload.

func (*Job) UnmarshalJSONPayload

func (j *Job) UnmarshalJSONPayload(v interface{}) error

UnmarshalJSONPayload decodes the JSON payload into a variable.

func (*Job) UnmarshalPayload

func (j *Job) UnmarshalPayload(v interface{}) error

UnmarshalPayload decodes the msgpack payload into a variable.

type JobOptions

type JobOptions struct {
	MaxExecutionTime time.Duration
	IdleWait         time.Duration
	NumGoroutines    int64
	Backoff          BackoffFunc

	DequeueMiddleware []DequeueMiddleware
	HandleMiddleware  []HandleMiddleware
}

JobOptions specifies how a job is executed.

func (*JobOptions) Validate

func (opt *JobOptions) Validate() error

Validate validates JobOptions.

type Metrics

type Metrics struct {
	Queue []*QueueMetrics
}

Metrics wraps metrics reported by MetricsExporter.

type MetricsExporter

type MetricsExporter interface {
	GetQueueMetrics(*QueueMetricsOptions) (*QueueMetrics, error)
}

MetricsExporter can be implemented by Queue to report metrics.

type OnceJobOptions added in v0.1.12

type OnceJobOptions struct {
	MaxExecutionTime time.Duration
	Backoff          BackoffFunc

	DequeueMiddleware []DequeueMiddleware
	HandleMiddleware  []HandleMiddleware
}

OnceJobOptions specifies how a job is executed.

func (*OnceJobOptions) Validate added in v0.1.12

func (opt *OnceJobOptions) Validate() error

Validate validates OnceJobOptions.

type Queue

type Queue interface {
	Enqueuer
	Dequeuer
}

Queue can enqueue and dequeue jobs.

type QueueMetrics

type QueueMetrics struct {
	Namespace string
	QueueID   string
	// Total number of jobs that can be executed right now.
	ReadyTotal int64
	// Total number of jobs that are scheduled to run in future.
	ScheduledTotal int64
	// Processing delay from oldest ready job
	Latency time.Duration
}

QueueMetrics contains metrics from a queue.

type QueueMetricsOptions

type QueueMetricsOptions struct {
	Namespace string
	QueueID   string
	At        time.Time
}

QueueMetricsOptions specifies how to fetch queue metrics.

func (*QueueMetricsOptions) Validate

func (opt *QueueMetricsOptions) Validate() error

Validate validates QueueMetricsOptions.

type RedisQueue added in v0.2.0

RedisQueue implements Queue with other additional capabilities

func NewRedisQueue

func NewRedisQueue(client redis.UniversalClient) RedisQueue

NewRedisQueue creates a new queue stored in redis.

type Worker

type Worker struct {
	// contains filtered or unexported fields
}

Worker runs jobs.

func NewWorker

func NewWorker(opt *WorkerOptions) *Worker

NewWorker creates a new worker.

func (*Worker) ExportMetrics

func (w *Worker) ExportMetrics() (*Metrics, error)

ExportMetrics dumps queue stats if the queue implements MetricsExporter.

func (*Worker) Register

func (w *Worker) Register(queueID string, h HandleFunc, opt *JobOptions) error

Register adds handler for a queue. queueID and namespace should be the same as the one used to enqueue.

func (*Worker) RegisterWithContext added in v0.1.4

func (w *Worker) RegisterWithContext(queueID string, h ContextHandleFunc, opt *JobOptions) error

RegisterWithContext adds handler for a queue with context.Context. queueID and namespace should be the same as the one used to enqueue. The context is created with context.WithTimeout set from MaxExecutionTime.

func (*Worker) RunOnce added in v0.1.12

func (w *Worker) RunOnce(ctx context.Context, queueID string, h ContextHandleFunc, opt *OnceJobOptions) error

RunOnce simply runs one job from a queue. The context is created with context.WithTimeout set from MaxExecutionTime.

This is used with kubernetes where a pod is created directly to run a job.

func (*Worker) Start

func (w *Worker) Start()

Start starts the worker.

func (*Worker) Stop

func (w *Worker) Stop()

Stop stops the worker.

type WorkerOptions

type WorkerOptions struct {
	Namespace string
	Queue     Queue
	ErrorFunc func(error)
}

WorkerOptions is used to create a worker.

Directories

Path Synopsis
cmd
middleware

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL