jobs: Index | Files | Directories

package jobs

import ""


Package Files

broker.go config.go dispatcher.go event.go job.go job_options.go pipeline.go rpc.go service.go


const (
    // EventPushOK thrown when new job has been added. JobEvent is passed as context.
    EventPushOK = iota + 1500

    // EventPushError caused when job can not be registered.

    // EventJobStart thrown when new job received.

    // EventJobOK thrown when job execution is successfully completed. JobEvent is passed as context.

    // EventJobError thrown on all job related errors. See JobError as context.

    // EventPipeConsume when pipeline pipelines has been requested.

    // EventPipeActive when pipeline has started.

    // EventPipeStop when pipeline has begun stopping.

    // EventPipeStopped when pipeline has been stopped.

    // EventPipeError when pipeline specific error happen.

    // EventBrokerReady thrown when broken is ready to accept/serve tasks.
const ID = "jobs"

ID defines public service name.

type Broker Uses

type Broker interface {
    // Register broker pipeline.
    Register(pipe *Pipeline) error

    // Consume configures pipeline to be consumed. With execPool to nil to disable pipelines. Method can be called before
    // the service is started!
    Consume(pipe *Pipeline, execPool chan Handler, errHandler ErrorHandler) error

    // Push job into the worker.
    Push(pipe *Pipeline, j *Job) (string, error)

    // Stat must fetch statistics about given pipeline or return error.
    Stat(pipe *Pipeline) (stat *Stat, err error)

Broker manages set of pipelines and provides ability to push jobs into them.

type Config Uses

type Config struct {
    // Workers configures roadrunner server and worker busy.
    Workers *roadrunner.ServerConfig

    // Dispatch defines where and how to match jobs.
    Dispatch map[string]*Options

    // Pipelines defines mapping between PHP job pipeline and associated job broker.
    Pipelines map[string]*Pipeline

    // Consuming specifies names of pipelines to be consumed on service start.
    Consume []string
    // contains filtered or unexported fields

Config defines settings for job broker, workers and job-pipeline mapping.

func (*Config) Get Uses

func (c *Config) Get(service string) service.Config

Get underlying broker config.

func (*Config) Hydrate Uses

func (c *Config) Hydrate(cfg service.Config) (err error)

Hydrate populates config values.

func (*Config) MatchPipeline Uses

func (c *Config) MatchPipeline(job *Job) (*Pipeline, *Options, error)

MatchPipeline locates the pipeline associated with the job.

func (*Config) Unmarshal Uses

func (c *Config) Unmarshal(out interface{}) error

Unmarshal is doing nothing.

type Dispatcher Uses

type Dispatcher map[string]*Options

Dispatcher provides ability to automatically locate the pipeline for the specific job and update job options (if none set).

type ErrorHandler Uses

type ErrorHandler func(id string, j *Job, err error)

ErrorHandler handles job execution errors.

type EventProvider Uses

type EventProvider interface {
    // Listen attaches the even listener.
    Listen(lsn func(event int, ctx interface{}))

EventProvider defines the ability to throw events for the broker.

type Handler Uses

type Handler func(id string, j *Job) error

Handler handles job execution.

type Job Uses

type Job struct {
    // Job contains name of job broker (usually PHP class).
    Job string `json:"job"`

    // Payload is string data (usually JSON) passed to Job broker.
    Payload string `json:"payload"`

    // Options contains set of PipelineOptions specific to job execution. Can be empty.
    Options *Options `json:"options,omitempty"`

Job carries information about single job.

func (*Job) Body Uses

func (j *Job) Body() []byte

Body packs job payload into binary payload.

func (*Job) Context Uses

func (j *Job) Context(id string) []byte

Context packs job context (job, id) into binary payload.

type JobError Uses

type JobError struct {
    // String is job id.
    ID  string

    // Job is failed job.
    Job *Job

    // Caused contains job specific error.
    Caused error
    // contains filtered or unexported fields

JobError represents singular Job error event.

func (*JobError) Elapsed Uses

func (e *JobError) Elapsed() time.Duration

Elapsed returns duration of the invocation.

func (*JobError) Error Uses

func (e *JobError) Error() string

Caused returns error message.

type JobEvent Uses

type JobEvent struct {
    // String is job id.
    ID  string

    // Job is failed job.
    Job *Job
    // contains filtered or unexported fields

JobEvent represent job event.

func (*JobEvent) Elapsed Uses

func (e *JobEvent) Elapsed() time.Duration

Elapsed returns duration of the invocation.

type Options Uses

type Options struct {
    // Pipeline manually specified pipeline.
    Pipeline string `json:"pipeline,omitempty"`

    // Delay defines time duration to delay execution for. Defaults to none.
    Delay int `json:"delay,omitempty"`

    // Attempts define maximum job retries. Attention, value 1 will only allow job to execute once (without retry).
    // Minimum valuable value is 2.
    Attempts int `json:"maxAttempts,omitempty"`

    // RetryDelay defines for how long job should be waiting until next retry. Defaults to none.
    RetryDelay int `json:"retryDelay,omitempty"`

    // Reserve defines for how broker should wait until treating job are failed. Defaults to 30 min.
    Timeout int `json:"timeout,omitempty"`

Options carry information about how to handle given job.

func (*Options) CanRetry Uses

func (o *Options) CanRetry(attempt int) bool

CanRetry must return true if broker is allowed to re-run the job.

func (*Options) DelayDuration Uses

func (o *Options) DelayDuration() time.Duration

DelayDuration returns delay duration in a form of time.Duration.

func (*Options) Merge Uses

func (o *Options) Merge(from *Options)

Merge merges job options.

func (*Options) RetryDuration Uses

func (o *Options) RetryDuration() time.Duration

RetryDuration returns retry delay duration in a form of time.Duration.

func (*Options) TimeoutDuration Uses

func (o *Options) TimeoutDuration() time.Duration

TimeoutDuration returns timeout duration in a form of time.Duration.

type Pipeline Uses

type Pipeline map[string]interface{}

Pipeline defines pipeline options.

func (Pipeline) Bool Uses

func (p Pipeline) Bool(name string, d bool) bool

Bool must return option value as string or return default value.

func (Pipeline) Broker Uses

func (p Pipeline) Broker() string

Broker associated with the pipeline.

func (Pipeline) Duration Uses

func (p Pipeline) Duration(name string, d time.Duration) time.Duration

Duration must return option value as time.Duration (seconds) or return default value.

func (Pipeline) Has Uses

func (p Pipeline) Has(name string) bool

Has checks if value presented in pipeline.

func (Pipeline) Integer Uses

func (p Pipeline) Integer(name string, d int) int

Integer must return option value as string or return default value.

func (Pipeline) Map Uses

func (p Pipeline) Map(name string) Pipeline

Map must return nested map value or empty config.

func (Pipeline) Name Uses

func (p Pipeline) Name() string

Name returns pipeline name.

func (Pipeline) String Uses

func (p Pipeline) String(name string, d string) string

String must return option value as string or return default value.

func (Pipeline) With Uses

func (p Pipeline) With(name string, value interface{}) Pipeline

With pipeline value. Immutable.

type PipelineError Uses

type PipelineError struct {
    // Pipeline is associated pipeline.
    Pipeline *Pipeline

    // Caused send by broker.
    Caused error

PipelineError defines pipeline specific errors.

func (*PipelineError) Error Uses

func (e *PipelineError) Error() string

Error returns error message.

type PipelineList Uses

type PipelineList struct {
    // Pipelines is list of pipeline stats.
    Pipelines []*Stat `json:"pipelines"`

PipelineList contains list of pipeline stats.

type Pipelines Uses

type Pipelines []*Pipeline

Pipelines is list of Pipeline.

func (Pipelines) Broker Uses

func (ps Pipelines) Broker(broker string) Pipelines

Broker return pipelines associated with specific broker.

func (Pipelines) Get Uses

func (ps Pipelines) Get(name string) *Pipeline

Get returns pipeline by it'svc name.

func (Pipelines) Names Uses

func (ps Pipelines) Names(only ...string) Pipelines

Names returns only pipelines with specified names.

func (Pipelines) Reverse Uses

func (ps Pipelines) Reverse() Pipelines

Reverse returns pipelines in reversed order.

type Service Uses

type Service struct {
    // Associated parent
    Brokers map[string]Broker
    // contains filtered or unexported fields

Service wraps roadrunner container and manage set of parent within it.

func (*Service) AddListener Uses

func (svc *Service) AddListener(l func(event int, ctx interface{}))

AddListener attaches event listeners to the service and all underlying brokers.

func (*Service) Attach Uses

func (svc *Service) Attach(ctr roadrunner.Controller)

Attach attaches cr. Currently only one cr is supported.

func (*Service) Consume Uses

func (svc *Service) Consume(pipe *Pipeline, execPool chan Handler, errHandler ErrorHandler) error

Consume enables or disables pipeline pipelines using given handlers.

func (*Service) Init Uses

func (svc *Service) Init(
    cfg service.Config,
    log *logrus.Logger,
    env env.Environment,
    rpc *rpc.Service,
) (ok bool, err error)

Init configures job service.

func (*Service) Push Uses

func (svc *Service) Push(job *Job) (string, error)

Push job to associated broker and return job id.

func (*Service) Serve Uses

func (svc *Service) Serve() error

Serve serves local rr server and creates broker association.

func (*Service) Server Uses

func (svc *Service) Server() *roadrunner.Server

Server returns associated rr server (if any).

func (*Service) Stat Uses

func (svc *Service) Stat(pipe *Pipeline) (stat *Stat, err error)

Stat returns list of pipelines workers and their stats.

func (*Service) Stop Uses

func (svc *Service) Stop()

Stop all pipelines and rr server.

type Stat Uses

type Stat struct {
    // Pipeline name.
    Pipeline string

    // Broken is name of associated broker.
    Broker string

    // InternalName defines internal broker specific pipeline name.
    InternalName string

    // Consuming indicates that pipeline is pipelines jobs.
    Consuming bool

    // testQueue defines number of pending jobs.
    Queue int64

    // Active defines number of jobs which are currently being processed.
    Active int64

    // Delayed defines number of jobs which are being processed.
    Delayed int64

Stat contains information about pipeline.

type WorkerList Uses

type WorkerList struct {
    // Workers is list of workers.
    Workers []*util.State `json:"workers"`

WorkerList contains list of workers.



Package jobs imports 12 packages (graph) and is imported by 3 packages. Updated 2020-08-12. Refresh now. Tools for package owners.