flowmatic

package module
v0.23.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 9, 2023 License: MIT Imports: 6 Imported by: 5

README

Flowmatic GoDoc Coverage Status Go Report Card Mentioned in Awesome Go

Flowmatic logo

Flowmatic is a generic Go library that provides a structured approach to concurrent programming. It lets you easily manage concurrent tasks in a manner that is simple, yet effective and flexible.

Flowmatic has an easy to use API with functions for handling common concurrency patterns. It automatically handles spawning workers, collecting errors, and propagating panics.

Flowmatic requires Go 1.20+.

Features

  • Has a simple API that improves readability over channels/waitgroups/mutexes
  • Handles a variety of concurrency problems such as heterogenous task groups, homogenous execution of a task over a slice, and dynamic work spawning
  • Aggregates errors
  • Properly propagates panics across goroutine boundaries
  • Has helpers for context cancelation
  • Few dependencies
  • Good test coverage

How to use Flowmatic

Execute heterogenous tasks

One problem that Flowmatic solves is managing the execution of multiple tasks in parallel that are independent of each other. For example, let's say you want to send data to three different downstream APIs. If any of the sends fail, you want to return an error. With traditional Go concurrency, this can quickly become complex and difficult to manage, with Goroutines, channels, and sync.WaitGroups to keep track of. Flowmatic makes it simple.

To execute heterogenous tasks, just use flowmatic.Do:

flowmatic stdlib
err := flowmatic.Do(
    func() error {
        return doThingA(),
    },
    func() error {
        return doThingB(),
    },
    func() error {
        return doThingC(),
    })
var wg sync.WaitGroup
var errs []error
errChan := make(chan error)

wg.Add(3)
go func() {
    defer wg.Done()
    if err := doThingA(); err != nil {
        errChan <- err
    }
}()
go func() {
    defer wg.Done()
    if err := doThingB(); err != nil {
        errChan <- err
    }
}()
go func() {
    defer wg.Done()
    if err := doThingC(); err != nil {
        errChan <- err
    }
}()

go func() {
    wg.Wait()
    close(errChan)
}()

for err := range errChan {
    errs = append(errs, err)
}

err := errors.Join(errs...)

To create a context for tasks that is canceled on the first error, use flowmatic.All. To create a context for tasks that is canceled on the first success, use flowmatic.Race.

// Make variables to hold responses
var pageA, pageB, pageC string
// Race the requests to see who can answer first
err := flowmatic.Race(ctx,
	func(ctx context.Context) error {
		var err error
		pageA, err = request(ctx, "A")
		return err
	},
	func(ctx context.Context) error {
		var err error
		pageB, err = request(ctx, "B")
		return err
	},
	func(ctx context.Context) error {
		var err error
		pageC, err = request(ctx, "C")
		return err
	},
)
Execute homogenous tasks

flowmatic.Each is useful if you need to execute the same task on each item in a slice using a worker pool:

flowmatic stdlib
things := []someType{thingA, thingB, thingC}

err := flowmatic.Each(numWorkers, things,
    func(thing someType) error {
        foo := thing.Frobincate()
        return foo.DoSomething()
    })
things := []someType{thingA, thingB, thingC}

work := make(chan someType)
errs := make(chan error)

for i := 0; i < numWorkers; i++ {
    go func() {
        for thing := range work {
            // Omitted: panic handling!
            foo := thing.Frobincate()
            errs <- foo.DoSomething()
        }
    }()
}

go func() {
    for _, thing := range things {
            work <- thing
    }

    close(tasks)
}()

var collectedErrs []error
for i := 0; i < len(things); i++ {
    collectedErrs = append(collectedErrs, <-errs)
}

err := errors.Join(collectedErrs...)

Use flowmatic.Map to map an input slice to an output slice.

func main() {
	results, err := Google(context.Background(), "golang")
	if err != nil {
		fmt.Fprintln(os.Stderr, err)
		return
	}
	for _, result := range results {
		fmt.Println(result)
	}
}
flowmatic x/sync/errgroup
func Google(ctx context.Context, query string) ([]Result, error) {
	searches := []Search{Web, Image, Video}
	return flowmatic.Map(ctx, flowmatic.MaxProcs, searches,
		func(ctx context.Context, search Search) (Result, error) {
			return search(ctx, query)
		})
}
func Google(ctx context.Context, query string) ([]Result, error) {
	g, ctx := errgroup.WithContext(ctx)

	searches := []Search{Web, Image, Video}
	results := make([]Result, len(searches))
	for i, search := range searches {
		i, search := i, search // https://golang.org/doc/faq#closures_and_goroutines
		g.Go(func() error {
			result, err := search(ctx, query)
			if err == nil {
				results[i] = result
			}
			return err
		})
	}
	if err := g.Wait(); err != nil {
		return nil, err
	}
	return results, nil
}
Manage tasks that spawn new tasks

For tasks that may create more work, use flowmatic.ManageTasks. Create a manager that will be serially executed, and have it save the results and examine the output of tasks to decide if there is more work to be done.

// Task fetches a page and extracts the URLs
task := func(u string) ([]string, error) {
    page, err := getURL(ctx, u)
    if err != nil {
        return nil, err
    }
    return getLinks(page), nil
}

// Map from page to links
// Doesn't need a lock because only the manager touches it
results := map[string][]string{}
var managerErr error

// Manager keeps track of which pages have been visited and the results graph
manager := func(req string, links []string, err error) ([]string, bool) {
    // Halt execution after the first error
    if err != nil {
        managerErr = err
        return nil, false
    }
    // Save final results in map
    results[req] = urls

    // Check for new pages to scrape
    var newpages []string
    for _, link := range links {
        if _, ok := results[link]; ok {
            // Seen it, try the next link
            continue
        }
        // Add to list of new pages
        newpages = append(newpages, link)
        // Add placeholder to map to prevent double scraping
        results[link] = nil
    }
    return newpages, true
}

// Process the tasks with as many workers as GOMAXPROCS
flowmatic.ManageTasks(flowmatic.MaxProcs, task, manager, "http://example.com/")
// Check if anything went wrong
if managerErr != nil {
    fmt.Println("error", managerErr)
}

Normally, it is very difficult to keep track of concurrent code because any combination of events could occur in any order or simultaneously, and each combination has to be accounted for by the programmer. flowmatic.ManageTasks makes it simple to write concurrent code because everything follows a simple rule: tasks happen concurrently; the manager runs serially.

Centralizing control in the manager makes reasoning about the code radically simpler. When writing locking code, if you have M states and N methods, you need to think about all N states in each of the M methods, giving you an M × N code explosion. By centralizing the logic, the N states only need to be considered in one location: the manager.

Advanced patterns with TaskPool

For very advanced uses, flowmatic.TaskPool takes the boilerplate out of managing a pool of workers. Compare Flowmatic to this example from x/sync/errgroup:

func main() {
	m, err := MD5All(context.Background(), ".")
	if err != nil {
		log.Fatal(err)
	}

	for k, sum := range m {
		fmt.Printf("%s:\t%x\n", k, sum)
	}
}
flowmatic x/sync/errgroup
// MD5All reads all the files in the file tree rooted at root
// and returns a map from file path to the MD5 sum of the file's contents.
// If the directory walk fails or any read operation fails,
// MD5All returns an error.
func MD5All(ctx context.Context, root string) (map[string][md5.Size]byte, error) {
	// Make a pool of 20 digesters
	in, out := flowmatic.TaskPool(20, digest)

	m := make(map[string][md5.Size]byte)
	// Open two goroutines:
	// one for reading file names by walking the filesystem
	// one for recording results from the digesters in a map
	err := flowmatic.All(ctx,
		func(ctx context.Context) error {
			return walkFilesystem(ctx, root, in)
		},
		func(ctx context.Context) error {
			for r := range out {
				if r.Err != nil {
					return r.Err
				}
				m[r.In] = *r.Out
			}
			return nil
		},
	)

	return m, err
}

func walkFilesystem(ctx context.Context, root string, in chan<- string) error {
	defer close(in)

	return filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
		if err != nil {
			return err
		}
		if !info.Mode().IsRegular() {
			return nil
		}
		select {
		case in <- path:
		case <-ctx.Done():
			return ctx.Err()
		}

		return nil
	})
}

func digest(path string) (*[md5.Size]byte, error) {
	data, err := os.ReadFile(path)
	if err != nil {
		return nil, err
	}
	hash := md5.Sum(data)
	return &hash, nil
}
type result struct {
	path string
	sum  [md5.Size]byte
}

// MD5All reads all the files in the file tree rooted at root and returns a map
// from file path to the MD5 sum of the file's contents. If the directory walk
// fails or any read operation fails, MD5All returns an error.
func MD5All(ctx context.Context, root string) (map[string][md5.Size]byte, error) {
	// ctx is canceled when g.Wait() returns. When this version of MD5All returns
	// - even in case of error! - we know that all of the goroutines have finished
	// and the memory they were using can be garbage-collected.
	g, ctx := errgroup.WithContext(ctx)
	paths := make(chan string)

	g.Go(func() error {
		defer close(paths)
		return filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
			if err != nil {
				return err
			}
			if !info.Mode().IsRegular() {
				return nil
			}
			select {
			case paths <- path:
			case <-ctx.Done():
				return ctx.Err()
			}
			return nil
		})
	})

	// Start a fixed number of goroutines to read and digest files.
	c := make(chan result)
	const numDigesters = 20
	for i := 0; i < numDigesters; i++ {
		g.Go(func() error {
			for path := range paths {
				data, err := ioutil.ReadFile(path)
				if err != nil {
					return err
				}
				select {
				case c <- result{path, md5.Sum(data)}:
				case <-ctx.Done():
					return ctx.Err()
				}
			}
			return nil
		})
	}
	go func() {
		g.Wait()
		close(c)
	}()

	m := make(map[string][md5.Size]byte)
	for r := range c {
		m[r.path] = r.sum
	}
	// Check whether any of the goroutines failed. Since g is accumulating the
	// errors, we don't need to send them (or check for them) in the individual
	// results sent on the channel.
	if err := g.Wait(); err != nil {
		return nil, err
	}
	return m, nil
}

Note on panicking

In Go, if there is a panic in a goroutine, and that panic is not recovered, then the whole process is shutdown. There are pros and cons to this approach. The pro is that if the panic is the symptom of a programming error in the application, no further damage can be done by the application. The con is that in many cases, this leads to a shutdown in a situation that might be recoverable.

As a result, although the Go standard HTTP server will catch panics that occur in one of its HTTP handlers and continue serving requests, but the server cannot catch panics that occur in separate goroutines, and these will cause the whole server to go offline.

Flowmatic fixes this problem by catching a panic that occurs in one of its worker goroutines and repropagating it in the parent goroutine, so the panic can be caught and logged at the appropriate level.

Documentation

Overview

Package flowmatic contains easy-to-use, generic, concurrent task runners.

Comparison of simple helpers:

       Tasks       Cancels Context?   Collect results?
Do     Different   No                 No
All    Different   On error           No
Race   Different   On success         No
Each   Same        No                 No
Map    Same        On error           Yes

ManageTasks and TaskPool allow for advanced concurrency patterns.

Index

Examples

Constants

View Source
const MaxProcs = -1

MaxProcs means use GOMAXPROCS workers when doing tasks.

Variables

This section is empty.

Functions

func All added in v0.23.3

func All(ctx context.Context, tasks ...func(context.Context) error) error

All runs each task concurrently and waits for them all to finish. Each task receives a child context which is canceled once one task returns an error or panics. All returns nil if all tasks succeed. Otherwise, All returns a multierror containing the errors encountered. If a task panics during execution, a panic will be caught and rethrown in the parent Goroutine.

Example
ctx := context.Background()
start := time.Now()
err := flowmatic.All(ctx,
	func(ctx context.Context) error {
		// This task sleeps then returns an error
		d := 1 * time.Millisecond
		time.Sleep(d)
		fmt.Println("slept for", d)
		return fmt.Errorf("abort after %v", d)
	},
	func(ctx context.Context) error {
		// sleepFor is a cancelable time.Sleep.
		// The error of first task
		// causes the early cancelation of this one.
		if !sleepFor(ctx, 1*time.Minute) {
			fmt.Println("canceled")
		}
		return nil
	},
)
fmt.Println("err:", err)
fmt.Println("exited early?", time.Since(start) < 10*time.Millisecond)
Output:

slept for 1ms
canceled
err: abort after 1ms
exited early? true

func Do

func Do(tasks ...func() error) error

Do runs each task concurrently and waits for them all to finish. Errors returned by tasks do not cancel execution, but are joined into a multierror return value. If a task panics during execution, a panic will be caught and rethrown in the parent Goroutine.

Example
package main

import (
	"fmt"
	"time"

	"github.com/carlmjohnson/flowmatic"
)

func main() {
	start := time.Now()
	err := flowmatic.Do(
		func() error {
			time.Sleep(50 * time.Millisecond)
			fmt.Println("hello")
			return nil
		}, func() error {
			time.Sleep(100 * time.Millisecond)
			fmt.Println("world")
			return nil
		}, func() error {
			time.Sleep(200 * time.Millisecond)
			fmt.Println("from flowmatic.Do")
			return nil
		})
	if err != nil {
		fmt.Println("error", err)
	}
	fmt.Println("executed concurrently?", time.Since(start) < 250*time.Millisecond)
}
Output:

hello
world
from flowmatic.Do
executed concurrently? true

func Each

func Each[Input any](numWorkers int, items []Input, task func(Input) error) error

Each starts numWorkers concurrent workers (or GOMAXPROCS workers if numWorkers < 1) and processes each item as a task. Errors returned by a task do not halt execution, but are joined into a multierror return value. If a task panics during execution, the panic will be caught and rethrown in the parent Goroutine.

Example
package main

import (
	"fmt"
	"time"

	"github.com/carlmjohnson/flowmatic"
)

func main() {
	times := []time.Duration{
		50 * time.Millisecond,
		100 * time.Millisecond,
		200 * time.Millisecond,
	}
	start := time.Now()
	err := flowmatic.Each(3, times, func(d time.Duration) error {
		time.Sleep(d)
		fmt.Println("slept", d)
		return nil
	})
	if err != nil {
		fmt.Println("error", err)
	}
	fmt.Println("executed concurrently?", time.Since(start) < 300*time.Millisecond)
}
Output:

slept 50ms
slept 100ms
slept 200ms
executed concurrently? true

func ManageTasks

func ManageTasks[Input, Output any](numWorkers int, task Task[Input, Output], manager Manager[Input, Output], initial ...Input)

ManageTasks manages tasks using numWorkers concurrent workers (or GOMAXPROCS workers if numWorkers < 1) which produce output consumed by a serially run manager. The manager should return a slice of new task inputs based on prior task results, or return false to halt processing. If a task panics during execution, the panic will be caught and rethrown in the parent Goroutine.

Example
package main

import (
	"fmt"
	"io"
	"net/http"
	"net/http/httptest"
	"slices"
	"strings"
	"testing/fstest"

	"github.com/carlmjohnson/flowmatic"
)

func main() {
	// Example site to crawl with recursive links
	srv := httptest.NewServer(http.FileServer(http.FS(fstest.MapFS{
		"index.html": &fstest.MapFile{
			Data: []byte("/a.html"),
		},
		"a.html": &fstest.MapFile{
			Data: []byte("/b1.html\n/b2.html"),
		},
		"b1.html": &fstest.MapFile{
			Data: []byte("/c.html"),
		},
		"b2.html": &fstest.MapFile{
			Data: []byte("/c.html"),
		},
		"c.html": &fstest.MapFile{
			Data: []byte("/"),
		},
	})))
	defer srv.Close()
	cl := srv.Client()

	// Task fetches a page and extracts the URLs
	task := func(u string) ([]string, error) {
		res, err := cl.Get(srv.URL + u)
		if err != nil {
			return nil, err
		}
		defer res.Body.Close()
		body, err := io.ReadAll(res.Body)
		if err != nil {
			return nil, err
		}

		return strings.Split(string(body), "\n"), nil
	}

	// Manager keeps track of which pages have been visited and the results graph
	tried := map[string]int{}
	results := map[string][]string{}
	manager := func(req string, urls []string, err error) ([]string, bool) {
		if err != nil {
			// If there's a problem fetching a page, try three times
			if tried[req] < 3 {
				tried[req]++
				return []string{req}, true
			}
			return nil, false
		}
		results[req] = urls
		var newurls []string
		for _, u := range urls {
			if tried[u] == 0 {
				newurls = append(newurls, u)
				tried[u]++
			}
		}
		return newurls, true
	}

	// Process the tasks with as many workers as GOMAXPROCS
	flowmatic.ManageTasks(flowmatic.MaxProcs, task, manager, "/")

	keys := make([]string, 0, len(results))
	for key := range results {
		keys = append(keys, key)
	}
	slices.Sort(keys)
	for _, key := range keys {
		fmt.Println(key, "links to:")
		for _, v := range results[key] {
			fmt.Println("- ", v)
		}
	}

}
Output:

/ links to:
-  /a.html
/a.html links to:
-  /b1.html
-  /b2.html
/b1.html links to:
-  /c.html
/b2.html links to:
-  /c.html
/c.html links to:
-  /

func Map added in v0.23.3

func Map[Input, Output any](ctx context.Context, numWorkers int, items []Input, task func(context.Context, Input) (Output, error)) (results []Output, err error)

Map starts numWorkers concurrent workers (or GOMAXPROCS workers if numWorkers < 1) and attempts to map the input slice to an output slice. Each task receives a child context. The first error or panic returned by a task cancels the child context and halts further task scheduling. If a task panics during execution, the panic will be caught and rethrown in the parent Goroutine.

Example
package main

import (
	"context"
	"fmt"
	"os"

	"github.com/carlmjohnson/flowmatic"
)

var (
	Web   = fakeSearch("web")
	Image = fakeSearch("image")
	Video = fakeSearch("video")
)

type Result string
type Search func(ctx context.Context, query string) (Result, error)

func fakeSearch(kind string) Search {
	return func(_ context.Context, query string) (Result, error) {
		return Result(fmt.Sprintf("%s result for %q", kind, query)), nil
	}
}

func Google(ctx context.Context, query string) ([]Result, error) {
	searches := []Search{Web, Image, Video}
	return flowmatic.Map(ctx, flowmatic.MaxProcs, searches,
		func(ctx context.Context, search Search) (Result, error) {
			return search(ctx, query)
		})
}

func main() {
	// Compare to https://pkg.go.dev/golang.org/x/sync/errgroup#example-Group-Parallel
	// and https://pkg.go.dev/sync#example-WaitGroup
	results, err := Google(context.Background(), "golang")
	if err != nil {
		fmt.Fprintln(os.Stderr, err)
		return
	}

	for _, result := range results {
		fmt.Println(result)
	}

}
Output:

web result for "golang"
image result for "golang"
video result for "golang"
Example (Simple)
package main

import (
	"context"
	"fmt"
	"strconv"

	"github.com/carlmjohnson/flowmatic"
)

func main() {
	ctx := context.Background()

	// Start with some slice of input work
	input := []string{"0", "1", "42", "1337"}
	// Have a task that takes a context
	decodeAndDouble := func(ctx context.Context, s string) (int, error) {
		// Do some work
		n, err := strconv.Atoi(s)
		if err != nil {
			return 0, err
		}
		// Return early if context was canceled
		if ctx.Err() != nil {
			return 0, ctx.Err()
		}
		// Do more work
		return 2 * n, nil
	}
	// Concurrently process input into output
	output, err := flowmatic.Map(ctx, flowmatic.MaxProcs, input, decodeAndDouble)
	if err != nil {
		fmt.Println(err)
	}
	fmt.Println(output)
}
Output:

[0 2 84 2674]

func Race added in v0.23.3

func Race(ctx context.Context, tasks ...func(context.Context) error) error

Race runs each task concurrently and waits for them all to finish. Each function receives a child context which is canceled once one function has successfully completed or panicked. Race returns nil if at least one function completes without an error. If all functions return an error, Race returns a multierror containing all the errors. If a function panics during execution, a panic will be caught and rethrown in the parent Goroutine.

Example
ctx := context.Background()
start := time.Now()
err := flowmatic.Race(ctx,
	func(ctx context.Context) error {
		// This task sleeps for only 1ms
		d := 1 * time.Millisecond
		time.Sleep(d)
		fmt.Println("slept for", d)
		return nil
	},
	func(ctx context.Context) error {
		// This task wants to sleep for a whole minute.
		d := 1 * time.Minute
		// But sleepFor is a cancelable time.Sleep.
		// So when the other task completes,
		// it cancels this one, causing it to return early.
		if !sleepFor(ctx, d) {
			fmt.Println("canceled")
		}
		// The error here is ignored
		// because the other task succeeded
		return errors.New("ignored")
	},
)
// Err is nil as long as one task succeeds
fmt.Println("err:", err)
fmt.Println("exited early?", time.Since(start) < 10*time.Millisecond)
Output:

slept for 1ms
canceled
err: <nil>
exited early? true
Example (FakeRequest)
// Setup fake requests
request := func(ctx context.Context, page string) (string, error) {
	var sleepLength time.Duration
	switch page {
	case "A":
		sleepLength = 10 * time.Millisecond
	case "B":
		sleepLength = 100 * time.Millisecond
	case "C":
		sleepLength = 10 * time.Second
	}
	if !sleepFor(ctx, sleepLength) {
		return "", ctx.Err()
	}
	return "got " + page, nil
}
ctx := context.Background()
// Make variables to hold responses
var pageA, pageB, pageC string
// Race the requests to see who can answer first
err := flowmatic.Race(ctx,
	func(ctx context.Context) error {
		var err error
		pageA, err = request(ctx, "A")
		return err
	},
	func(ctx context.Context) error {
		var err error
		pageB, err = request(ctx, "B")
		return err
	},
	func(ctx context.Context) error {
		var err error
		pageC, err = request(ctx, "C")
		return err
	},
)
fmt.Println("err:", err)
fmt.Printf("A: %q B: %q C: %q\n", pageA, pageB, pageC)
Output:

err: <nil>
A: "got A" B: "" C: ""

func TaskPool

func TaskPool[Input, Output any](numWorkers int, task Task[Input, Output]) (in chan<- Input, out <-chan Result[Input, Output])

TaskPool starts numWorkers workers (or GOMAXPROCS workers if numWorkers < 1) which consume the in channel, execute task, and send the Result on the out channel. Callers should close the in channel to stop the workers from waiting for tasks. The out channel will be closed once the last result has been sent.

Example
package main

import (
	"context"
	"crypto/md5"
	"fmt"
	"log"
	"os"
	"path/filepath"

	"github.com/carlmjohnson/flowmatic"
)

func main() {
	// Compare to https://pkg.go.dev/golang.org/x/sync/errgroup#example-Group-Pipeline and https://blog.golang.org/pipelines

	m, err := MD5All(context.Background(), "testdata/md5all")
	if err != nil {
		log.Fatal(err)
	}

	for k, sum := range m {
		fmt.Printf("%s:\t%x\n", k, sum)
	}

}

// MD5All reads all the files in the file tree rooted at root
// and returns a map from file path to the MD5 sum of the file's contents.
// If the directory walk fails or any read operation fails,
// MD5All returns an error.
func MD5All(ctx context.Context, root string) (map[string][md5.Size]byte, error) {
	// Make a pool of 20 digesters
	in, out := flowmatic.TaskPool(20, digest)

	m := make(map[string][md5.Size]byte)
	// Open two goroutines:
	// one for reading file names by walking the filesystem
	// one for recording results from the digesters in a map
	err := flowmatic.All(ctx,
		func(ctx context.Context) error {
			return walkFilesystem(ctx, root, in)
		},
		func(ctx context.Context) error {
			for r := range out {
				if r.Err != nil {
					return r.Err
				}
				m[r.In] = *r.Out
			}
			return nil
		},
	)

	return m, err
}

func walkFilesystem(ctx context.Context, root string, in chan<- string) error {
	defer close(in)

	return filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
		if err != nil {
			return err
		}
		if !info.Mode().IsRegular() {
			return nil
		}
		select {
		case in <- path:
		case <-ctx.Done():
			return ctx.Err()
		}

		return nil
	})
}

func digest(path string) (*[md5.Size]byte, error) {
	data, err := os.ReadFile(path)
	if err != nil {
		return nil, err
	}
	hash := md5.Sum(data)
	return &hash, nil
}
Output:

testdata/md5all/hello.txt:	bea8252ff4e80f41719ea13cdf007273

Types

type Manager

type Manager[Input, Output any] func(Input, Output, error) (tasks []Input, ok bool)

Manager is a function that serially examines Task results to see if it produced any new Inputs. Returning false will halt the processing of future tasks.

type Result

type Result[Input, Output any] struct {
	In    Input
	Out   Output
	Err   error
	Panic any
}

Result is the type returned by the output channel of TaskPool.

type Task

type Task[Input, Output any] func(in Input) (out Output, err error)

Task is a function that can concurrently transform an input into an output.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL