jobs

package module
v2.2.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 15, 2021 License: MIT Imports: 12 Imported by: 1

README

RoadRunner: Background PHP workers, Queue brokers

Latest Stable Version GoDoc CI Go Report Card Codecov

Documentation

Features

  • supports in memory queue, Beanstalk, AMQP, AWS SQS
  • can work as standalone application or as part of RoadRunner server
  • multiple pipelines per application
  • durable (prefetch control, graceful exit, reconnects)
  • automatic queue configuration
  • plug-and-play PHP library (framework agnostic)
  • delayed jobs
  • job level timeouts, retries, retry delays
  • PHP and Golang consumers and producers
  • per pipeline stop/resume
  • interactive stats, events, RPC
  • works on Windows

License:

The MIT License (MIT). Please see LICENSE for more information. Maintained by Spiral Scout.

Documentation

Index

Constants

View Source
const (
	// EventPushOK thrown when new job has been added. JobEvent is passed as context.
	EventPushOK = iota + 1500

	// EventPushError caused when job can not be registered.
	EventPushError

	// EventJobStart thrown when new job received.
	EventJobStart

	// EventJobOK thrown when job execution is successfully completed. JobEvent is passed as context.
	EventJobOK

	// EventJobError thrown on all job related errors. See JobError as context.
	EventJobError

	// EventPipeConsume when pipeline pipelines has been requested.
	EventPipeConsume

	// EventPipeActive when pipeline has started.
	EventPipeActive

	// EventPipeStop when pipeline has begun stopping.
	EventPipeStop

	// EventPipeStopped when pipeline has been stopped.
	EventPipeStopped

	// EventPipeError when pipeline specific error happen.
	EventPipeError

	// EventBrokerReady thrown when broken is ready to accept/serve tasks.
	EventBrokerReady
)
View Source
const ID = "jobs"

ID defines public service name.

Variables

This section is empty.

Functions

This section is empty.

Types

type Broker

type Broker interface {
	// Register broker pipeline.
	Register(pipe *Pipeline) error

	// Consume configures pipeline to be consumed. With execPool to nil to disable pipelines. Method can be called before
	// the service is started!
	Consume(pipe *Pipeline, execPool chan Handler, errHandler ErrorHandler) error

	// Push job into the worker.
	Push(pipe *Pipeline, j *Job) (string, error)

	// Stat must fetch statistics about given pipeline or return error.
	Stat(pipe *Pipeline) (stat *Stat, err error)
}

Broker manages set of pipelines and provides ability to push jobs into them.

type Config

type Config struct {
	// Workers configures roadrunner server and worker busy.
	Workers *roadrunner.ServerConfig

	// Dispatch defines where and how to match jobs.
	Dispatch map[string]*Options

	// Pipelines defines mapping between PHP job pipeline and associated job broker.
	Pipelines map[string]*Pipeline

	// Consuming specifies names of pipelines to be consumed on service start.
	Consume []string
	// contains filtered or unexported fields
}

Config defines settings for job broker, workers and job-pipeline mapping.

func (*Config) Get

func (c *Config) Get(service string) service.Config

Get underlying broker config.

func (*Config) Hydrate

func (c *Config) Hydrate(cfg service.Config) (err error)

Hydrate populates config values.

func (*Config) MatchPipeline

func (c *Config) MatchPipeline(job *Job) (*Pipeline, *Options, error)

MatchPipeline locates the pipeline associated with the job.

func (*Config) Unmarshal

func (c *Config) Unmarshal(out interface{}) error

Unmarshal is doing nothing.

type Dispatcher

type Dispatcher map[string]*Options

Dispatcher provides ability to automatically locate the pipeline for the specific job and update job options (if none set).

type ErrorHandler

type ErrorHandler func(id string, j *Job, err error)

ErrorHandler handles job execution errors.

type EventProvider

type EventProvider interface {
	// Listen attaches the even listener.
	Listen(lsn func(event int, ctx interface{}))
}

EventProvider defines the ability to throw events for the broker.

type Handler

type Handler func(id string, j *Job) error

Handler handles job execution.

type Job

type Job struct {
	// Job contains name of job broker (usually PHP class).
	Job string `json:"job"`

	// Payload is string data (usually JSON) passed to Job broker.
	Payload string `json:"payload"`

	// Options contains set of PipelineOptions specific to job execution. Can be empty.
	Options *Options `json:"options,omitempty"`
}

Job carries information about single job.

func (*Job) Body

func (j *Job) Body() []byte

Body packs job payload into binary payload.

func (*Job) Context

func (j *Job) Context(id string) []byte

Context packs job context (job, id) into binary payload.

type JobError

type JobError struct {
	// String is job id.
	ID string

	// Job is failed job.
	Job *Job

	// Caused contains job specific error.
	Caused error
	// contains filtered or unexported fields
}

JobError represents singular Job error event.

func (*JobError) Elapsed

func (e *JobError) Elapsed() time.Duration

Elapsed returns duration of the invocation.

func (*JobError) Error

func (e *JobError) Error() string

Caused returns error message.

type JobEvent

type JobEvent struct {
	// String is job id.
	ID string

	// Job is failed job.
	Job *Job
	// contains filtered or unexported fields
}

JobEvent represent job event.

func (*JobEvent) Elapsed

func (e *JobEvent) Elapsed() time.Duration

Elapsed returns duration of the invocation.

type Options

type Options struct {
	// Pipeline manually specified pipeline.
	Pipeline string `json:"pipeline,omitempty"`

	// Delay defines time duration to delay execution for. Defaults to none.
	Delay int `json:"delay,omitempty"`

	// Attempts define maximum job retries. Attention, value 1 will only allow job to execute once (without retry).
	// Minimum valuable value is 2.
	Attempts int `json:"maxAttempts,omitempty"`

	// RetryDelay defines for how long job should be waiting until next retry. Defaults to none.
	RetryDelay int `json:"retryDelay,omitempty"`

	// Reserve defines for how broker should wait until treating job are failed. Defaults to 30 min.
	Timeout int `json:"timeout,omitempty"`
}

Options carry information about how to handle given job.

func (*Options) CanRetry

func (o *Options) CanRetry(attempt int) bool

CanRetry must return true if broker is allowed to re-run the job.

func (*Options) DelayDuration

func (o *Options) DelayDuration() time.Duration

DelayDuration returns delay duration in a form of time.Duration.

func (*Options) Merge

func (o *Options) Merge(from *Options)

Merge merges job options.

func (*Options) RetryDuration

func (o *Options) RetryDuration() time.Duration

RetryDuration returns retry delay duration in a form of time.Duration.

func (*Options) TimeoutDuration

func (o *Options) TimeoutDuration() time.Duration

TimeoutDuration returns timeout duration in a form of time.Duration.

type Pipeline

type Pipeline map[string]interface{}

Pipeline defines pipeline options.

func (Pipeline) Bool

func (p Pipeline) Bool(name string, d bool) bool

Bool must return option value as string or return default value.

func (Pipeline) Broker

func (p Pipeline) Broker() string

Broker associated with the pipeline.

func (Pipeline) Duration

func (p Pipeline) Duration(name string, d time.Duration) time.Duration

Duration must return option value as time.Duration (seconds) or return default value.

func (Pipeline) Has

func (p Pipeline) Has(name string) bool

Has checks if value presented in pipeline.

func (Pipeline) Integer

func (p Pipeline) Integer(name string, d int) int

Integer must return option value as string or return default value.

func (Pipeline) Map

func (p Pipeline) Map(name string) Pipeline

Map must return nested map value or empty config.

func (Pipeline) Name

func (p Pipeline) Name() string

Name returns pipeline name.

func (Pipeline) String

func (p Pipeline) String(name string, d string) string

String must return option value as string or return default value.

func (Pipeline) With

func (p Pipeline) With(name string, value interface{}) Pipeline

With pipeline value. Immutable.

type PipelineError

type PipelineError struct {
	// Pipeline is associated pipeline.
	Pipeline *Pipeline

	// Caused send by broker.
	Caused error
}

PipelineError defines pipeline specific errors.

func (*PipelineError) Error

func (e *PipelineError) Error() string

Error returns error message.

type PipelineList

type PipelineList struct {
	// Pipelines is list of pipeline stats.
	Pipelines []*Stat `json:"pipelines"`
}

PipelineList contains list of pipeline stats.

type Pipelines

type Pipelines []*Pipeline

Pipelines is list of Pipeline.

func (Pipelines) Broker

func (ps Pipelines) Broker(broker string) Pipelines

Broker return pipelines associated with specific broker.

func (Pipelines) Get

func (ps Pipelines) Get(name string) *Pipeline

Get returns pipeline by it'svc name.

func (Pipelines) Names

func (ps Pipelines) Names(only ...string) Pipelines

Names returns only pipelines with specified names.

func (Pipelines) Reverse

func (ps Pipelines) Reverse() Pipelines

Reverse returns pipelines in reversed order.

type Service

type Service struct {
	// Associated parent
	Brokers map[string]Broker
	// contains filtered or unexported fields
}

Service wraps roadrunner container and manage set of parent within it.

func (*Service) AddListener

func (svc *Service) AddListener(l func(event int, ctx interface{}))

AddListener attaches event listeners to the service and all underlying brokers.

func (*Service) Attach

func (svc *Service) Attach(ctr roadrunner.Controller)

Attach attaches cr. Currently only one cr is supported.

func (*Service) Consume

func (svc *Service) Consume(pipe *Pipeline, execPool chan Handler, errHandler ErrorHandler) error

Consume enables or disables pipeline pipelines using given handlers.

func (*Service) Init

func (svc *Service) Init(
	cfg service.Config,
	log *logrus.Logger,
	env env.Environment,
	rpc *rpc.Service,
) (ok bool, err error)

Init configures job service.

func (*Service) Push

func (svc *Service) Push(job *Job) (string, error)

Push job to associated broker and return job id.

func (*Service) Serve

func (svc *Service) Serve() error

Serve serves local rr server and creates broker association.

func (*Service) Server

func (svc *Service) Server() *roadrunner.Server

Server returns associated rr server (if any).

func (*Service) Stat

func (svc *Service) Stat(pipe *Pipeline) (stat *Stat, err error)

Stat returns list of pipelines workers and their stats.

func (*Service) Stop

func (svc *Service) Stop()

Stop all pipelines and rr server.

type Stat

type Stat struct {
	// Pipeline name.
	Pipeline string

	// Broken is name of associated broker.
	Broker string

	// InternalName defines internal broker specific pipeline name.
	InternalName string

	// Consuming indicates that pipeline is pipelines jobs.
	Consuming bool

	// testQueue defines number of pending jobs.
	Queue int64

	// Active defines number of jobs which are currently being processed.
	Active int64

	// Delayed defines number of jobs which are being processed.
	Delayed int64
}

Stat contains information about pipeline.

type WorkerList

type WorkerList struct {
	// Workers is list of workers.
	Workers []*util.State `json:"workers"`
}

WorkerList contains list of workers.

Directories

Path Synopsis
broker
sqs
cmd

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL