workerpool

package module
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 18, 2022 License: BSD-2-Clause Imports: 3 Imported by: 0

README

github.com/tniswong/workerpool

Go Docs Unit Tests Coverage Status

This package provides a concurrent worker pool implementation using a semaphore for bounded concurrency

package main

import (
    "context"
    "fmt"
    "github.com/tniswong/workerpool"
    "time"
)

func NewCounterTask(name string, limit int) *CounterTask {
    return &CounterTask{name: name, limit: limit}
}

type CounterTask struct {
    name  string
    count int
    limit int
}

func (c *CounterTask) Invoke(ctx context.Context) error {

loop:
    for {

        select {
        case <-ctx.Done():
            return ctx.Err()
        case <-time.Tick(1 * time.Second / 2):

            c.count++
            fmt.Printf("name: %v, count:%v\n", c.name, c.count)

            if c.count >= c.limit {
                break loop
            }

        }

    }

    return nil

}

func main() {

    wp := workerpool.New(2)
    ctx, cancel := context.WithCancel(context.Background())
    
    go wp.Run(ctx) // runs until context is cancelled

    wp.Push(NewCounterTask("task 1", 2))
    wp.Push(NewCounterTask("task 2", 3))

    wp.Wait() // blocks until all pending tasks are complete, but does not stop workerpool goroutine
    cancel() // stops the workerpool
    
    // Unordered output:
    // name: task 1, count:1
    // name: task 2, count:1
    // name: task 2, count:2
    // name: task 1, count:2
    // name: task 2, count:3

}

Documentation

Overview

Example
package main

import (
	"context"
	"fmt"
	"github.com/tniswong/workerpool"
	"time"
)

func NewCounterTask(name string, limit int) *CounterTask {
	return &CounterTask{name: name, limit: limit}
}

type CounterTask struct {
	name  string
	count int
	limit int
}

func (c *CounterTask) Invoke(ctx context.Context) error {

loop:
	for {

		select {
		case <-ctx.Done():
			return ctx.Err()
		case <-time.Tick(1 * time.Second / 2):

			c.count++
			fmt.Printf("name: %v, count:%v\n", c.name, c.count)

			if c.count >= c.limit {
				break loop
			}

		}

	}

	return nil

}

func main() {

	wp := workerpool.New(2)
	ctx, cancel := context.WithCancel(context.Background())

	go wp.Run(ctx) // runs until context is cancelled

	wp.Push(NewCounterTask("task 1", 2))
	wp.Push(NewCounterTask("task 2", 3))

	wp.Wait() // blocks until all pending tasks are complete, but does not stop workerpool goroutine
	cancel()  // stops the workerpool

}
Output:

name: task 1, count:1
name: task 2, count:1
name: task 2, count:2
name: task 1, count:2
name: task 2, count:3

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Task

type Task interface {
	Invoke(ctx context.Context) error
}

Task represents a unit of work for the WorkerPool

type TaskOption

type TaskOption func(j *job)

TaskOption allows for customization of task behavior

func Retry

func Retry(b bool) TaskOption

Retry will cause a Task to be invoked again if an error is returned. By default, a Task will be retried indefinitely until either the Task terminates successfully or the WorkerPool is stopped by context cancellation. To place limits on this behavior, see the RetryMax(n int) TaskOption

func RetryMax

func RetryMax(n int) TaskOption

RetryMax will cause a Task to be invoked again error is returned, with a limit of `n` total retries

type WorkerPool

type WorkerPool struct {
	// contains filtered or unexported fields
}

WorkerPool is a concurrent worker pool implementation based on a semaphore

func New

func New(n int64) WorkerPool

New is a constructor function for WorkerPool. `n` specifies the number of parallel workers

func (*WorkerPool) Push

func (p *WorkerPool) Push(t Task, opts ...TaskOption)

Push adds a Task to the queue with the provided options

func (*WorkerPool) Run

func (p *WorkerPool) Run(ctx context.Context)

Run the WorkerPool. To stop processing, cancel the context

func (WorkerPool) Wait

func (p WorkerPool) Wait()

Wait blocks until each Task in the queue completes, or the context passed to Run is cancelled

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL