gopool

package
v0.2.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 5, 2015 License: MIT Imports: 6 Imported by: 0

README

gopool

GoDoc

Package gopool implements a concurrent work processing model. It is a similar to thread pools in other languages, but it uses goroutines and channels. A pool is formed wherein several goroutines get tasks from a channel. Various sources can be used to schedule tasks and given some coordination workgroups on various systems can work from the same source.

Documentation

Overview

Package gopool implements a concurrent work processing model. It is a similar to thread pools in other languages, but it uses goroutines and channels. A pool is formed wherein several goroutines get tasks from a channel. Various sources can be used to schedule tasks and given some coordination workgroups on various systems can work from the same source.

Index

Constants

This section is empty.

Variables

View Source
var ErrStopped = errors.New("stop requested")

ErrStopped is used to signal a goroutine that it should stop.

Functions

func NewSource

func NewSource(s Sourcer, verbose bool, wakeup chan struct{}) (<-chan Task,
	chan<- Task, chan chan struct{})

NewSource creates a managed source using the given Sourcer and starts a goroutine that synchronizes access to the given Interface. If a wakeup channel is non-nil, it can be used to force the goroutine to wakeup and look for new tasks. If verbose is true things happening in the channel are logged to default logger. The returned channels are the source, add and stop channels respectively. The returned source channel is used for getting tasks. The add channel is used to add tasks elsewhere.

The stop channel is used to stop the running goroutine. When it is time to stop, simply send a new channel down that channel. When the goroutine has cleaned up, the given channel will be closed. For example:

src, add, stop := New(s, false, nil)
done := make(chan struct{})
stop <- done // Send the channel we are goign to wait on.
<- done      // Once this returns, thing are cleaned up.

Types

type GoPool

type GoPool struct {
	// contains filtered or unexported fields
}

GoPool is a group of goroutines that work on Tasks. Each goroutine gets work from a channel until Stop() is called, the source channel is closed or the current task tells it to close.

func New

func New(name string, goroutines int, verbose bool, src <-chan Task) *GoPool

New creates a new GoPool with the given number of goroutines. The name is used for logging purposes. The goroutines are started as part of calling New().

To shut them down call Stop(). Once Stop() returns, all of the currently running tasks have finished. It's a good idea to wait if you don't want any tasks be stopped at an unexpected state.

The src channel is where the goroutines look for tasks. If verbose is true, information about the work being performed is logged. Otherwise only unexpected closures or errors are logged.

func (*GoPool) Stop

func (p *GoPool) Stop()

Stop gracefully signals all the goroutines to stop. It blocks until all of them are done.

func (*GoPool) String

func (p *GoPool) String() string

String implements the fmt.Stringer interface. It just prints the name given to New().

type PriorityQueue

type PriorityQueue struct {
	// contains filtered or unexported fields
}

PriorityQueue is an implementation of Interface using a priority queue. Higher priority tasks will be done first.

func NewPriorityQueue

func NewPriorityQueue(name string) *PriorityQueue

NewPriorityQueue creates a new PriorityQueue.

func (*PriorityQueue) Add

func (q *PriorityQueue) Add(t Task)

Add implements Sourcer.Add.

func (*PriorityQueue) Next

func (q *PriorityQueue) Next() Task

Next implements Sourcer.Next.

func (*PriorityQueue) String

func (q *PriorityQueue) String() string

type PriorityTask

type PriorityTask interface {
	Task
	Priority() int
}

PriorityTask is a Task that has a priority.

func NewPriorityTask

func NewPriorityTask(t Task, priority int) PriorityTask

NewPriorityTask returns a PriorityTask with the given task and priority.

type Sourcer

type Sourcer interface {
	fmt.Stringer

	// Next returns the next task from the source. It should return nil
	// if there is currently no work.
	Next() Task

	// Add schedules a task. It aslo reschedules a task during cleanup
	// if a task was taken but was unable to be sent. As such, it should
	// be available until the goroutine managing it is done.
	Add(t Task)
}

Sourcer is the interface that allows a type to be run as a source that communicates approptiately with a gopool. The Next() and Add() methods are synchronized internally, so as long as no other places are calling them, they won't suffer from race conditions. If they might be called concurrently, it is the implementers responsibility to synchronize usage (e.g. through a mutex).

type Task

type Task interface {
	fmt.Stringer

	// Run performs the work for this task. When the stop channel is
	// closed, processing should stop as soon as reasonably
	// possible. Long running tasks should make sure it's watching that
	// channel.
	//
	// Normally, the only error returned should be ErrStopped when the
	// stop channel is closed. If something fatal happens though and the
	// goroutine doing the work should stop, any other error can be
	// returned. The error is logged to the default logger.
	Run(stop chan struct{}) error
}

Task is a some type of work that the gopool should perform. The Stringer interface is used to aid in logging.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL