workers

package module
v0.0.0-...-43922cd Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 10, 2017 License: MIT Imports: 20 Imported by: 0

README

Build Status GoDoc

Sidekiq compatible background workers in golang.

  • reliable queueing for all queues using brpoplpush
  • handles retries
  • support custom middleware
  • customize concurrency per queue
  • responds to Unix signals to safely wait for jobs to finish before exiting.
  • provides stats on what jobs are currently running
  • well tested

Example usage:

package main

import (
	"github.com/jrallison/go-workers"
)

func myJob(message *workers.Msg) {
  // do something with your message
  // message.Jid()
  // message.Args() is a wrapper around go-simplejson (http://godoc.org/github.com/bitly/go-simplejson)
}

type myMiddleware struct{}

func (r *myMiddleware) Call(queue string, message *workers.Msg, next func() bool) (acknowledge bool) {
  // do something before each message is processed
  acknowledge = next()
  // do something after each message is processed
  return
} 

func main() {
  workers.Configure(map[string]string{
    // location of redis instance
    "server":  "localhost:6379",
    // instance of the database
    "database":  "0",
    // number of connections to keep open with redis
    "pool":    "30",
    // unique process id for this instance of workers (for proper recovery of inprogress jobs on crash)
    "process": "1",
  })

  workers.Middleware.Append(&myMiddleware{})

  // pull messages from "myqueue" with concurrency of 10
  workers.Process("myqueue", myJob, 10)

  // pull messages from "myqueue2" with concurrency of 20
  workers.Process("myqueue2", myJob, 20)

  // Add a job to a queue
  workers.Enqueue("myqueue3", "Add", []int{1, 2})

  // Add a job to a queue with retry
  workers.EnqueueWithOptions("myqueue3", "Add", []int{1, 2}, workers.EnqueueOptions{Retry: true})

  // stats will be available at http://localhost:8080/stats
  go workers.StatsServer(8080)

  // Blocks until process is told to exit via unix signal
  workers.Run()
}

Initial development sponsored by Customer.io

Documentation

Index

Constants

View Source
const (
	DEFAULT_MAX_RETRY = 25
	LAYOUT            = "2006-01-02 15:04:05 MST"
)
View Source
const (
	NanoSecondPrecision = 1000000000.0
)

Variables

This section is empty.

Functions

func Configure

func Configure(cfg ConfigureOpts) (configObj *config, err error)

func Stats

func Stats(workers *Workers, w http.ResponseWriter, req *http.Request)

func StatsServer

func StatsServer(workers *Workers, port int)

Types

type Action

type Action interface {
	Call(queue string, message *Msg, next func() error) error
}

type Args

type Args struct {
	// contains filtered or unexported fields
}

func (Args) Equals

func (d Args) Equals(other interface{}) bool

func (Args) ToJson

func (d Args) ToJson() string

type ConfigureOpts

type ConfigureOpts struct {
	// RedisURL is a redis schemed URL as understood by redigo:
	// https://godoc.org/github.com/garyburd/redigo/redis#DialURL
	RedisURL string

	// ProcessID uniquely identifies this process. Used for uncoordinated reliable processing of messages.
	ProcessID string

	// MaxIdle is the maximum number of idle connections to keep in the redis connection pool.
	MaxIdle int

	// PoolSize is the maximum number of connections allowed by the redis conneciton pool.
	PoolSize int

	// PollInterval is how often we should poll for scheduled jobs.
	PollInterval int

	// Namespace is the namespace to use for redis keys.
	Namespace string

	RedisPool *redis.Pool
}

type EnqueueData

type EnqueueData struct {
	Queue      string      `json:"queue,omitempty"`
	Class      string      `json:"class"`
	Args       interface{} `json:"args"`
	Jid        string      `json:"jid"`
	EnqueuedAt float64     `json:"enqueued_at"`
	EnqueueOptions
}

type EnqueueOptions

type EnqueueOptions struct {
	RetryCount int     `json:"retry_count,omitempty"`
	Retry      bool    `json:"retry,omitempty"`
	At         float64 `json:"at,omitempty"`
}

type Fetcher

type Fetcher interface {
	Queue() string
	Fetch()
	Acknowledge(*Msg)
	Ready() chan bool
	FinishedWork() chan bool
	Messages() chan *Msg
	Close()
	Closed() bool
	InprogressQueue() string
}

func NewFetch

func NewFetch(config *config, queue string, messages chan *Msg, ready chan bool) Fetcher

type GoWorkers

type GoWorkers interface {
	Run()
	Start()
	Quit()
	WaitForExit()
	ResetManagers() error

	Process(queue string, job jobFunc, concurrency int, mids ...Action)

	Enqueue(queue, class string, args interface{}) (string, error)
	EnqueueIn(queue, class string, in float64, args interface{}) (string, error)
	EnqueueAt(queue, class string, at time.Time, args interface{}) (string, error)
	EnqueueWithOptions(queue, class string, args interface{}, opts EnqueueOptions) (string, error)

	BeforeStart(f func())
	DuringDrain(f func())

	Ping() error
	QueueStats() (queueStats *QueueStats, err error)

	Namespace() string
	NamespacedKey(keys ...string) string
	TrimKeyNamespace(key string) string
}

type MiddlewareLogging

type MiddlewareLogging struct{}

func (*MiddlewareLogging) Call

func (l *MiddlewareLogging) Call(queue string, message *Msg, next func() error) (err error)

type MiddlewareRetry

type MiddlewareRetry struct {
	// contains filtered or unexported fields
}

func (*MiddlewareRetry) Call

func (r *MiddlewareRetry) Call(queue string, message *Msg, next func() error) (err error)

type MiddlewareStats

type MiddlewareStats struct {
	// contains filtered or unexported fields
}

func (*MiddlewareStats) Call

func (l *MiddlewareStats) Call(queue string, message *Msg, next func() error) (err error)

type Middlewares

type Middlewares struct {
	// contains filtered or unexported fields
}

func NewMiddleware

func NewMiddleware(actions ...Action) *Middlewares

func (*Middlewares) Append

func (m *Middlewares) Append(action Action)

func (*Middlewares) AppendToCopy

func (m *Middlewares) AppendToCopy(mids []Action) *Middlewares

func (*Middlewares) Equals

func (m *Middlewares) Equals(other *Middlewares) bool

func (*Middlewares) Prepend

func (m *Middlewares) Prepend(action Action)

type Msg

type Msg struct {
	// contains filtered or unexported fields
}

func NewMsg

func NewMsg(content string) (*Msg, error)

func (*Msg) Args

func (m *Msg) Args() *Args

func (Msg) Equals

func (d Msg) Equals(other interface{}) bool

func (*Msg) Jid

func (m *Msg) Jid() string

func (*Msg) OriginalJson

func (m *Msg) OriginalJson() string

func (Msg) ToJson

func (d Msg) ToJson() string

type QueueDepth

type QueueDepth struct {
	Name       string
	InProgress int
	Queued     int
}

type QueueStats

type QueueStats struct {
	Queues     []*QueueDepth
	RetryDepth int
}

type Workers

type Workers struct {
	// contains filtered or unexported fields
}

func NewWorkers

func NewWorkers(config *config) *Workers

func (*Workers) BeforeStart

func (w *Workers) BeforeStart(f func())

func (*Workers) DuringDrain

func (w *Workers) DuringDrain(f func())

func (*Workers) Enqueue

func (w *Workers) Enqueue(queue, class string, args interface{}) (string, error)

func (*Workers) EnqueueAt

func (w *Workers) EnqueueAt(queue, class string, at time.Time, args interface{}) (string, error)

func (*Workers) EnqueueIn

func (w *Workers) EnqueueIn(queue, class string, in float64, args interface{}) (string, error)

func (*Workers) EnqueueWithOptions

func (w *Workers) EnqueueWithOptions(queue, class string, args interface{}, opts EnqueueOptions) (string, error)

func (*Workers) Namespace

func (w *Workers) Namespace() string

func (*Workers) NamespacedKey

func (w *Workers) NamespacedKey(keys ...string) string

func (*Workers) Ping

func (w *Workers) Ping() error

func (*Workers) Process

func (w *Workers) Process(queue string, job jobFunc, concurrency int, mids ...Action)

func (*Workers) QueueStats

func (w *Workers) QueueStats() (queueStats *QueueStats, err error)

func (*Workers) Quit

func (w *Workers) Quit()

func (*Workers) RedisPool

func (w *Workers) RedisPool() *redis.Pool

func (*Workers) ResetManagers

func (w *Workers) ResetManagers() error

func (*Workers) Run

func (w *Workers) Run()

func (*Workers) Start

func (w *Workers) Start()

func (*Workers) TrimKeyNamespace

func (w *Workers) TrimKeyNamespace(key string) string

func (*Workers) WaitForExit

func (w *Workers) WaitForExit()

type WorkersLogger

type WorkersLogger interface {
	Println(...interface{})
	Printf(string, ...interface{})
}
var Logger WorkersLogger = log.New(os.Stdout, "workers: ", log.Ldate|log.Lmicroseconds)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL