workerpool

package module
v0.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 19, 2019 License: GPL-3.0 Imports: 6 Imported by: 0

README

WorkerPool

Simple worker pool for parallel processing.

Example

package main

import (
	"fmt"
	"github.com/trencat/workerpool"
	"log/syslog"
	"time"
)

func DoWork(job interface{}, workerID string) interface{} {
	n := job.(int)
	time.Sleep(100 * time.Millisecond)
	return n * 2
}

func main() {
	syslog, _ := syslog.Dial("tcp", "localhost:514", syslog.LOG_INFO, "workerpool")

	// Start a pool, spin off 20 workers
	pool := workerpool.StartPool(20, 1, 40, syslog)

	// We do have 20 workers indeed
	fmt.Printf("Number of workers: %d", pool.NumberWorkers())

	// Read workers results in a different thread and, when done, send the result to getSum channel
	getSum := make(chan int)
	go func() {
		sum := 0
		// Method pool.Stop closes pool.Collector.Results channel after all workers shut down
		for result := range pool.Collector.Results {
			sum += result.(int) // result is an integer
		}
		getSum <- sum // Return the sum
	}()

	// Create 100 works and push them to the pool. Can be done in a different thread
	for i := 1; i <= 100; i++ {
		work := workerpool.Work{
			ID:     string(i),
			Job:    i,
			DoWork: DoWork,
		}
		pool.AddWork(work, 0)
	}

	// We are not going to add more work, we can close the pool
	pool.Stop(true)

	// Wait until all results are gathered
	fmt.Printf("The sum is %d", <-getSum)

	// All workers are down indeed
	fmt.Printf("Number workers after closing: %d", pool.NumberWorkers()) // 0
}

Makefile

Use Makefile for building the package, running tests and building documentation.

# Build package
make build

# Run short tests
make test
make test NOCACHE=1 # Don't use cache

# Run all tests
make test-long
make test-long NOCACHE=1 # Don't use cache

# Generate html docs
make docs

Documentation

Overview

Package pool provides a simple pool of workers.

Example
package main

import (
	"fmt"
	"log/syslog"
	"time"

	"github.com/trencat/workerpool"
)

func DoWork(job interface{}, workerID string) interface{} {
	n := job.(int)
	time.Sleep(100 * time.Millisecond)
	return n * 2
}

func main() {
	syslog, _ := syslog.Dial("tcp", "localhost:514", syslog.LOG_INFO, "workerpool")

	// Start a pool, spin off 20 workers
	pool := workerpool.StartPool(20, 1, 40, syslog)

	// We do have 20 workers indeed
	fmt.Printf("Number of workers: %d", pool.NumberWorkers())

	// Read workers results in a different thread and, when done, send the result to getSum channel
	getSum := make(chan int)
	go func() {
		sum := 0
		// Method pool.Stop closes pool.Collector.Results channel after all workers shut down
		for result := range pool.Collector.Results {
			sum += result.(int) // result is an integer
		}
		getSum <- sum // Return the sum
	}()

	// Create 100 works and push them to the pool. Can be done in a different thread
	for i := 1; i <= 100; i++ {
		work := workerpool.Work{
			ID:     string(i),
			Job:    i,
			DoWork: DoWork,
		}
		pool.AddWork(work, 0)
	}

	// We are not going to add more work, we can close the pool
	pool.Stop(true)

	// Wait until all results are gathered
	fmt.Printf("The sum is %d", <-getSum)

	// All workers are down indeed
	fmt.Printf("Number workers after closing: %d", pool.NumberWorkers()) // 0
}
Output:

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Collector

type Collector struct {
	Works   chan Work        // receives Work to send to workers
	Results chan interface{} // receives results from the workers
}

Collector contains channels to send work to rokers and receive worker results. Only non-nil return values will be added to Results channel.

type DoWorkFunc

type DoWorkFunc func(job interface{}, workerID string) (result interface{})

DoWorkFunc processes a job.

type Pool

type Pool struct {
	Collector Collector
	// contains filtered or unexported fields
}

func StartPool

func StartPool(workerCount int, dispatcherCount int, resultsBuffer int, log *syslog.Writer) *Pool

StartPool runs a new Pool and launches workers.

func (*Pool) AddWork

func (p *Pool) AddWork(work Work, timeout int) error

AddWork adds a work to the queue. Timeout argument is in milliseconds. Set timeout to 0 to wait indefinitely. Returns an error if called after closing the pool. Returns an error if timeout is reached.

func (*Pool) HardStop

func (p *Pool) HardStop(block bool)

HardStop sends a stop signal to the dispatcher, who sends stop signals to all workers. New jobs won't be accepted. Jobs still queued won't be processed. Channel pool.Collector.Results will be closed after all workers shut down.

If block is true, the method will block until all workers are shut down.

func (*Pool) NumberWorkers

func (p *Pool) NumberWorkers() int

NumberWorkers return the current number of workers in the pool.

func (*Pool) Stop

func (p *Pool) Stop(block bool)

Stop sends a stop signal to the dispatcher, who sends stop signals to all workers. New jobs won't be accepted. Jobs still queued will be processed. Channel pool.Collector.Results will be closed after all workers shut down.

If block is true, the method will block until all workers are shut down.

type Work

type Work struct {
	ID     string
	Job    interface{}
	DoWork DoWorkFunc
}

Work contains the necessary information process a job, which the job itself and the function that processes it.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL