Documentation ¶
Overview ¶
Package flowmatic contains easy-to-use, generic, concurrent task runners.
Comparison of simple helpers:
Tasks Cancels Context? Collect results? Do Different No No All Different On error No Race Different On success No Each Same No No Map Same On error Yes
ManageTasks and TaskPool allow for advanced concurrency patterns.
Index ¶
- Constants
- func All(ctx context.Context, tasks ...func(context.Context) error) error
- func Do(tasks ...func() error) error
- func Each[Input any](numWorkers int, items []Input, task func(Input) error) error
- func ManageTasks[Input, Output any](numWorkers int, task Task[Input, Output], manager Manager[Input, Output], ...)
- func Map[Input, Output any](ctx context.Context, numWorkers int, items []Input, ...) (results []Output, err error)
- func Race(ctx context.Context, tasks ...func(context.Context) error) error
- func TaskPool[Input, Output any](numWorkers int, task Task[Input, Output]) (in chan<- Input, out <-chan Result[Input, Output])
- type Manager
- type Result
- type Task
Examples ¶
Constants ¶
const MaxProcs = -1
MaxProcs means use GOMAXPROCS workers when doing tasks.
Variables ¶
This section is empty.
Functions ¶
func All ¶ added in v0.23.3
All runs each task concurrently and waits for them all to finish. Each task receives a child context which is canceled once one task returns an error or panics. All returns nil if all tasks succeed. Otherwise, All returns a multierror containing the errors encountered. If a task panics during execution, a panic will be caught and rethrown in the parent Goroutine.
Example ¶
ctx := context.Background() start := time.Now() err := flowmatic.All(ctx, func(ctx context.Context) error { // This task sleeps then returns an error d := 1 * time.Millisecond time.Sleep(d) fmt.Println("slept for", d) return fmt.Errorf("abort after %v", d) }, func(ctx context.Context) error { // sleepFor is a cancelable time.Sleep. // The error of first task // causes the early cancelation of this one. if !sleepFor(ctx, 1*time.Minute) { fmt.Println("canceled") } return nil }, ) fmt.Println("err:", err) fmt.Println("exited early?", time.Since(start) < 10*time.Millisecond)
Output: slept for 1ms canceled err: abort after 1ms exited early? true
func Do ¶
Do runs each task concurrently and waits for them all to finish. Errors returned by tasks do not cancel execution, but are joined into a multierror return value. If a task panics during execution, a panic will be caught and rethrown in the parent Goroutine.
Example ¶
package main import ( "fmt" "time" "github.com/carlmjohnson/flowmatic" ) func main() { start := time.Now() err := flowmatic.Do( func() error { time.Sleep(50 * time.Millisecond) fmt.Println("hello") return nil }, func() error { time.Sleep(100 * time.Millisecond) fmt.Println("world") return nil }, func() error { time.Sleep(200 * time.Millisecond) fmt.Println("from flowmatic.Do") return nil }) if err != nil { fmt.Println("error", err) } fmt.Println("executed concurrently?", time.Since(start) < 250*time.Millisecond) }
Output: hello world from flowmatic.Do executed concurrently? true
func Each ¶
Each starts numWorkers concurrent workers (or GOMAXPROCS workers if numWorkers < 1) and processes each item as a task. Errors returned by a task do not halt execution, but are joined into a multierror return value. If a task panics during execution, the panic will be caught and rethrown in the parent Goroutine.
Example ¶
package main import ( "fmt" "time" "github.com/carlmjohnson/flowmatic" ) func main() { times := []time.Duration{ 50 * time.Millisecond, 100 * time.Millisecond, 200 * time.Millisecond, } start := time.Now() err := flowmatic.Each(3, times, func(d time.Duration) error { time.Sleep(d) fmt.Println("slept", d) return nil }) if err != nil { fmt.Println("error", err) } fmt.Println("executed concurrently?", time.Since(start) < 300*time.Millisecond) }
Output: slept 50ms slept 100ms slept 200ms executed concurrently? true
func ManageTasks ¶
func ManageTasks[Input, Output any](numWorkers int, task Task[Input, Output], manager Manager[Input, Output], initial ...Input)
ManageTasks manages tasks using numWorkers concurrent workers (or GOMAXPROCS workers if numWorkers < 1) which produce output consumed by a serially run manager. The manager should return a slice of new task inputs based on prior task results, or return false to halt processing. If a task panics during execution, the panic will be caught and rethrown in the parent Goroutine.
Example ¶
package main import ( "fmt" "io" "net/http" "net/http/httptest" "slices" "strings" "testing/fstest" "github.com/carlmjohnson/flowmatic" ) func main() { // Example site to crawl with recursive links srv := httptest.NewServer(http.FileServer(http.FS(fstest.MapFS{ "index.html": &fstest.MapFile{ Data: []byte("/a.html"), }, "a.html": &fstest.MapFile{ Data: []byte("/b1.html\n/b2.html"), }, "b1.html": &fstest.MapFile{ Data: []byte("/c.html"), }, "b2.html": &fstest.MapFile{ Data: []byte("/c.html"), }, "c.html": &fstest.MapFile{ Data: []byte("/"), }, }))) defer srv.Close() cl := srv.Client() // Task fetches a page and extracts the URLs task := func(u string) ([]string, error) { res, err := cl.Get(srv.URL + u) if err != nil { return nil, err } defer res.Body.Close() body, err := io.ReadAll(res.Body) if err != nil { return nil, err } return strings.Split(string(body), "\n"), nil } // Manager keeps track of which pages have been visited and the results graph tried := map[string]int{} results := map[string][]string{} manager := func(req string, urls []string, err error) ([]string, bool) { if err != nil { // If there's a problem fetching a page, try three times if tried[req] < 3 { tried[req]++ return []string{req}, true } return nil, false } results[req] = urls var newurls []string for _, u := range urls { if tried[u] == 0 { newurls = append(newurls, u) tried[u]++ } } return newurls, true } // Process the tasks with as many workers as GOMAXPROCS flowmatic.ManageTasks(flowmatic.MaxProcs, task, manager, "/") keys := make([]string, 0, len(results)) for key := range results { keys = append(keys, key) } slices.Sort(keys) for _, key := range keys { fmt.Println(key, "links to:") for _, v := range results[key] { fmt.Println("- ", v) } } }
Output: / links to: - /a.html /a.html links to: - /b1.html - /b2.html /b1.html links to: - /c.html /b2.html links to: - /c.html /c.html links to: - /
func Map ¶ added in v0.23.3
func Map[Input, Output any](ctx context.Context, numWorkers int, items []Input, task func(context.Context, Input) (Output, error)) (results []Output, err error)
Map starts numWorkers concurrent workers (or GOMAXPROCS workers if numWorkers < 1) and attempts to map the input slice to an output slice. Each task receives a child context. The first error or panic returned by a task cancels the child context and halts further task scheduling. If a task panics during execution, the panic will be caught and rethrown in the parent Goroutine.
Example ¶
package main import ( "context" "fmt" "os" "github.com/carlmjohnson/flowmatic" ) var ( Web = fakeSearch("web") Image = fakeSearch("image") Video = fakeSearch("video") ) type Result string type Search func(ctx context.Context, query string) (Result, error) func fakeSearch(kind string) Search { return func(_ context.Context, query string) (Result, error) { return Result(fmt.Sprintf("%s result for %q", kind, query)), nil } } func Google(ctx context.Context, query string) ([]Result, error) { searches := []Search{Web, Image, Video} return flowmatic.Map(ctx, flowmatic.MaxProcs, searches, func(ctx context.Context, search Search) (Result, error) { return search(ctx, query) }) } func main() { // Compare to https://pkg.go.dev/golang.org/x/sync/errgroup#example-Group-Parallel // and https://pkg.go.dev/sync#example-WaitGroup results, err := Google(context.Background(), "golang") if err != nil { fmt.Fprintln(os.Stderr, err) return } for _, result := range results { fmt.Println(result) } }
Output: web result for "golang" image result for "golang" video result for "golang"
Example (Simple) ¶
package main import ( "context" "fmt" "strconv" "github.com/carlmjohnson/flowmatic" ) func main() { ctx := context.Background() // Start with some slice of input work input := []string{"0", "1", "42", "1337"} // Have a task that takes a context decodeAndDouble := func(ctx context.Context, s string) (int, error) { // Do some work n, err := strconv.Atoi(s) if err != nil { return 0, err } // Return early if context was canceled if ctx.Err() != nil { return 0, ctx.Err() } // Do more work return 2 * n, nil } // Concurrently process input into output output, err := flowmatic.Map(ctx, flowmatic.MaxProcs, input, decodeAndDouble) if err != nil { fmt.Println(err) } fmt.Println(output) }
Output: [0 2 84 2674]
func Race ¶ added in v0.23.3
Race runs each task concurrently and waits for them all to finish. Each function receives a child context which is canceled once one function has successfully completed or panicked. Race returns nil if at least one function completes without an error. If all functions return an error, Race returns a multierror containing all the errors. If a function panics during execution, a panic will be caught and rethrown in the parent Goroutine.
Example ¶
ctx := context.Background() start := time.Now() err := flowmatic.Race(ctx, func(ctx context.Context) error { // This task sleeps for only 1ms d := 1 * time.Millisecond time.Sleep(d) fmt.Println("slept for", d) return nil }, func(ctx context.Context) error { // This task wants to sleep for a whole minute. d := 1 * time.Minute // But sleepFor is a cancelable time.Sleep. // So when the other task completes, // it cancels this one, causing it to return early. if !sleepFor(ctx, d) { fmt.Println("canceled") } // The error here is ignored // because the other task succeeded return errors.New("ignored") }, ) // Err is nil as long as one task succeeds fmt.Println("err:", err) fmt.Println("exited early?", time.Since(start) < 10*time.Millisecond)
Output: slept for 1ms canceled err: <nil> exited early? true
Example (FakeRequest) ¶
// Setup fake requests request := func(ctx context.Context, page string) (string, error) { var sleepLength time.Duration switch page { case "A": sleepLength = 10 * time.Millisecond case "B": sleepLength = 100 * time.Millisecond case "C": sleepLength = 10 * time.Second } if !sleepFor(ctx, sleepLength) { return "", ctx.Err() } return "got " + page, nil } ctx := context.Background() // Make variables to hold responses var pageA, pageB, pageC string // Race the requests to see who can answer first err := flowmatic.Race(ctx, func(ctx context.Context) error { var err error pageA, err = request(ctx, "A") return err }, func(ctx context.Context) error { var err error pageB, err = request(ctx, "B") return err }, func(ctx context.Context) error { var err error pageC, err = request(ctx, "C") return err }, ) fmt.Println("err:", err) fmt.Printf("A: %q B: %q C: %q\n", pageA, pageB, pageC)
Output: err: <nil> A: "got A" B: "" C: ""
func TaskPool ¶
func TaskPool[Input, Output any](numWorkers int, task Task[Input, Output]) (in chan<- Input, out <-chan Result[Input, Output])
TaskPool starts numWorkers workers (or GOMAXPROCS workers if numWorkers < 1) which consume the in channel, execute task, and send the Result on the out channel. Callers should close the in channel to stop the workers from waiting for tasks. The out channel will be closed once the last result has been sent.
Example ¶
package main import ( "context" "crypto/md5" "fmt" "log" "os" "path/filepath" "github.com/carlmjohnson/flowmatic" ) func main() { // Compare to https://pkg.go.dev/golang.org/x/sync/errgroup#example-Group-Pipeline and https://blog.golang.org/pipelines m, err := MD5All(context.Background(), "testdata/md5all") if err != nil { log.Fatal(err) } for k, sum := range m { fmt.Printf("%s:\t%x\n", k, sum) } } // MD5All reads all the files in the file tree rooted at root // and returns a map from file path to the MD5 sum of the file's contents. // If the directory walk fails or any read operation fails, // MD5All returns an error. func MD5All(ctx context.Context, root string) (map[string][md5.Size]byte, error) { // Make a pool of 20 digesters in, out := flowmatic.TaskPool(20, digest) m := make(map[string][md5.Size]byte) // Open two goroutines: // one for reading file names by walking the filesystem // one for recording results from the digesters in a map err := flowmatic.All(ctx, func(ctx context.Context) error { return walkFilesystem(ctx, root, in) }, func(ctx context.Context) error { for r := range out { if r.Err != nil { return r.Err } m[r.In] = *r.Out } return nil }, ) return m, err } func walkFilesystem(ctx context.Context, root string, in chan<- string) error { defer close(in) return filepath.Walk(root, func(path string, info os.FileInfo, err error) error { if err != nil { return err } if !info.Mode().IsRegular() { return nil } select { case in <- path: case <-ctx.Done(): return ctx.Err() } return nil }) } func digest(path string) (*[md5.Size]byte, error) { data, err := os.ReadFile(path) if err != nil { return nil, err } hash := md5.Sum(data) return &hash, nil }
Output: testdata/md5all/hello.txt: bea8252ff4e80f41719ea13cdf007273
Types ¶
type Manager ¶
Manager is a function that serially examines Task results to see if it produced any new Inputs. Returning false will halt the processing of future tasks.