bounded

package module
v0.0.0-...-c70879d Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 1, 2019 License: MIT Imports: 3 Imported by: 0

README

bounded Documentation

Bounded Goroutine Management

bounded.Pool is a bounded goroutine manager. Pool provides:

  • Ensures goroutines spawned to be within the limit
  • Lazily spawns new goroutines when ones in the pool are busy, up to the limit
  • Captures the first non-nil error in the pool
  • Notifies other goroutines through the cancellation the returned context
  • Ability to wait for all goroutines to complete
  • Insignificant to no overhead compared to hand rolled worker pools
go get github.com/JackyChiu/bounded

Example

pool, ctx := bounded.NewPool(context.Background(), 20)

for {
  select {
    case message := <-stream:
      pool.Go(func () error {
        // process message
      })
    case <-ctx.Done():
      // continue and check pool for error
  }
}

if err := pool.Wait(); err != nil {
  // handle error
}

For a more complete example checkout the examples directory.

Why

Go encourages programming with concurrent design, allowing us to execute independent tasks with goroutines. Much programs end up having boundless concurrency and as a result they comes with producing a significant amount of overhead.

This package is a attempt at providing an thin API (along with synchronization/ error capturing built-in) to allow developers to continue programming with the same mental model without concern for the overhead.

The API and error capturing was inspired by the errgroup package.

Synchronization And Error Capture

Pool provides simple synchronization and error capturing abilities. It provides an API to wait for all tasks in the pool to complete and exit with Wait(). If an error occurs in the pool, it's capture and the goroutines are notified via context.Context.Done(). The main goroutine can also use this to tell if it should stop producing work to the pool. The first error captured is returned.

Lazy

Pool lazily spawns workers in the pool as tasks are queued up. Tasks are favored to be completed by an existing worker. If all workers are busy then it will spawn a new worker and enqueue the task again. This behaviour is ongoing until the size of the pool has reached it's limit.

pool, ctx := bounded.NewPool(context.Background(), HUGE_NUMBER)

for message := range stream {
  pool.Go(func() error {
    // process message
  })
}

// Pool will reuse goroutines. New goroutines are spawned when the workers
// are busy, up to the pool limit.

Gotchas

Deadlocks

Calls to Pool.Go will block when a worker isn't available. This is a issue when you're designing a producer/consumer type system where you want to spawned workers to produce results and consume the results them in the same goroutine.

results := make(chan results)
for message := range stream {
  // This can block and cause a deadlock becauase nothing is able to consume
  // from the results channel.
  pool.Go(func() error {
    // process message to sent to results channel
  })
}
go func() {
  pool.Wait()
  close(results)
}

for r := range results {
  // process results
}

Instead consider have a separate goroutine to consume the messages and spawn the workers from there. This will also allow you to have a goroutine responsible for closing the results channel.

results := make(chan results)
go func() {
  defer close(results)
  for message := range stream {
    pool.Go(func() error {
      // process message to send to results channel
    })
  }
  pool.Wait()
}()

for r := range results {
  // process results
}

if err := pool.Wait(); err != nil {
  // handle error
}
Performance

In the producer/consumer model shown above, there is only one goroutine consuming messages from the messages channel. Compared to possible hand roll solutions that have multiple goroutines reading from the channel. This applies back pressure to the producer at the cost of the producer being blocked. To increase performance of the producer give the messages channel a buffer so that it isn't as blocked as often.

pool, ctx := bounded.NewPool(context.Background(), 20)

messages := make(chan message, 10)
pool.Go(func() error {
  defer close(messages)
  return produceMessages(messages)
})

// Consume messages

Documentation

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Pool

type Pool struct {
	// contains filtered or unexported fields
}

Pool is a bounded goroutine manager. It ensures that goroutines spawned are within the given limit. The benefit being the ability to think and write go programs without worrying about the overhead of spawning too much goroutines.

Pool provides some simple synchronization and error capturing abilities too. Developers can wait for all goroutines in the pool to complete and exit with Wait(). The first error captured is returned.

Pool lazily spawns workers in the pool as tasks are queued up. Tasks are favored to be completed by an existing worker. If all workers are busy then it will spawn a new worker and enqueue the task again. This behaviour is ongoing until the size of the pool has reached it's limit.

Example (Parallel)

Parallel illustrates the use of a Group for synchronizing a simple parallel task: the "Google Search 2.0" function from https://talks.golang.org/2012/concurrency.slide#46, augmented with a Context and error-handling.

package main

import (
	"context"
	"fmt"
	"os"

	"github.com/JackyChiu/bounded"
)

var (
	Web   = fakeSearch("web")
	Image = fakeSearch("image")
	Video = fakeSearch("video")
)

type Result string
type Search func(ctx context.Context, query string) (Result, error)

func fakeSearch(kind string) Search {
	return func(_ context.Context, query string) (Result, error) {
		return Result(fmt.Sprintf("%s result for %q", kind, query)), nil
	}
}

// Parallel illustrates the use of a Group for synchronizing a simple parallel
// task: the "Google Search 2.0" function from
// https://talks.golang.org/2012/concurrency.slide#46, augmented with a Context
// and error-handling.
func main() {
	Google := func(ctx context.Context, query string) ([]Result, error) {
		pool, ctx := bounded.NewPool(ctx, 3)

		searches := []Search{Web, Image, Video}
		results := make([]Result, len(searches))
		for i, search := range searches {
			i, search := i, search // https://golang.org/doc/faq#closures_and_goroutines
			pool.Go(func() error {
				result, err := search(ctx, query)
				if err == nil {
					results[i] = result
				}
				return err
			})
		}
		if err := pool.Wait(); err != nil {
			return nil, err
		}
		return results, nil
	}

	results, err := Google(context.Background(), "golang")
	if err != nil {
		fmt.Fprintln(os.Stderr, err)
		return
	}
	for _, result := range results {
		fmt.Println(result)
	}

}
Output:

web result for "golang"
image result for "golang"
video result for "golang"

func NewPool

func NewPool(ctx context.Context, poolSize uint32) (*Pool, context.Context)

NewPool returns a Pool instances and a new context. The number of goroutines spawned are limited by the given max capacity. The new context includes cancellations from the goroutines in the Pool.

func (*Pool) Go

func (p *Pool) Go(task func() error)

Go will enqueue the task for execution by one of goroutines in the pool. Calls to Go will spin up workers lazily, as the workers are blocked, new workers will be spawned until the goroutine limit has been reached.

func (*Pool) Size

func (p *Pool) Size() uint32

Size is the number of goroutines running in the pool.

func (*Pool) Wait

func (p *Pool) Wait() error

Wait waits for the tasks and worker goroutines in the pool to exit. The first error to occur in the pool is returned, if any.

Directories

Path Synopsis
examples

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL