sync: golang.org/x/sync/errgroup Index | Examples | Files

package errgroup

import "golang.org/x/sync/errgroup"

Package errgroup provides synchronization, error propagation, and Context cancelation for groups of goroutines working on subtasks of a common task.

Index

Examples

Package Files

errgroup.go

type Group Uses

type Group struct {
    // contains filtered or unexported fields
}

A Group is a collection of goroutines working on subtasks that are part of the same overall task.

A zero Group is valid and does not cancel on error.

JustErrors illustrates the use of a Group in place of a sync.WaitGroup to simplify goroutine counting and error handling. This example is derived from the sync.WaitGroup example at https://golang.org/pkg/sync/#example_WaitGroup.

Code:

var g errgroup.Group
var urls = []string{
    "http://www.golang.org/",
    "http://www.google.com/",
    "http://www.somestupidname.com/",
}
for _, url := range urls {
    // Launch a goroutine to fetch the URL.
    url := url // https://golang.org/doc/faq#closures_and_goroutines
    g.Go(func() error {
        // Fetch the URL.
        resp, err := http.Get(url)
        if err == nil {
            resp.Body.Close()
        }
        return err
    })
}
// Wait for all HTTP fetches to complete.
if err := g.Wait(); err == nil {
    fmt.Println("Successfully fetched all URLs.")
}

Parallel illustrates the use of a Group for synchronizing a simple parallel task: the "Google Search 2.0" function from https://talks.golang.org/2012/concurrency.slide#46, augmented with a Context and error-handling.

Code:

Google := func(ctx context.Context, query string) ([]Result, error) {
    g, ctx := errgroup.WithContext(ctx)

    searches := []Search{Web, Image, Video}
    results := make([]Result, len(searches))
    for i, search := range searches {
        i, search := i, search // https://golang.org/doc/faq#closures_and_goroutines
        g.Go(func() error {
            result, err := search(ctx, query)
            if err == nil {
                results[i] = result
            }
            return err
        })
    }
    if err := g.Wait(); err != nil {
        return nil, err
    }
    return results, nil
}

results, err := Google(context.Background(), "golang")
if err != nil {
    fmt.Fprintln(os.Stderr, err)
    return
}
for _, result := range results {
    fmt.Println(result)
}

Output:

web result for "golang"
image result for "golang"
video result for "golang"

Pipeline demonstrates the use of a Group to implement a multi-stage pipeline: a version of the MD5All function with bounded parallelism from https://blog.golang.org/pipelines.

Code:

package main

import (
    "crypto/md5"
    "fmt"
    "io/ioutil"
    "log"
    "os"
    "path/filepath"

    "golang.org/x/net/context"
    "golang.org/x/sync/errgroup"
)

// Pipeline demonstrates the use of a Group to implement a multi-stage
// pipeline: a version of the MD5All function with bounded parallelism from
// https://blog.golang.org/pipelines.
func main() {
    m, err := MD5All(context.Background(), ".")
    if err != nil {
        log.Fatal(err)
    }

    for k, sum := range m {
        fmt.Printf("%s:\t%x\n", k, sum)
    }
}

type result struct {
    path string
    sum  [md5.Size]byte
}

// MD5All reads all the files in the file tree rooted at root and returns a map
// from file path to the MD5 sum of the file's contents. If the directory walk
// fails or any read operation fails, MD5All returns an error.
func MD5All(ctx context.Context, root string) (map[string][md5.Size]byte, error) {
    // ctx is canceled when g.Wait() returns. When this version of MD5All returns
    // - even in case of error! - we know that all of the goroutines have finished
    // and the memory they were using can be garbage-collected.
    g, ctx := errgroup.WithContext(ctx)
    paths := make(chan string)

    g.Go(func() error {
        defer close(paths)
        return filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
            if err != nil {
                return err
            }
            if !info.Mode().IsRegular() {
                return nil
            }
            select {
            case paths <- path:
            case <-ctx.Done():
                return ctx.Err()
            }
            return nil
        })
    })

    // Start a fixed number of goroutines to read and digest files.
    c := make(chan result)
    const numDigesters = 20
    for i := 0; i < numDigesters; i++ {
        g.Go(func() error {
            for path := range paths {
                data, err := ioutil.ReadFile(path)
                if err != nil {
                    return err
                }
                select {
                case c <- result{path, md5.Sum(data)}:
                case <-ctx.Done():
                    return ctx.Err()
                }
            }
            return nil
        })
    }
    go func() {
        g.Wait()
        close(c)
    }()

    m := make(map[string][md5.Size]byte)
    for r := range c {
        m[r.path] = r.sum
    }
    // Check whether any of the goroutines failed. Since g is accumulating the
    // errors, we don't need to send them (or check for them) in the individual
    // results sent on the channel.
    if err := g.Wait(); err != nil {
        return nil, err
    }
    return m, nil
}

func WithContext Uses

func WithContext(ctx context.Context) (*Group, context.Context)

WithContext returns a new Group and an associated Context derived from ctx.

The derived Context is canceled the first time a function passed to Go returns a non-nil error or the first time Wait returns, whichever occurs first.

func (*Group) Go Uses

func (g *Group) Go(f func() error)

Go calls the given function in a new goroutine.

The first call to return a non-nil error cancels the group; its error will be returned by Wait.

func (*Group) Wait Uses

func (g *Group) Wait() error

Wait blocks until all function calls from the Go method have returned, then returns the first non-nil error (if any) from them.

Package errgroup imports 2 packages (graph) and is imported by 148 packages. Updated 2017-07-05. Refresh now. Tools for package owners.