workerpool

package module
v1.0.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 20, 2022 License: MIT Imports: 4 Imported by: 0

README

Golang Worker Pool

run tests codecov goreportcard

Inspired from Java Thread Pool, Go WorkerPool aims to control heavy Go Routines.

Installation

The simplest way to install the library is to run:

go get github.com/zenthangplus/go-workerpool

Example

package main

import (
	"fmt"
	"github.com/zenthangplus/go-workerpool"
)

func main() {
	// Init worker pool with 3 workers to run concurrently.
	pool := workerpool.NewFixedSize(3)

	// Start worker pool
	pool.Start()

	// pool.Submit will block until slot available in Pool queue. 
	// Submit an identifiable job, ID will be generated randomly (using UUID)
	pool.Submit(workerpool.NewIdentifiableJob(func() {
		// Do a heavy job
	}))
	// Use NewCustomIdentifierJob if you don't want ID to be generated randomly
	pool.Submit(workerpool.NewCustomIdentifierJob("custom-id", func() { 
		// Do a heavy job
	}))
	// or Submit a simple function without identifier
	pool.SubmitFunc(func() {// simpler way of: Submit(FuncJob(func() {})) 
		// Do a heavy job
	})

	// pool.SubmitConfidently will submit a job in confident mode, 
	// this function will return ErrPoolFull when Pool queue is full.
	err := pool.SubmitConfidently(workerpool.NewIdentifiableJob(func() {
		// Do a heavy job
	}))
	if err == workerpool.ErrPoolFull {
		fmt.Println("Pool is full")
	}
}

Usage

package main

import (
	"fmt"
	"github.com/zenthangplus/go-workerpool"
)

func main() {
	// Initiate worker pool with fixed size. Eg: 3 workers to run concurrently.
	pool := workerpool.NewFixedSize(3)

	// Or initiate fixed size worker pool with custom options.
	pool = workerpool.NewFixedSize(3,
		// When you want to custom mode
		workerpool.WithMode(workerpool.FixedSize),
		
		// When you want to custom number of workers
		workerpool.WithNumberWorkers(5),
		
		// When you want to customize capacity
		workerpool.WithCapacity(6),
		
		// When you want to custom log function
		workerpool.WithLogFunc(func(msgFormat string, args ...interface{}) {
			fmt.Printf(msgFormat+"\n", args...)
		}),
	)

	// Start worker pool
	pool.Start()
	
	// Init a functional job with ID is generated randomly
	job1 := workerpool.NewIdentifiableJob(func() {})

	// init a functional job with predefined ID
	job2 := workerpool.NewCustomIdentifierJob("test-an-id", func() {})

	// Submit job in normal mode, it will block until pool has available slot.
	pool.Submit(job1)
	
	// or Submit a simple function
	pool.SubmitFunc(func() {})
	
	// Submit in confident mode, it will return ErrPoolFull when pool is full. 
	err := pool.SubmitConfidently(job2)
	if err != nil {
		fmt.Print(err)
	}
}

// CompressDirJob
// You can create a custom Job by implement `Job` interface
type CompressDirJob struct {
	directory string
}

func NewCompressDirJob(directory string) *CompressDirJob {
	return &CompressDirJob{directory: directory}
}

func (c CompressDirJob) Id() string {
	return "directory-" + c.directory
}

func (c CompressDirJob) Exec() {
	// Do compress directory
}

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrPoolFull = errors.New("pool is full")

Functions

This section is empty.

Types

type FuncJob

type FuncJob func()

func (FuncJob) Exec

func (f FuncJob) Exec()

func (FuncJob) Id

func (f FuncJob) Id() string

type IdentifiableJob

type IdentifiableJob struct {
	// contains filtered or unexported fields
}

func NewCustomIdentifierJob

func NewCustomIdentifierJob(id string, execFunc func()) *IdentifiableJob

func NewIdentifiableJob

func NewIdentifiableJob(execFunc func()) *IdentifiableJob

func (IdentifiableJob) Exec

func (c IdentifiableJob) Exec()

func (IdentifiableJob) Id

func (c IdentifiableJob) Id() string

type Job

type Job interface {
	Id() string
	Exec()
}

type LogFunc

type LogFunc func(msgFormat string, args ...interface{})

type Mode

type Mode int
const (
	FixedSize Mode = iota
	FlexibleSize
)

type Option

type Option struct {
	Mode          Mode
	Capacity      int
	NumberWorkers int
	LogFunc       LogFunc
}

type OptionFunc

type OptionFunc func(opt *Option)

func WithCapacity

func WithCapacity(capacity int) OptionFunc

func WithLogFunc

func WithLogFunc(logFunc LogFunc) OptionFunc

func WithMode

func WithMode(mode Mode) OptionFunc

func WithNumberWorkers

func WithNumberWorkers(numberWorkers int) OptionFunc

type Pool

type Pool struct {
	// contains filtered or unexported fields
}

func New

func New(option *Option) *Pool

func NewFixedSize

func NewFixedSize(numberWorkers int, optionFunc ...OptionFunc) *Pool

func (Pool) AssignedJobs

func (p Pool) AssignedJobs() int

func (Pool) Capacity

func (p Pool) Capacity() int

func (*Pool) Start

func (p *Pool) Start()

func (*Pool) Submit

func (p *Pool) Submit(job Job)

Submit a job. This will block until slot available in Pool queue.

func (*Pool) SubmitConfidently

func (p *Pool) SubmitConfidently(job Job) error

SubmitConfidently submit a job in confidently mode. This will return ErrPoolFull when Pool queue is full.

func (*Pool) SubmitFunc

func (p *Pool) SubmitFunc(f func())

SubmitFunc a func job. Fast way to Submit(FuncJob(func() {}))

func (Pool) SubmittedJobs

func (p Pool) SubmittedJobs() int

func (Pool) Workers

func (p Pool) Workers() []*Worker

type Worker

type Worker struct {
	// contains filtered or unexported fields
}

func NewWorker

func NewWorker(id int) *Worker

func (Worker) BusiedAt

func (w Worker) BusiedAt() *time.Time

func (Worker) BusiedDuration

func (w Worker) BusiedDuration() time.Duration

func (Worker) Id

func (w Worker) Id() int

func (Worker) IdledAt

func (w Worker) IdledAt() *time.Time

func (Worker) IdledDuration

func (w Worker) IdledDuration() time.Duration

func (*Worker) Run

func (w *Worker) Run(job Job)

func (Worker) RunningJobId

func (w Worker) RunningJobId() string

func (Worker) WorkedJobs

func (w Worker) WorkedJobs() int

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL