workgroups

package module
v0.5.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 17, 2022 License: MIT Imports: 6 Imported by: 0

README

workgroups

Package workgroups is a little helper for creating workers with the help of sync.Errgroup.

build coverage report

readme

Variables

ErrInWorker is an error that gets returned if there is a error in the work function.

var ErrInWorker = errors.New("received error in worker")

Types

type Dispatcher

type Dispatcher struct { ... }

Dispatcher carries the job queue, the errgroup and the number of workers to start.

func (*Dispatcher) Append

func (d *Dispatcher) Append(job Job)

Append adds a job to the work queue.

func (*Dispatcher) Close

func (d *Dispatcher) Close()

Close closes the queue channel.

func (*Dispatcher) Start

func (d *Dispatcher) Start()

Start starts the configured number of workers and waits for jobs.

func (*Dispatcher) Wait

func (d *Dispatcher) Wait() error

Wait for all jobs to finnish.

type Job

type Job struct { ... }

Job carries a job with everything it needs. I know know that contexts shouldnt be stored in a struct. Here is an exception, because its a short living object. The context is only used as argument for the Work function. Please use the NewJob function to get around this context in struct shenanigans.

type Work

type Work func(ctx context.Context) error

Work is a type that defines worker work.

Examples

package main

import (
	"context"
	"fmt"
	"github.com/go-logr/stdr"
	"go.xsfx.dev/workgroups"
	"log"
	"os"
	"runtime"
)

func main() {
	d, ctx := workgroups.NewDispatcher(
		context.Background(),
		stdr.New(log.New(os.Stderr, "", log.Lshortfile)),
		runtime.GOMAXPROCS(0), // This starts as much worker as maximal processes are allowed for go.
		10,                    // Capacity of the queue.
	)

	work := func(ctx context.Context) error {
		// Check if context already expired.
		// Return if its the case, else just go forward.
		select {
		case <-ctx.Done():
			return fmt.Errorf("got error from context: %w", ctx.Err())
		default:
		}

		// Some wannebe work... printing something.
		fmt.Print("hello world from work")

		return nil
	}

	// Starting up the workers.
	d.Start()

	// Feeding the workers some work.
	d.Append(workgroups.NewJob(ctx, work))

	// Closing the channel for work.
	d.Close()

	// Waiting to finnish everything.
	if err := d.Wait(); err != nil {
		log.Fatal(err)
	}

}

Output:

hello world from work
Retry

Retry is a middleware for doing a retry in executing job work.

package main

import (
	"context"
	"errors"
	"fmt"
	"github.com/go-logr/stdr"
	"go.xsfx.dev/workgroups"
	"log"
	"os"
	"runtime"
	"time"
)

func main() {
	d, ctx := workgroups.NewDispatcher(
		context.Background(),
		stdr.New(log.New(os.Stderr, "", log.Lshortfile)),
		runtime.GOMAXPROCS(0), // This starts as much worker as maximal processes are allowed for go.
		10,                    // Capacity of the queue.
	)

	// Just returning some error. So it can retry.
	failFunc := func() error {
		fmt.Print("fail ")

		return errors.New("fail") //nolint:goerr113
	}

	work := func(ctx context.Context) error {
		// Check if context already expired.
		// Return if its the case, else just go forward.
		select {
		case <-ctx.Done():
			return fmt.Errorf("got error from context: %w", ctx.Err())
		default:
		}

		if err := failFunc(); err != nil {
			return err
		}

		return nil
	}

	// Starting up the workers.
	d.Start()

	ctx, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()

	// Feeding the workers some work.
	d.Append(
		workgroups.NewJob(
			ctx,
			workgroups.Retry(ctx, time.Second/2)(work), // This will retry after a half second.
		),
	)

	// Closing the channel for work.
	d.Close()

	// Waiting to finnish everything.
	if err := d.Wait(); err != nil {
		fmt.Print(err)
	}

}

Output:

fail fail error on waiting: got error from context: context deadline exceeded

Documentation

Overview

Package workgroups is a little helper for creating workers with the help of (sync.Errgroup) https://pkg.go.dev/golang.org/x/sync/errgroup.

(image/build) https://ci.xsfx.dev/api/badges/xsteadfastx/workgroups/status.svg (image/coverage) https://codecov.io/gh/xsteadfastx/workgroups/branch/main/graph/badge.svg?token=RZE1ZWJSYA (image/report) https://goreportcard.com/badge/go.xsfx.dev/workgroups

(image/readme) https://git.xsfx.dev/xsteadfastx/workgroups/raw/branch/main/README.gif

Example
package main

import (
	"context"
	"fmt"
	"log"
	"os"
	"runtime"

	"github.com/go-logr/stdr"
	"go.xsfx.dev/workgroups"
)

func main() {
	d, ctx := workgroups.NewDispatcher(
		context.Background(),
		stdr.New(log.New(os.Stderr, "", log.Lshortfile)),
		runtime.GOMAXPROCS(0), // This starts as much worker as maximal processes are allowed for go.
		10,                    // Capacity of the queue.
	)

	work := func(ctx context.Context) error {
		// Check if context already expired.
		// Return if its the case, else just go forward.
		select {
		case <-ctx.Done():
			return fmt.Errorf("got error from context: %w", ctx.Err())
		default:
		}

		// Some wannebe work... printing something.
		fmt.Print("hello world from work")

		return nil
	}

	// Starting up the workers.
	d.Start()

	// Feeding the workers some work.
	d.Append(workgroups.NewJob(ctx, work))

	// Closing the channel for work.
	d.Close()

	// Waiting to finnish everything.
	if err := d.Wait(); err != nil {
		log.Fatal(err)
	}

}
Output:

hello world from work

Index

Examples

Constants

This section is empty.

Variables

View Source
var ErrInWorker = errors.New("received error in worker")

ErrInWorker is an error that gets returned if there is a error in the work function.

Functions

func Retry added in v0.4.0

func Retry(ctx context.Context, wait time.Duration) func(Work) Work

Retry is a middleware for doing a retry in executing job work.

Example
package main

import (
	"context"
	"errors"
	"fmt"
	"log"
	"os"
	"runtime"
	"time"

	"github.com/go-logr/stdr"
	"go.xsfx.dev/workgroups"
)

func main() {
	d, ctx := workgroups.NewDispatcher(
		context.Background(),
		stdr.New(log.New(os.Stderr, "", log.Lshortfile)),
		runtime.GOMAXPROCS(0), // This starts as much worker as maximal processes are allowed for go.
		10,                    // Capacity of the queue.
	)

	// Just returning some error. So it can retry.
	failFunc := func() error {
		fmt.Print("fail ")

		return errors.New("fail") //nolint:goerr113
	}

	work := func(ctx context.Context) error {
		// Check if context already expired.
		// Return if its the case, else just go forward.
		select {
		case <-ctx.Done():
			return fmt.Errorf("got error from context: %w", ctx.Err())
		default:
		}

		if err := failFunc(); err != nil {
			return err
		}

		return nil
	}

	// Starting up the workers.
	d.Start()

	ctx, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()

	// Feeding the workers some work.
	d.Append(
		workgroups.NewJob(
			ctx,
			workgroups.Retry(ctx, time.Second/2)(work), // This will retry after a half second.
		),
	)

	// Closing the channel for work.
	d.Close()

	// Waiting to finnish everything.
	if err := d.Wait(); err != nil {
		fmt.Print(err)
	}

}
Output:

fail fail error on waiting: got error from context: context deadline exceeded

Types

type Dispatcher

type Dispatcher struct {
	// contains filtered or unexported fields
}

Dispatcher carries the job queue, the errgroup and the number of workers to start.

func NewDispatcher

func NewDispatcher(ctx context.Context, log logr.Logger, numWorkers, workLength int) (*Dispatcher, context.Context)

NewDispatcher creates a new Dispatcher. It takes a context and adds it to the errgroup creation and returns it again.

func (*Dispatcher) Append

func (d *Dispatcher) Append(job Job)

Append adds a job to the work queue.

func (*Dispatcher) Close

func (d *Dispatcher) Close()

Close closes the queue channel.

func (*Dispatcher) Start

func (d *Dispatcher) Start()

Start starts the configured number of workers and waits for jobs.

func (*Dispatcher) Wait

func (d *Dispatcher) Wait() error

Wait for all jobs to finnish.

type Job

type Job struct {
	// contains filtered or unexported fields
}

Job carries a job with everything it needs. I know know that contexts shouldnt be stored in a struct. Here is an exception, because its a short living object. The context is only used as argument for the Work function. Please use the NewJob function to get around this context in struct shenanigans.

func NewJob

func NewJob(ctx context.Context, work Work) Job

NewJob creates a new Job to send down the work queue with context and all that stuff.

type Work

type Work func(ctx context.Context) error

Work is a type that defines worker work.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL