workers

package module
v1.2.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 1, 2024 License: MIT Imports: 21 Imported by: 23

README

Build Status GoDoc

Sidekiq compatible background workers in golang.

  • reliable queueing for all queues using brpoplpush
  • handles retries
  • support custom middleware
  • customize concurrency per queue
  • responds to Unix signals to safely wait for jobs to finish before exiting.
  • provides stats on what jobs are currently running
  • well tested

Example usage:

package main

import (
	"github.com/topfreegames/go-workers"
	workers "go-workers"
)

func myJob(message *workers.Msg) {
	// do something with your message
	// message.Jid()
	// message.Args() is a wrapper around go-simplejson (http://godoc.org/github.com/bitly/go-simplejson)
}

type myMiddleware struct{}

func (r *myMiddleware) Call(queue string, message *workers.Msg, next func() bool) (acknowledge bool) {
	// do something before each message is processed
	acknowledge = next()
	// do something after each message is processed
	return
}

func main() {
	workers.Configure(workers.Options{
		// location of redis instance
		Address: "localhost:6379",
		// instance of the database
		Database: "0",
		// number of connections to keep open with redis
		PoolSize: "30",
		// unique process id for this instance of workers (for proper recovery of inprogress jobs on crash)
		ProcessID: "1",
	})

	workers.Middleware.Append(&myMiddleware{})

	// pull messages from "myqueue" with concurrency of 10
	workers.Process("myqueue", myJob, 10)

	// pull messages from "myqueue2" with concurrency of 20
	workers.Process("myqueue2", myJob, 20)

	// Add a job to a queue
	workers.Enqueue("myqueue3", "Add", []int{1, 2})

	// Add a job to a queue with retry
	workers.EnqueueWithOptions("myqueue3", "Add", []int{1, 2}, workers.EnqueueOptions{Retry: true})

	// Add a job to a queue in a different redis instance
	workers.EnqueueWithOptions("myqueue4", "Add", []int{1, 2},
		workers.EnqueueOptions{
			Retry: true,
			ConnectionOptions: workers.Options{
				Address:  "localhost:6378",
				Database: "my-database",
				PoolSize: 10,
				Password: "pass",
			},
		},
	)

	// stats will be available at http://localhost:8080/stats
	go workers.StatsServer(8080)

	// Blocks until process is told to exit via unix signal
	workers.Run()
}

Initial development sponsored by Customer.io

Documentation

Index

Constants

View Source
const (
	DEFAULT_MAX_RETRY = 25
	LAYOUT            = "2006-01-02 15:04:05 MST"
)
View Source
const (
	RETRY_KEY          = "goretry"
	SCHEDULED_JOBS_KEY = "schedule"
)
View Source
const (
	NanoSecondPrecision = 1000000000.0
)

Variables

View Source
var Logger = initLogger()

Logger is the default logger

Functions

func BeforeStart

func BeforeStart(f func())

func Configure

func Configure(options Options)

func DuringDrain

func DuringDrain(f func())

func Enqueue

func Enqueue(queue, class string, args interface{}) (string, error)

func EnqueueAt

func EnqueueAt(queue, class string, at time.Time, args interface{}) (string, error)

func EnqueueIn

func EnqueueIn(queue, class string, in float64, args interface{}) (string, error)

func EnqueueWithOptions

func EnqueueWithOptions(queue, class string, args interface{}, opts EnqueueOptions) (string, error)

func GetConnectionPool

func GetConnectionPool(options Options) *redis.Pool

func Process

func Process(queue string, job jobFunc, concurrency int, mids ...Action)

func Quit

func Quit()

func ResetManagers

func ResetManagers() error

func Run

func Run()

func SetLogger added in v1.0.0

func SetLogger(l WorkersLogger)

SetLogger rewrites the default logger

func Start

func Start()

func Stats

func Stats(w http.ResponseWriter, req *http.Request)

Stats writes stats on response writer

func StatsServer

func StatsServer(port int)

Types

type Action

type Action interface {
	Call(queue string, message *Msg, next func() bool) bool
}

type Args

type Args struct {
	// contains filtered or unexported fields
}

func (Args) Equals

func (d Args) Equals(other interface{}) bool

func (Args) ToJson

func (d Args) ToJson() string

type EnqueueData

type EnqueueData struct {
	Queue      string      `json:"queue,omitempty"`
	Class      string      `json:"class"`
	Args       interface{} `json:"args"`
	Jid        string      `json:"jid"`
	EnqueuedAt float64     `json:"enqueued_at"`
	EnqueueOptions
}

type EnqueueOptions

type EnqueueOptions struct {
	RetryCount        int          `json:"retry_count,omitempty"`
	Retry             bool         `json:"retry,omitempty"`
	RetryMax          int          `json:"retry_max,omitempty"`
	At                float64      `json:"at,omitempty"`
	RetryOptions      RetryOptions `json:"retry_options,omitempty"`
	ConnectionOptions Options      `json:"connection_options,omitempty"`
}

type Fetcher

type Fetcher interface {
	Queue() string
	Fetch()
	Acknowledge(*Msg)
	Ready() chan bool
	FinishedWork() chan bool
	Messages() chan *Msg
	Close()
	Closed() bool
}

func NewFetch

func NewFetch(queue string, messages chan *Msg, ready chan bool) Fetcher

type MiddlewareLogging

type MiddlewareLogging struct{}

func (*MiddlewareLogging) Call

func (l *MiddlewareLogging) Call(queue string, message *Msg, next func() bool) (acknowledge bool)

type MiddlewareRetry

type MiddlewareRetry struct{}

func (*MiddlewareRetry) Call

func (r *MiddlewareRetry) Call(queue string, message *Msg, next func() bool) (acknowledge bool)

type MiddlewareStats

type MiddlewareStats struct{}

func (*MiddlewareStats) Call

func (l *MiddlewareStats) Call(queue string, message *Msg, next func() bool) (acknowledge bool)

type Middlewares

type Middlewares struct {
	// contains filtered or unexported fields
}

func NewMiddleware

func NewMiddleware(actions ...Action) *Middlewares

func (*Middlewares) Append

func (m *Middlewares) Append(action Action)

func (*Middlewares) Prepend

func (m *Middlewares) Prepend(action Action)

type Msg

type Msg struct {
	// contains filtered or unexported fields
}

func NewMsg

func NewMsg(content string) (*Msg, error)

func (*Msg) Args

func (m *Msg) Args() *Args

func (Msg) Equals

func (d Msg) Equals(other interface{}) bool

func (*Msg) Jid

func (m *Msg) Jid() string

func (*Msg) OriginalJson

func (m *Msg) OriginalJson() string

func (Msg) ToJson

func (d Msg) ToJson() string

type Options added in v1.2.1

type Options struct {
	Address      string
	Password     string
	Database     string
	ProcessID    string
	Namespace    string
	PoolSize     int
	PoolInterval int
	DialOptions  []redis.DialOption
}

type RetryOptions

type RetryOptions struct {
	Exp      int `json:"exp"`
	MinDelay int `json:"min_delay"`
	MaxDelay int `json:"max_delay"`
	MaxRand  int `json:"max_rand"`
}

type WorkerConfig added in v1.2.1

type WorkerConfig struct {
	Namespace    string
	PoolInterval int
	Pool         *redis.Pool
	Fetch        func(queue string) Fetcher
	// contains filtered or unexported fields
}
var Config *WorkerConfig

type WorkerStats

type WorkerStats struct {
	Processed int               `json:"processed"`
	Failed    int               `json:"failed"`
	Enqueued  map[string]string `json:"enqueued"`
	Retries   int64             `json:"retries"`
}

WorkerStats holds workers stats

func GetStats

func GetStats() *WorkerStats

GetStats returns workers stats

type WorkersLogger

type WorkersLogger interface {
	Fatal(format ...interface{})
	Fatalf(format string, args ...interface{})
	Fatalln(args ...interface{})

	Debug(args ...interface{})
	Debugf(format string, args ...interface{})
	Debugln(args ...interface{})

	Error(args ...interface{})
	Errorf(format string, args ...interface{})
	Errorln(args ...interface{})

	Info(args ...interface{})
	Infof(format string, args ...interface{})
	Infoln(args ...interface{})

	Warn(args ...interface{})
	Warnf(format string, args ...interface{})
	Warnln(args ...interface{})
}

WorkersLogger represents the log interface

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL