errgroup

package module
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 9, 2021 License: BSD-3-Clause Imports: 5 Imported by: 0

README

errgroup

Go Reference ci GoReportCard

This repository provides errgroup fork as seen in https://go-review.googlesource.com/c/sync/+/134395/3/errgroup/errgroup.go#100

Documentation

Overview

Package errgroup provides synchronization, error propagation, and Context cancelation for groups of goroutines working on subtasks of a common task.

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Group

type Group struct {
	// contains filtered or unexported fields
}

A Group is a collection of goroutines working on subtasks that are part of the same overall task.

A zero Group is valid and does not cancel on error.

Example (JustErrors)

JustErrors illustrates the use of a Group in place of a sync.WaitGroup to simplify goroutine counting and error handling. This example is derived from the sync.WaitGroup example at https://golang.org/pkg/sync/#example_WaitGroup.

package main

import (
	"fmt"
	"github.com/DmitriyMV/errgroup"
	"net/http"
)

func main() {
	var g errgroup.Group
	defer g.Stop()
	var urls = []string{
		"http://www.golang.org/",
		"http://www.google.com/",
		"http://www.somestupidname.com/",
	}
	for _, url := range urls {
		// Launch a goroutine to fetch the URL.
		url := url // https://golang.org/doc/faq#closures_and_goroutines
		g.Go(func() error {
			// Fetch the URL.
			resp, err := http.Get(url)
			if err == nil {
				resp.Body.Close()
			}
			return err
		})
	}
	// Wait for all HTTP fetches to complete.
	if err := g.Wait(); err == nil {
		fmt.Println("Successfully fetched all URLs.")
	}
}
Output:

Example (Parallel)

Parallel illustrates the use of a Group for synchronizing a simple parallel task: the "Google Search 2.0" function from https://talks.golang.org/2012/concurrency.slide#46, augmented with a Context and error-handling.

package main

import (
	"context"
	"fmt"
	"github.com/DmitriyMV/errgroup"
	"os"
)

var (
	Web   = fakeSearch("web")
	Image = fakeSearch("image")
	Video = fakeSearch("video")
)

type Result string
type Search func(ctx context.Context, query string) (Result, error)

func fakeSearch(kind string) Search {
	return func(_ context.Context, query string) (Result, error) {
		return Result(fmt.Sprintf("%s result for %q", kind, query)), nil
	}
}

func main() {
	Google := func(ctx context.Context, query string) ([]Result, error) {
		g, ctx := errgroup.New(ctx)
		defer g.Stop()
		searches := []Search{Web, Image, Video}
		results := make([]Result, len(searches))
		for i, search := range searches {
			i, search := i, search // https://golang.org/doc/faq#closures_and_goroutines
			g.Go(func() error {
				result, err := search(ctx, query)
				if err == nil {
					results[i] = result
				}
				return err
			})
		}
		if err := g.Wait(); err != nil {
			return nil, err
		}
		return results, nil
	}
	results, err := Google(context.Background(), "golang")
	if err != nil {
		fmt.Fprintln(os.Stderr, err)
		return
	}
	for _, result := range results {
		fmt.Println(result)
	}
}
Output:

web result for "golang"
image result for "golang"
video result for "golang"
Example (Pipeline)

Pipeline demonstrates the use of a Group to implement a multi-stage pipeline: a version of the MD5All function with bounded parallelism from https://blog.golang.org/pipelines.

package main

import (
	"context"
	"crypto/md5"
	"fmt"
	"io/ioutil"
	"log"
	"os"
	"path/filepath"

	"github.com/DmitriyMV/errgroup"
)

// Pipeline demonstrates the use of a Group to implement a multi-stage
// pipeline: a version of the MD5All function with bounded parallelism from
// https://blog.golang.org/pipelines.
func main() {
	m, err := MD5All(context.Background(), ".")
	if err != nil {
		log.Fatal(err)
	}
	for k, sum := range m {
		fmt.Printf("%s:\t%x\n", k, sum)
	}
}

type result struct {
	path string
	sum  [md5.Size]byte
}
type token struct{}

// MD5All reads all the files in the file tree rooted at root and returns a map
// from file path to the MD5 sum of the file's contents. If the directory walk
// fails or any read operation fails, MD5All returns an error.
func MD5All(ctx context.Context, root string) (map[string][md5.Size]byte, error) {
	// ctx is canceled when g.Stop is called or any goroutine returns an error.
	// When this version of MD5All returns — even in case of error! — we know that
	// all of the goroutines have finished and the memory they were using can be
	// garbage-collected.
	g, ctx := errgroup.New(ctx)
	defer g.Stop()
	paths := make(chan string)
	g.Go(func() error {
		defer close(paths)
		return filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
			if err != nil {
				return err
			}
			if !info.Mode().IsRegular() {
				return nil
			}
			select {
			case paths <- path:
			case <-ctx.Done():
				return ctx.Err()
			}
			return nil
		})
	})
	// Read and digest files concurrently,
	// storing the results so far in a channel buffer.
	results := make(chan map[string][md5.Size]byte, 1)
	results <- make(map[string][md5.Size]byte)
	// Use a semaphore to bound the number of files in flight.
	const maxInFlight = 20
	semaphore := make(chan token, maxInFlight)
	for path := range paths {
		select {
		case <-ctx.Done():
			// We can return immediately without leaving any goroutines behind:
			// the 'defer g.Stop()' above will finish cleaning up.
			return nil, ctx.Err()
		case semaphore <- token{}:
		}
		path := path
		g.Go(func() error {
			defer func() { <-semaphore }()
			data, err := ioutil.ReadFile(path)
			if err != nil {
				return err
			}
			sum := md5.Sum(data)
			m := <-results
			m[path] = sum
			results <- m
			return nil
		})
	}
	// Check whether any of the goroutines failed. Since g is accumulating the
	// errors, we don't need to send them (or check for them) in the individual
	// results sent on the channel.
	if err := g.Wait(); err != nil {
		return nil, err
	}
	return <-results, nil
}
Output:

func New

func New(ctx context.Context) (*Group, context.Context)

New returns a new Group and an associated Context derived from ctx.

The derived Context is canceled if any goroutine in the group returns a non-nil error, panics, or invokes runtime.Goexit, or if any other goroutine calls Stop on the returned Group.

Stopping the Group releases resources associated with its Context, so code should call Stop as soon as the Group is no longer needed.

func WithContext deprecated

func WithContext(ctx context.Context) (*Group, context.Context)

WithContext returns a new Group and an associated Context derived from ctx.

The derived Context is canceled if any goroutine in the group returns a non-nil error, panics, or invokes runtime.Goexit, if any other goroutine calls Stop on the returned Group, or the first time Wait returns — whichever occurs first.

Deprecated: use New instead, and defer a call to Stop to clean up.

func (*Group) Go

func (g *Group) Go(f func() error)

Go calls the given function in a new goroutine, adding that goroutine to the group.

The first goroutine in the group that returns a non-nil error, panics, or invokes runtime.Goexit will cancel the group.

func (*Group) Stop

func (g *Group) Stop()

Stop cancels the Context associated with g, if any, then waits for all goroutines started by the Go method to exit.

func (*Group) Wait

func (g *Group) Wait() error

Wait blocks until all goroutines in the group have exited.

If any goroutine panicked or invoked runtime.Goexit, Wait panics with a corresponding value or invokes runtime.Goexit.

Otherwise, Wait returns the first non-nil error (if any) returned by any of the functions passed to Go.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL