jobqueue

package module
v0.0.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 1, 2023 License: MIT Imports: 4 Imported by: 2

README

JobStack

This library provides a stack implementation for long running or otherwise expensive processing jobs. As a special case, it implements the standard http.Handler in addition to the generic interface.

Mechanism

The stack defines a maximum concurrency limit at which the jobs can be executed, and makes them wait if this limit is exceeded. The default concurrency limit is 1. It is important to note, that the stack doesn't start individual goroutines for each job, the jobs have to have their own goroutines and the stack should be called from those.

Besides limiting the concurrency level, it is also possible to limit the number of pending jobs, either by setting the maximum stack size, or a timeout for the jobs, or both.

Example

func processJobs(jobs []func()) (dropped, timedOut int) {
	stack := jobqueue.With(Options{
		MaxConcurrency: 256,
		MaxStackSize:   256 * 256,
		Timeout:        9 * time.Millisecond,
	})

	defer stack.Close()

	d := newCounter()
	to := newCounter()
	var wg sync.WaitGroup
	wg.Add(len(jobs))
	for _, j := range jobs {
		go func(j func()) {
			err := stack.Do(j)
			switch err {
			case jobqueue.ErrStackFull:
				d.inc()
			case jobqueue.ErrTimeout:
				to.inc()
			}

			wg.Done()
		}(j)
	}

	wg.Wait()
	dropped = d.value()
	timedOut = to.value()
	return
}

Two-step example

func processInSharedStack(s *jobqueue.Stack, job func()) error {
	done, err := s.Wait()
	if err != nil {
		return err
	}

	job()
	done()
	return nil
}

Documentation

Overview

JobStack

This library provides a stack implementation for long running or otherwise expensive processing jobs. As a special case, it implements the standard http.Handler in addition to the generic interface.

Mechanism

The stack defines a maximum concurrency limit at which the jobs can be executed, and makes them wait if this limit is exceeded. The default concurrency limit is 1. It is important to note, that the stack doesn't start individual goroutines for each job, the jobs have to have their own goroutines and the stack should be called from those.

Besides limiting the concurrency level, it is also possible to limit the number of pending jobs, either by setting the maximum stack size, or a timeout for the jobs, or both.

Example

func processJobs(jobs []func()) (dropped, timedOut int) {
	stack := jobqueue.With(Options{
		MaxConcurrency: 256,
		MaxStackSize:   256 * 256,
		Timeout:        9 * time.Millisecond,
	})

	defer stack.Close()

	d := newCounter()
	to := newCounter()
	var wg sync.WaitGroup
	wg.Add(len(jobs))
	for _, j := range jobs {
		go func(j func()) {
			err := stack.Do(j)
			switch err {
			case jobqueue.ErrStackFull:
				d.inc()
			case jobqueue.ErrTimeout:
				to.inc()
			}

			wg.Done()
		}(j)
	}

	wg.Wait()
	dropped = d.value()
	timedOut = to.value()
	return
}

Two-step example

func processInSharedStack(s *jobqueue.Stack, job func()) error {
	done, err := s.Wait()
	if err != nil {
		return err
	}

	job()
	done()
	return nil
}

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrStackFull is returned by the stack when the max stack size is reached.
	ErrStackFull = errors.New("stack is full")

	// ErrTimeout is returned by the stack when a pending job reached the timeout.
	ErrTimeout = errors.New("timeout")

	// ErrClosed is returned by the queue when called after the queue was closed, or when the
	// queue was closed while a job was waiting to be scheduled.
	ErrClosed = errors.New("queue closed")
)

Functions

This section is empty.

Types

type HTTPOptions

type HTTPOptions struct {

	// Options contains the common options for the stack.
	Options

	// StackFullStatusCode is used when a job needs to be dropped from the
	// stack before its processing has been started. Defaults to 503 Service
	// Unavailable.
	StackFullStatusCode int

	// TimeoutStatusCode is used when a job times out before its processing
	// has been started. Defaults to 503 Service Unavailable.
	TimeoutStatusCode int
}

HTTPOptions extends the main stack options with the HTTP related configuration.

type Handler

type Handler struct {
	// contains filtered or unexported fields
}

Handler is wrapper around Stack that implements the standard http.Handler interface.

func NewHandler

func NewHandler(o HTTPOptions, h http.Handler) *Handler

NewHandler initializes stack handler wrapping th ehttp.Handler argument. It uses the configured stack to control whether and when the processing of a request can be started. It limits the maximum number of requests that can be processed to the value of MaxConcurrency.

Instances of the Handler needs to be closed with the Close method once they are not used anymore.

func (*Handler) Close

func (h *Handler) Close()

Close frees up the resources used by a Handler instance.

func (*Handler) ServeHTTP

func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request)

ServeHTTP implements the http.Handler interface.

type Options

type Options struct {

	// MaxConcurrency defines how many jobs are allowed to run concurrently.
	// Defaults to 1.
	MaxConcurrency int

	// MaxStackSize defines how many jobs may be waiting in the stack.
	// Defaults to infinite.
	MaxStackSize int

	// Timeout defines how long a job can be waiting in the stack.
	// Defaults to infinite.
	Timeout time.Duration

	// CloseTimeout sets a maximum duration for how long the queue can wait
	// for the active and queued jobs to finish. Defaults to infinite.
	CloseTimeout time.Duration
}

Options allows passing in parameters to the stack.

type Stack

type Stack struct {
	// contains filtered or unexported fields
}

Stack controls how long running or otherwise expensive jobs are executed. It allows the jobs to proceed with limited concurrency. The incoming jobs are executed in LIFO style (Last-in-first-out).

Jobs also can be dropped or timed out, when the MaxStackSize and/or Timeout options are set. When MaxStackSize is reached, the oldest job is dropped.

Using a stack for job processing can be a good way to protect an application from bursts of chatty clients or temporarily slow job execution.

func New

func New() *Stack

New creates a Stack instance with a concurrency level of 1, and with infinite stack size and timeout. See With(Options), too. The Stack needs to be closed once it's not used anymore.

func With

func With(o Options) *Stack

With creates a Stack instance configured by the Options parameter. The Stack needs to be closed once it's not used anymore.

func (*Stack) Close

func (s *Stack) Close()

Close frees up the resources used by a Stack instance.

After called, the queue stops accepting new jobs, but it waits until all the jobs are done, including those waiting in the queue.

If the close timeout is set to >0, then forces closing after the timeout has passed. If the timeout has passed, the queued jobs receive ErrClosed. The close timeout can be set as an initialization option to the queue.

func (*Stack) CloseForced

func (s *Stack) CloseForced()

CloseForced frees up the resources used by a Stack instance.

When called, the queued jobs receive ErrClosed.

func (*Stack) Do

func (s *Stack) Do(job func()) error

Do calls the job, as soon as the number of the running jobs is not higher than the MaxConcurrency.

If a job is dropped from the stack or times out, ErrStackFull or ErrTimeout is returned. Do does not return any other errors than ErrStackFull or ErrTimeout.

Once the job has been started, Do does not return an error.

func (*Stack) Reconfigure

func (s *Stack) Reconfigure(o Options) error

func (*Stack) Status

func (s *Stack) Status() Status

Status returns snapshot information about the state of the queue.

func (*Stack) Wait

func (s *Stack) Wait() (done func(), err error)

Wait returns when a job can be processed, or it should be cancelled. The notion of the actual 'job' to be processed is completely up to the calling code.

When a job can be processed, Wait returns a non-nil done() function, which must be called after the job was done, in order to free-up a slot for the next job.

When the job needs to be droppped, Wait returns ErrStackFull. When the job timed out, Wait returns ErrTimeout. In these cases, done() must not be called, and it may be nil.

Wait doesn't return other errors than ErrStackFull or ErrTimeout.

type Status

type Status struct {

	// Active contains the number of jobs being executed.
	ActiveJobs int

	// Queued contains the number of jobs waiting to be scheduled.
	QueuedJobs int

	// Closing indicates that the queue is being closed.
	Closing bool

	// Closed indicates that the queues has been closed.
	Closed bool
}

Status contains snapshot information about the state of the queue.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL