jobs

package module
v1.1.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 6, 2019 License: MIT Imports: 12 Imported by: 3

README

RoadRunner: Async Job Pipeline

Latest Stable Version GoDoc Build Status Go Report Card Codecov

Features

  • supports in memory queue, Beanstalk, AMQP, AWS SQS
  • can work as standalone application or as part of RoadRunner server
  • multiple pipelines per application
  • durable (prefetch control, graceful exit, reconnects)
  • automatic queue configuration
  • plug-and-play PHP library (framework agnostic)
  • delayed jobs
  • job level timeouts, retries, retry delays
  • PHP and Golang consumers and producers
  • per pipeline stop/resume
  • interactive stats, events, RPC
  • works on Windows

Documentation

Index

Constants

View Source
const (
	// EventPushOK thrown when new job has been added. JobEvent is passed as context.
	EventPushOK = iota + 1500

	// EventPushError caused when job can not be registered.
	EventPushError

	// EventJobStart thrown when new job received.
	EventJobStart

	// EventJobOK thrown when job execution is successfully completed. JobEvent is passed as context.
	EventJobOK

	// EventJobError thrown on all job related errors. See JobError as context.
	EventJobError

	// EventPipeConsume when pipeline pipelines has been requested.
	EventPipeConsume

	// EventPipeActive when pipeline has started.
	EventPipeActive

	// EventPipeStop when pipeline has begun stopping.
	EventPipeStop

	// EventPipeStopped when pipeline has been stopped.
	EventPipeStopped

	// EventPipeError when pipeline specific error happen.
	EventPipeError

	// EventBrokerReady thrown when broken is ready to accept/serve tasks.
	EventBrokerReady
)
View Source
const ID = "jobs"

ID defines public service name.

Variables

This section is empty.

Functions

This section is empty.

Types

type Broker added in v0.2.0

type Broker interface {
	// Register broker pipeline.
	Register(pipe *Pipeline) error

	// Consume configures pipeline to be consumed. With execPool to nil to disable pipelines. Method can be called before
	// the service is started!
	Consume(pipe *Pipeline, execPool chan Handler, errHandler ErrorHandler) error

	// Push job into the worker.
	Push(pipe *Pipeline, j *Job) (string, error)

	// Stat must fetch statistics about given pipeline or return error.
	Stat(pipe *Pipeline) (stat *Stat, err error)
}

Broker manages set of pipelines and provides ability to push jobs into them.

type Config

type Config struct {
	// Workers configures roadrunner server and worker busy.
	Workers *roadrunner.ServerConfig

	// Dispatch defines where and how to match jobs.
	Dispatch map[string]*Options

	// Pipelines defines mapping between PHP job pipeline and associated job broker.
	Pipelines map[string]*Pipeline

	// Consuming specifies names of pipelines to be consumed on service start.
	Consume []string
	// contains filtered or unexported fields
}

Config defines settings for job broker, workers and job-pipeline mapping.

func (*Config) Get added in v1.0.0

func (c *Config) Get(service string) service.Config

Get underlying broker config.

func (*Config) Hydrate

func (c *Config) Hydrate(cfg service.Config) (err error)

Hydrate populates config values.

func (*Config) MatchPipeline added in v1.0.0

func (c *Config) MatchPipeline(job *Job) (*Pipeline, *Options, error)

MatchPipeline locates the pipeline associated with the job.

func (*Config) Unmarshal added in v1.0.0

func (c *Config) Unmarshal(out interface{}) error

Unmarshal is doing nothing.

type Dispatcher added in v1.0.0

type Dispatcher map[string]*Options

Dispatcher provides ability to automatically locate the pipeline for the specific job and update job options (if none set).

type ErrorHandler added in v0.2.0

type ErrorHandler func(id string, j *Job, err error)

ErrorHandler handles job execution errors.

type EventProvider added in v1.0.0

type EventProvider interface {
	// Listen attaches the even listener.
	Listen(lsn func(event int, ctx interface{}))
}

EventProvider defines the ability to throw events for the broker.

type Handler added in v0.2.0

type Handler func(id string, j *Job) error

Handler handles job execution.

type Job added in v0.2.0

type Job struct {
	// Job contains name of job broker (usually PHP class).
	Job string `json:"job"`

	// Payload is string data (usually JSON) passed to Job broker.
	Payload string `json:"payload"`

	// Options contains set of PipelineOptions specific to job execution. Can be empty.
	Options *Options `json:"options,omitempty"`
}

Job carries information about single job.

func (*Job) Body added in v0.2.0

func (j *Job) Body() []byte

Body packs job payload into binary payload.

func (*Job) Context added in v0.2.0

func (j *Job) Context(id string) []byte

Context packs job context (job, id) into binary payload.

type JobError added in v1.0.0

type JobError struct {
	// String is job id.
	ID string

	// Job is failed job.
	Job *Job

	// Caused contains job specific error.
	Caused error
	// contains filtered or unexported fields
}

JobError represents singular Job error event.

func (*JobError) Elapsed added in v1.0.0

func (e *JobError) Elapsed() time.Duration

Elapsed returns duration of the invocation.

func (*JobError) Error added in v1.0.0

func (e *JobError) Error() string

Caused returns error message.

type JobEvent added in v1.0.0

type JobEvent struct {
	// String is job id.
	ID string

	// Job is failed job.
	Job *Job
	// contains filtered or unexported fields
}

JobEvent represent job event.

func (*JobEvent) Elapsed added in v1.0.0

func (e *JobEvent) Elapsed() time.Duration

Elapsed returns duration of the invocation.

type Options added in v0.2.0

type Options struct {
	// Pipeline manually specified pipeline.
	Pipeline string `json:"pipeline,omitempty"`

	// Delay defines time duration to delay execution for. Defaults to none.
	Delay int `json:"delay,omitempty"`

	// Attempts define maximum job retries. Attention, value 1 will only allow job to execute once (without retry).
	// Minimum valuable value is 2.
	Attempts int `json:"maxAttempts,omitempty"`

	// RetryDelay defines for how long job should be waiting until next retry. Defaults to none.
	RetryDelay int `json:"retryDelay,omitempty"`

	// Reserve defines for how broker should wait until treating job are failed. Defaults to 30 min.
	Timeout int `json:"timeout,omitempty"`
}

Options carry information about how to handle given job.

func (*Options) CanRetry added in v1.0.0

func (o *Options) CanRetry(attempt int) bool

CanRetry must return true if broker is allowed to re-run the job.

func (*Options) DelayDuration added in v0.2.0

func (o *Options) DelayDuration() time.Duration

DelayDuration returns delay duration in a form of time.Duration.

func (*Options) Merge added in v1.0.0

func (o *Options) Merge(from *Options)

Merge merges job options.

func (*Options) RetryDuration added in v0.2.0

func (o *Options) RetryDuration() time.Duration

RetryDuration returns retry delay duration in a form of time.Duration.

func (*Options) TimeoutDuration added in v0.2.0

func (o *Options) TimeoutDuration() time.Duration

TimeoutDuration returns timeout duration in a form of time.Duration.

type Pipeline added in v0.2.0

type Pipeline map[string]interface{}

Pipeline defines pipeline options.

func (Pipeline) Bool added in v1.0.0

func (p Pipeline) Bool(name string, d bool) bool

Bool must return option value as string or return default value.

func (Pipeline) Broker added in v0.2.0

func (p Pipeline) Broker() string

Broker associated with the pipeline.

func (Pipeline) Duration added in v1.0.0

func (p Pipeline) Duration(name string, d time.Duration) time.Duration

Duration must return option value as time.Duration (seconds) or return default value.

func (Pipeline) Has added in v0.2.0

func (p Pipeline) Has(name string) bool

Has checks if value presented in pipeline.

func (Pipeline) Integer added in v1.0.0

func (p Pipeline) Integer(name string, d int) int

Integer must return option value as string or return default value.

func (Pipeline) Map added in v1.0.0

func (p Pipeline) Map(name string) Pipeline

Map must return nested map value or empty config.

func (Pipeline) Name added in v1.0.0

func (p Pipeline) Name() string

Name returns pipeline name.

func (Pipeline) String added in v1.0.0

func (p Pipeline) String(name string, d string) string

String must return option value as string or return default value.

func (Pipeline) With added in v1.0.0

func (p Pipeline) With(name string, value interface{}) Pipeline

With pipeline value. Immutable.

type PipelineError added in v1.0.0

type PipelineError struct {
	// Pipeline is associated pipeline.
	Pipeline *Pipeline

	// Caused send by broker.
	Caused error
}

PipelineError defines pipeline specific errors.

func (*PipelineError) Error added in v1.0.0

func (e *PipelineError) Error() string

Error returns error message.

type PipelineList added in v1.0.0

type PipelineList struct {
	// Pipelines is list of pipeline stats.
	Pipelines []*Stat `json:"pipelines"`
}

PipelineList contains list of pipeline stats.

type Pipelines added in v1.0.0

type Pipelines []*Pipeline

Pipelines is list of Pipeline.

func (Pipelines) Broker added in v1.0.0

func (ps Pipelines) Broker(broker string) Pipelines

Broker return pipelines associated with specific broker.

func (Pipelines) Get added in v1.0.0

func (ps Pipelines) Get(name string) *Pipeline

Get returns pipeline by it'svc name.

func (Pipelines) Names added in v1.0.0

func (ps Pipelines) Names(only ...string) Pipelines

Names returns only pipelines with specified names.

func (Pipelines) Reverse added in v1.0.0

func (ps Pipelines) Reverse() Pipelines

Reverse returns pipelines in reversed order.

type Service

type Service struct {
	// Associated parent
	Brokers map[string]Broker
	// contains filtered or unexported fields
}

Service wraps roadrunner container and manage set of parent within it.

func (*Service) AddListener added in v1.0.0

func (svc *Service) AddListener(l func(event int, ctx interface{}))

AddListener attaches event listeners to the service and all underlying brokers.

func (*Service) Attach added in v1.1.0

func (svc *Service) Attach(ctr roadrunner.Controller)

Attach attaches cr. Currently only one cr is supported.

func (*Service) Consume added in v1.0.0

func (svc *Service) Consume(pipe *Pipeline, execPool chan Handler, errHandler ErrorHandler) error

Consume enables or disables pipeline pipelines using given handlers.

func (*Service) Init

func (svc *Service) Init(
	cfg service.Config,
	log *logrus.Logger,
	env env.Environment,
	rpc *rpc.Service,
) (ok bool, err error)

Init configures job service.

func (*Service) Push added in v0.2.0

func (svc *Service) Push(job *Job) (string, error)

Push job to associated broker and return job id.

func (*Service) Serve

func (svc *Service) Serve() error

Serve serves local rr server and creates broker association.

func (*Service) Server added in v1.1.4

func (svc *Service) Server() *roadrunner.Server

Server returns associated rr server (if any).

func (*Service) Stat added in v1.0.0

func (svc *Service) Stat(pipe *Pipeline) (stat *Stat, err error)

Stat returns list of pipelines workers and their stats.

func (*Service) Stop

func (svc *Service) Stop()

Stop all pipelines and rr server.

type Stat added in v1.0.0

type Stat struct {
	// Pipeline name.
	Pipeline string

	// Broken is name of associated broker.
	Broker string

	// InternalName defines internal broker specific pipeline name.
	InternalName string

	// Consuming indicates that pipeline is pipelines jobs.
	Consuming bool

	// testQueue defines number of pending jobs.
	Queue int64

	// Active defines number of jobs which are currently being processed.
	Active int64

	// Delayed defines number of jobs which are being processed.
	Delayed int64
}

Stat contains information about pipeline.

type WorkerList added in v1.0.0

type WorkerList struct {
	// Workers is list of workers.
	Workers []*util.State `json:"workers"`
}

WorkerList contains list of workers.

Directories

Path Synopsis
broker
sqs
cmd

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL