beanstalkworker

package module
v0.0.0-...-7ad72d1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 29, 2019 License: GPL-2.0 Imports: 9 Imported by: 0

README

GoDoc

beanstalkworker

A helper library for creating beanstalkd consumer processes.

Usage

go get -u github.com/tomponline/beanstalkworker

Docs/Examples

Please see Go Docs for usage and examples:

https://godoc.org/github.com/tomponline/beanstalkworker

Aims

  • To provide a generic way for consuming beanstalkd jobs without all of the boiler plate code
  • To provide an easy way to spin up concurrent worker Go routines
  • To use Go's interfaces to make unit testing your workers easy

Details

The library is broken down into the following components:

  • JobManager interface - represents a way to handle a job's lifecycle.
  • RawJob - an implementation of JobManager for managing a Raw job's life cycle.
  • Worker - an implementation of a beanstalkd client process that consumes raw jobs from one or more tubes. It will automatically reconnect to beanstalkd server if it loses the connection.

See also

This library is a wrapper around the low-level Beanstalkd client written in Go:

https://github.com/beanstalkd/go-beanstalk

This client talks to Beanstalkd queue server:

https://beanstalkd.github.io/

Documentation

Overview

Example (Worker)
package main

import "github.com/tomponline/beanstalkworker"
import "context"
import "os"
import "os/signal"
import "syscall"
import "log"
import "fmt"
import "time"

func main() {
	//Setup context for cancelling beanstalk worker.
	ctx, cancel := context.WithCancel(context.Background())

	//Start up signal handler that will cleanly shutdown beanstalk worker.
	go signalHandler(cancel)

	//Define a new worker process - how to connect to the beanstalkd server.
	bsWorker := beanstalkworker.NewWorker("127.0.0.1:11300")

	//Optional custom logger - see below.
	bsWorker.SetLogger(&MyLogger{})

	//Set concurrent worker threads to 2.
	bsWorker.SetNumWorkers(2)

	//Job is deleted from the queue if unmarshal error appears. We can
	//decide to bury or release (default behaviour) it as well.
	bsWorker.SetUnmarshalErrorAction(beanstalkworker.ActionDeleteJob)

	//Define a common value (example a shared database connection)
	commonVar := "some common value"

	//Add one or more subcriptions to specific tubes with a handler function.
	bsWorker.Subscribe("job1", func(jobMgr beanstalkworker.JobManager, jobData Job1Data) {
		//Create a fresh handler struct per job (this ensures fresh state for each job).
		handler := &Job1Handler{
			JobManager: jobMgr,    //Embed the JobManager into the handler.
			commonVar:  commonVar, //Pass the commonVar into the handler.
		}

		handler.Run(jobData)
	})

	//Run the beanstalk worker, this blocks until the context is cancelled.
	//It will also handle reconnecting to beanstalkd server automatically.
	bsWorker.Run(ctx)
}

// signalHandler catches OS signals for program to end.
func signalHandler(cancel context.CancelFunc) {
	sigc := make(chan os.Signal, 1)
	signal.Notify(sigc, syscall.SIGHUP, syscall.SIGTERM, syscall.SIGINT)
	for {
		<-sigc
		log.Print("Got signal, cancelling context")
		cancel()
	}
}

//Custom Logging Example

// MyLogger provides custom logging.
type MyLogger struct {
}

// Info logs a custom info message regarding the job.
func (l *MyLogger) Info(v ...interface{}) {
	log.Print("MyInfo: ", fmt.Sprint(v...))
}

// Infof logs a custom info message regarding the job.
func (l *MyLogger) Infof(format string, v ...interface{}) {
	format = "MyInfof: " + format
	log.Print(fmt.Sprintf(format, v...))
}

// Error logs a custom error message regarding the job.
func (l *MyLogger) Error(v ...interface{}) {
	log.Print("MyError: ", fmt.Sprint(v...))
}

// Errorf logs a custom error message regarding the job.
func (l *MyLogger) Errorf(format string, v ...interface{}) {
	format = "MyErrorf: " + format
	log.Print(fmt.Sprintf(format, v...))
}

//Job Handler

// Job1Handler contains the business logic to handle the Job1 type jobs.
type Job1Handler struct {
	beanstalkworker.JobManager
	commonVar string
}

// Job1Data is a struct that represents the Job1 data that arrives from the queue.
type Job1Data struct {
	SomeField      string `json:"someField"`
	SomeOtherField int    `json:"someOtherField"`
}

// LogError example of overriding a function provided in beanstalkworker.JobManager
// and calling the underlying function in order to add context.
func (handler *Job1Handler) LogError(a ...interface{}) {
	handler.JobManager.LogError("Job1 error: ", fmt.Sprint(a...))
}

// Run is executed by the beanstalk worker when a Job1 type job is received.
func (handler *Job1Handler) Run(jobData Job1Data) {
	handler.LogInfo("Starting job with commonVar value: ", handler.commonVar)
	handler.LogInfo("Job Data received: ", jobData)
	handler.LogInfo("Job Priority: ", handler.GetPriority())
	handler.LogInfo("Job Releases: ", handler.GetReleases())
	handler.LogInfo("Job Reserves: ", handler.GetReserves())
	handler.LogInfo("Job Age: ", handler.GetAge())
	handler.LogInfo("Job Delay: ", handler.GetDelay())
	handler.LogInfo("Job Timeouts: ", handler.GetTimeouts())
	handler.LogInfo("Job Tube: ", handler.GetTube())
	// Retrieve the server's hostname where the job is running
	conn := handler.GetConn()
	stats, err := conn.Stats()
	if err != nil {
		handler.Release()
		return
	}
	handler.LogInfo("Hostname: ", stats["hostname"])

	//Simulate job processing time
	time.Sleep(2 * time.Second)

	if handler.GetTimeouts() == 0 {
		handler.LogInfo("Simulating a timeout by not releasing/deleting job")
		return
	}

	if handler.GetReserves() == 2 {
		handler.LogInfo("Release without setting custom delay or priority")
		handler.Release()
		return
	}

	handler.SetReturnDelay(5 * time.Second) //Optional return delay (defaults to current delay)
	handler.SetReturnPriority(5)            //Optional return priority (defaults to current priority)

	if handler.GetReleases() >= 3 {
		handler.Delete()
		handler.LogError("Deleting job as too many releases")
		return
	}

	handler.LogInfo("Releasing job to be retried...")
	handler.Release() //Pretend job process failed and needs retrying
}
Output:

Index

Examples

Constants

View Source
const (
	ActionDeleteJob  = "delete"
	ActionBuryJob    = "bury"
	ActionReleaseJob = "release"
)

Actions the user can choose in case of an unmarshal error.

Variables

This section is empty.

Functions

This section is empty.

Types

type CustomLogger

type CustomLogger interface {
	Info(v ...interface{})
	Infof(format string, args ...interface{})
	Error(v ...interface{})
	Errorf(format string, args ...interface{})
}

CustomLogger provides support for the creation of custom logging.

type Handler

type Handler interface{}

Handler provides an interface type for callback functions.

type JobManager

type JobManager interface {
	Delete()
	Touch()
	Release()
	LogError(a ...interface{})
	LogInfo(a ...interface{})
	GetAge() time.Duration
	GetPriority() uint32
	GetReleases() uint32
	GetReserves() uint32
	GetTimeouts() uint32
	GetDelay() time.Duration
	GetTube() string
	GetConn() *beanstalk.Conn
	SetReturnPriority(prio uint32)
	SetReturnDelay(delay time.Duration)
}

JobManager interface represents a way to handle a job's lifecycle.

type Logger

type Logger struct {
	Info   func(v ...interface{})
	Infof  func(format string, v ...interface{})
	Error  func(v ...interface{})
	Errorf func(format string, v ...interface{})
}

Logger provides support for standard logging.

func NewDefaultLogger

func NewDefaultLogger() *Logger

NewDefaultLogger creates a new Logger initialised to use the global log package.

type RawJob

type RawJob struct {
	// contains filtered or unexported fields
}

RawJob represents the raw job data that is returned by beanstalkd.

func NewEmptyJob

func NewEmptyJob(cl CustomLogger) *RawJob

NewEmptyJob initialises a new empty RawJob with a custom logger. Useful for testing methods that log messages on the job.

func (*RawJob) Bury

func (job *RawJob) Bury()

Bury function buries the job from the queue.

func (*RawJob) Delete

func (job *RawJob) Delete()

Delete function deletes the job from the queue.

func (*RawJob) GetAge

func (job *RawJob) GetAge() time.Duration

GetAge gets the age of the job from the job stats.

func (*RawJob) GetConn

func (job *RawJob) GetConn() *beanstalk.Conn

GetConn returns the beanstalk connection used to receive the job.

func (*RawJob) GetDelay

func (job *RawJob) GetDelay() time.Duration

GetDelay gets the delay of the job from the job stats.

func (*RawJob) GetPriority

func (job *RawJob) GetPriority() uint32

GetPriority gets the priority of the job.

func (*RawJob) GetReleases

func (job *RawJob) GetReleases() uint32

GetReleases gets the count of release of the job.

func (*RawJob) GetReserves

func (job *RawJob) GetReserves() uint32

GetReserves gets the count of reserves of the job.

func (*RawJob) GetTimeouts

func (job *RawJob) GetTimeouts() uint32

GetTimeouts gets the count of timeouts of the job.

func (*RawJob) GetTube

func (job *RawJob) GetTube() string

GetTube returns the tube name we got this job from.

func (*RawJob) LogError

func (job *RawJob) LogError(a ...interface{})

LogError function logs an error message regarding the job.

func (*RawJob) LogInfo

func (job *RawJob) LogInfo(a ...interface{})

LogInfo function logs an info message regarding the job.

func (*RawJob) Release

func (job *RawJob) Release()

Release function releases the job from the queue.

func (*RawJob) SetReturnDelay

func (job *RawJob) SetReturnDelay(delay time.Duration)

SetReturnDelay sets the return delay to use if a job is released back to queue.

func (*RawJob) SetReturnPriority

func (job *RawJob) SetReturnPriority(prio uint32)

SetReturnPriority sets the return priority to use if a job is released or buried.

func (*RawJob) Touch

func (job *RawJob) Touch()

Touch function touches the job from the queue.

type Worker

type Worker struct {
	// contains filtered or unexported fields
}

Worker represents a single process that is connecting to beanstalkd and is consuming jobs from one or more tubes.

func NewWorker

func NewWorker(addr string) *Worker

NewWorker creates a new worker process, but does not actually connect to beanstalkd server yet.

func (*Worker) Run

func (w *Worker) Run(ctx context.Context)

Run starts one or more worker threads based on the numWorkers value. If numWorkers is set to zero or less then 1 worker is started.

func (*Worker) SetLogger

func (w *Worker) SetLogger(cl CustomLogger)

SetLogger switches logging to use a custom Logger.

func (*Worker) SetNumWorkers

func (w *Worker) SetNumWorkers(numWorkers int)

SetNumWorkers sets the number of concurrent workers threads that should be started. Each thread establishes a separate connection to the beanstalkd server.

func (*Worker) SetUnmarshalErrorAction

func (w *Worker) SetUnmarshalErrorAction(action string)

SetUnmarshalErrorAction defines what to do if there is an unmarshal error.

func (*Worker) Subscribe

func (w *Worker) Subscribe(tube string, cb Handler)

Subscribe adds a handler function to be run for jobs coming from a particular tube.

Directories

Path Synopsis
examples

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL