dequeuer

package
v0.0.0-...-7ed8402 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 7, 2023 License: MIT Imports: 15 Imported by: 1

Documentation

Overview

Package dequeuer retrieves jobs from the database and does some work.

Index

Constants

View Source
const DefaultStuckJobTimeout = 7 * time.Minute

How long to wait before marking a job as "stuck"

Variables

This section is empty.

Functions

This section is empty.

Types

type Config

type Config struct {
	// Database connector, for example db.DatabaseURLConnector. If nil,
	// db.DefaultConnection is used.
	Connector db.Connector
	// Number of open connections to the database
	NumConns        int
	Processor       *services.JobProcessor
	StuckJobTimeout time.Duration

	// Enqueueing a job with name "meta.shutdown" will shutdown the dequeuer (so
	// it can be restarted with a job type added or removed).
	//
	// Enable this flag if you have long running jobs that could be interfered
	// with if the dequeuer restarted.
	DisableMetaShutdown bool

	Logger log.Logger
}

type Dequeuer

type Dequeuer struct {
	log.Logger
	ID int
	W  Worker
	// contains filtered or unexported fields
}

func (*Dequeuer) Work

func (d *Dequeuer) Work(name string, wg *sync.WaitGroup)

type Pool

type Pool struct {
	Dequeuers []*Dequeuer
	Name      string
	// contains filtered or unexported fields
}

A Pool contains an array of dequeuers, all of which perform work for the same models.Job.

func NewPool

func NewPool(ctx context.Context, name string) *Pool

func (*Pool) AddDequeuer

func (p *Pool) AddDequeuer(ctx context.Context, w Worker) error

AddDequeuer adds a Dequeuer to the Pool and starts running it in a separate goroutine. w should be the work that the Dequeuer will do with a dequeued job.

func (*Pool) Len

func (p *Pool) Len() int

func (*Pool) RemoveDequeuer

func (p *Pool) RemoveDequeuer() error

RemoveDequeuer removes a dequeuer from the pool and sends that dequeuer a shutdown signal.

func (*Pool) Shutdown

func (p *Pool) Shutdown(ctx context.Context) error

Shutdown all workers in the pool.

type Pools

type Pools []*Pool

func CreatePools

func CreatePools(ctx context.Context, w Worker, maxInitialJitter time.Duration) (Pools, error)

CreatePools creates job pools for all jobs in the database. The provided Worker w will be shared between all dequeuers, so it must be thread safe.

func (Pools) NumDequeuers

func (ps Pools) NumDequeuers() int

NumDequeuers returns the total number of dequeuers across all pools.

type WorkServer

type WorkServer struct {
	log.Logger
	// contains filtered or unexported fields
}

func New

func New(ctx context.Context, cfg Config) (WorkServer, error)

New creates a new WorkServer.

func (*WorkServer) Run

func (w *WorkServer) Run(ctx context.Context) error

Run starts the WorkServer and several daemons (to measure queue depth, process "stuck" jobs)

type Worker

type Worker interface {
	log.Logger
	// DoWork is responsible for performing work and either updating the job
	// status in the database or waiting for the status to be updated by
	// another thread. Success and failure for the job are marked by hitting
	// services.HandleStatusCallback, or POST /v1/jobs/:job-name/:job-id (over
	// HTTP).
	//
	// A good pattern is for DoWork to make a HTTP request to a downstream
	// service, and then for that service to make a HTTP callback to report
	// success or failure.
	//
	// The Worker is responsible for returning an error if the ExpiresAt
	// deadline is exceeded while the work is in progress.
	//
	// If DoWork is unable to get the work to be done, it should call
	// HandleStatusCallback with a failed callback; errors are logged, but
	// otherwise nothing else is done with them.
	DoWork(context.Context, *newmodels.QueuedJob) error

	// Sleep returns the amount of time to sleep between failed attempts to
	// acquire a queued job. The default implementation sleeps for 20, 40, 80,
	// 160, ..., up to a maximum of 10 seconds between attempts.
	Sleep(failedAttempts int32) time.Duration
}

A Worker does some work with a QueuedJob. Worker implementations may be shared and should be threadsafe.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL