workerpool

package
v1.0.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 5, 2024 License: Apache-2.0 Imports: 7 Imported by: 4

Documentation

Overview

Package workerpool defines abstractions for parallelizing tasks.

Example (HTTP)
package main

import (
	"context"
	"fmt"
	"io"
	"net/http"

	workerpool "github.com/abcxyz/pkg/workerpool"
)

func main() {
	ctx := context.TODO()
	w := workerpool.New[string](&workerpool.Config{
		Concurrency: 0,
	})

	urls := []string{
		"https://apple.com",
		"https://example.com",
		"https://google.com",
	}

	for _, u := range urls {
		// Make a local copy for the closure.
		u := u

		if err := w.Do(ctx, func() (string, error) {
			resp, err := http.Get(u)
			if err != nil {
				return "", err
			}
			defer resp.Body.Close()

			b, err := io.ReadAll(resp.Body)
			if err != nil {
				return "", err
			}
			return string(b), nil
		}); err != nil {
			// TODO: check err
		}
	}

	results, err := w.Done(ctx)
	if err != nil {
		// TODO: check err
	}

	for i, result := range results {
		fmt.Printf("%s: body(%d), err(%v)\n", urls[i], len(result.Value), result.Error)
	}
}
Output:

Example (Sleep)
package main

import (
	"context"
	"time"

	workerpool "github.com/abcxyz/pkg/workerpool"
)

func main() {
	ctx := context.TODO()
	pool := workerpool.New[*workerpool.Void](&workerpool.Config{
		Concurrency: 3,
	})

	for i := 0; i < 5; i++ {
		if err := pool.Do(ctx, func() (*workerpool.Void, error) {
			time.Sleep(10 * time.Millisecond)
			return nil, nil
		}); err != nil {
			// TODO: check err
		}
	}

	results, err := pool.Done(ctx)
	if err != nil {
		// TODO: check err
	}
	_ = results
}
Output:

Index

Examples

Constants

This section is empty.

Variables

View Source
var ErrStopped = fmt.Errorf("worker is stopped")

ErrStopped is the error returned when the worker is stopped.

Functions

This section is empty.

Types

type Config

type Config struct {
	// Concurrency is the maximum number of jobs to run in parallel.
	Concurrency int64

	// StopOnError instructs the worker pool to stop processing new work after the
	// first error is returned. In-flight jobs may still be processed, even if
	// they complete after the first error is returned.
	StopOnError bool
}

Config represents the input configuration to the worker.

type Pool

type Pool[T any] struct {
	// contains filtered or unexported fields
}

Pool represents an instance of a worker pool. It is same for concurrent use, but see function documentation for more specific semantics.

func New

func New[T any](c *Config) *Pool[T]

New creates a new worker pool that executes work in parallel, up to the maximum provided concurrency. Work is guaranteed to be executed in the order in which it was enqueued, but is not guaranteed to complete in the order in which it was enqueued (i.e. this is not a pipeline).

If the provided concurrency is less than 1, it defaults to the number of CPU cores.

func (*Pool[T]) Do

func (p *Pool[T]) Do(ctx context.Context, fn WorkFunc[T]) error

Do adds new work into the queue. If there are no available workers in the pool, it blocks until a worker becomes available or until the provided context is cancelled. The function returns when the work has been successfully scheduled.

To wait for all work to be completed and read the results, call Pool.Done. This function only returns an error on two conditions:

  • The worker pool was stopped via a call to Pool.Done. You should not enqueue more work. The error will be ErrStopped.
  • The incoming context was cancelled. You should probably not enqueue more work, but this is an application-specific decision. The error will be context.DeadlineExceeded or context.Canceled.

Never call Do from within a Do function because it will deadlock.

func (*Pool[T]) Done

func (p *Pool[T]) Done(ctx context.Context) ([]*Result[T], error)

Done immediately stops the worker pool and prevents new work from being enqueued. Then it waits for all existing work to finish and results the results.

The results are returned in the order in which jobs were enqueued into the worker pool. Each result will include a result value or corresponding error type.

The function will return an error if:

If the worker pool is already done, it returns ErrStopped.

type Result

type Result[T any] struct {
	Value T
	Error error
}

Result is the final result returned to the caller.

type Void

type Void struct{}

Void is a convenience struct for workers that do not actually return values.

type WorkFunc

type WorkFunc[T any] func() (T, error)

WorkFunc is a function for executing work.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL