workerpool

package module
v1.1.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 12, 2022 License: MIT Imports: 5 Imported by: 310

README

workerpool

GoDoc Build Status Go Report Card codecov License

Concurrency limiting goroutine pool. Limits the concurrency of task execution, not the number of tasks queued. Never blocks submitting tasks, no matter how many tasks are queued.

This implementation builds on ideas from the following:

Installation

To install this package, you need to setup your Go workspace. The simplest way to install the library is to run:

$ go get github.com/gammazero/workerpool

Example

package main

import (
	"fmt"
	"github.com/gammazero/workerpool"
)

func main() {
	wp := workerpool.New(2)
	requests := []string{"alpha", "beta", "gamma", "delta", "epsilon"}

	for _, r := range requests {
		r := r
		wp.Submit(func() {
			fmt.Println("Handling request:", r)
		})
	}

	wp.StopWait()
}

Example wrapper function to show start and finish time of submitted function.

Usage Note

There is no upper limit on the number of tasks queued, other than the limits of system resources. If the number of inbound tasks is too many to even queue for pending processing, then the solution is outside the scope of workerpool. It should be solved by distributing workload over multiple systems, and/or storing input for pending processing in intermediate storage such as a file system, distributed message queue, etc.

Documentation

Overview

Package workerpool queues work to a limited number of goroutines.

The purpose of the worker pool is to limit the concurrency of tasks executed by the workers. This is useful when performing tasks that require sufficient resources (CPU, memory, etc.), and running too many tasks at the same time would exhaust resources.

Non-blocking task submission

A task is a function submitted to the worker pool for execution. Submitting tasks to this worker pool will not block, regardless of the number of tasks. Incoming tasks are immediately dispatched to an available worker. If no worker is immediately available, or there are already tasks waiting for an available worker, then the task is put on a waiting queue to wait for an available worker.

The intent of the worker pool is to limit the concurrency of task execution, not limit the number of tasks queued to be executed. Therefore, this unbounded input of tasks is acceptable as the tasks cannot be discarded. If the number of inbound tasks is too many to even queue for pending processing, then the solution is outside the scope of workerpool. It should be solved by distributing load over multiple systems, and/or storing input for pending processing in intermediate storage such as a database, file system, distributed message queue, etc.

Dispatcher

This worker pool uses a single dispatcher goroutine to read tasks from the input task queue and dispatch them to worker goroutines. This allows for a small input channel, and lets the dispatcher queue as many tasks as are submitted when there are no available workers. Additionally, the dispatcher can adjust the number of workers as appropriate for the work load, without having to utilize locked counters and checks incurred on task submission.

When no tasks have been submitted for a period of time, a worker is removed by the dispatcher. This is done until there are no more workers to remove. The minimum number of workers is always zero, because the time to start new workers is insignificant.

Usage note

It is advisable to use different worker pools for tasks that are bound by different resources, or that have different resource use patterns. For example, tasks that use X Mb of memory may need different concurrency limits than tasks that use Y Mb of memory.

Waiting queue vs goroutines

When there are no available workers to handle incoming tasks, the tasks are put on a waiting queue, in this implementation. In implementations mentioned in the credits below, these tasks were passed to goroutines. Using a queue is faster and has less memory overhead than creating a separate goroutine for each waiting task, allowing a much higher number of waiting tasks. Also, using a waiting queue ensures that tasks are given to workers in the order the tasks were received.

Credits

This implementation builds on ideas from the following:

http://marcio.io/2015/07/handling-1-million-requests-per-minute-with-golang http://nesv.github.io/golang/2014/02/25/worker-queues-in-go.html

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type WorkerPool

type WorkerPool struct {
	// contains filtered or unexported fields
}

WorkerPool is a collection of goroutines, where the number of concurrent goroutines processing requests does not exceed the specified maximum.

func New

func New(maxWorkers int) *WorkerPool

New creates and starts a pool of worker goroutines.

The maxWorkers parameter specifies the maximum number of workers that can execute tasks concurrently. When there are no incoming tasks, workers are gradually stopped until there are no remaining workers.

func (*WorkerPool) Pause added in v1.1.0

func (p *WorkerPool) Pause(ctx context.Context)

Pause causes all workers to wait on the given Context, thereby making them unavailable to run tasks. Pause returns when all workers are waiting. Tasks can continue to be queued to the workerpool, but are not executed until the Context is canceled or times out.

Calling Pause when the worker pool is already paused causes Pause to wait until all previous pauses are canceled. This allows a goroutine to take control of pausing and unpausing the pool as soon as other goroutines have unpaused it.

When the workerpool is stopped, workers are unpaused and queued tasks are executed during StopWait.

func (*WorkerPool) Size

func (p *WorkerPool) Size() int

Size returns the maximum number of concurrent workers.

func (*WorkerPool) Stop

func (p *WorkerPool) Stop()

Stop stops the worker pool and waits for only currently running tasks to complete. Pending tasks that are not currently running are abandoned. Tasks must not be submitted to the worker pool after calling stop.

Since creating the worker pool starts at least one goroutine, for the dispatcher, Stop() or StopWait() should be called when the worker pool is no longer needed.

func (*WorkerPool) StopWait

func (p *WorkerPool) StopWait()

StopWait stops the worker pool and waits for all queued tasks tasks to complete. No additional tasks may be submitted, but all pending tasks are executed by workers before this function returns.

func (*WorkerPool) Stopped

func (p *WorkerPool) Stopped() bool

Stopped returns true if this worker pool has been stopped.

func (*WorkerPool) Submit

func (p *WorkerPool) Submit(task func())

Submit enqueues a function for a worker to execute.

Any external values needed by the task function must be captured in a closure. Any return values should be returned over a channel that is captured in the task function closure.

Submit will not block regardless of the number of tasks submitted. Each task is immediately given to an available worker or to a newly started worker. If there are no available workers, and the maximum number of workers are already created, then the task is put onto a waiting queue.

When there are tasks on the waiting queue, any additional new tasks are put on the waiting queue. Tasks are removed from the waiting queue as workers become available.

As long as no new tasks arrive, one available worker is shutdown each time period until there are no more idle workers. Since the time to start new goroutines is not significant, there is no need to retain idle workers indefinitely.

func (*WorkerPool) SubmitWait

func (p *WorkerPool) SubmitWait(task func())

SubmitWait enqueues the given function and waits for it to be executed.

func (*WorkerPool) WaitingQueueSize

func (p *WorkerPool) WaitingQueueSize() int

WaitingQueueSize returns the count of tasks in the waiting queue.

Directories

Path Synopsis
Package pacer provides a utility to limit the rate at which concurrent goroutines begin execution.
Package pacer provides a utility to limit the rate at which concurrent goroutines begin execution.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL