worker

package module
v0.0.0-...-07b734a Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 3, 2017 License: MIT Imports: 17 Imported by: 0

README

Worker Travis CI GoDoc

An experimental background jobs processor. The following queues are supported:

Installation

$ go get -u github.com/vitalie/worker

Usage

package main

import (
	"context"
	"log"

	"github.com/vitalie/worker"
)

// addJob represents a simple background job.
// When the job is queued, visible (public) fields
// of the struct are serialized in JSON format along
// with struct's name.
type addJob struct {
	X, Y int
}

// Make implements Factory interface, it initialize
// the struct with data received from the job queue.
func (j *addJob) Make(args *worker.Args) (worker.Job, error) {
	job := &addJob{
		X: args.Get("X").MustInt(-1),
		Y: args.Get("Y").MustInt(-1),
	}
	return job, nil
}

// Run implements Runner interface, this is the
// function which is executed by background processor
// after initialization.
func (j *addJob) Run() error {
	sum := j.X + j.Y
	log.Printf("sum(%d, %d) = %d\n", j.X, j.Y, sum)
	return nil
}

func main() {
	q := worker.NewMemoryQueue()
	q.Put(&addJob{2, 3})

	// Create a worker pool with default settings,
	// common middlewares (Recovery, Logger)
	// using the `q` queue.
	pool := worker.NewPool(
		worker.SetQueue(q),
	)

	// Register the job.
	pool.Add(&addJob{})

	// Starts the workers and processes the jobs
	// from the queue until process exists.
	pool.Run(context.Background())
}

Example output:

vitalie@black:~/tmp$ go run t.go
[worker] addJob {"X":2,"Y":3} ... started
2015/04/24 14:36:21 sum(2, 3) = 5
[worker] addJob {"X":2,"Y":3} ... in 98.945µs ... OK
^C[worker] Quit signal received ...
[worker] Stopping workers ...
[worker] Shutdown completed!

TODO

  • Job scheduler

Credits

Contributing

  1. Fork it
  2. Create your feature branch (git checkout -b my-new-feature)
  3. Commit your changes (git commit -am 'Add some feature')
  4. Push to the branch (git push origin my-new-feature)
  5. Create new Pull Request

Documentation

Index

Constants

View Source
const (
	BeanstalkHost = "localhost" // Beanstalk default host.
	BeanstalkPort = "11300"     // Beanstalk default port.
	BeanstalkTube = "default"   // Beanstalk default queue.
	BeanstalkPrio = 100         // Beanstalk default job priority.

)
View Source
const (
	DefaultWorkersCount = 10
)

Variables

View Source
var (
	BeanstalkTimeout time.Duration = 1 * time.Second // Beanstalk reserve timeout.
	BeanstalkTTR     time.Duration = 2 * DefaultTTR  // Beanstalk default TTR (time to run).
)
View Source
var (
	DefaultTTR time.Duration = 10 * time.Minute
)

Functions

func NewError

func NewError(msg string) error

func NewErrorFmt

func NewErrorFmt(format string, args ...interface{}) error

func SetQueue

func SetQueue(q Queue) func(*Pool)

SetQueue assigns a custom queue to worker pool.

func SetWorkers

func SetWorkers(n int) func(*Pool)

SetWorkers configures the pool concurrency.

func StructType

func StructType(v interface{}) (string, error)

Types

type Airbrake

type Airbrake struct {
	Logger    *log.Logger
	Airbrake  *gobrake.Notifier
	StackAll  bool
	StackSize int
}

func NewAirbrake

func NewAirbrake(id int64, key, env string) *Airbrake

func (*Airbrake) Exec

func (r *Airbrake) Exec(sw StatusWriter, fact string, args *Args, next JobRunner)

type Args

type Args struct {
	// contains filtered or unexported fields
}

func (Args) String

func (d Args) String() string

type BeanstalkQueue

type BeanstalkQueue struct {
	Host string        // Beanstalk host.
	Port string        // Beanstalk port.
	Name string        // Beanstalk tube name.
	Prio uint32        // Beanstalk priority.
	TTR  time.Duration // Beanstalk time to run.
	// contains filtered or unexported fields
}

BeanstalkQueue represents a Beanstalk queue.

func (*BeanstalkQueue) Delete

func (q *BeanstalkQueue) Delete(m Message) error

Delete deletes a job from the queue.

func (*BeanstalkQueue) Get

func (q *BeanstalkQueue) Get() (Message, error)

Get peeks a job from the queue.

func (*BeanstalkQueue) Put

func (q *BeanstalkQueue) Put(j Job) error

Put puts the job in the queue.

func (*BeanstalkQueue) Reject

func (q *BeanstalkQueue) Reject(m Message) error

Reject rejects the job marking it as failed.

func (*BeanstalkQueue) Size

func (q *BeanstalkQueue) Size() (uint64, uint64, error)

Size returns the queue size, only ready jobs are returned.

type Envelope

type Envelope struct {
	// contains filtered or unexported fields
}

func NewEnvelope

func NewEnvelope(body []byte) (*Envelope, error)

func (*Envelope) Args

func (e *Envelope) Args() *Args

func (*Envelope) String

func (e *Envelope) String() string

func (*Envelope) Type

func (e *Envelope) Type() string

type Error

type Error struct {
	Err       string
	IsTimeout bool
}

func (*Error) Error

func (e *Error) Error() string

func (*Error) Temporary

func (e *Error) Temporary() bool

func (*Error) Timeout

func (e *Error) Timeout() bool

type Factory

type Factory interface {
	Make(*Args) (Job, error)
}

type Handler

type Handler interface {
	Exec(sw StatusWriter, fact string, args *Args, next JobRunner)
}

func AirbreakStack

func AirbreakStack(id int64, key, env string) []Handler

AirbrakeStack is used to configure default middleware using Airbrake error tracking service (middlewares: Airbreak, Logger).

func CommonStack

func CommonStack() []Handler

CommonStack is used to configure default middleware that's common for most applications (middlewares: Recovery, Logger).

type HandlerFunc

type HandlerFunc func(sw StatusWriter, fact string, args *Args, next JobRunner)

func (HandlerFunc) Exec

func (h HandlerFunc) Exec(sw StatusWriter, fact string, args *Args, next JobRunner)

type Job

type Job interface {
	Runner
	Factory
}

type JobRunner

type JobRunner func(sw StatusWriter, fact string, args *Args)

type Logger

type Logger struct {
	*log.Logger
}

func NewLogger

func NewLogger() *Logger

func (*Logger) Exec

func (l *Logger) Exec(sw StatusWriter, fact string, args *Args, next JobRunner)

type MemoryQueue

type MemoryQueue struct {
	sync.Mutex
	// contains filtered or unexported fields
}

MemoryQueue represents an ordered queue, this queue is used mainly for unit tests.

func (*MemoryQueue) Delete

func (q *MemoryQueue) Delete(msg Message) error

func (*MemoryQueue) Get

func (q *MemoryQueue) Get() (Message, error)

func (*MemoryQueue) Put

func (q *MemoryQueue) Put(j Job) error

func (*MemoryQueue) Reject

func (q *MemoryQueue) Reject(msg Message) error

func (*MemoryQueue) Size

func (q *MemoryQueue) Size() (uint64, uint64, error)

type Message

type Message interface {
	Type() string
	Args() *Args
}

type Payload

type Payload struct {
	Type string      `json:"type"`
	Args interface{} `json:"args"`
}

Payload represents a queue message payload.

type Pool

type Pool struct {
	// contains filtered or unexported fields
}

Pool represents a pool of workers connected to a queue.

func NewPool

func NewPool(opts ...func(*Pool)) *Pool

NewPool returns a new Pool instance.

func (*Pool) Add

func (p *Pool) Add(f Factory) error

Add registers a new job factory.

func (*Pool) Exec

func (p *Pool) Exec(sw StatusWriter, fact string, args *Args)

Exec runs the job passing it through the middleware stack.

func (*Pool) Run

func (p *Pool) Run(ctx context.Context) error

Run starts processing jobs from the queue.

func (*Pool) Use

func (p *Pool) Use(h Handler)

Use appends a new middleware to current stack.

type Priority

type Priority interface {
	Prio() uint32
}

type Queue

type Queue interface {
	Put(Job) error
	Get() (Message, error)
	Delete(Message) error
	Reject(Message) error
	Size() (uint64, uint64, error)
}

func NewBeanstalkQueue

func NewBeanstalkQueue(opts ...func(*BeanstalkQueue)) (Queue, error)

NewBeanstalkQueue returns a queue instance using custom options.

func NewMemoryQueue

func NewMemoryQueue() Queue

type Recovery

type Recovery struct {
	Logger    *log.Logger
	StackAll  bool
	StackSize int
}

func NewRecovery

func NewRecovery() *Recovery

func (*Recovery) Exec

func (r *Recovery) Exec(sw StatusWriter, fact string, args *Args, next JobRunner)

type Runner

type Runner interface {
	Run() error
}

type StatusWriter

type StatusWriter interface {
	Set(interface{})
	Get() error
	OK() bool
}

func NewStatusWriter

func NewStatusWriter() StatusWriter

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL