workers

package module
v0.0.0-...-f9540ad Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 16, 2013 License: MIT Imports: 16 Imported by: 0

README

Build Status

Sidekiq compatible background workers in golang.

  • reliable queueing for all queues using brpoplpush
  • handles retries
  • support custom middleware
  • customize concurrency per queue
  • responds to Unix signals to safely wait for jobs to finish before exiting.
  • provides stats on what jobs are currently running
  • well tested

Example usage:

package main

import (
	"github.com/jrallison/go-workers"
)

func myJob(args *workers.Args) {
  // do something with your message
  // args is a wrapper around go-simplejson (http://godoc.org/github.com/bitly/go-simplejson)
}

type myMiddleware struct{}

func (r *myMiddleware) Call(queue string, message *workers.Msg, next func()) {
  // do something before each message is processed
  next()
  // do something after each message is processed
}

func main() {
  workers.Configure(map[string]string{
    // location of redis instance
    "server":  "localhost:6379",
    // number of connections to keep open with redis
    "pool":    "30",
    // unique process id for this instance of workers (for proper recovery of inprogress jobs on crash)
    "process": "1",
  })
  
  workers.Middleware.Append(&myMiddleware{})

  // pull messages from "myqueue" with concurrency of 10
  workers.Process("myqueue", myJob, 10)
  
  // pull messages from "myqueue2" with concurrency of 20
  workers.Process("myqueue2", myJob, 20)

  // Add a job to a queue
  workers.Enqueue("myqueue3", "Add", []int{1, 2})

  // stats will be available at http://localhost:8080/stats
  go workers.StatsServer(8080)

  // Blocks until process is told to exit via unix signal
  workers.Run()
}

Initial development sponsored by Customer.io

Documentation

Index

Constants

View Source
const (
	DEFAULT_MAX_RETRY = 25
	LAYOUT            = "2006-01-02 15:04:05 MST"
)
View Source
const (
	POLL_INTERVAL = 15
)
View Source
const (
	RETRY_KEY = "goretry"
)

Variables

View Source
var Config *config
View Source
var Logger = log.New(os.Stdout, "workers: ", log.Ldate|log.Lmicroseconds)
View Source
var Middleware = newMiddleware(
	&MiddlewareLogging{},
	&MiddlewareRetry{},
	&MiddlewareStats{},
)

Functions

func Configure

func Configure(options map[string]string)

func Enqueue

func Enqueue(queue, class string, args interface{}) error

func Process

func Process(queue string, job jobFunc, concurrency int)

func Quit

func Quit()

func Run

func Run()

func Stats

func Stats(w http.ResponseWriter, req *http.Request)

func StatsServer

func StatsServer(port int)

Types

type Action

type Action interface {
	Call(queue string, message *Msg, next func())
}

type Args

type Args struct {
	// contains filtered or unexported fields
}

func (Args) Equals

func (d Args) Equals(other interface{}) bool

func (Args) ToJson

func (d Args) ToJson() string

type EnqueueData

type EnqueueData struct {
	Class string      `json:"class"`
	Args  interface{} `json:"args"`
}

type Fetcher

type Fetcher interface {
	Fetch()
	Acknowledge(*Msg)
	Messages() chan *Msg
	Close()
	Closed() bool
}

type MiddlewareLogging

type MiddlewareLogging struct{}

func (*MiddlewareLogging) Call

func (l *MiddlewareLogging) Call(queue string, message *Msg, next func())

type MiddlewareRetry

type MiddlewareRetry struct{}

func (*MiddlewareRetry) Call

func (r *MiddlewareRetry) Call(queue string, message *Msg, next func())

type MiddlewareStats

type MiddlewareStats struct{}

func (*MiddlewareStats) Call

func (l *MiddlewareStats) Call(queue string, message *Msg, next func())

type Msg

type Msg struct {
	// contains filtered or unexported fields
}

func NewMsg

func NewMsg(content string) (*Msg, error)

func (*Msg) Args

func (m *Msg) Args() *Args

func (Msg) Equals

func (d Msg) Equals(other interface{}) bool

func (*Msg) Jid

func (m *Msg) Jid() string

func (*Msg) OriginalJson

func (m *Msg) OriginalJson() string

func (Msg) ToJson

func (d Msg) ToJson() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL