Documentation ¶
Overview ¶
Package nursery implements "structured concurrency" in Go.
It's based on this blog post: https://vorpus.org/blog/notes-on-structured-concurrency-or-go-statement-considered-harmful/
Example ¶
package main import ( "context" "errors" "fmt" "log" "time" ) func main() { ch := make(chan int) err := RunConcurrently( // producer job: produce numbers into ch and once done close it func(ctx context.Context, errCh chan error) { produceNumbers(ctx, ch) close(ch) }, // consumer job func(ctx context.Context, errCh chan error) { // run 5 copies of the consumer reading from ch until closed or err encountered err := RunMultipleCopiesConcurrentlyWithContext(ctx, 5, func(ctx context.Context, errCh chan error) { if err := consumeNumbers(ctx, ch); err != nil { errCh <- err } }, ) if err != nil { errCh <- err // drain the channel to not block the producer in the event of an error for range ch { } } }, ) if err != nil { log.Fatal(err) } } func produceNumbers(ctx context.Context, ch chan int) { for i := 0; i < 200; i++ { select { case <-ctx.Done(): fmt.Printf("producer terminating early after sending numbers up to: %d\n", i) return default: time.Sleep(time.Nanosecond * 100) ch <- i } } fmt.Println("all numbers produced... now exiting...") } func consumeNumbers(ctx context.Context, ch chan int) error { jobID := ctx.Value(JobID).(int) for v := range ch { select { case <-ctx.Done(): fmt.Printf("Job %d terminating early\n", jobID) return nil default: if v == 10 { fmt.Printf("Job %d received value 10 which is an error\n", jobID) return errors.New("number 10 received") } fmt.Printf("Job %d received value: %d\n", jobID, v) time.Sleep(time.Millisecond * 10) } } fmt.Printf("Job %d finishing up...\n", jobID) return nil }
Output:
Index ¶
- Constants
- func IsContextDone(ctx context.Context) bool
- func RunConcurrently(jobs ...ConcurrentJob) error
- func RunConcurrentlyWithContext(parentCtx context.Context, jobs ...ConcurrentJob) error
- func RunConcurrentlyWithTimeout(timeout time.Duration, jobs ...ConcurrentJob) error
- func RunMultipleCopiesConcurrently(copies int, job ConcurrentJob) error
- func RunMultipleCopiesConcurrentlyWithContext(ctx context.Context, copies int, job ConcurrentJob) error
- func RunUntilFirstCompletion(jobs ...ConcurrentJob) error
- func RunUntilFirstCompletionWithContext(parentCtx context.Context, jobs ...ConcurrentJob) error
- func RunUntilFirstCompletionWithTimeout(timeout time.Duration, jobs ...ConcurrentJob) error
- type ConcurrentJob
Examples ¶
Constants ¶
const JobID = jobIDKey("id")
JobID is the key used to identify the JobID from the context for jobs running in copies
Variables ¶
This section is empty.
Functions ¶
func IsContextDone ¶ added in v0.5.0
IsContextDone is a utility function to check if the context is Done/Cancelled.
func RunConcurrently ¶
func RunConcurrently(jobs ...ConcurrentJob) error
RunConcurrently runs jobs concurrently until all jobs have either finished or any one job encountered an error.
func RunConcurrentlyWithContext ¶ added in v0.3.0
func RunConcurrentlyWithContext(parentCtx context.Context, jobs ...ConcurrentJob) error
RunConcurrentlyWithContext runs jobs concurrently until all jobs have either finished or any one job encountered an error. It wraps the parent context - so if the parent context is Done the jobs get the signal to wrap up
func RunConcurrentlyWithTimeout ¶
func RunConcurrentlyWithTimeout(timeout time.Duration, jobs ...ConcurrentJob) error
RunConcurrentlyWithTimeout runs jobs concurrently until all jobs have either finished or any one job encountered an error. or the timeout has expired
func RunMultipleCopiesConcurrently ¶ added in v0.4.0
func RunMultipleCopiesConcurrently(copies int, job ConcurrentJob) error
RunMultipleCopiesConcurrently runs multiple copies of the given job until they have all finished or any one has encountered an error. The passed context can be optionally checked for an int value with key JobID counting up from 0 to identify uniquely the copy that is run.
func RunMultipleCopiesConcurrentlyWithContext ¶ added in v0.4.0
func RunMultipleCopiesConcurrentlyWithContext(ctx context.Context, copies int, job ConcurrentJob) error
RunMultipleCopiesConcurrentlyWithContext runs multiple copies of the given job until they have all finished or any one has encountered an error. The passed context can be optionally checked for an int value with key JobID counting up from 0 to identify uniquely the copy that is run. It wraps the parent context - so if the parent context is Done the jobs get the signal to wrap up
func RunUntilFirstCompletion ¶
func RunUntilFirstCompletion(jobs ...ConcurrentJob) error
RunUntilFirstCompletion runs jobs concurrently until atleast one job has finished or any job has encountered an error.
func RunUntilFirstCompletionWithContext ¶ added in v0.3.0
func RunUntilFirstCompletionWithContext(parentCtx context.Context, jobs ...ConcurrentJob) error
RunUntilFirstCompletionWithContext runs jobs concurrently until atleast one job has finished or any job has encountered an error.
func RunUntilFirstCompletionWithTimeout ¶
func RunUntilFirstCompletionWithTimeout(timeout time.Duration, jobs ...ConcurrentJob) error
RunUntilFirstCompletionWithTimeout runs jobs concurrently until atleast one job has finished or any job has encountered an error or the timeout has expired.
Types ¶
type ConcurrentJob ¶
ConcurrentJob contains procedural code that can run concurrently to another. Please ensure that you're listening to `context.Done()` - at which point you're required to clean up and exit. Publish any errors into the error channel but note that only the first error across the jobs will be returned. Finally ensure that you're not unsafely modifying shared state without protection and using go's built in channels for communicating rather than sharing memory.
Example ¶
package main import ( "context" "log" "time" "github.com/arunsworld/nursery" ) func main() { nursery.RunConcurrently( // Job 1 func(context.Context, chan error) { time.Sleep(time.Millisecond * 10) log.Println("Job 1 done...") }, // Job 2 func(context.Context, chan error) { time.Sleep(time.Millisecond * 5) log.Println("Job 2 done...") }, ) log.Println("All jobs done...") }
Output: