nursery

package module
v0.6.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 8, 2021 License: Apache-2.0 Imports: 3 Imported by: 13

README

nursery: structured concurrency in Go

GoDoc GoReportCard CircleCI gopherbadger-tag-do-not-edit

RunConcurrently(
    // Job 1
    func(context.Context, chan error) {
        time.Sleep(time.Millisecond * 10)
        log.Println("Job 1 done...")
    },
    // Job 2
    func(context.Context, chan error) {
        time.Sleep(time.Millisecond * 5)
        log.Println("Job 2 done...")
    },
)
log.Println("All jobs done...")

Installation

go get -u github.com/arunsworld/nursery

Notes on structured concurrency, or: Go statement considered harmful is an article that compares the dangers of goto with the go statement.

While I don't necessarily agree with the entire content I can appreciate that even with Go's high-level abstraction of concurrency using Goroutines, Channels & the select statement it is possible to end up with unreadable code, deadlocks, leaked goroutines, race conditions and poor error handling.

Implementing a higher-level abstraction for the use-cases mentioned is very straightforward in Go and this simple package provides just that.

The following functions are provided:

  • RunConcurrently(jobs ...ConcurrentJob) error: takes an array of ConcurrentJobs and runs them concurrently ensuring that all jobs are completed before the call terminates. If all jobs terminate cleanly error is nil; otherwise the first non-nil error is returned.
  • RunConcurrentlyWithContext(parentCtx context.Context, jobs ...ConcurrentJob) error: is the RunConcurrently behavior but additionally wraps a context that's passed in allowing cancellations of the parentCtx to get propagated.
  • RunMultipleCopiesConcurrently(copies int, job ConcurrentJob) error: makes copies of the given job and runs them concurrently. This is useful for cases where we want to execute multiple slow consumers taking jobs from a channel until the job is finished. The channel itself can be fed by a producer that is run concurrently with the job running the consumers. Each job's context is also passed an unique index with key nursery.JobID - a 0 based int - that maybe used as a job identity if required.
  • RunMultipleCopiesConcurrentlyWithContext(ctx context.Context, copies int, job ConcurrentJob) error: is the RunMultipleCopiesConcurrently behavior with a context that allows cancellation to be propagated to the jobs.
  • RunUntilFirstCompletion(jobs ...ConcurrentJob) error: takes an array of ConcurrentJobs and runs them concurrently but terminates after the completion of the earliest completing job. A key point here is that despite early termination it blocks until all jobs have terminated (ie. released any used resources). If all jobs terminate cleanly error is nil; otherwise the first non-nil error is returned.
  • RunUntilFirstCompletionWithContext(parentCtx context.Context, jobs ...ConcurrentJob) error: is the RunUntilFirstCompletion behavior but additionally wraps a context that's passed in allowing cancellations of the parentCtx to get propagated.
  • RunConcurrentlyWithTimeout(timeout time.Duration, jobs ...ConcurrentJob) error: is similar in behavior to RunConcurrently except it also takes a timeout and can cause the function to terminate earlier if timeout has expired. As before we wait for all jobs to have cleanly terminated.
  • RunUntilFirstCompletionWithTimeout(timeout time.Duration, jobs ...ConcurrentJob) error: is similar in behavior to RunUntilFirstCompletion with an additional timeout clause.

ConcurrentJob is a simple function that takes a context and error channel. We need to ensure that we're listening to the Done() channel on context and if invoked to clean-up resources and bail out. Errors are to be published to the error channel for proper handling.

Note: while this package simplifies the semantics of defining and executing concurrent code it cannot protect against bad concurrent programming such as using shared resources across jobs leading to data corruption or panics due to race conditions.

You may also be interested in reading Structured Concurrency in Go.

The library includes a utility function: IsContextDone(context.Context) to check if the passed in context is done or not. This can be used as a guard clause in a for loop within a ConcurrentJob using the passed in context to decide whether to stop processing and return or continue.

Documentation

Overview

Package nursery implements "structured concurrency" in Go.

It's based on this blog post: https://vorpus.org/blog/notes-on-structured-concurrency-or-go-statement-considered-harmful/

Example
package main

import (
	"context"
	"errors"
	"fmt"
	"log"
	"time"
)

func main() {
	ch := make(chan int)
	err := RunConcurrently(
		// producer job: produce numbers into ch and once done close it
		func(ctx context.Context, errCh chan error) {
			produceNumbers(ctx, ch)
			close(ch)
		},
		// consumer job
		func(ctx context.Context, errCh chan error) {
			// run 5 copies of the consumer reading from ch until closed or err encountered
			err := RunMultipleCopiesConcurrentlyWithContext(ctx, 5,
				func(ctx context.Context, errCh chan error) {
					if err := consumeNumbers(ctx, ch); err != nil {
						errCh <- err
					}
				},
			)
			if err != nil {
				errCh <- err
				// drain the channel to not block the producer in the event of an error
				for range ch {
				}
			}
		},
	)
	if err != nil {
		log.Fatal(err)
	}
}

func produceNumbers(ctx context.Context, ch chan int) {
	for i := 0; i < 200; i++ {
		select {
		case <-ctx.Done():
			fmt.Printf("producer terminating early after sending numbers up to: %d\n", i)
			return
		default:
			time.Sleep(time.Nanosecond * 100)
			ch <- i
		}
	}
	fmt.Println("all numbers produced... now exiting...")
}

func consumeNumbers(ctx context.Context, ch chan int) error {
	jobID := ctx.Value(JobID).(int)
	for v := range ch {
		select {
		case <-ctx.Done():
			fmt.Printf("Job %d terminating early\n", jobID)
			return nil
		default:
			if v == 10 {
				fmt.Printf("Job %d received value 10 which is an error\n", jobID)
				return errors.New("number 10 received")
			}
			fmt.Printf("Job %d received value: %d\n", jobID, v)
			time.Sleep(time.Millisecond * 10)
		}
	}
	fmt.Printf("Job %d finishing up...\n", jobID)
	return nil
}
Output:

Index

Examples

Constants

View Source
const JobID = jobIDKey("id")

JobID is the key used to identify the JobID from the context for jobs running in copies

Variables

This section is empty.

Functions

func IsContextDone added in v0.5.0

func IsContextDone(ctx context.Context) bool

IsContextDone is a utility function to check if the context is Done/Cancelled.

func RunConcurrently

func RunConcurrently(jobs ...ConcurrentJob) error

RunConcurrently runs jobs concurrently until all jobs have either finished or any one job encountered an error.

func RunConcurrentlyWithContext added in v0.3.0

func RunConcurrentlyWithContext(parentCtx context.Context, jobs ...ConcurrentJob) error

RunConcurrentlyWithContext runs jobs concurrently until all jobs have either finished or any one job encountered an error. It wraps the parent context - so if the parent context is Done the jobs get the signal to wrap up

func RunConcurrentlyWithTimeout

func RunConcurrentlyWithTimeout(timeout time.Duration, jobs ...ConcurrentJob) error

RunConcurrentlyWithTimeout runs jobs concurrently until all jobs have either finished or any one job encountered an error. or the timeout has expired

func RunMultipleCopiesConcurrently added in v0.4.0

func RunMultipleCopiesConcurrently(copies int, job ConcurrentJob) error

RunMultipleCopiesConcurrently runs multiple copies of the given job until they have all finished or any one has encountered an error. The passed context can be optionally checked for an int value with key JobID counting up from 0 to identify uniquely the copy that is run.

func RunMultipleCopiesConcurrentlyWithContext added in v0.4.0

func RunMultipleCopiesConcurrentlyWithContext(ctx context.Context, copies int, job ConcurrentJob) error

RunMultipleCopiesConcurrentlyWithContext runs multiple copies of the given job until they have all finished or any one has encountered an error. The passed context can be optionally checked for an int value with key JobID counting up from 0 to identify uniquely the copy that is run. It wraps the parent context - so if the parent context is Done the jobs get the signal to wrap up

func RunUntilFirstCompletion

func RunUntilFirstCompletion(jobs ...ConcurrentJob) error

RunUntilFirstCompletion runs jobs concurrently until atleast one job has finished or any job has encountered an error.

func RunUntilFirstCompletionWithContext added in v0.3.0

func RunUntilFirstCompletionWithContext(parentCtx context.Context, jobs ...ConcurrentJob) error

RunUntilFirstCompletionWithContext runs jobs concurrently until atleast one job has finished or any job has encountered an error.

func RunUntilFirstCompletionWithTimeout

func RunUntilFirstCompletionWithTimeout(timeout time.Duration, jobs ...ConcurrentJob) error

RunUntilFirstCompletionWithTimeout runs jobs concurrently until atleast one job has finished or any job has encountered an error or the timeout has expired.

Types

type ConcurrentJob

type ConcurrentJob func(context.Context, chan error)

ConcurrentJob contains procedural code that can run concurrently to another. Please ensure that you're listening to `context.Done()` - at which point you're required to clean up and exit. Publish any errors into the error channel but note that only the first error across the jobs will be returned. Finally ensure that you're not unsafely modifying shared state without protection and using go's built in channels for communicating rather than sharing memory.

Example
package main

import (
	"context"
	"log"
	"time"

	"github.com/arunsworld/nursery"
)

func main() {
	nursery.RunConcurrently(
		// Job 1
		func(context.Context, chan error) {
			time.Sleep(time.Millisecond * 10)
			log.Println("Job 1 done...")
		},
		// Job 2
		func(context.Context, chan error) {
			time.Sleep(time.Millisecond * 5)
			log.Println("Job 2 done...")
		},
	)
	log.Println("All jobs done...")
}
Output:

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL