pond

package module
v1.8.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 31, 2023 License: MIT Imports: 8 Imported by: 115

README

Build status

pond

Minimalistic and High-performance goroutine worker pool written in Go

Motivation

This library is meant to provide a simple way to limit concurrency when executing some function over a limited resource or service.

Some common scenarios include:

  • Executing queries against a Database with a limited no. of connections
  • Sending HTTP requests to a a rate/concurrency limited API

Features:

  • Zero dependencies
  • Create pools with fixed or dynamic size
  • Worker goroutines are only created when needed (backpressure detection) and automatically purged after being idle for some time (configurable)
  • Minimalistic APIs for:
    • Creating worker pools with fixed or dynamic size
    • Submitting tasks to a pool in a fire-and-forget fashion
    • Submitting tasks to a pool and waiting for them to complete
    • Submitting tasks to a pool with a deadline
    • Submitting a group of tasks and waiting for them to complete
    • Submitting a group of tasks associated to a Context
    • Getting the number of running workers (goroutines)
    • Stopping a worker pool
  • Task panics are handled gracefully (configurable panic handler)
  • Supports Non-blocking and Blocking task submission modes (buffered / unbuffered)
  • Very high performance and efficient resource usage under heavy workloads, even outperforming unbounded goroutines in some scenarios (See benchmarks)
  • Configurable pool resizing strategy, with 3 presets for common scenarios: Eager, Balanced and Lazy.
  • Complete pool metrics such as number of running workers, tasks waiting in the queue and more.
  • New (since v1.7.0): configurable parent context and graceful shutdown with deadline.
  • API reference

How to install

go get -u github.com/alitto/pond

How to use

Worker pool with dynamic size
package main

import (
	"fmt"

	"github.com/alitto/pond"
)

func main() {

	// Create a buffered (non-blocking) pool that can scale up to 100 workers
	// and has a buffer capacity of 1000 tasks
	pool := pond.New(100, 1000)

	// Submit 1000 tasks
	for i := 0; i < 1000; i++ {
		n := i
		pool.Submit(func() {
			fmt.Printf("Running task #%d\n", n)
		})
	}

	// Stop the pool and wait for all submitted tasks to complete
	pool.StopAndWait()
}
Worker pool with fixed size
package main

import (
	"fmt"

	"github.com/alitto/pond"
)

func main() {

	// Create an unbuffered (blocking) pool with a fixed 
	// number of workers
	pool := pond.New(10, 0, pond.MinWorkers(10))

	// Submit 1000 tasks
	for i := 0; i < 1000; i++ {
		n := i
		pool.Submit(func() {
			fmt.Printf("Running task #%d\n", n)
		})
	}

	// Stop the pool and wait for all submitted tasks to complete
	pool.StopAndWait()
}
Submitting a group of tasks
package main

import (
	"fmt"

	"github.com/alitto/pond"
)

func main() {

	// Create a pool
	pool := pond.New(10, 1000)
	defer pool.StopAndWait()

	// Create a task group
	group := pool.Group()

	// Submit a group of tasks
	for i := 0; i < 20; i++ {
		n := i
		group.Submit(func() {
			fmt.Printf("Running group task #%d\n", n)
		})
	}

	// Wait for all tasks in the group to complete
	group.Wait()
}
Submitting a group of tasks associated to a context (since v1.8.0)

This feature provides synchronization, error propagation, and Context cancelation for subtasks of a common task. Similar to errgroup.Group from golang.org/x/sync/errgroup package with concurrency bounded by the worker pool.

package main

import (
	"context"
	"fmt"
	"net/http"

	"github.com/alitto/pond"
)

func main() {

	// Create a worker pool
	pool := pond.New(10, 1000)
	defer pool.StopAndWait()

	// Create a task group associated to a context
	group, ctx := pool.GroupContext(context.Background())

	var urls = []string{
		"https://www.golang.org/",
		"https://www.google.com/",
		"https://www.github.com/",
	}

	// Submit tasks to fetch each URL
	for _, url := range urls {
		url := url
		group.Submit(func() error {
			req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
			resp, err := http.DefaultClient.Do(req)
			if err == nil {
				resp.Body.Close()
			}
			return err
		})
	}

	// Wait for all HTTP requests to complete.
	err := group.Wait()
	if err != nil {
		fmt.Printf("Failed to fetch URLs: %v", err)
	} else {
		fmt.Println("Successfully fetched all URLs")
	}
}
Pool Configuration Options
  • MinWorkers: Specifies the minimum number of worker goroutines that must be running at any given time. These goroutines are started when the pool is created. The default value is 0. Example:
// This will create a pool with 5 running worker goroutines 
pool := pond.New(10, 1000, pond.MinWorkers(5))
  • IdleTimeout: Defines how long to wait before removing idle worker goroutines from the pool. The default value is 5 seconds. Example:
// This will create a pool that will remove workers 100ms after they become idle 
pool := pond.New(10, 1000, pond.IdleTimeout(100 * time.Millisecond))
  • PanicHandler: Allows to configure a custom function to handle panics thrown by tasks submitted to the pool. The default handler just writes a message to standard output using fmt.Printf with the following contents: Worker exits from a panic: [panic] \n Stack trace: [stack trace]). Example:
// Custom panic handler function
panicHandler := func(p interface{}) {
	fmt.Printf("Task panicked: %v", p)
}

// This will create a pool that will handle panics using a custom panic handler
pool := pond.New(10, 1000, pond.PanicHandler(panicHandler)))
  • Strategy: Configures the strategy used to resize the pool when backpressure is detected. You can create a custom strategy by implementing the pond.ResizingStrategy interface or choose one of the 3 presets:
    • Eager: maximizes responsiveness at the expense of higher resource usage, which can reduce throughput under certain conditions. This strategy is meant for worker pools that will operate at a small percentage of their capacity most of the time and may occasionally receive bursts of tasks. This is the default strategy.
    • Balanced: tries to find a balance between responsiveness and throughput. It's suitable for general purpose worker pools or those that will operate close to 50% of their capacity most of the time.
    • Lazy: maximizes throughput at the expense of responsiveness. This strategy is meant for worker pools that will operate close to their max. capacity most of the time.
// Example: create pools with different resizing strategies
eagerPool := pond.New(10, 1000, pond.Strategy(pond.Eager()))
balancedPool := pond.New(10, 1000, pond.Strategy(pond.Balanced()))
lazyPool := pond.New(10, 1000, pond.Strategy(pond.Lazy()))
  • Context: Configures a parent context on this pool to stop all workers when it is cancelled. The default value context.Background(). Example:
// This creates a pool that is stopped when myCtx is cancelled 
pool := pond.New(10, 1000, pond.Context(myCtx))
Resizing strategies

The following chart illustrates the behaviour of the different pool resizing strategies as the number of submitted tasks increases. Each line represents the number of worker goroutines in the pool (pool size) and the x-axis reflects the number of submitted tasks (cumulative).

Pool resizing strategies behaviour

As the name suggests, the "Eager" strategy always spawns an extra worker when there are no idles, which causes the pool to grow almost linearly with the number of submitted tasks. On the other end, the "Lazy" strategy creates one worker every N submitted tasks, where N is the maximum number of available CPUs (GOMAXPROCS). The "Balanced" strategy represents a middle ground between the previous two because it creates a worker every N/2 submitted tasks.

Stopping a pool

There are 3 methods available to stop a pool and release associated resources:

  • pool.Stop(): stop accepting new tasks and signal all workers to stop processing new tasks. Tasks being processed by workers will continue until completion unless the process is terminated.
  • pool.StopAndWait(): stop accepting new tasks and wait until all running and queued tasks have completed before returning.
  • pool.StopAndWaitFor(deadline time.Duration): similar to StopAndWait but with a deadline to prevent waiting indefinitely.
Metrics & monitoring

Each worker pool instance exposes useful metrics that can be queried through the following methods:

  • pool.RunningWorkers() int: Current number of running workers
  • pool.IdleWorkers() int: Current number of idle workers
  • pool.MinWorkers() int: Minimum number of worker goroutines
  • pool.MaxWorkers() int: Maxmimum number of worker goroutines
  • pool.MaxCapacity() int: Maximum number of tasks that can be waiting in the queue at any given time (queue capacity)
  • pool.SubmittedTasks() uint64: Total number of tasks submitted since the pool was created
  • pool.WaitingTasks() uint64: Current number of tasks in the queue that are waiting to be executed
  • pool.SuccessfulTasks() uint64: Total number of tasks that have successfully completed their exection since the pool was created
  • pool.FailedTasks() uint64: Total number of tasks that completed with panic since the pool was created
  • pool.CompletedTasks() uint64: Total number of tasks that have completed their exection either successfully or with panic since the pool was created

In our Prometheus example we showcase how to configure collectors for these metrics and expose them to Prometheus.

Examples

API Reference

Full API reference is available at https://pkg.go.dev/github.com/alitto/pond

Benchmarks

See Benchmarks.

Resources

Here are some of the resources which have served as inspiration when writing this library:

Contribution & Support

Feel free to send a pull request if you consider there's something which can be improved. Also, please open up an issue if you run into a problem when using this library or just have a question.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// Eager maximizes responsiveness at the expense of higher resource usage,
	// which can reduce throughput under certain conditions.
	// This strategy is meant for worker pools that will operate at a small percentage of their capacity
	// most of the time and may occasionally receive bursts of tasks. It's the default strategy.
	Eager = func() ResizingStrategy { return RatedResizer(1) }
	// Balanced tries to find a balance between responsiveness and throughput.
	// It's suitable for general purpose worker pools or those
	// that will operate close to 50% of their capacity most of the time.
	Balanced = func() ResizingStrategy { return RatedResizer(maxProcs / 2) }
	// Lazy maximizes throughput at the expense of responsiveness.
	// This strategy is meant for worker pools that will operate close to their max. capacity most of the time.
	Lazy = func() ResizingStrategy { return RatedResizer(maxProcs) }
)

Preset pool resizing strategies

View Source
var (
	// ErrSubmitOnStoppedPool is thrown when attempting to submit a task to a pool that has been stopped
	ErrSubmitOnStoppedPool = errors.New("worker pool has been stopped and is no longer accepting tasks")
)

Functions

This section is empty.

Types

type Option

type Option func(*WorkerPool)

Option represents an option that can be passed when instantiating a worker pool to customize it

func Context added in v1.7.0

func Context(parentCtx context.Context) Option

Context configures a parent context on a worker pool to stop all workers when it is cancelled

func IdleTimeout

func IdleTimeout(idleTimeout time.Duration) Option

IdleTimeout allows to change the idle timeout for a worker pool

func MinWorkers

func MinWorkers(minWorkers int) Option

MinWorkers allows to change the minimum number of workers of a worker pool

func PanicHandler

func PanicHandler(panicHandler func(interface{})) Option

PanicHandler allows to change the panic handler function of a worker pool

func Strategy added in v1.3.0

func Strategy(strategy ResizingStrategy) Option

Strategy allows to change the strategy used to resize the pool

type ResizingStrategy added in v1.3.0

type ResizingStrategy interface {
	Resize(runningWorkers, minWorkers, maxWorkers int) bool
}

ResizingStrategy represents a pool resizing strategy

func RatedResizer added in v1.4.0

func RatedResizer(rate int) ResizingStrategy

RatedResizer creates a resizing strategy which can be configured to create workers at a specific rate when the pool has no idle workers. rate: determines the number of tasks to receive before creating an extra worker. A value of 3 can be interpreted as: "Create a new worker every 3 tasks".

type TaskGroup

type TaskGroup struct {
	// contains filtered or unexported fields
}

TaskGroup represents a group of related tasks

func (*TaskGroup) Submit

func (g *TaskGroup) Submit(task func())

Submit adds a task to this group and sends it to the worker pool to be executed

func (*TaskGroup) Wait

func (g *TaskGroup) Wait()

Wait waits until all the tasks in this group have completed

type TaskGroupWithContext added in v1.8.0

type TaskGroupWithContext struct {
	TaskGroup
	// contains filtered or unexported fields
}

TaskGroupWithContext represents a group of related tasks associated to a context

func (*TaskGroupWithContext) Submit added in v1.8.0

func (g *TaskGroupWithContext) Submit(task func() error)

Submit adds a task to this group and sends it to the worker pool to be executed

func (*TaskGroupWithContext) Wait added in v1.8.0

func (g *TaskGroupWithContext) Wait() error

Wait blocks until either all the tasks submitted to this group have completed, one of them returned a non-nil error or the context associated to this group was canceled.

type WorkerPool

type WorkerPool struct {
	// contains filtered or unexported fields
}

WorkerPool models a pool of workers

func New

func New(maxWorkers, maxCapacity int, options ...Option) *WorkerPool

New creates a worker pool with that can scale up to the given maximum number of workers (maxWorkers). The maxCapacity parameter determines the number of tasks that can be submitted to this pool without blocking, because it defines the size of the buffered channel used to receive tasks. The options parameter can take a list of functions to customize configuration values on this worker pool.

func (*WorkerPool) CompletedTasks added in v1.5.0

func (p *WorkerPool) CompletedTasks() uint64

CompletedTasks returns the total number of tasks that have completed their exection either successfully or with panic since the pool was created

func (*WorkerPool) FailedTasks added in v1.5.0

func (p *WorkerPool) FailedTasks() uint64

FailedTasks returns the total number of tasks that completed with panic since the pool was created

func (*WorkerPool) Group

func (p *WorkerPool) Group() *TaskGroup

Group creates a new task group

func (*WorkerPool) GroupContext added in v1.8.0

func (p *WorkerPool) GroupContext(ctx context.Context) (*TaskGroupWithContext, context.Context)

GroupContext creates a new task group and an associated Context derived from ctx.

The derived Context is canceled the first time a function submitted to the group returns a non-nil error or the first time Wait returns, whichever occurs first.

func (*WorkerPool) IdleWorkers added in v1.5.0

func (p *WorkerPool) IdleWorkers() int

IdleWorkers returns the current number of idle workers

func (*WorkerPool) MaxCapacity added in v1.5.0

func (p *WorkerPool) MaxCapacity() int

MaxCapacity returns the maximum number of tasks that can be waiting in the queue at any given time (queue size)

func (*WorkerPool) MaxWorkers added in v1.5.0

func (p *WorkerPool) MaxWorkers() int

MaxWorkers returns the maximum number of worker goroutines

func (*WorkerPool) MinWorkers added in v1.5.0

func (p *WorkerPool) MinWorkers() int

MinWorkers returns the minimum number of worker goroutines

func (*WorkerPool) RunningWorkers added in v1.5.0

func (p *WorkerPool) RunningWorkers() int

RunningWorkers returns the current number of running workers

func (*WorkerPool) Stop

func (p *WorkerPool) Stop()

Stop causes this pool to stop accepting new tasks and signals all workers to exit. Tasks being executed by workers will continue until completion (unless the process is terminated). Tasks in the queue will not be executed.

func (*WorkerPool) StopAndWait

func (p *WorkerPool) StopAndWait()

StopAndWait causes this pool to stop accepting new tasks and then waits for all tasks in the queue to complete before returning.

func (*WorkerPool) StopAndWaitFor added in v1.7.0

func (p *WorkerPool) StopAndWaitFor(deadline time.Duration)

StopAndWaitFor stops this pool and waits until either all tasks in the queue are completed or the given deadline is reached, whichever comes first.

func (*WorkerPool) Stopped added in v1.6.1

func (p *WorkerPool) Stopped() bool

Stopped returns true if the pool has been stopped and is no longer accepting tasks, and false otherwise.

func (*WorkerPool) Strategy added in v1.5.0

func (p *WorkerPool) Strategy() ResizingStrategy

Strategy returns the configured pool resizing strategy

func (*WorkerPool) Submit

func (p *WorkerPool) Submit(task func())

Submit sends a task to this worker pool for execution. If the queue is full, it will wait until the task is dispatched to a worker goroutine.

func (*WorkerPool) SubmitAndWait

func (p *WorkerPool) SubmitAndWait(task func())

SubmitAndWait sends a task to this worker pool for execution and waits for it to complete before returning

func (*WorkerPool) SubmitBefore

func (p *WorkerPool) SubmitBefore(task func(), deadline time.Duration)

SubmitBefore attempts to send a task for execution to this worker pool but aborts it if the task did not start before the given deadline.

func (*WorkerPool) SubmittedTasks added in v1.5.0

func (p *WorkerPool) SubmittedTasks() uint64

SubmittedTasks returns the total number of tasks submitted since the pool was created

func (*WorkerPool) SuccessfulTasks added in v1.5.0

func (p *WorkerPool) SuccessfulTasks() uint64

SuccessfulTasks returns the total number of tasks that have successfully completed their exection since the pool was created

func (*WorkerPool) TrySubmit added in v1.4.0

func (p *WorkerPool) TrySubmit(task func()) bool

TrySubmit attempts to send a task to this worker pool for execution. If the queue is full, it will not wait for a worker to become idle. It returns true if it was able to dispatch the task and false otherwise.

func (*WorkerPool) WaitingTasks added in v1.5.0

func (p *WorkerPool) WaitingTasks() uint64

WaitingTasks returns the current number of tasks in the queue that are waiting to be executed

Directories

Path Synopsis
examples
dynamic_size Module
fixed_size Module
group_context Module
pool_context Module
task_group Module

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL