workerpool

package module
v0.4.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 22, 2023 License: MIT Imports: 10 Imported by: 0

README

Workerpool

GoDoc Go Report Card License

Asynchronous in-memory workers for Go based projects.

It is based on workerpool, a lightweight workers solution, and tomb that handles clean goroutine tracking and termination.

This project aimed to provide the following points:

  • Dynamic concurrency (expand and shrink the number of workers)
  • Trackable background jobs
    • Cancelable jobs
    • Jobs' status

Usage

go get github.com/mdouchement/workerpool
package main

import (
	"fmt"

	"github.com/mdouchement/workerpool"
)

func main() {
	// It is not mandatory to implement all *Func.
	job := &workerpool.Job{
		// Triggered when the job's status change
		OnStatusChangeFunc: func(j *workerpool.Job) error {
			fmt.Println("Status changed to:", j.Status())
			return nil
		},

		// Before starting job
		BeforeFunc: func(j *workerpool.Job) error {
			fmt.Println("Before starting job")
			return nil
		},

		// Task to perform
		ActionFunc: func(j *workerpool.Job) error {
			fmt.Println("Starting job")
			return nil
		},

		// When job is completed
		AfterFunc: func(j *workerpool.Job) error {
			fmt.Println("After job")
			return nil
		},

		// Triggered when workerpool.Cancel("job_id")
		// You can use j.Context() to forward cancellation signal.
		CancelFunc: func(j *workerpool.Job) error {
			fmt.Println("Execute cleanup function")
			return nil
		},

		// Triggered when an error or panic occurred.
		ErrHandler: func(j *workerpool.Job, err error, panic bool) {
			// Do something with the error (and j.Context()).
		},
	}

	id := workerpool.Send(job)
	fmt.Println("Job id:", id)
}

License

MIT

Contributing

All PRs are welcome.

  1. Fork it
  2. Create your feature branch (git checkout -b my-new-feature)
  3. Commit your changes (git commit -am 'Add some feature')
  4. Push to the branch (git push origin my-new-feature)
  5. Create new Pull Request

As possible, run the following commands to format and lint the code:

# Format
find . -name '*.go' -not -path './vendor*' -exec gofmt -s -w {} \;

# Lint
gometalinter --config=gometalinter.json ./...

Documentation

Index

Constants

View Source
const (
	// PENDING state when job is queued.
	PENDING = "pending"
	// RUNNING state when job is currently executed.
	RUNNING = "running"
	// COMPLETED state when job is terminated whithout error.
	COMPLETED = "completed"
	// FAILED state when job is terminated with error.
	FAILED = "failed"
	// CANCELLED state when job is cancelled by the user.
	CANCELLED = "cancelled"
)

Variables

View Source
var (
	// ErrActionNotDefined is returned when there is no ActionFunc declared in the job.
	ErrActionNotDefined = errors.New("Action function is not defined")
	// EmptyAction defines a no-op action for job's func.
	EmptyAction = func(j *Job) error {
		return nil
	}
)

Functions

func CancelJob

func CancelJob(id string)

CancelJob stops the job for the given id in an asynchronous routine.

func GetJobStatus

func GetJobStatus(id string) string

GetJobStatus returns the job's status for the given id.

func GetJobsMetrics

func GetJobsMetrics() map[string]interface{}

GetJobsMetrics returns the metrics about the workerpool.

func GetPoolSize

func GetPoolSize() int

GetPoolSize returns the number of running workers.

func RecordJobsMetrics added in v0.4.0

func RecordJobsMetrics(enabled bool)

RecordJobsMetrics records or not the metrics about the workerpool.

func Send

func Send(job *Job) string

Send enqueues the given job and returns its ID.

func SetLogger

func SetLogger(l Logger)

SetLogger defines the workerpool logger.

func SetPoolSize

func SetPoolSize(n int)

SetPoolSize defines the number of wanted workers. n is absolute so the pool can be expanded or shrunk according to n.

func Shutdown

func Shutdown()

Shutdown waits job completion and shrink the pool to 0.

Types

type Job

type Job struct {
	OnStatusChangeFunc JobAction
	BeforeFunc         JobAction
	ActionFunc         JobAction
	AfterFunc          JobAction
	CancelFunc         JobAction
	// ErrHandler is executed when an error or a panic occur.
	ErrHandler func(j *Job, err error, panic bool)
	// contains filtered or unexported fields
}

A Job performs actions.

func GetJob

func GetJob(id string) *Job

GetJob returns the job for the given id.

func (*Job) Cancel

func (j *Job) Cancel()

Cancel stops the job execution.

func (*Job) Context

func (j *Job) Context() context.Context

Context returns a new context for the job.

func (*Job) Error

func (j *Job) Error() error

Error returns the job's error if exists.

func (*Job) ID

func (j *Job) ID() string

ID returns the job's identifier.

func (*Job) Init

func (j *Job) Init(log Logger)

Init initializes the job. It should be only called by the worker.

func (*Job) Run

func (j *Job) Run()

Run starts the job.

func (*Job) SetContext

func (j *Job) SetContext(ctx context.Context)

SetContext sets the given context to the job. It panics if the job is queued.

func (*Job) Status

func (j *Job) Status() string

Status returns the job's status.

type JobAction

type JobAction func(j *Job) error

A JobAction defines the format of the function holded by a job.

type Logger

type Logger interface {
	Print(...interface{})
	Printf(string, ...interface{})
	Println(...interface{})

	Fatal(...interface{})
	Fatalf(string, ...interface{})
	Fatalln(...interface{})

	Panic(...interface{})
	Panicf(string, ...interface{})
	Panicln(...interface{})
}

A Logger defines all function needed by the workerpool's logger.

type Workerpool

type Workerpool struct {
	// contains filtered or unexported fields
}

A Workerpool manages asynchronous jobs.

func New

func New(workers, queueSize int) *Workerpool

New instanciates a new Workerpool.

func NewDefault

func NewDefault() *Workerpool

NewDefault instanciates a new Workerpool with a queue size of 10k elements.

func (*Workerpool) CancelJob

func (w *Workerpool) CancelJob(id string)

CancelJob stops the job for the given id in an asynchronous routine.

func (*Workerpool) GetJob

func (w *Workerpool) GetJob(id string) *Job

GetJob returns the job for the given id.

func (*Workerpool) GetJobStatus

func (w *Workerpool) GetJobStatus(id string) string

GetJobStatus returns the job's status for the given id.

func (*Workerpool) GetJobsMetrics

func (w *Workerpool) GetJobsMetrics() map[string]interface{}

GetJobsMetrics returns the metrics about the workerpool.

func (*Workerpool) GetPoolSize

func (w *Workerpool) GetPoolSize() int

GetPoolSize returns the number of running workers.

func (*Workerpool) RecordJobsMetrics added in v0.4.0

func (w *Workerpool) RecordJobsMetrics(enabled bool)

RecordJobsMetrics records or not the metrics about the workerpool.

func (*Workerpool) Send

func (w *Workerpool) Send(job *Job) string

Send enqueues the given job and returns its ID.

func (*Workerpool) SetLogger

func (w *Workerpool) SetLogger(l Logger)

SetLogger defines the workerpool logger.

func (*Workerpool) SetPoolSize

func (w *Workerpool) SetPoolSize(n int)

SetPoolSize defines the number of wanted workers. n is absolute so the pool can be expanded or shrunk according to n.

func (*Workerpool) Shutdown

func (w *Workerpool) Shutdown()

Shutdown waits job completion and shrink the pool to 0.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL