core

package module
v0.0.0-...-aa1e2ff Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 11, 2022 License: MIT Imports: 6 Imported by: 0

README

Go Worker Pools

This repo demonstrates three different strategies for implementing Go worker pools with bounded concurrency.

I do not guarantee that any of this code is production-ready. This is simply an experiment to compare different strategies for managing concurrency in Go.

If something in this repo is incorrect or misleading, I'd like to correct it. Please open a GitHub issue to let me know.

Structure

These worker pools satisfy a basic common interface so that we can test and bench them against each other:

  • Users can Submit() a function task to the pool. This function will not block.
  • Users can Wait() for all work in the pool to complete.

Strategies

The following worker pool strategies are featured here:

  • Channel pool: Use a buffered channel to limit concurrency. To queue a task, first put a struct{} into the channel – this will block if too many tasks are running. Once your task is done, receive a struct from the channel and throw it away. This frees up space for another struct{}.
  • Semaphore pool: Use a semaphore to limit concurrency as described in the package docs.
  • Worker pool: Use the gammazero/workerpool package to limit concurrency. This is my favorite interface and it's what I use in my projects.

Testing and Benchmarking

Run tests with go test ./... -race. This tests all pools and ensures that they complete all jobs, use the full concurrency available, and do not exceed the specified concurrency.

Run benchmarks with go test bench=.. Here are the results from running an extended benchmark on my 2020 M1 MacBook Air:

$ go test -bench=. -benchtime=60s
goos: darwin
goarch: arm64
pkg: github.com/mplewis/go-worker-pools
BenchmarkWorkerPool-8      	   21027	   3554945 ns/op
BenchmarkChannelPool-8     	   31254	   2253164 ns/op
BenchmarkSemaphorePool-8   	   28473	   2572525 ns/op

Bar graph of benchmark results

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func VerifyConcurrency

func VerifyConcurrency(cm ConcurrencyManager, p Params, task func()) error

VerifyConcurrency verifies that the worker pool runs tasks at the expected concurrency. The task must have a non-zero runtime to ensure that we test the concurrency limit of the worker pool.

Types

type ChannelPool

type ChannelPool struct {
	// contains filtered or unexported fields
}

ChannelPool is a worker pool that limits concurrency using a fixed-size buffered channel.

func NewChannelPool

func NewChannelPool(workerCount int) ChannelPool

func (ChannelPool) Submit

func (p ChannelPool) Submit(f func())

func (ChannelPool) Wait

func (p ChannelPool) Wait()

type ConcurrencyManager

type ConcurrencyManager interface {
	// Submit submits a work item to the worker pool.
	Submit(func())
	// Wait blocks until all work items have been completed.
	Wait()
}

ConcurrencyManager represents a worker pool.

type Params

type Params struct {
	ExpectedConcurrency int
	WorkQuantity        int
}

Params is the parameters for a concurrency verification test.

type SemaphorePool

type SemaphorePool struct {
	// contains filtered or unexported fields
}

SemaphorePool is a worker pool that limits concurrency using a semaphore.

func NewSemaphorePool

func NewSemaphorePool(concurrency int64) *SemaphorePool

func (SemaphorePool) Submit

func (s SemaphorePool) Submit(f func())

func (SemaphorePool) Wait

func (s SemaphorePool) Wait()

type WorkerPoolWrapper

type WorkerPoolWrapper struct {
	*workerpool.WorkerPool
}

WorkerPoolWrapper is a worker pool that limits concurrency using the gammazero/workerpool package.

func NewWorkerPool

func NewWorkerPool(workerCount int) *WorkerPoolWrapper

func (*WorkerPoolWrapper) Submit

func (w *WorkerPoolWrapper) Submit(f func())

func (*WorkerPoolWrapper) Wait

func (w *WorkerPoolWrapper) Wait()

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL