pool

package module
v1.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 24, 2018 License: Apache-2.0 Imports: 6 Imported by: 0

README

pool

makes concurrency easier

worker pool with job queue

This library is inspired by Golang blog post Go Concurrency Patterns: Pipelines and cancellation

pool

the challenge

While working on a piece of microservice, the goal was to be able to handle a large amount of requests from thousands of endpoints.

The initial implementation started with native Go routines, one Go routine per request, but quickly this was proved to not work very well at a large scale. There is no way to control how many Go routines are spawned. And with the number of requests increasing, it OOMed and crashed.

The second iteration was to create a buffered channel where the requests queueed up and then processed by handler, and since the maximum number of requests in the queue is fixed, there is no more OOM.

The better solution is to utilize the fan-out pattern from the blog, to create a 2-tier channel system, one for queuing requests and another to control how many workers operate on the queue concurrently.

how it works

When the pool is instantiated, it creates a request queue with size JobQueueBufferSize and InitPoolNum dispatchers with WorkerNum workers per dispatcher, each dispatcher reads from the same queue until it is closed, then distribute requests amongst the workers to parallelize.

flow

Install
go get github.com/andy2046/pool
Usage
func main() {
	done := make(chan struct{})
	mu := &sync.RWMutex{}
	data := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}
	sum := 0
	jobHandlerGenerator := func() pool.JobHandler {
		return func(j pool.Job) error {
			mu.Lock()
			defer mu.Unlock()
			sum += j.Data.(int)
			return nil
		}
	}
	size := 2
	opt := func(c *pool.Config) error {
		c.InitPoolNum = size
		c.WorkerNum = 5
		return nil
	}

	p := pool.New(done, jobHandlerGenerator, opt)
	p.Start()

	for i := range data {
		p.JobQueue <- pool.Job{
			Data: data[i],
		}
	}

	close(done)

	// wait for jobs to finish
	for {
		time.Sleep(1 * time.Second)
		if p.Closed() {
			break
		}
	}
	mu.RLock()
	fmt.Println(sum)
	// Output: 55
	mu.RUnlock()
}

Documentation

Overview

Package pool provides worker pool with job queue.

Index

Examples

Constants

This section is empty.

Variables

View Source
var (
	// DefaultConfig is the default Pool Config.
	DefaultConfig = Config{
		InitPoolNum: 1,
		MaxPoolNum:  3,
		WorkerNum:   50,
		LoadFactor:  20,
		Resize: Resize{
			SuccessThreshold:    2,
			PeriodSeconds:       30,
			InitialDelaySeconds: 60,
		},
		JobQueueBufferSize: 1000,
	}
)

Functions

func Max

func Max(x, y int) int

Max returns the larger of x or y.

func Min

func Min(x, y int) int

Min returns the smaller of x or y.

func Range

func Range(end int) []struct{}

Range creates a range progressing from zero up to, but not including end.

Types

type Config

type Config struct {
	// initial number of dispatcher
	InitPoolNum int

	// maximum number of dispatcher
	MaxPoolNum int

	// number of workers in a dispatcher
	WorkerNum int

	// LoadFactor determines number of jobs in JobQueue divided by number of workers in the Pool,
	// for example LoadFactor 20 means 1 worker handles 20 jobs in a second,
	// if current load exceeds LoadFactor then resizing number of pools upward
	LoadFactor int

	Resize

	// JobQueue channel buffer size
	JobQueueBufferSize int

	// Verbose logging mode if it's true, by default it's false
	Verbose bool

	// If enabled, any errors that occurred while processing job request are returned on
	// the Errors channel (default disabled). If enabled, you must read from
	// the Errors channel or it will deadlock.
	Errors bool

	// If enabled, it check `LoadFactor` peridically and progressively resize to `MaxPoolNum`,
	// by default it's false.
	AutoScale bool

	// Tracer is the opentracing.Tracer used for tracing.
	Tracer opentracing.Tracer
}

Config used to init Pool.

type Dispatcher

type Dispatcher struct {
	// contains filtered or unexported fields
}

Dispatcher represents the dispatcher that dispatch the job.

func NewDispatcher

func NewDispatcher(done <-chan struct{}, wgPool *sync.WaitGroup, numWorkers int, jobQueue <-chan Job,
	jobHandler JobHandler, errors chan error, tracer opentracing.Tracer) *Dispatcher

NewDispatcher creates a dispatcher.

func (*Dispatcher) Closed

func (d *Dispatcher) Closed() bool

Closed returns true if dispatcher received a signal to stop.

func (*Dispatcher) DeWorker

func (d *Dispatcher) DeWorker(num ...int)

DeWorker signals worker to stop, num is the number of workers to stop, default to 1.

func (*Dispatcher) Run

func (d *Dispatcher) Run()

Run creates the workers pool and dispatches available jobs.

type IDispatcher

type IDispatcher interface {
	Run()
	Closed() bool
	DeWorker(...int)
}

IDispatcher is the Dispatcher interface.

type IPool

type IPool interface {
	Start()
	Closed() bool
	Size() int
	Undispatch(...int)
	SetMaxPoolNum(int)
	SetLoadFactor(int)
	SetResizeSuccessThreshold(int)
	SetResizePeriodSeconds(time.Duration)
}

IPool is the Pool interface.

type IWorker

type IWorker interface {
	Start(JobHandler)
	Closed() bool
}

IWorker is the Worker interface.

type Job

type Job struct {
	Data interface{}
}

Job represents the job to be run.

type JobHandler

type JobHandler func(Job) error

JobHandler completes the job.

type JobHandlerGen

type JobHandlerGen = func() JobHandler

JobHandlerGen returns a JobHandler when it's called.

type Option

type Option = func(*Config) error

Option applies config to Pool Config.

type Pool

type Pool struct {
	// JobQueue channel for incoming job request,
	// user should NOT close this channel to stop Pool,
	// instead done channel is used for stopping Pool.
	JobQueue chan Job

	// JobHandlerGenerator is used for new JobHandler.
	JobHandlerGenerator JobHandlerGen

	// Errors channel to receive any errors that occurred while processing job request
	Errors chan error
	// contains filtered or unexported fields
}

Pool represents a pool with dispatcher.

Example
package main

import (
	"fmt"
	"sync"
	"time"

	"github.com/andy2046/pool"
)

func main() {
	done := make(chan struct{})
	mu := &sync.RWMutex{}
	data := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}
	sum := 0
	jobHandlerGenerator := func() pool.JobHandler {
		return func(j pool.Job) error {
			mu.Lock()
			defer mu.Unlock()
			sum += j.Data.(int)
			return nil
		}
	}
	size := 2
	opt := func(c *pool.Config) error {
		c.InitPoolNum = size
		c.WorkerNum = 5
		return nil
	}

	p := pool.New(done, jobHandlerGenerator, opt)
	p.Start()

	for i := range data {
		p.JobQueue <- pool.Job{
			Data: data[i],
		}
	}

	close(done)

	// wait for jobs to finish
	for {
		time.Sleep(1 * time.Second)
		if p.Closed() {
			break
		}
	}
	mu.RLock()
	fmt.Println(sum)

	mu.RUnlock()
}
Output:

55

func New

func New(done <-chan struct{}, jobHandlerGenerator JobHandlerGen, options ...Option) *Pool

New creates a pool.

func (*Pool) Closed

func (p *Pool) Closed() bool

Closed returns true if pool received a signal to stop.

func (*Pool) SetLoadFactor

func (p *Pool) SetLoadFactor(loadFactor int)

SetLoadFactor applies LoadFactor to Pool Config.

func (*Pool) SetMaxPoolNum

func (p *Pool) SetMaxPoolNum(maxPoolNum int)

SetMaxPoolNum applies MaxPoolNum to Pool Config.

func (*Pool) SetResizePeriodSeconds

func (p *Pool) SetResizePeriodSeconds(resizePeriodSeconds time.Duration)

SetResizePeriodSeconds applies Resize PeriodSeconds to Pool Config.

func (*Pool) SetResizeSuccessThreshold

func (p *Pool) SetResizeSuccessThreshold(resizeSuccessThreshold int)

SetResizeSuccessThreshold applies Resize SuccessThreshold to Pool Config.

func (*Pool) Size

func (p *Pool) Size() int

Size returns current number of dispatcher.

func (*Pool) Start

func (p *Pool) Start()

Start run dispatchers in the pool.

func (*Pool) Undispatch

func (p *Pool) Undispatch(num ...int)

Undispatch signals dispatcher to stop, num is the number of dispatcher to stop, default to 1.

type Resize

type Resize struct {
	// the number of times the check needs to succeed before running resize
	SuccessThreshold int
	// how often to check LoadFactor to determine whether to resize
	PeriodSeconds time.Duration
	// the number of second to wait after the Pool has started before running the check
	InitialDelaySeconds time.Duration
}

Resize related config.

type Worker

type Worker struct {
	// contains filtered or unexported fields
}

Worker represents the worker that executes the job.

func NewWorker

func NewWorker(done <-chan struct{}, workerPool chan<- chan Job, wg *sync.WaitGroup,
	jobPool <-chan struct{}, errors chan error, tracer opentracing.Tracer) *Worker

NewWorker creates a worker.

func (*Worker) Closed

func (w *Worker) Closed() bool

Closed returns true if worker received a signal to stop.

func (*Worker) Start

func (w *Worker) Start(handler JobHandler)

Start pushes the worker into worker queue, listens for signal to stop.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL