workers

package
v0.0.0-...-2841839 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 11, 2023 License: MIT Imports: 7 Imported by: 0

README

workers

import "github.com/cognusion/go-jar/workers"

Overview

Index

Package files

work.go worker.go workerpool.go

Variables

var (
    // DebugOut is a log.Logger for debug messages
    DebugOut = log.New(io.Discard, "[DEBUG] ", 0)
    // ErrorOut is a log.Logger for error messages
    ErrorOut = log.New(io.Discard, "", 0)
)

type Work

type Work interface {
    Work() interface{}
    Return(interface{})
}

Work is an interface to allow the abstraction of Work and Return, enabling generic Workers doing blind Work

type WorkError

type WorkError struct {
    Messages string
}

WorkError is sent to Work.Return() if the Work generates a panic

func (*WorkError) Error
func (w *WorkError) Error() string

type Worker

type Worker struct {
    // WorkChan is where the work comes from
    WorkChan chan Work
    // QuitChan will get some bools sent to it when the Worker pool needs to shrink
    QuitChan chan bool
    // KillChan will close when all the Workers need to exit
    KillChan chan struct{}
    // contains filtered or unexported fields
}

Worker is a simple primitive construct that listens on WorkChan for Work to do, Might hear a "true" on QuitChan if it is underworked, Might see a closed KillChan if it's time to leave expeditiously

func (*Worker) Do
func (w *Worker) Do()

Do forks off a Workerloop that listens for Work, quits, or kills

type WorkerPool

type WorkerPool struct {
    // WorkChan is where the work goes
    WorkChan chan Work
    // Stop permanently stops the pool after current work is done.
    // WorkChan is not closed, to prevent errant reads
    Stop func()

    Metrics metrics.Meter
    // contains filtered or unexported fields
}

WorkerPool is an overly-complicated mechanation to allow arbitrary work to be accomplished by an arbitrary worker, which will then return arbitrary results onto an arbitrary channel, while allowing for the evidence-driven growing or shrinking of the pool of available workers based on the fillyness of the WorkChan, which should be buffered and of an appropriate size. If that hasn't turned you off yet, carry on.

func NewWorkerPool
func NewWorkerPool(WorkChan chan Work, initialSize int, autoAdjustInterval time.Duration) *WorkerPool

NewWorkerPool returns a functioning WorkerPool bound to WorkChan, with an initial pool size of initialSize, and if autoAdjustInterval > 0, then it will run the CheckAndAdjust() every that often. NOTE: If your WorkChan is unbuffered (no size during make(), autoAdjust will not run, nor will calling CheckAndAdjust() result in changes. The channel capacity and usage is key to this. It is recommended that the buffer size be around anticipated burst size for work

func (*WorkerPool) AddWorkers
func (p *WorkerPool) AddWorkers(number int64)

AddWorkers adds the specified number of workers

func (*WorkerPool) CheckAndAdjust
func (p *WorkerPool) CheckAndAdjust()

CheckAndAdjust asynchronously triggers the process to possibly resize the pool. While a resize process is running, subsequent processors will silently exit

func (*WorkerPool) Max
func (p *WorkerPool) Max(max int)

Max sets the maximum number of workers

func (*WorkerPool) Min
func (p *WorkerPool) Min(min int)

Min sets the minimum number of workers

func (*WorkerPool) RemoveWorkers
func (p *WorkerPool) RemoveWorkers(number int64)

RemoveWorkers removes the specified number of workers, or the number running.

func (*WorkerPool) Size
func (p *WorkerPool) Size() int64

Size returns the eventually-consistent number of workers in the pool

func (*WorkerPool) Work
func (p *WorkerPool) Work() int

Work returns the quantity of Work in the work channel


Generated by godoc2md

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// DebugOut is a log.Logger for debug messages
	DebugOut = log.New(io.Discard, "[DEBUG] ", 0)
	// ErrorOut is a log.Logger for error messages
	ErrorOut = log.New(io.Discard, "", 0)
)

Functions

This section is empty.

Types

type Work

type Work interface {
	Work() interface{}
	Return(interface{})
}

Work is an interface to allow the abstraction of Work and Return, enabling generic Workers doing blind Work

type WorkError

type WorkError struct {
	Messages string
}

WorkError is sent to Work.Return() if the Work generates a panic

func (*WorkError) Error

func (w *WorkError) Error() string

type Worker

type Worker struct {
	// WorkChan is where the work comes from
	WorkChan chan Work
	// QuitChan will get some bools sent to it when the Worker pool needs to shrink
	QuitChan chan bool
	// KillChan will close when all the Workers need to exit
	KillChan chan struct{}
	// contains filtered or unexported fields
}

Worker is a simple primitive construct that listens on WorkChan for Work to do, Might hear a "true" on QuitChan if it is underworked, Might see a closed KillChan if it's time to leave expeditiously

func (*Worker) Do

func (w *Worker) Do()

Do forks off a Workerloop that listens for Work, quits, or kills

type WorkerPool

type WorkerPool struct {
	// WorkChan is where the work goes
	WorkChan chan Work
	// Stop permanently stops the pool after current work is done.
	// WorkChan is not closed, to prevent errant reads
	Stop func()

	Metrics metrics.Meter
	// contains filtered or unexported fields
}

WorkerPool is an overly-complicated mechanation to allow arbitrary work to be accomplished by an arbitrary worker, which will then return arbitrary results onto an arbitrary channel, while allowing for the evidence-driven growing or shrinking of the pool of available workers based on the fillyness of the WorkChan, which should be buffered and of an appropriate size. If that hasn't turned you off yet, carry on.

func NewWorkerPool

func NewWorkerPool(WorkChan chan Work, initialSize int, autoAdjustInterval time.Duration) *WorkerPool

NewWorkerPool returns a functioning WorkerPool bound to WorkChan, with an initial pool size of initialSize, and if autoAdjustInterval > 0, then it will run the CheckAndAdjust() every that often. NOTE: If your WorkChan is unbuffered (no size during make(), autoAdjust will not run, nor will calling CheckAndAdjust() result in changes. The channel capacity and usage is key to this. It is recommended that the buffer size be around anticipated burst size for work

func (*WorkerPool) AddWorkers

func (p *WorkerPool) AddWorkers(number int64)

AddWorkers adds the specified number of workers

func (*WorkerPool) CheckAndAdjust

func (p *WorkerPool) CheckAndAdjust()

CheckAndAdjust asynchronously triggers the process to possibly resize the pool. While a resize process is running, subsequent processors will silently exit

func (*WorkerPool) Max

func (p *WorkerPool) Max(max int)

Max sets the maximum number of workers

func (*WorkerPool) Min

func (p *WorkerPool) Min(min int)

Min sets the minimum number of workers

func (*WorkerPool) RemoveWorkers

func (p *WorkerPool) RemoveWorkers(number int64)

RemoveWorkers removes the specified number of workers, or the number running.

func (*WorkerPool) Size

func (p *WorkerPool) Size() int64

Size returns the eventually-consistent number of workers in the pool

func (*WorkerPool) Work

func (p *WorkerPool) Work() int

Work returns the quantity of Work in the work channel

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL