workerpool

package module
v2.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 26, 2020 License: MIT Imports: 3 Imported by: 2

README ยถ

๐Ÿ‘จโ€๐Ÿ”ง worker-pool

Build Status Go Report Card codecov FOSSA Status license

logo

Go simple async worker pool.

๐Ÿ“– ABOUT

Contributors:

Want to contribute ? Feel free to send pull requests!

Have problems, bugs, feature ideas? We are using the github issue tracker to manage them.

๐Ÿ“š Documentation

For examples visit godoc#pkg-examples

For GoDoc reference, visit pkg.go.dev

๐Ÿš HOW TO USE

๐Ÿš… Benchmark

CPU: 3,3 GHz Intel Core i7

RAM: 16 GB 2133 MHz LPDDR3

โžœ  worker-pool git:(master) โœ— go test -bench=. -cpu=4 -benchmem
goos: darwin
goarch: amd64
pkg: github.com/vardius/worker-pool/v2
BenchmarkWorker1-4                	 3944299	       284 ns/op	      56 B/op	       3 allocs/op
BenchmarkWorker1Parallel-4        	 7394715	       138 ns/op	      48 B/op	       2 allocs/op
BenchmarkWorker100-4              	 1657569	       693 ns/op	      56 B/op	       3 allocs/op
BenchmarkWorker100Parallel-4      	 3673483	       368 ns/op	      48 B/op	       2 allocs/op
BenchmarkWorkerNumCPU-4           	 2590293	       445 ns/op	      56 B/op	       3 allocs/op
BenchmarkWorkerNumCPUParallel-4   	 3591553	       298 ns/op	      48 B/op	       2 allocs/op
PASS
ok  	github.com/vardius/worker-pool/v2	9.511s

๐Ÿซ Basic example

package main

import (
    "fmt"
    "sync"

    "github.com/vardius/worker-pool/v2"
)

func main() {
	var wg sync.WaitGroup

	poolSize := 1
	jobsAmount := 3
	workersAmount := 2

	// create new pool
	pool := workerpool.New(poolSize)
	out := make(chan int, jobsAmount)
	worker := func(i int) {
        defer wg.Done()
        out <- i
    }

	for i := 1; i <= workersAmount; i++ {
		if err := pool.AddWorker(worker); err != nil {
			panic(err)
		}
	}

	wg.Add(jobsAmount)

	for i := 0; i < jobsAmount; i++ {
		pool.Delegate(i)
	}

	go func() {
		// stop all workers after jobs are done
		wg.Wait()
		close(out)
		pool.Stop() // stop removes all workers from pool, to resume work add them again
	}()

	sum := 0
	for n := range out {
		sum += n
	}

	fmt.Println(sum)
	// Output:
	// 3
}

๐Ÿ“œ License

This package is released under the MIT license. See the complete license in the package

FOSSA Status

Documentation ยถ

Overview ยถ

Package workerpool provides simple async workers

Example ยถ
package main

import (
	"fmt"
	"sync"

	workerpool "github.com/vardius/worker-pool/v2"
)

func main() {
	var wg sync.WaitGroup

	poolSize := 1
	jobsAmount := 3
	workersAmount := 2

	// create new pool
	pool := workerpool.New(poolSize)
	out := make(chan int, jobsAmount)
	worker := func(i int) {
		defer wg.Done()
		out <- i
	}

	for i := 1; i <= workersAmount; i++ {
		if err := pool.AddWorker(worker); err != nil {
			panic(err)
		}
	}

	wg.Add(jobsAmount)

	for i := 0; i < jobsAmount; i++ {
		pool.Delegate(i)
	}

	go func() {
		// stop all workers after jobs are done
		wg.Wait()
		close(out)
		pool.Stop()
	}()

	sum := 0
	for n := range out {
		sum += n
	}

	fmt.Println(sum)
}
Output:

3
Example (Second) ยถ
package main

import (
	"fmt"

	workerpool "github.com/vardius/worker-pool/v2"
)

func main() {
	poolSize := 2
	jobsAmount := 8
	workersAmount := 3

	ch := make(chan int, jobsAmount)
	defer close(ch)

	// create new pool
	pool := workerpool.New(poolSize)
	defer pool.Stop()

	worker := func(i int, out chan<- int) { out <- i }

	for i := 1; i <= workersAmount; i++ {
		if err := pool.AddWorker(worker); err != nil {
			panic(err)
		}
	}

	go func() {
		for n := 0; n < jobsAmount; n++ {
			pool.Delegate(n, ch)
		}
	}()

	var sum = 0
	for sum < jobsAmount {
		select {
		case <-ch:
			sum++
		}
	}

	fmt.Println(sum)
}
Output:

8
Example (Third) ยถ
package main

import (
	"fmt"
	"strconv"
	"sync"

	workerpool "github.com/vardius/worker-pool/v2"
)

func main() {
	poolSize := 2
	jobsAmount := 8
	workersAmount := 3

	var wg sync.WaitGroup
	wg.Add(jobsAmount)

	// allocate queue
	pool := workerpool.New(poolSize)

	// moc arg
	argx := make([]string, jobsAmount)
	for j := 0; j < jobsAmount; j++ {
		argx[j] = "_" + strconv.Itoa(j) + "_"
	}

	// assign job
	for i := 0; i < jobsAmount; i++ {
		go func(i int) {
			pool.Delegate(argx[i])
		}(i)
	}

	worker := func(s string) {
		defer wg.Done()
		defer fmt.Println("job " + s + " is done !")
		fmt.Println("job " + s + " is running ..")
	}

	// start workers
	for i := 1; i <= workersAmount; i++ {
		if err := pool.AddWorker(worker); err != nil {
			panic(err)
		}
	}

	// clean up
	wg.Wait()
	pool.Stop()

	// fmt.Println("# hi: ok?")
	
Output:

Index ยถ

Examples ยถ

Constants ยถ

This section is empty.

Variables ยถ

This section is empty.

Functions ยถ

This section is empty.

Types ยถ

type Pool ยถ

type Pool interface {
	// Delegate job to a workers
	// will block if channel is full, you might want to wrap it with goroutine to avoid it
	// will panic if called after Stop()
	Delegate(args ...interface{}) error

	// AddWorker adds worker to the pool
	AddWorker(fn interface{}) error
	// RemoveWorker removes worker from the pool
	RemoveWorker(fn interface{}) error

	// WorkersNum returns number of workers in the pool
	WorkersNum() int

	// Stop removes all workers workers
	// to resume work add them again
	Stop()
}

Pool implements worker pool

func New ยถ

func New(queueLength int) Pool

New creates new worker pool with a given job queue length

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL