Documentation ¶
Overview ¶
Package pool provides worker pool with job queue.
Index ¶
- Variables
- func Max(x, y int) int
- func Min(x, y int) int
- func Range(end int) []struct{}
- type Config
- type Dispatcher
- type IDispatcher
- type IPool
- type IWorker
- type Job
- type JobHandler
- type JobHandlerGen
- type Option
- type Pool
- func (p *Pool) Closed() bool
- func (p *Pool) SetLoadFactor(loadFactor int)
- func (p *Pool) SetMaxPoolNum(maxPoolNum int)
- func (p *Pool) SetResizePeriodSeconds(resizePeriodSeconds time.Duration)
- func (p *Pool) SetResizeSuccessThreshold(resizeSuccessThreshold int)
- func (p *Pool) Size() int
- func (p *Pool) Start()
- func (p *Pool) Undispatch(num ...int)
- type Resize
- type Worker
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ( // DefaultConfig is the default Pool Config. DefaultConfig = Config{ InitPoolNum: 1, MaxPoolNum: 3, WorkerNum: 50, LoadFactor: 20, Resize: Resize{ SuccessThreshold: 2, PeriodSeconds: 30, InitialDelaySeconds: 60, }, JobQueueBufferSize: 1000, } )
Functions ¶
Types ¶
type Config ¶
type Config struct { // initial number of dispatcher InitPoolNum int // maximum number of dispatcher MaxPoolNum int // number of workers in a dispatcher WorkerNum int // LoadFactor determines number of jobs in JobQueue divided by number of workers in the Pool, // for example LoadFactor 20 means 1 worker handles 20 jobs in a second, // if current load exceeds LoadFactor then resizing number of pools upward LoadFactor int Resize // JobQueue channel buffer size JobQueueBufferSize int // Verbose logging mode if it's true, by default it's false Verbose bool // If enabled, any errors that occurred while processing job request are returned on // the Errors channel (default disabled). If enabled, you must read from // the Errors channel or it will deadlock. Errors bool // If enabled, it check `LoadFactor` peridically and progressively resize to `MaxPoolNum`, // by default it's false. AutoScale bool // Tracer is the opentracing.Tracer used for tracing. Tracer opentracing.Tracer }
Config used to init Pool.
type Dispatcher ¶
type Dispatcher struct {
// contains filtered or unexported fields
}
Dispatcher represents the dispatcher that dispatch the job.
func NewDispatcher ¶
func NewDispatcher(done <-chan struct{}, wgPool *sync.WaitGroup, numWorkers int, jobQueue <-chan Job, jobHandler JobHandler, errors chan error, tracer opentracing.Tracer) *Dispatcher
NewDispatcher creates a dispatcher.
func (*Dispatcher) Closed ¶
func (d *Dispatcher) Closed() bool
Closed returns true if dispatcher received a signal to stop.
func (*Dispatcher) DeWorker ¶
func (d *Dispatcher) DeWorker(num ...int)
DeWorker signals worker to stop, num is the number of workers to stop, default to 1.
func (*Dispatcher) Run ¶
func (d *Dispatcher) Run()
Run creates the workers pool and dispatches available jobs.
type IDispatcher ¶
IDispatcher is the Dispatcher interface.
type IPool ¶
type IPool interface { Start() Closed() bool Size() int Undispatch(...int) SetMaxPoolNum(int) SetLoadFactor(int) SetResizeSuccessThreshold(int) SetResizePeriodSeconds(time.Duration) }
IPool is the Pool interface.
type IWorker ¶
type IWorker interface { Start(JobHandler) Closed() bool }
IWorker is the Worker interface.
type JobHandlerGen ¶
type JobHandlerGen = func() JobHandler
JobHandlerGen returns a JobHandler when it's called.
type Pool ¶
type Pool struct { // JobQueue channel for incoming job request, // user should NOT close this channel to stop Pool, // instead done channel is used for stopping Pool. JobQueue chan Job // JobHandlerGenerator is used for new JobHandler. JobHandlerGenerator JobHandlerGen // Errors channel to receive any errors that occurred while processing job request Errors chan error // contains filtered or unexported fields }
Pool represents a pool with dispatcher.
Example ¶
package main import ( "fmt" "sync" "time" "github.com/andy2046/pool" ) func main() { done := make(chan struct{}) mu := &sync.RWMutex{} data := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10} sum := 0 jobHandlerGenerator := func() pool.JobHandler { return func(j pool.Job) error { mu.Lock() defer mu.Unlock() sum += j.Data.(int) return nil } } size := 2 opt := func(c *pool.Config) error { c.InitPoolNum = size c.WorkerNum = 5 return nil } p := pool.New(done, jobHandlerGenerator, opt) p.Start() for i := range data { p.JobQueue <- pool.Job{ Data: data[i], } } close(done) // wait for jobs to finish for { time.Sleep(1 * time.Second) if p.Closed() { break } } mu.RLock() fmt.Println(sum) mu.RUnlock() }
Output: 55
func New ¶
func New(done <-chan struct{}, jobHandlerGenerator JobHandlerGen, options ...Option) *Pool
New creates a pool.
func (*Pool) SetLoadFactor ¶
SetLoadFactor applies LoadFactor to Pool Config.
func (*Pool) SetMaxPoolNum ¶
SetMaxPoolNum applies MaxPoolNum to Pool Config.
func (*Pool) SetResizePeriodSeconds ¶
SetResizePeriodSeconds applies Resize PeriodSeconds to Pool Config.
func (*Pool) SetResizeSuccessThreshold ¶
SetResizeSuccessThreshold applies Resize SuccessThreshold to Pool Config.
func (*Pool) Undispatch ¶
Undispatch signals dispatcher to stop, num is the number of dispatcher to stop, default to 1.
type Resize ¶
type Resize struct { // the number of times the check needs to succeed before running resize SuccessThreshold int // how often to check LoadFactor to determine whether to resize PeriodSeconds time.Duration // the number of second to wait after the Pool has started before running the check InitialDelaySeconds time.Duration }
Resize related config.
type Worker ¶
type Worker struct {
// contains filtered or unexported fields
}
Worker represents the worker that executes the job.
func NewWorker ¶
func NewWorker(done <-chan struct{}, workerPool chan<- chan Job, wg *sync.WaitGroup, jobPool <-chan struct{}, errors chan error, tracer opentracing.Tracer) *Worker
NewWorker creates a worker.
func (*Worker) Start ¶
func (w *Worker) Start(handler JobHandler)
Start pushes the worker into worker queue, listens for signal to stop.