workerpool

package module
v0.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 28, 2023 License: MIT Imports: 7 Imported by: 0

README

workerpool

Coverage

Simple goroutine pool. Limits the concurrency of task execution.

Installation

$ go get github.com/cdragon1116/workerpool

Usage

  • Create a new pool with max concurrency count.
pool := workerpool.NewWorkerPool(10)
  • Add Task

Sends a new worker to execute a task asynchronously. It blocks when active workers reached maximum config size. It returns error when interrupt by context or workerpool closed.

err := pool.AddTask(context.Background(), func() {
    fmt.Println("hello world!")
})
  • Wait for all task to complete
pool.WaitForTaskCompleted(context.Background())
  • Close pool

Stop waits for all active workers finish then stop the workerpool. Once workerpool is closed, it can not be open again. It takes a context to interrupt the waiting.

pool.Stop(context.Background())

Example

ctx := context.Background()

// initialize a workerpool with max size = 10
pool := workerpool.NewWorkerPool(10)

// create a wait group to wait for task to enqueue
wg := new(sync.WaitGroup)

for i := 0; i < 15; i++ {
    wg.Add(1)

    go func() {
        defer wg.Done()

        pool.AddTask(context.Background(), func() {
            fmt.Println("hello world!")
            time.Sleep(1 * time.Second)
        })
    }()
}

// wait for all tasks enqueued
wg.Wait()

// wait for all tasks complete and exit
pool.Stop(ctx)

Custom Configurations

  • custom panic handler
handFunc := func(interface{}) { fmt.Println("panic occurs!") }
pool := workerpool.NewWorkerPool(10, workerpool.PanicHandler(handFunc))

Documentation

Overview

Package workerpool ...

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrPoolClosed ...
	ErrPoolClosed = errors.New("pool closed")
	// ErrContextInterrupt ...
	ErrContextInterrupt = errors.New("context interrupt")
)

Functions

This section is empty.

Types

type Option

type Option func(*WorkerPool)

Option represents an option that can be passed when instantiating a worker pool to customize it

func PanicHandler

func PanicHandler(panicHandler func(interface{})) Option

PanicHandler allows to change the panic handler function of a worker pool

type WorkerPool

type WorkerPool struct {
	// Configurable settings
	MaxWorkers   int32
	PanicHandler func(interface{})
	// contains filtered or unexported fields
}

WorkerPool ...

func NewWorkerPool

func NewWorkerPool(maxWorkers int32, options ...Option) *WorkerPool

NewWorkerPool ... maxWorkers defines the maximum workers that can be execute tasks concurrently.

func (*WorkerPool) AddTask

func (w *WorkerPool) AddTask(ctx context.Context, fn func()) error

AddTask sends a new goroutine to execute a task asynchronously. It will block when activeWorkers reached maximum config size. It takes context to interrupt the waiting.

func (*WorkerPool) Closed

func (w *WorkerPool) Closed() bool

Closed indicates workerpool closed

func (*WorkerPool) Stop

func (w *WorkerPool) Stop(ctx context.Context) error

Stop waits for all active workers finish then stop the workerpool. It takes context to interrupt the waiting. Once workerpool is closed, it can not be open again.

func (*WorkerPool) WaitForTaskCompleted

func (w *WorkerPool) WaitForTaskCompleted()

WaitForTaskCompleted waits for all active workers finish

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL