proletarian

package module
v0.0.0-...-3206d3c Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 16, 2022 License: MIT Imports: 4 Imported by: 0

README

proletarian GoDoc GitHub go.mod Go version of a Go module Go Report Card

Worker pool with retries and gracefull shutdown for Go

Installation

Using go get

go get github.com/eliastor/proletarian

Usage


type Task struct {
	proletarian.TaskHeader // embed this to make your task supported by proletarian
    ... // all other fields for your task
}

pool := proletarian.NewPool(context.TODO(), proletarian.PoolConfig{
    LobbySize: 0,
    Size:      2,
    Retries:   2,
    Func:      func(poolTask proletarian.Task) error {
		task := poolTask.(*Task)
        ... // working with task
		return nil // return error, if nil, if returned error is not nil, then retry mechanism will be applied
	},
})

pool.Run() // Run pool workers and internals

go func() {
	for i := range tasks {
		pool.Queue(tasks[i])
	}
	pool.Shutdown() // pool.Queue will not add new tasks to the pool and it will wait until all tasks will be finished (including retires) and stops pool. After Shutdown() pool is not usable anymore.
}()

go func() {
	for {
		errTask := pool.ErroredTask()
		if errTask == nil {
			return
		}
		// process errored task
	}
}()

pool.Wait() // Wait for wait pool workers to finish all tasks

Documentation

Overview

Package proletarian providers worker pool with graceful shutdowns and retires of tasks

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type PoolConfig

type PoolConfig struct {
	// LobbbySize sets size of input queue, default value is 0
	LobbySize int

	// Size sets size of workers, default value is 1, it is normalized to range [1 .. runtime.GOMAXPROCS(0) * 32]
	Size int

	// Retries limits number of retries for every task. Set this value to something bigger than 0
	Retries int

	// Func is function that will be executed in every worker
	Func func(t Task) error
}

PoolConfig includes configuration for the pool. All values are normalized to limits.

type Pooler

type Pooler interface {
	// Run triggers start of the pool. Must be called only once, can be called without creating new goroutine.
	Run()

	// Queue puts new task in input queue.
	Queue(t Task)

	// Shutdown gracefully stops pool, waiting for all task will be finished (successfully or errored after retries). The pool must not be used after Shutdown
	Shutdown()

	// Cancel stops pool ungracefully. The pool must not be used after Cancel
	Cancel()

	// ErroredTask returns task with error. It waits while such task appears and returns the task or nil if pool was shut down and no more errored task available.
	ErroredTask() Task

	// Wait holds execution and waits until all tasks pool execution queue will be empty
	Wait()
}

func NewPool

func NewPool(ctx context.Context, cfg PoolConfig) Pooler

NewPool creates new Pool

type Task

type Task interface {
	// SetError sets error and increase error counter if err != nil
	SetError(error)
	// ErrorCount returns number of errors for the task
	ErrorCount() int

	// Err returns last error set by SetError
	Err() error

	// Unwrap is same as Err and returns last error set by SetError
	Unwrap() error
}

Task is the interface that every task in pool must satisfy

type TaskHeader

type TaskHeader struct {
	// contains filtered or unexported fields
}

TaskHeader is structure that satisfy Task interface and intended to be embedded in user-defined tasks

func (*TaskHeader) Err

func (t *TaskHeader) Err() error

Err returns last error occurred in task

func (*TaskHeader) ErrorCount

func (t *TaskHeader) ErrorCount() int

ErrorCount returns number of errors for the task

func (*TaskHeader) SetError

func (t *TaskHeader) SetError(err error)

SetError sets error in TaskHeader and increase error counter if err != nil

func (*TaskHeader) Unwrap

func (t *TaskHeader) Unwrap() error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL