wait

package
v0.0.0-...-a5b82e5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 3, 2024 License: MIT Imports: 8 Imported by: 0

README

wait: A Safer WaitGroup for Production Applications

Go Reference

Introduction

The wait package provides a safer alternative to sync.WaitGroup for managing parallel operations in production applications. It addresses common issues such as counter decrementation errors and offers built-in capabilities for parallel operations using various pool primitives for goroutine reuse. Additionally, it supports OpenTelemetry tracing for application diagnostics. It's important to note that the operations performed by this package are parallel rather than concurrent, as defined in the famous Go talk by Rob Pike: link to Go talk. We also offer parallel and concurrent pipelines in our github.com/gostdlib/concurrency/pipelines package set.

Installation

To install the wait package, use the following command:

go get github.com/gostdlib/concurrency/

Key Features

  • Use wait.Group if you need:
    • A safer version of sync.WaitGroup for parallel jobs
    • A parallel job runner that collects errors after all jobs complete
    • A parallel job runner that can stop processing on the first error
    • A parallel job runner that can be cancelled with a Context
    • Reuse and limitation of goroutines by supplying a goroutines.Pool
    • Integration with your preferred logging solution, including support for OpenTelemetry spans

Package Purpose

The wait package aims to achieve the following goals:

Reduce Programmatic Errors
Deadlocks

Consider the following example that demonstrates a common mistake when using sync.WaitGroup:

func main() {
    wg := sync.WaitGroup{}

    for i := 0; i < 100; i++ {
        i := i
        wg.Add(1)
        go func() {
            // <- There should be a defer wg.Done() here
            fmt.Println(i)
        }()
    }

    wg.Wait()
}

In this code, there is a missing defer wg.Done() statement, leading to synchronization errors and potential deadlocks. With our WaitGroup, you can avoid this problem:

func main() {
    ctx := context.Background()
    wg := wait.Group{}
   
    for i := 0; i < 100; i++ {
        i := i
        wg.Go(
            ctx,
            func(ctx context.Context) error {
               return fmt.Println(i)
            },
        )
    }
    wg.Wait()
}
Error Capturing

Old way of capturing only the first error:

func main() {
    wg := sync.WaitGroup{}
    ch := make(chan error, 1)

    for i := 0; i < 100; i++ {
        i := i
        wg.Add(1)
        go func() {
            defer wg.Done()
            if i % 2 == 0 {
                select {
                case ch <- errors.New("error"):
                default:
                }
            }
            fmt.Println(i)
        }()
    }

    wg.Wait()
}

Using our error capturing

approach:

func main() {
    wg := wait.Group{}

    for i := 0; i < 100; i++ {
        i := i
        wg.Go(
            context.Background(),
            func(ctx context.Context) error {
                if i == 3 {
                    return errors.New("error")
                }
                return fmt.Println(i)
            },
        )
    }

    if err := wg.Wait(); err != nil {
        panic(err) // or handle the error as per your requirements
    }

    // Individual errors can be unpacked using errors.Unwrap()
}

To stop processing on the first error, modify the WaitGroup as follows:

ctx, cancel := context.WithCancel(context.Background())
wg := wait.Group{CancelOnErr: cancel}
...

We capture all errors using error wrapping and retrieve them using the errors.Unwrap method. Only the Context.Deadline error is captured if it is the first error encountered.

Reuse Resources and Limit Goroutines

The standard way to limit the number of goroutines is by using a channel. However, this approach creates a new goroutine each time and can lead to deadlocks if you forget to defer removal of a struct{} from the channel. Consider the following example:

func main() {
    wg := sync.WaitGroup{}
    limit := make(chan struct{}, runtime.NumCPU())

    for i := 0; i < 100; i++ {
        i := i
        wg.Add(1)
        limit <- struct{}{}
        go func() {
            defer wg.Done()
            defer func() { <-limit }()

            fmt.Println(i)
        }()
    }

    wg.Wait()
}

With our wait package, you can limit the number of goroutines to the number of available CPUs and reuse the goroutines for subsequent calls:

func main() {
    p, _ := pooled.New(runtime.NumCPU()) // Reuse goroutines and limit their number.
    wg := wait.Group{Pool: p}

    for i := 0; i < 100; i++ {
        i := i
        wg.Go(
            ctx,
            func(ctx context.Context) error {
                return fmt.Println(i)
            },
        )
    }

    if err := wg.Wait(); err != nil {
        panic(err) // or handle the error as per your requirements
    }
}

Comparison with golang.org/x/errgroup and github.com/sourcegraph/conc

Differences from golang.org/x/errgroup
  • Our package does not provide direct support for streaming. However, you can set up pipelines similar to errgroup using our concurrency/pipelines packages, which offer a more formal approach.
Differences from github.com/sourcegraph/conc
  • We offer an advanced WaitGroup that combines the functionalities of conc#WaitGroup and conc/pool#Pool. It provides more comprehensive capabilities than both.
  • Our package supports errors for goroutines instead of panics. Catching panics in goroutines is considered a bad practice. If you encounter panics in your own libraries, it's recommended to fix them if they are non-critical. For third-party panics, catch them in the goroutine and return them as errors.
Additional Features
  • Our package provides support for goroutine reuse, rate limiting, and more through the concurrency/goroutines package (optional).
  • We offer integration with OpenTelemetry logging for advanced debugging.

Acknowledgments

This package, along with github.com/sourcegraph/conc, builds upon the great work done in golang.org/x/errgroup by Bryan Mills.

Documentation

Overview

Package wait provides a safer alternative to sync.WaitGroup. It is an alternative to the errgroup package, but does not implement streaming as that package can. We provide a better alternative to that in our stagepipe framework.

This package can leverage our groutines.Pool types for more control over concurrency and implements OTEL spans to record information around what is happening in your goroutines.

Here is a basic example:

g := wait.Group{Name: "Print  me"}

for i := 0; i < 100; i++ {
	i := i
	g.Go(func(ctx context.Context) error{
		fmt.Println(i)
	}
}

if err := g.Wait(ctx); err != nil {
	// Handle error
}

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type FuncCall

type FuncCall func(ctx context.Context) error

FuncCall is a function call that can be used in various functions or methods in this package.

type Group

type Group struct {

	// Pool is an optional goroutines.Pool for concurrency control and reuse.
	Pool goroutines.Pool
	// CancelOnErr holds a CancelFunc that will be called if any goroutine
	// returns an error. This will automatically be called when Wait() is
	// finished and then reset to nil to allow reuse.
	CancelOnErr context.CancelFunc
	// Name provides an optional name for a WaitGroup for the purpose of
	// OTEL logging information.
	Name string
	// PoolOptions are the options to use when submitting jobs to the Pool.
	// If you have set PoolOptions but have not supplied a pool or the pool
	// doesn't support the option, the result is undefined.
	PoolOptions []goroutines.SubmitOption
	// contains filtered or unexported fields
}

Group provides a Group implementation that allows launching goroutines in safer way by handling the .Add() and .Done() methods in a standard sync.WaitGroup. This prevents problems where you forget to increment or decrement the sync.WaitGroup. In addition you can use a goroutines.Pool object to allow concurrency control and goroutine reuse (if you don't, it just uses a goroutine per call). It provides a Running() method that keeps track of how many goroutines are running. This can be used with the goroutines.Pool stats to understand what goroutines are in use. It has a CancelOnErr() method to allow mimicing of the golang.org/x/sync/errgroup package. Finally we provide OTEL support in the Group that can be named via the Group.Name string. This will provide span messages on the current span when Wait() is called and record any errors in the span.

Example (Cancel_on_err)

CancelOnErr illustrates how to use WaitGroup to do parallel tasks and cancel all remaining tasks if a single task has an error.

ctx, cancel := context.WithCancel(context.Background())
p, _ := pooled.New("poolName", 10)

wg := Group{Pool: p, CancelOnErr: cancel}

for i := 0; i < 10000; i++ {
	i := i

	wg.Go(
		ctx,
		func(ctx context.Context) error {
			if i == 100 {
				return errors.New("error")
			}
			return nil
		},
	)
}

if err := wg.Wait(ctx); err != nil {
	fmt.Println(err)
}
Output:

error
Example (Just_errors)

JustErrors illustrates the use of WaitGroup in place of a sync.WaitGroup to simplify goroutine counting and error handling. This example is derived from This example is derived from errgroup.Group from golang.org/x/sync/errgroup.

ctx := context.Background()
wg := Group{}

var urls = []string{
	"http://www.golang.org/",
	"http://www.google.com/",
	"http://www.somestupidname.com/",
}
for _, url := range urls {
	// Launch a goroutine to fetch the URL.
	url := url // https://golang.org/doc/faq#closures_and_goroutines
	wg.Go(ctx, func(ctx context.Context) error {
		// Fetch the URL.
		resp, err := http.Get(url)
		if err == nil {
			resp.Body.Close()
		}
		return err
	})
}

// Wait for all HTTP fetches to complete.
if err := wg.Wait(ctx); err != nil {
	fmt.Println("Successfully fetched all URLs.")
}
Output:

Example (Parallel)

Parallel illustrates the use of a Group for synchronizing a simple parallel task: the "Google Search 2.0" function from https://talks.golang.org/2012/concurrency.slide#46, augmented with a Context and error-handling. // This example is derived from errgroup.Group from golang.org/x/sync/errgroup.

Google := func(ctx context.Context, query string) ([]Result, error) {
	wg := Group{}

	searches := []Search{Web, Image, Video}
	results := make([]Result, len(searches))
	for i, search := range searches {
		i, search := i, search // https://golang.org/doc/faq#closures_and_goroutines
		wg.Go(ctx, func(context.Context) error {
			result, err := search(ctx, query)
			if err == nil {
				results[i] = result
			}
			return err
		})
	}
	if err := wg.Wait(ctx); err != nil {
		return nil, err
	}
	return results, nil
}

results, err := Google(context.Background(), "golang")
if err != nil {
	fmt.Fprintln(os.Stderr, err)
	return
}
for _, result := range results {
	fmt.Println(result)
}
Output:

web result for "golang"
image result for "golang"
video result for "golang"

func (*Group) Go

func (w *Group) Go(ctx context.Context, f FuncCall)

Go spins off a goroutine that executes f(ctx). This will use the underlying goroutines.Pool if provided. If you have set PoolOptions but have not supplied a pool or the pool doesn't support the option, the result is undefined.

func (*Group) Running

func (w *Group) Running() int

Running returns the number of goroutines that are currently running.

func (*Group) Wait

func (w *Group) Wait(ctx context.Context) error

Wait blocks until all goroutines are finshed. The passed Context cannot be cancelled.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL