worker

package
v0.0.0-...-4263410 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 11, 2024 License: GPL-3.0 Imports: 22 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrHandlerNotFound = errors.New("handler not registered")
View Source
var ErrJobNotFound = errors.New("job not found")

Functions

This section is empty.

Types

type Job

type Job struct {
	// Random UUID for logging and tracing. It is generated randomly by Enqueue function when blank.
	ID uuid.UUID

	// Job type or "queue".
	Type JobType

	// Associated account.
	AccountID int64

	// Associated identity
	Identity identity.Principal

	// For logging purposes
	TraceContext propagation.MapCarrier

	// For logging purposes
	EdgeID string

	// Job arguments.
	Args any
}

type JobEnqueuer

type JobEnqueuer interface {
	// Enqueue delivers a job to one of the backend workers.
	Enqueue(context.Context, *Job) error
}

JobEnqueuer sends Job messages into worker queue.

type JobHandler

type JobHandler func(ctx context.Context, job *Job)

type JobType

type JobType string

func (JobType) String

func (jt JobType) String() string

type JobWorker

type JobWorker interface {
	// RegisterHandler registers an event listener for a particular type with an associated handler.
	RegisterHandler(JobType, JobHandler, any)

	// DequeueLoop starts one or more goroutines to dispatch incoming jobs.
	DequeueLoop(ctx context.Context)

	// Stop let's background workers to finish all jobs and terminates them. It blocks until workers are done.
	Stop(ctx context.Context)

	// Stats returns statistics. Not all implementations supports stats, some may return zero values.
	Stats(ctx context.Context) (Stats, error)
}

JobWorker receives and handles Job messages.

type MemoryWorker

type MemoryWorker struct {
	// contains filtered or unexported fields
}

func NewMemoryClient

func NewMemoryClient() *MemoryWorker

func (*MemoryWorker) DequeueLoop

func (w *MemoryWorker) DequeueLoop(ctx context.Context)

func (*MemoryWorker) Enqueue

func (w *MemoryWorker) Enqueue(ctx context.Context, job *Job) error

func (*MemoryWorker) RegisterHandler

func (w *MemoryWorker) RegisterHandler(jtype JobType, handler JobHandler, _ any)

func (*MemoryWorker) Stats

func (w *MemoryWorker) Stats(_ context.Context) (Stats, error)

func (*MemoryWorker) Stop

func (w *MemoryWorker) Stop(_ context.Context)

type RedisWorker

type RedisWorker struct {
	// contains filtered or unexported fields
}

func NewRedisWorker

func NewRedisWorker(address, username, password string, db int, queueName string, pollInterval time.Duration, concurrency int) (*RedisWorker, error)

NewRedisWorker creates new worker that keeps all jobs in a single queue (list), starts N polling goroutines which fetch jobs from the queue and process them in the same goroutine. Use the Stats function to track number of in-flight jobs.

func (*RedisWorker) DequeueLoop

func (w *RedisWorker) DequeueLoop(ctx context.Context)

func (*RedisWorker) Enqueue

func (w *RedisWorker) Enqueue(ctx context.Context, job *Job) error

func (*RedisWorker) RegisterHandler

func (w *RedisWorker) RegisterHandler(jtype JobType, handler JobHandler, args any)

func (*RedisWorker) Stats

func (w *RedisWorker) Stats(ctx context.Context) (Stats, error)

func (*RedisWorker) Stop

func (w *RedisWorker) Stop(ctx context.Context)

type Stats

type Stats struct {
	// Number of jobs currently in the queue. This is a global value - all clients see the same value.
	EnqueuedJobs uint64

	// Number of jobs currently being processed. Local value - each client has its own number.
	InFlight int64
}

Stats provides monitoring statistics.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL