workers

package module
v0.9.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 15, 2020 License: MIT Imports: 24 Imported by: 0

README

Build Status GoDoc

Sidekiq compatible background workers in golang.

  • reliable queueing for all queues using brpoplpush
  • handles retries
  • support custom middleware
  • customize concurrency per queue
  • responds to Unix signals to safely wait for jobs to finish before exiting.
  • provides stats on what jobs are currently running
  • redis sentinel support
  • well tested

Example usage:

package main

import (
  "fmt"

  workers "github.com/matheuscscp/go-workers2"
)

func myJob(message *workers.Msg) error {
  // do something with your message
  // message.Jid()
  // message.Args() is a wrapper around go-simplejson (http://godoc.org/github.com/bitly/go-simplejson)
  return nil
}

func myMiddleware(queue string, mgr *workers.Manager, next workers.JobFunc) workers.JobFunc {
  return func(message *workers.Msg) (err error) {
    // do something before each message is processed
    err = next(message)
    // do something after each message is processed
    return
  }
}

func main() {
  // Create a manager, which manages workers
  manager, err := workers.NewManager(workers.Options{
    // location of redis instance
    ServerAddr: "localhost:6379",
    // instance of the database
    Database:   0,
    // number of connections to keep open with redis
    PoolSize:   30,
    // unique process id for this instance of workers (for proper recovery of inprogress jobs on crash)
    ProcessID:  "1",
  })

  if err != nil {
    fmt.Println(err)
  }

  // create a middleware chain with the default middlewares, and append myMiddleware
  mids := workers.DefaultMiddlewares().Append(myMiddleware)

  // pull messages from "myqueue" with concurrency of 10
  // this worker will not run myMiddleware, but will run the default middlewares
  manager.AddWorker("myqueue", 10, myJob)

  // pull messages from "myqueue2" with concurrency of 20
  // this worker will run the default middlewares and myMiddleware
  manager.AddWorker("myqueue2", 20, myJob, mids...)

  // pull messages from "myqueue3" with concurrency of 20
  // this worker will only run myMiddleware
  manager.AddWorker("myqueue3", 20, myJob, myMiddleware)

  // If you already have a manager and want to enqueue
  // to the same place:
  producer := manager.Producer()

  // Alternatively, if you want to create a producer to enqueue messages
  // producer, err := workers.NewProducer(Options{
  //   // location of redis instance
  //   ServerAddr: "localhost:6379",
  //   // instance of the database
  //   Database:   0,
  //   // number of connections to keep open with redis
  //   PoolSize:   30,
  //   // unique process id for this instance of workers (for proper recovery of inprogress jobs on crash)
  //   ProcessID:  "1",
  // })

  // Add a job to a queue
  producer.Enqueue("myqueue3", "Add", []int{1, 2})

  // Add a job to a queue with retry
  producer.EnqueueWithOptions("myqueue3", "Add", []int{1, 2}, workers.EnqueueOptions{Retry: true})

  // stats will be available at http://localhost:8080/stats
  go workers.StartStatsServer(8080)

  // Blocks until process is told to exit via unix signal
  manager.Run()
}

When running the above code example, it will produce the following output at localhost:8080/stats:

[
  {
    "manager_name": "",
    "processed": 5,
    "failed": 57,
    "jobs": {
      "myqueue": null,
      "myqueue2": null,
      "myqueue3": null
    },
    "enqueued": {
      "myqueue": 0,
      "myqueue2": 0,
      "myqueue3": 0
    },
    "retry_count": 4
  }
]

Development sponsored by DigitalOcean. Code forked from github/jrallison/go-workers. Initial development sponsored by Customer.io.

Documentation

Index

Constants

View Source
const (
	// DefaultRetryMax is default for max number of retries for a job
	DefaultRetryMax = 25

	// RetryTimeFormat is default for retry time format
	RetryTimeFormat = "2006-01-02 15:04:05 MST"
)
View Source
const (
	NanoSecondPrecision = 1000000000.0
)

Variables

View Source
var Logger = log.New(os.Stdout, "workers: ", log.Ldate|log.Lmicroseconds)

Logger is a stdout logger for workers

Functions

func StartAPIServer

func StartAPIServer(port int)

func StopAPIServer

func StopAPIServer()

Types

type Args

type Args struct {
	// contains filtered or unexported fields
}

func (Args) Equals

func (d Args) Equals(other interface{}) bool

func (Args) ToJson

func (d Args) ToJson() string

type EnqueueData

type EnqueueData struct {
	Queue      string      `json:"queue,omitempty"`
	Class      string      `json:"class"`
	Args       interface{} `json:"args"`
	Jid        string      `json:"jid"`
	EnqueuedAt float64     `json:"enqueued_at"`
	EnqueueOptions
}

type EnqueueOptions

type EnqueueOptions struct {
	RetryCount int     `json:"retry_count,omitempty"`
	Retry      bool    `json:"retry,omitempty"`
	At         float64 `json:"at,omitempty"`
}

type Fetcher

type Fetcher interface {
	Queue() string
	Fetch()
	Acknowledge(*Msg)
	Ready() chan bool
	Messages() chan *Msg
	Close()
	Closed() bool
}

Fetcher is an interface for managing work messages

type JobFunc

type JobFunc func(message *Msg) error

func LogMiddleware

func LogMiddleware(queue string, mgr *Manager, next JobFunc) JobFunc

LogMiddleware is the default logging middleware

func NopMiddleware

func NopMiddleware(queue string, mgr *Manager, final JobFunc) JobFunc

NopMiddleware does nothing

func RetryMiddleware

func RetryMiddleware(queue string, mgr *Manager, next JobFunc) JobFunc

RetryMiddleware middleware that allows retries for jobs failures

func StatsMiddleware

func StatsMiddleware(queue string, mgr *Manager, next JobFunc) JobFunc

StatsMiddleware middleware to collect stats on processed messages

type JobStatus

type JobStatus struct {
	Message   *Msg  `json:"message"`
	StartedAt int64 `json:"started_at"`
}

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

Manager coordinates work, workers, and signaling needed for job processing

func NewManager

func NewManager(options Options) (*Manager, error)

NewManager creates a new manager with provide options

func NewManagerWithRedisClient

func NewManagerWithRedisClient(options Options, client *redis.Client) (*Manager, error)

NewManagerWithRedisClient creates a new manager with provide options and pre-configured Redis client

func (*Manager) AddBeforeStartHooks

func (m *Manager) AddBeforeStartHooks(hooks ...func())

AddBeforeStartHooks adds functions to be executed before the manager starts

func (*Manager) AddDuringDrainHooks

func (m *Manager) AddDuringDrainHooks(hooks ...func())

AddDuringDrainHooks adds function to be execute during a drain operation

func (*Manager) AddWorker

func (m *Manager) AddWorker(queue string, concurrency int, job JobFunc, mids ...MiddlewareFunc)

AddWorker adds a new job processing worker

func (*Manager) GetRedisClient

func (m *Manager) GetRedisClient() *redis.Client

GetRedisClient returns the Redis client used by the manager

func (*Manager) GetRetries

func (m *Manager) GetRetries(page uint64, page_size int64, match string) (Retries, error)

GetRetries returns the set of retry jobs for the manager

func (*Manager) GetStats

func (m *Manager) GetStats() (Stats, error)

GetStats returns the set of stats for the manager

func (*Manager) Producer

func (m *Manager) Producer() *Producer

Producer creates a new work producer with configuration identical to the manager

func (*Manager) Run

func (m *Manager) Run()

Run starts all workers under this Manager and blocks until they exit.

func (*Manager) Stop

func (m *Manager) Stop()

Stop all workers under this Manager and returns immediately.

type MiddlewareFunc

type MiddlewareFunc func(queue string, m *Manager, next JobFunc) JobFunc

type Middlewares

type Middlewares []MiddlewareFunc

func DefaultMiddlewares

func DefaultMiddlewares() Middlewares

func NewMiddlewares

func NewMiddlewares(mids ...MiddlewareFunc) Middlewares

func (Middlewares) Append

func (m Middlewares) Append(mid MiddlewareFunc) Middlewares

func (Middlewares) Prepend

func (m Middlewares) Prepend(mid MiddlewareFunc) Middlewares

type Msg

type Msg struct {
	// contains filtered or unexported fields
}

func NewMsg

func NewMsg(content string) (*Msg, error)

func (*Msg) Args

func (m *Msg) Args() *Args

func (Msg) Equals

func (d Msg) Equals(other interface{}) bool

func (*Msg) Jid

func (m *Msg) Jid() string

func (*Msg) OriginalJson

func (m *Msg) OriginalJson() string

func (Msg) ToJson

func (d Msg) ToJson() string

type Options

type Options struct {
	ProcessID    string
	Namespace    string
	PollInterval time.Duration
	Database     int
	Password     string
	PoolSize     int

	// Provide one of ServerAddr or (SentinelAddrs + RedisMasterName)
	ServerAddr      string
	SentinelAddrs   string
	RedisMasterName string
	RedisTLSConfig  *tls.Config

	// Optional display name used when displaying manager stats
	ManagerDisplayName string
	// contains filtered or unexported fields
}

Options contains the set of configuration options for a manager and/or producer

type Producer

type Producer struct {
	// contains filtered or unexported fields
}

func NewProducer

func NewProducer(options Options) (*Producer, error)

func NewProducerWithRedisClient

func NewProducerWithRedisClient(options Options, client *redis.Client) (*Producer, error)

func (*Producer) Enqueue

func (p *Producer) Enqueue(queue, class string, args interface{}) (string, error)

func (*Producer) EnqueueAt

func (p *Producer) EnqueueAt(queue, class string, at time.Time, args interface{}) (string, error)

func (*Producer) EnqueueIn

func (p *Producer) EnqueueIn(queue, class string, in float64, args interface{}) (string, error)

func (*Producer) EnqueueInWithOptions added in v0.9.3

func (p *Producer) EnqueueInWithOptions(queue, class string, in float64, args interface{}, opts EnqueueOptions) (string, error)

func (*Producer) EnqueueWithOptions

func (p *Producer) EnqueueWithOptions(queue, class string, args interface{}, opts EnqueueOptions) (string, error)

func (*Producer) GetRedisClient

func (p *Producer) GetRedisClient() *redis.Client

type Retries

type Retries struct {
	TotalRetryCount int64           `json:"total_retry_count"`
	RetryJobs       []RetryJobStats `json:"retry_jobs"`
}

type RetryJobStats

type RetryJobStats struct {
	Class        string `json:"class"`
	ErrorMessage string `json:"error_message"`
	FailedAt     string `json:"failed_at"`
	JobID        string `json:"jid"`
	Queue        string `json:"queue"`
	RetryCount   int64  `json:"retry_count"`
}

type Stats

type Stats struct {
	Name       string                 `json:"manager_name"`
	Processed  int64                  `json:"processed"`
	Failed     int64                  `json:"failed"`
	Jobs       map[string][]JobStatus `json:"jobs"`
	Enqueued   map[string]int64       `json:"enqueued"`
	RetryCount int64                  `json:"retry_count"`
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL