worker

package module
v0.0.0-...-33e939c Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 31, 2020 License: MIT Imports: 2 Imported by: 19

README

worker

travis-ci status for jimmysawczuk/worker GoDoc Go Report Card

Package worker is a Go package designed to facilitate the easy parallelization of a number of tasks N with up to n at a time being computed concurrently.

Getting started

$ go get github.com/jimmysawczuk/worker

Using in your program

Design

To use this package, all you need to do is package your tasks into types that satisfy the following interface:

type Job interface {
	Run()
}
Implementation

From there, it's easy to add your task to the queue and start it:

type SampleJob struct {
	Name     string
	Duration time.Duration
}

func (s *SampleJob) Run() {

	time.Sleep(s.Duration)
	log.Printf("Done, slept for %s\n", s.Duration)

}

// only do 3 jobs at a time
worker.MaxJobs = 3

w := worker.NewWorker()
w.Add(SampleJob{
	Name: "sleep 1",
	Duration: 1 * time.Second,
})

w.Add(SampleJob{
	Name: "sleep 2",
	Duration: 2 * time.Second,
})

// ... and so forth

w.RunUntilDone()

Your Jobs are packaged internally as Packages, which have nice features such as storing a unique-per-worker ID, as well as the return value that is retrieved from the channel. This is mostly used for event handling though; keep in mind that you can store your information in this value or you can simply use your custom Job type and store more custom information.

Events

You can also listen for events from the Worker and react appropriately. Currently, three events are fired: JobQueued, JobStarted, and JobFinished. Add an event handler like so:

w.On(worker.JobStarted, func(pk *worker.Package, args ...interface{}) {
	// You can use type assertion to get back your original job from this:
	job := pk.Job()
})

Currently each event emitter only passes one argument, the relevant Package that emitted the event. There may be more added later, for other events, but the Package will always be the first argument.

More documentation

You can find more documentation at GoDoc.

Examples

  • less-tree, a recursive, per-directory LESS compiler uses worker

Documentation

Overview

Package worker accepts Jobs and places them in a queue to be executed N at a time.

Index

Constants

This section is empty.

Variables

View Source
var MaxJobs = 4

MaxJobs is the default amount of jobs to run at a time. This can be changed per Worker object as well.

Functions

This section is empty.

Types

type Event

type Event int

An Event is fired when Jobs change within a Worker.

const (

	// JobAdded is fired when a Job (Package) is added to the Worker's queue.
	JobAdded Event = 1 << iota

	// JobStarted is fired when a Job (Package) begins executing.
	JobStarted

	// JobFinished is fired when a Job (Package) finishes executing Run().
	JobFinished
)

type ExitCode

type ExitCode int

ExitCode is a code for how the Worker should exit.

const (
	// ExitNormally indicates that the Worker is exiting without error.
	ExitNormally ExitCode = 0

	// ExitWhenDone indicates that the Worker should finish all of its Jobs first, then exit.
	ExitWhenDone = 4
)

type Job

type Job interface {
	Run()
}

A Job is an object with a Run() method, which is expected to complete a given task. Jobs can be run in parallel, so any resources shared between Jobs should be thread-safe.

type JobStatus

type JobStatus int

JobStatus indicates where in the execution process a Job (Package) is.

const (
	// Queued means the Job is added to the Worker but hasn't started yet.
	Queued JobStatus = 1 << iota

	// Running means the Job has started executing its Run() method.
	Running

	// Finished means the Job has finished executing its Run() method and presumably has no errors.
	Finished

	// Errored means the Job errored at some point during the Run() method and has stopped executing.
	Errored
)

type Map

type Map struct {
	// contains filtered or unexported fields
}

Map holds a lookup table for Job Packages by ID.

func NewMap

func NewMap() Map

NewMap returns a new empty Map

func (*Map) Get

func (m *Map) Get(id int64) *Package

Get returns the *Package at the location given by the ID provided.

func (*Map) Set

func (m *Map) Set(val *Package)

Set puts a provided Package into the Map, properly indexed by ID

type Package

type Package struct {
	ID int64
	// contains filtered or unexported fields
}

Package is a type that wraps a more generic Job object and contains some meta information about the Job used by the Worker.

func NewPackage

func NewPackage(id int64, j Job) *Package

NewPackage wraps the given Job and assigned ID in a *Package and returns it.

func (*Package) Job

func (p *Package) Job() Job

Job returns the Job associated with the Package.

func (*Package) SetStatus

func (p *Package) SetStatus(inc JobStatus)

SetStatus sets the completion status of the Package.

func (*Package) Status

func (p *Package) Status() JobStatus

Status returns the completion status of the package.

type Queue

type Queue struct {
	// contains filtered or unexported fields
}

A Queue is an ordered list of Jobs.

func NewQueue

func NewQueue() Queue

NewQueue returns a new, empty Queue.

func (*Queue) Add

func (q *Queue) Add(j *Package)

Add adds a Package to the end of the Queue.

func (*Queue) Len

func (q *Queue) Len() int

Len returns the length of the Queue.

func (*Queue) Prepend

func (q *Queue) Prepend(j *Package)

Prepend adds a Package to the front of the Queue.

func (*Queue) Top

func (q *Queue) Top() *Package

Top returns the first Package in the Queue.

type Stats

type Stats struct {
	Total    int64
	Running  int64
	Finished int64
	Queued   int64
	Errored  int64
}

Stats contains some information about the Packages that are Queued, Running, Finished, etc.

type Worker

type Worker struct {
	// contains filtered or unexported fields
}

A Worker holds and executes a bunch of jobs N at a time.

func NewWorker

func NewWorker() *Worker

NewWorker returns a new Worker object with the maximum jobs at a time set to the default.

func (*Worker) Add

func (w *Worker) Add(j Job)

Add adds a Job to the Worker's queue

func (*Worker) On

func (w *Worker) On(e Event, cb func(*Package, ...interface{}))

On attaches an event handler to a given Event.

func (*Worker) RunUntilDone

func (w *Worker) RunUntilDone()

RunUntilDone tells the Worker to run until all of its jobs are completed and then shut down and stop accepting Jobs.

func (*Worker) RunUntilStopped

func (w *Worker) RunUntilStopped(stopCh chan ExitCode)

RunUntilStopped tells the Worker to run until it's explicitly told to stop via an ExitCode. It'll accept new Jobs until this happens.

func (*Worker) Stats

func (w *Worker) Stats() (stats Stats)

Stats returns a collection of statistics related to how many jobs are finished, queued, running, etc.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL