Documentation ¶
Overview ¶
Package goworkers implements a simple, flexible and lightweight goroutine worker pool implementation.
Example ¶
gw := New() fn := func(i int) { fmt.Println("Start Job", i) time.Sleep(time.Duration(i) * time.Second) fmt.Println("End Job", i) } for _, i := range []int{9, 7, 1, 2, 3} { gw.Submit(func() { fn(i) }) } log.Println("Submitted!") gw.Stop(false)
Output:
Example (Benchmark) ¶
tStart := time.Now() opts := Options{Workers: 500} gw := New(opts) fn := func(i int) { fmt.Println("Start Job", i) time.Sleep(time.Duration(5) * time.Second) fmt.Println("End Job", i) } for value := 500; value > 0; value-- { i := value gw.Submit(func() { fn(i) }) } log.Println("Submitted!") gw.Stop(false) tEnd := time.Now() tDiff := tEnd.Sub(tStart) log.Println("Time taken to execute 500 jobs that are 5 seconds long is", tDiff.Seconds())
Output:
Example (ErrorChannel) ¶
gw := New() // You must strictly start reading from the error channel before invoking // SubmitCheckError() else you'll miss the updates. // You can employ any mechanism to read from this channel. go func() { // Error channel provides errors from job, if any for err := range gw.ErrChan { fmt.Println(err) } }() // This is your actual function fn := func(i int) error { // Do work here return fmt.Errorf("Got error %d", i) } // The job submit part for _, value := range []int{3, 2, 1} { i := value gw.SubmitCheckError(func() error { return fn(i) }) } log.Println("Submitted!") // Wait for jobs to finish // Here, wait flag is set to true. Setting wait to true ensures that // the output channels are read from completely. // Stop(true) exits only when the error channel is completely read from. gw.Stop(true)
Output:
Example (OutputChannel) ¶
gw := New() type myOutput struct { Idx int Name string } // You must strictly start reading from the error and output channels // before invoking SubmitCheckResult() else you'll miss the updates. // You can employ any mechanism to read from these channels. go func() { for { select { // Error channel provides errors from job, if any case err, ok := <-gw.ErrChan: // The error channel is closed when the workers are done with their tasks. // When the channel is closed, ok is set to false if !ok { return } fmt.Printf("Error: %s\n", err.Error()) // Result channel provides output from job, if any // It will be of type interface{} case res, ok := <-gw.ResultChan: // The result channel is closed when the workers are done with their tasks. // When the channel is closed, ok is set to false if !ok { return } fmt.Printf("Type: %T, Value: %+v\n", res, res) } } }() // This is your actual function fn := func(i int) (interface{}, error) { // Do work here // return error if i%2 == 0 { return nil, fmt.Errorf("Got error %d", i) } // return output return myOutput{Idx: i, Name: "dummy"}, nil } // The job submit part for _, value := range []int{3, 2, 1} { i := value gw.SubmitCheckResult(func() (interface{}, error) { return fn(i) }) } log.Println("Submitted!") // Wait for jobs to finish // Here, wait flag is set to true. Setting wait to true ensures that // the output channels are read from completely. // Stop(true) exits only when both the result and the error channels are completely read from. gw.Stop(true)
Output:
Example (Simple) ¶
gw := New() gw.Submit(func() { fmt.Println("Hello, how are you?") }) gw.Submit(func() { fmt.Println("I'm fine, thank you!") }) log.Println("Submitted!") gw.Stop(false)
Output:
Example (WithArgs) ¶
opts := Options{Workers: 3, QSize: 256} gw := New(opts) fn := func(i int) { fmt.Println("Start Job", i) time.Sleep(time.Duration(i) * time.Second) fmt.Println("End Job", i) } for _, value := range []int{9, 7, 1, 2, 3} { i := value gw.Submit(func() { fn(i) }) } log.Println("Submitted!") gw.Stop(false)
Output:
Index ¶
- type GoWorkers
- func (gw *GoWorkers) JobNum() uint32
- func (gw *GoWorkers) Stop(wait bool)
- func (gw *GoWorkers) Submit(job func())
- func (gw *GoWorkers) SubmitCheckError(job func() error)
- func (gw *GoWorkers) SubmitCheckResult(job func() (interface{}, error))
- func (gw *GoWorkers) Wait(wait bool)
- func (gw *GoWorkers) WorkerNum() uint32
- type Options
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type GoWorkers ¶
type GoWorkers struct { // ErrChan is a safe buffered output channel of size 100 on which error // returned by a job can be caught, if any. The channel will be closed // after Stop() returns. Valid only for SubmitCheckError() and SubmitCheckResult(). // You must start listening to this channel before submitting jobs so that no // updates would be missed. This is comfortably sized at 100 so that chances // that a slow receiver missing updates would be minute. ErrChan chan error // ResultChan is a safe buffered output channel of size 100 on which error // and output returned by a job can be caught, if any. The channels will be // closed after Stop() returns. Valid only for SubmitCheckResult(). // You must start listening to this channel before submitting jobs so that no // updates would be missed. This is comfortably sized at 100 so that chances // that a slow receiver missing updates would be minute. ResultChan chan interface{} // contains filtered or unexported fields }
GoWorkers is a collection of worker goroutines.
All workers will be killed after Stop() is called if their respective job finishes.
func New ¶
New creates a new worker pool.
Accepts optional Options{} argument.
Example (WithArgs) ¶
opts := Options{Workers: 3, QSize: 256} _ = New(opts)
Output:
Example (WithoutArgs) ¶
_ = New()
Output:
func (*GoWorkers) Stop ¶
Stop gracefully waits for the jobs to finish running and releases the associated resources.
This is a blocking call and returns when all the active and queued jobs are finished. If wait is true, Stop() waits until the result and the error channels are emptied. Setting wait to true ensures that you can read all the values from the result and the error channels before your parent program exits.
func (*GoWorkers) Submit ¶
func (gw *GoWorkers) Submit(job func())
Submit is a non-blocking call with arg of type `func()`
Example ¶
gw := New() gw.Submit(func() { fmt.Println("Hello, how are you?") }) gw.Stop(false)
Output:
func (*GoWorkers) SubmitCheckError ¶ added in v1.0.0
SubmitCheckError is a non-blocking call with arg of type `func() error`
Use this if your job returns 'error'. Use ErrChan buffered channel to read error, if any.
Example ¶
gw := New() gw.SubmitCheckError(func() error { // Do some work here return fmt.Errorf("This is an error message") }) gw.Stop(true)
Output:
func (*GoWorkers) SubmitCheckResult ¶ added in v1.0.0
SubmitCheckResult is a non-blocking call with arg of type `func() (interface{}, error)`
Use this if your job returns output and error. Use ErrChan buffered channel to read error, if any. Use ResultChan buffered channel to read output, if any. For a job, either of error or output would be sent if available.
Example ¶
gw := New() gw.SubmitCheckResult(func() (interface{}, error) { // Do some work here return fmt.Sprintf("This is an output message"), nil }) gw.Stop(true)
Output:
func (*GoWorkers) Wait ¶ added in v1.8.0
Wait waits for the jobs to finish running.
This is a blocking call and returns when all the active and queued jobs are finished. If 'wait' argument is set true, Wait() waits until the result and the error channels are emptied. Setting 'wait' argument to true ensures that you can read all the values from the result and the error channels before this function unblocks. Jobs cannot be submitted until this function returns. If any, will be discarded.
Example ¶
gw := New() defer gw.Stop(false) gw.Submit(func() { fmt.Println("Hello, how are you?") }) gw.Wait(false) gw.Submit(func() { fmt.Println("I'm good, thank you!") }) gw.Wait(false)
Output: