goworkers

package module
v1.8.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 25, 2020 License: MIT Imports: 2 Imported by: 2

README

GoWorkers

CircleCI Codecov Go Report Card License

A minimal and efficient scalable workerpool implementation in Go using goroutines.

Note: Do not use master branch. Use the latest release.

GoDoc

Table of Contents

Installation

$ go get github.com/dpaks/goworkers

Examples

Basic
package main

import "github.com/dpaks/goworkers"

func main() {
	// initialise
	gw := goworkers.New()

	// non-blocking call
	gw.Submit(func() {
	// do your work here
	})

	// wait till your job finishes
	gw.Stop(false)
}
With arguments
package main

import (
	"fmt"
	"log"
	"time"

	"github.com/dpaks/goworkers"
)

func main() {
	opts := goworkers.Options{Workers: 20}
	gw := goworkers.New(opts)
	
	// your actual work
	fn := func(i int) {
		fmt.Println("Start Job", i)
		time.Sleep(time.Duration(i) * time.Second)
		fmt.Println("End Job", i)
	}
	
	for _, value := range []int{9, 7, 1, 2, 3} {
		i := value
		gw.Submit(func() {
			fn(i)
		})
	}
	log.Println("Submitted!")
	
	gw.Stop(false)
}
Without arguments
package main

import (
	"fmt"
	"log"
	"time"

	"github.com/dpaks/goworkers"
)

func main() {
	gw := goworkers.New()
	
	fn := func(i int) {
		fmt.Println("Start Job", i)
		time.Sleep(time.Duration(i) * time.Second)
		fmt.Println("End Job", i)
	}
	
	for _, value := range []int{9, 7, 1, 2, 3} {
		i := value
		gw.Submit(func() {
			fn(i)
		})
	}
	log.Println("Submitted!")
	
	gw.Stop(false)
}
Benchmark
package main

import (
    "log"
    "time"

    "github.com/dpaks/goworkers"
)

func main() {
    tStart := time.Now()

    gw := goworkers.New()

    fn := func() {
        time.Sleep(time.Duration(5) * time.Second)
    }

    for value := 500; value > 0; value-- {
        gw.Submit(func() {
            fn()
        })
    }

    gw.Stop(false)

    tEnd := time.Now()
    tDiff := tEnd.Sub(tStart)

    log.Println("Time taken to execute 500 jobs that were 5 seconds long is only", tDiff.Seconds(), "seconds!")
}

Output: 2020/07/03 20:03:01 Time taken to execute 500 jobs that were 5 seconds long is only 5.001186599 seconds!

To Receive Error from Job
package main

import (
	"fmt"
	"log"
	"time"

	"github.com/dpaks/goworkers"
)

func main() {
    gw := goworkers.New()

    // You must strictly start reading from the error channel before invoking
    // SubmitCheckError() else you'll miss the updates.
    // You can employ any mechanism to read from this channel.
    go func() {
        // Error channel provides errors from job, if any
        for err := range gw.ErrChan {
            fmt.Println(err)
        }
    }()

    // This is your actual function
    fn := func(i int) error {
        // Do work here
        return fmt.Errorf("Got error %d", i)
    }

    // The job submit part
    for _, value := range []int{3, 2, 1} {
        i := value
        gw.SubmitCheckError(func() error {
            return fn(i)
        })
    }
    log.Println("Submitted!")

    // Wait for jobs to finish
    // Here, wait flag is set to true. Setting wait to true ensures that
    // the output channels are read from completely.
    // Stop(true) exits only when the error channel is completely read from.
    gw.Stop(true)
}
To Receive Output and Error from Job
package main

import (
	"fmt"
	"log"
	"time"

	"github.com/dpaks/goworkers"
)

func main() {
    gw := goworkers.New()

    type myOutput struct {
        Idx  int
        Name string
    }

    // You must strictly start reading from the error and output channels
    // before invoking SubmitCheckResult() else you'll miss the updates.
    // You can employ any mechanism to read from these channels.
    go func() {
        for {
            select {
            // Error channel provides errors from job, if any
            case err, ok := <-gw.ErrChan:
                // The error channel is closed when the workers are done with their tasks.
                // When the channel is closed, ok is set to false
                if !ok {
                    return
                }
                fmt.Printf("Error: %s\n", err.Error())
            // Result channel provides output from job, if any
            // It will be of type interface{}
            case res, ok := <-gw.ResultChan:
                // The result channel is closed when the workers are done with their tasks.
                // When the channel is closed, ok is set to false
                if !ok {
                    return
                }
                fmt.Printf("Type: %T, Value: %+v\n", res, res)
            }
        }
    }()

    // This is your actual function
    fn := func(i int) (interface{}, error) {
        // Do work here

        // return error
        if i%2 == 0 {
            return nil, fmt.Errorf("Got error %d", i)
        }
        // return output
        return myOutput{Idx: i, Name: "dummy"}, nil
    }

    // The job submit part
    for _, value := range []int{3, 2, 1} {
        i := value
        gw.SubmitCheckResult(func() (interface{}, error) {
            return fn(i)
        })
    }
    log.Println("Submitted!")

    // Wait for jobs to finish
    // Here, wait flag is set to true. Setting wait to true ensures that
    // the output channels are read from completely.
    // Stop(true) exits only when both the result and the error channels are completely read from.
    gw.Stop(true)
}

TODO

  • Add logs toggle
  • When the goworkers machine is stopped, ensure that everything is cleanedup
  • Add support for a 'results' channel
  • An option to auto-adjust worker pool size
  • Introduce timeout

FAQ

Q. I don't want to use error channel. I only need output. What do I?

A. Listen only to output channel. It is not compulsory to listen to any channel if you don't need any output.

Q. I get duplicate output.

A. In the below wrong snippet, k and v are initialised only once. Since references are passed to the Submit function, they may get overwritten with the newer value.

Wrong code

for k, v := range myMap {
    wg.SubmitCheckResult(func() (interface{}, error) {
            return myFunc(k, v)
})

Correct code

for i, j := range myMap {
    k := i
    v := j
    wg.SubmitCheckResult(func() (interface{}, error) {
            return myFunc(k, v)
})

Q. Can I use a combination of Submit(), SubmitCheckError() and SubmitCheckResult() and still use output and error channels?

A. It is absolutely safe.

Documentation

Overview

Package goworkers implements a simple, flexible and lightweight goroutine worker pool implementation.

Example
gw := New()

fn := func(i int) {
	fmt.Println("Start Job", i)
	time.Sleep(time.Duration(i) * time.Second)
	fmt.Println("End Job", i)
}

for _, i := range []int{9, 7, 1, 2, 3} {
	gw.Submit(func() {
		fn(i)
	})
}

log.Println("Submitted!")

gw.Stop(false)
Output:

Example (Benchmark)
tStart := time.Now()

opts := Options{Workers: 500}
gw := New(opts)

fn := func(i int) {
	fmt.Println("Start Job", i)
	time.Sleep(time.Duration(5) * time.Second)
	fmt.Println("End Job", i)
}

for value := 500; value > 0; value-- {
	i := value
	gw.Submit(func() {
		fn(i)
	})
}
log.Println("Submitted!")

gw.Stop(false)

tEnd := time.Now()
tDiff := tEnd.Sub(tStart)

log.Println("Time taken to execute 500 jobs that are 5 seconds long is", tDiff.Seconds())
Output:

Example (ErrorChannel)
gw := New()

// You must strictly start reading from the error channel before invoking
// SubmitCheckError() else you'll miss the updates.
// You can employ any mechanism to read from this channel.
go func() {
	// Error channel provides errors from job, if any
	for err := range gw.ErrChan {
		fmt.Println(err)
	}
}()

// This is your actual function
fn := func(i int) error {
	// Do work here
	return fmt.Errorf("Got error %d", i)
}

// The job submit part
for _, value := range []int{3, 2, 1} {
	i := value
	gw.SubmitCheckError(func() error {
		return fn(i)
	})
}
log.Println("Submitted!")

// Wait for jobs to finish
// Here, wait flag is set to true. Setting wait to true ensures that
// the output channels are read from completely.
// Stop(true) exits only when the error channel is completely read from.
gw.Stop(true)
Output:

Example (OutputChannel)
gw := New()

type myOutput struct {
	Idx  int
	Name string
}

// You must strictly start reading from the error and output channels
// before invoking SubmitCheckResult() else you'll miss the updates.
// You can employ any mechanism to read from these channels.
go func() {
	for {
		select {
		// Error channel provides errors from job, if any
		case err, ok := <-gw.ErrChan:
			// The error channel is closed when the workers are done with their tasks.
			// When the channel is closed, ok is set to false
			if !ok {
				return
			}
			fmt.Printf("Error: %s\n", err.Error())
		// Result channel provides output from job, if any
		// It will be of type interface{}
		case res, ok := <-gw.ResultChan:
			// The result channel is closed when the workers are done with their tasks.
			// When the channel is closed, ok is set to false
			if !ok {
				return
			}
			fmt.Printf("Type: %T, Value: %+v\n", res, res)
		}
	}
}()

// This is your actual function
fn := func(i int) (interface{}, error) {
	// Do work here

	// return error
	if i%2 == 0 {
		return nil, fmt.Errorf("Got error %d", i)
	}
	// return output
	return myOutput{Idx: i, Name: "dummy"}, nil
}

// The job submit part
for _, value := range []int{3, 2, 1} {
	i := value
	gw.SubmitCheckResult(func() (interface{}, error) {
		return fn(i)
	})
}
log.Println("Submitted!")

// Wait for jobs to finish
// Here, wait flag is set to true. Setting wait to true ensures that
// the output channels are read from completely.
// Stop(true) exits only when both the result and the error channels are completely read from.
gw.Stop(true)
Output:

Example (Simple)
gw := New()

gw.Submit(func() {
	fmt.Println("Hello, how are you?")
})

gw.Submit(func() {
	fmt.Println("I'm fine, thank you!")
})

log.Println("Submitted!")

gw.Stop(false)
Output:

Example (WithArgs)
opts := Options{Workers: 3, QSize: 256}
gw := New(opts)

fn := func(i int) {
	fmt.Println("Start Job", i)
	time.Sleep(time.Duration(i) * time.Second)
	fmt.Println("End Job", i)
}

for _, value := range []int{9, 7, 1, 2, 3} {
	i := value
	gw.Submit(func() {
		fn(i)
	})
}
log.Println("Submitted!")

gw.Stop(false)
Output:

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type GoWorkers

type GoWorkers struct {

	// ErrChan is a safe buffered output channel of size 100 on which error
	// returned by a job can be caught, if any. The channel will be closed
	// after Stop() returns. Valid only for SubmitCheckError() and SubmitCheckResult().
	// You must start listening to this channel before submitting jobs so that no
	// updates would be missed. This is comfortably sized at 100 so that chances
	// that a slow receiver missing updates would be minute.
	ErrChan chan error
	// ResultChan is a safe buffered output channel of size 100 on which error
	// and output returned by a job can be caught, if any. The channels will be
	// closed after Stop() returns. Valid only for SubmitCheckResult().
	// You must start listening to this channel before submitting jobs so that no
	// updates would be missed. This is comfortably sized at 100 so that chances
	// that a slow receiver missing updates would be minute.
	ResultChan chan interface{}
	// contains filtered or unexported fields
}

GoWorkers is a collection of worker goroutines.

All workers will be killed after Stop() is called if their respective job finishes.

func New

func New(args ...Options) *GoWorkers

New creates a new worker pool.

Accepts optional Options{} argument.

Example (WithArgs)
opts := Options{Workers: 3, QSize: 256}
_ = New(opts)
Output:

Example (WithoutArgs)
_ = New()
Output:

func (*GoWorkers) JobNum

func (gw *GoWorkers) JobNum() uint32

JobNum returns number of active jobs

func (*GoWorkers) Stop

func (gw *GoWorkers) Stop(wait bool)

Stop gracefully waits for the jobs to finish running and releases the associated resources.

This is a blocking call and returns when all the active and queued jobs are finished. If wait is true, Stop() waits until the result and the error channels are emptied. Setting wait to true ensures that you can read all the values from the result and the error channels before your parent program exits.

func (*GoWorkers) Submit

func (gw *GoWorkers) Submit(job func())

Submit is a non-blocking call with arg of type `func()`

Example
gw := New()

gw.Submit(func() {
	fmt.Println("Hello, how are you?")
})

gw.Stop(false)
Output:

func (*GoWorkers) SubmitCheckError added in v1.0.0

func (gw *GoWorkers) SubmitCheckError(job func() error)

SubmitCheckError is a non-blocking call with arg of type `func() error`

Use this if your job returns 'error'. Use ErrChan buffered channel to read error, if any.

Example
gw := New()

gw.SubmitCheckError(func() error {
	// Do some work here
	return fmt.Errorf("This is an error message")
})

gw.Stop(true)
Output:

func (*GoWorkers) SubmitCheckResult added in v1.0.0

func (gw *GoWorkers) SubmitCheckResult(job func() (interface{}, error))

SubmitCheckResult is a non-blocking call with arg of type `func() (interface{}, error)`

Use this if your job returns output and error. Use ErrChan buffered channel to read error, if any. Use ResultChan buffered channel to read output, if any. For a job, either of error or output would be sent if available.

Example
gw := New()

gw.SubmitCheckResult(func() (interface{}, error) {
	// Do some work here
	return fmt.Sprintf("This is an output message"), nil
})

gw.Stop(true)
Output:

func (*GoWorkers) Wait added in v1.8.0

func (gw *GoWorkers) Wait(wait bool)

Wait waits for the jobs to finish running.

This is a blocking call and returns when all the active and queued jobs are finished. If 'wait' argument is set true, Wait() waits until the result and the error channels are emptied. Setting 'wait' argument to true ensures that you can read all the values from the result and the error channels before this function unblocks. Jobs cannot be submitted until this function returns. If any, will be discarded.

Example
gw := New()
defer gw.Stop(false)

gw.Submit(func() {
	fmt.Println("Hello, how are you?")
})

gw.Wait(false)

gw.Submit(func() {
	fmt.Println("I'm good, thank you!")
})

gw.Wait(false)
Output:

func (*GoWorkers) WorkerNum

func (gw *GoWorkers) WorkerNum() uint32

WorkerNum returns number of active workers

type Options

type Options struct {
	Workers uint32
	QSize   uint32
}

Options configures the behaviour of worker pool.

Workers specifies the number of workers that will be spawned. If unspecified or zero, workers will be spawned as per demand.

QSize specifies the size of the queue that holds up incoming jobs. Minimum value is 128.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL