workerpool

package module
v1.1.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 19, 2019 License: MIT Imports: 2 Imported by: 0

README

Vardius - worker-pool

Build Status Go Report Card codecov license

Go simple async worker pool.

ABOUT

Contributors:

Want to contribute ? Feel free to send pull requests!

Have problems, bugs, feature ideas? We are using the github issue tracker to manage them.

HOW TO USE

  1. GoDoc

Benchmark

CPU: 3,3 GHz Intel Core i7

RAM: 16 GB 2133 MHz LPDDR3

➜  worker-pool git:(master) ✗ go test -bench=. -cpu=4 -benchmem
goos: darwin
goarch: amd64
BenchmarkWorker1-4                	 3000000	       453 ns/op	      56 B/op	       3 allocs/op
BenchmarkWorker1Parallel-4        	 3000000	       506 ns/op	      48 B/op	       2 allocs/op
BenchmarkWorker100-4              	 3000000	       485 ns/op	      56 B/op	       3 allocs/op
BenchmarkWorker100Parallel-4      	 3000000	       444 ns/op	      48 B/op	       2 allocs/op
BenchmarkWorkerNumCPU-4           	 3000000	       467 ns/op	      56 B/op	       3 allocs/op
BenchmarkWorkerNumCPUParallel-4   	 3000000	       431 ns/op	      48 B/op	       2 allocs/op
PASS
ok  	worker-pool	11.570s

Basic example

package main

import (
    "fmt"
    "sync"

    "github.com/vardius/worker-pool"
)

func main() {
	var wg sync.WaitGroup

	poolSize := 1
	jobsAmount := 3
	workersAmount := 2

	// create new pool
	pool := workerpool.New(poolSize)
	out := make(chan int, jobsAmount)

	pool.Start(workersAmount, func(i int) {
		defer wg.Done()
		out <- i
	})

	wg.Add(jobsAmount)

	for i := 0; i < jobsAmount; i++ {
		pool.Delegate(i)
	}

	go func() {
		// stop all workers after jobs are done
		wg.Wait()
		close(out)
		pool.Stop()
	}()

	sum := 0
	for n := range out {
		sum += n
	}

	fmt.Println(sum)
	// Output:
	// 3
}

License

This package is released under the MIT license. See the complete license in the package:

LICENSE

Documentation

Overview

Package workerpool provides simple async workers

Example
package main

import (
	"fmt"
	"sync"

	workerpool "github.com/vardius/worker-pool"
)

func main() {
	var wg sync.WaitGroup

	poolSize := 1
	jobsAmount := 3
	workersAmount := 2

	// create new pool
	pool := workerpool.New(poolSize)
	out := make(chan int, jobsAmount)

	pool.Start(workersAmount, func(i int) {
		defer wg.Done()
		out <- i
	})

	wg.Add(jobsAmount)

	for i := 0; i < jobsAmount; i++ {
		pool.Delegate(i)
	}

	go func() {
		// stop all workers after jobs are done
		wg.Wait()
		close(out)
		pool.Stop()
	}()

	sum := 0
	for n := range out {
		sum += n
	}

	fmt.Println(sum)
}
Output:

3
Example (Second)
package main

import (
	"fmt"

	workerpool "github.com/vardius/worker-pool"
)

func main() {
	poolSize := 2
	jobsAmount := 8
	workersAmount := 3

	ch := make(chan int, jobsAmount)
	defer close(ch)

	// create new pool
	pool := workerpool.New(poolSize)
	defer pool.Stop()

	pool.Start(workersAmount, func(i int, out chan<- int) { out <- i })

	go func() {
		for n := 0; n < jobsAmount; n++ {
			pool.Delegate(n, ch)
		}
	}()

	var sum = 0
	for sum < jobsAmount {
		select {
		case <-ch:
			sum++
		}
	}

	fmt.Println(sum)
}
Output:

8
Example (Third)
package main

import (
	"fmt"
	"strconv"
	"sync"

	workerpool "github.com/vardius/worker-pool"
)

func main() {
	poolSize := 2
	jobsAmount := 8
	workersAmount := 3

	var wg sync.WaitGroup
	wg.Add(jobsAmount)

	// allocate queue
	pool := workerpool.New(poolSize)

	// moc arg
	argx := make([]string, jobsAmount)
	for j := 0; j < jobsAmount; j++ {
		argx[j] = "_" + strconv.Itoa(j) + "_"
	}

	// assign job
	for i := 0; i < jobsAmount; i++ {
		go func(i int) {
			pool.Delegate(argx[i])
		}(i)
	}

	// start worker
	pool.Start(workersAmount, func(s string) {
		defer wg.Done()
		defer fmt.Println("job " + s + " is done !")
		fmt.Println("job " + s + " is running ..")
	})

	// clean up
	wg.Wait()
	pool.Stop()

	// fmt.Println("# hi: ok?")
	
Output:

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Pool

type Pool interface {
	// Delegate job to a workers
	// will block if channel is full, you might want to wrap it with goroutine to avoid it
	// will panic if called after Stop()
	Delegate(args ...interface{})
	// Start given number of workers that will take jobs from a queue
	Start(maxWorkers int, fn interface{}) error
	// Stop all workers
	Stop()
}

Pool implements worker pool

func New

func New(queueLength int) Pool

New creates new worker pool with a given job queue length

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL