boltworker

package module
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 8, 2021 License: MIT Imports: 13 Imported by: 0

README

boltWorker

A Simple boltDB based Worker which can be used with gobuffalo's background worker interface.

Suitable for managing background tasks for single standalone binaries.

Works with grift tasks of your buffalo app

w := actions.App().Worker
w.Perform(worker.Job{
	Queue:   "default",
	Handler: "send_mail",
	Args: worker.Args{
		"user_id":  123,
	},
})

The task will be performed by the background workers spawned by your buffalo App server

Setup

Default boltWorker can be configured by providing a DB filepath

import "github.com/astutic/boltWorker"

// ...
buffalo.New(buffalo.Options {
	// ...
	Worker: boltworker.NewBoltWorker(boltworker.Options{
		FilePath:       "worker.db",
		MaxConcurrency: 10,
	}),
	// ...
})

boltWorker Options

The following options can be configured for boltWorker

FilePath string
FilePath is where the boltDB will be created/accessed. Must Have parameter.
BoltOptions bolt.Options
Boltoptions are options which can be passed for boltDB
Logger Logger
Logger if passed will be used for logging, else a logrus TextFormatter with InfoLevel will be used
Name string
Name is for future use if any. Default value is 'buffalo'
MaxConcurrency int
MaxConcurrency determines how many workers will be spawned. Default is 10.
CompletedBucket string
Name of the bucket where completed jobs will be saved. Default 'completed'.
PendingBucket string
Name of the bucket where jobs which are incomplete will be saved. Default 'pending'.
FailedBucket string
Name of the bucket where jobs which fail will be saved. Default 'failed'.
DBSyncInterval string
A time.Duration string used as in interval to sync boltDB to read jobs from the DB. Default '30s'. 
MaxRetryAttempts int
Number of attempts for jobs which return RetryJobError after which the job will be declared failed. Default 10.
IdleSleepTime string
A time.Duration string which determines the sleep time when there are no jobs in the Queue. Default '5s'
JobNameHandler JobNameGenerator
A function of type ***func(worker.Job) string** which will be called to set the key of the job.
Default is UUID version 4 generator

type JobNameGenerator func(worker.Job) string

func DefaultJobNameGenerator(job worker.Job) string {
	return uuid.Must(uuid.NewV4()).String()
}

An example

import "github.com/gobuffalo/uuid"

jn := func(job worker.Job) string {
	return job.Args.String()
}

app = buffalo.New(buffalo.Options{
	// ...
	Worker: boltworker.NewBoltWorker(boltworker.Options{
		FilePath:       "worker.db",
		MaxConcurrency: 5,
		JobNameHandler: jn,
	}),
	//...
})

Disclaimer

  • Not developed for performance.
  • Not optimized for memory, Holds the entire DB in memory.
  • Not suitable for large and/or distributed workloads.
  • No Live updates to the boltDB about the working status, the DB is only updated after the job is performed.

Documentation

Index

Constants

View Source
const (
	//JobPending denotes the job is pending
	JobPending jobStatusCode = 1 << iota
	//JobInProcess denotes the job is being worked by a worker
	JobInProcess
	//JobReAttempt denotes the job needs a retry
	JobReAttempt
	//JobFailed denotes the job has failed and no need to reattempt
	JobFailed
	//JobDone denotes the job completed successfully
	JobDone
)

Variables

This section is empty.

Functions

func DefaultJobNameGenerator

func DefaultJobNameGenerator(job worker.Job) string

DefaultJobNameGenerator is the default job name generator which assigns a uuid version 4 id to the job

func NewBoltDB

func NewBoltDB(bo *Options) *boltDB

NewBoltDB creates a new instance for boltDB used for boltWorker

Types

type BoltWorker

type BoltWorker struct {
	Logger Logger

	DB                  *boltDB
	DBSyncInterval      time.Duration
	IdleSleepTime       time.Duration
	BusyWorkerSleepTime time.Duration
	Concurrency         int

	RetryAttempts int
	// contains filtered or unexported fields
}

BoltWorker is a basic implementation of buffalo worker interface which persists job data in boltDB

func NewBoltWorker

func NewBoltWorker(opts Options) *BoltWorker

NewBoltWorker creates a buffalo worker interface implementation which persists data in boltDB defined in the opts

func NewBoltWorkerWithContext

func NewBoltWorkerWithContext(ctx context.Context, opts Options) *BoltWorker

NewBoltWorkerWithContext creates a buffalo worker interface implementation which persists data in boltDB defined in opts

func (*BoltWorker) LoadPendingJobs

func (bw *BoltWorker) LoadPendingJobs()

LoadPendingJobs loads all the pending jobs from the boltDB to the Job Queue

func (*BoltWorker) Perform

func (bw *BoltWorker) Perform(job worker.Job) error

Perform a job, the job is first saved to boltDB and then performed

func (*BoltWorker) PerformAt

func (bw *BoltWorker) PerformAt(job worker.Job, t time.Time) error

PerformAt perfirms a job at a particular time, the job is first saved to boltDB

func (*BoltWorker) PerformIn

func (bw *BoltWorker) PerformIn(job worker.Job, d time.Duration) error

PerformIn performs a job after waiting for a specified time, the job is first saved to boltDB

func (*BoltWorker) Register

func (bw *BoltWorker) Register(name string, h worker.Handler) error

Register a work handler with the name of the work

func (*BoltWorker) SpawnWorkers

func (bw *BoltWorker) SpawnWorkers()

SpawnWorkers creates concurrent worker goroutines based on the MaxConcurrency option provided

func (*BoltWorker) Start

func (bw *BoltWorker) Start(ctx context.Context) error

Start boltWorker

func (*BoltWorker) Stop

func (bw *BoltWorker) Stop() error

Stop boltWorker

func (*BoltWorker) SyncWithDB

func (bw *BoltWorker) SyncWithDB() error

SyncWithDB syncs the jobQueue with DB, not threadsafe and needs to be called within a mutex Lock No other concurrent operations with the jobQueue should take place.

type JobNameGenerator

type JobNameGenerator func(worker.Job) string

JobNameGenerator function that will be run to determine the key for the job which will be saved in boltDB

type Logger

type Logger interface {
	Debugf(string, ...interface{})
	Infof(string, ...interface{})
	Errorf(string, ...interface{})
	Debug(...interface{})
	Info(...interface{})
	Error(...interface{})
}

Logger is used by the worker to write logs

type Options

type Options struct {
	BoltOptions      bolt.Options
	Logger           Logger
	Name             string
	MaxConcurrency   int
	FilePath         string
	CompletedBucket  string
	PendingBucket    string
	FailedBucket     string
	DBSyncInterval   string
	MaxRetryAttempts int
	IdleSleepTime    string
	JobNameHandler   JobNameGenerator
}

Options are used to configure boltWorker

type RetryJobError

type RetryJobError struct {
	RetryIN time.Duration
	// contains filtered or unexported fields
}

RetryJobError denotes jobs which temporarily failed and should be retried

func NewRetryJobError

func NewRetryJobError(msg string) RetryJobError

NewRetryJobError returns an instance of RetryJobError, this denotes a temporary failure and the job will be retried

func (RetryJobError) Error

func (e RetryJobError) Error() string

Error returns the error as a string

func (*RetryJobError) SetRetryTime

func (e *RetryJobError) SetRetryTime(t time.Duration)

SetRetryTime sets the retry time in time.Duration

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL