workerpoolxt
Worker pool library that extends https://github.com/gammazero/workerpool
Synopsis
- Allows you to retain access to underlying
*WorkerPool
object as if you imported workerpool
directly
- Hello, world!
- How we extend
workerpool
- Results
- Context
- Retry
int
that defines N number of retries
- Can only supply retry on a per job basis
- Options
- Options are optional
- Provide either global/default options or per job options
- Options are nothing more than
map[string]interface{}
so that you may supply anything you wish
- Job options override default options, we do NOT merge options
- Runtime duration
- Access a job's runtime duration via it's response
- e.g.
howLongItTook := someResponseFromSomeJob.RuntimeDuration() //-> time.Duration
Hello World
- Obligatory "as simple as it gets" example
package main
import (
"context"
"fmt"
wpxt "github.com/oze4/workerpoolxt"
)
func main() {
ctx := context.Background()
numWorkers := 10
wp := wpxt.New(ctx, numWorkers)
wp.SubmitXT(wpxt.Job{
Name: "My first job",
Task: func(o wpxt.Options) wpxt.Response {
return wpxt.Response{Data: "Hello, world!"}
},
})
jobResults := wp.StopWaitXT()
for _, jobresult := range jobResults {
fmt.Println(jobresult)
}
}
How we extend workerpool
Results
// ...
// ... pretend we submitted jobs here
// ...
results := wp.StopWaitXT() // -> []wpxt.Response
for _, result := range results {
// If job failed, `result.Error != nil`
}
Error Handling
- What if I encounter an error in one of my jobs?
- How can I handle or check for errors/timeout?
Return Error From Job
// Just set the `Error` field on the `wpxt.Response` you return
wp.SubmitXT(wpxt.Job{
Name: "How to handle errors",
Task: func(o wpxt.Options) wpxt.Response {
// Pretend we got an error doing something
if theError != nil {
return wpxt.Response{Error: theError}
}
},
})
Check For Errors In Response
// ... pretend we submitted a bunch of jobs
//
// StopWaitXT() returns []wpxt.Response
// Each response has an `Error` field
// Whether a timeout, or an error you set
// Check for it like
if someResponseFromSomeJob.Error != nil {
// ....
}
Context
- Required default context when creating new
workerpoolxt
- You can override default context per job
Default Context
myctx := context.Background() // Any `context.Context`
numWorkers := 10
wp := wpxt.New(myctx, numWorkers)
Per Job Context
Timeouts
defaultCtx := context.Background()
numWorkers := 10
wp := wpxt.New(defaultCtx, numWorkers)
timeout := time.Duration(time.Millisecond)
myCtx, done := context.WithTimeout(context.Background(), timeout)
defer done()
wp.SubmitXT(wpxt.Job{
Name: "my ctx job",
Context: myCtx,
Task: func(o wpxt.Options) wpxt.Response {
// Simulate long running task
time.Sleep(time.Second*10)
return wpxt.Response{Data: "I could be anything"}
},
})
// > `Response.Error` will be `context.DeadlineExceeded`
Retry
- Optional
- Seamlessly retry failed jobs
wp.SubmitXT(wpxt.Job{
// This job is configured to fail immediately,
// therefore it will retry 5 times
// (as long as we have not exceeded our job timeout)
timeoutctx, _ := context.WithTimeout(context.Background(), time.Duration(time.Millisecond*500))
Retry: 5,
// ^^^^^^
Name: "I will retry 5 times",
// Set timeout field on job
Context: timeoutctx,
Task: func(o wpxt.Options) wpxt.Response {
return wpxt.Response{Error: errors.New("some_err")}
},
})
Options
Default Options
myopts := map[string]interface{}{
"myclient": &http.Client{},
}
wp := wpxt.New(context.Background(), 10)
wp.WithOptions(myopts)
wp.SubmitXT(wpxt.Job{
Name: "myjob",
Task: func(o wpxt.Options) wpxt.Response {
// access options here
client := o["myclient"]
},
})
Per Job Options
myhttpclient := &http.Client{}
myk8sclient := kubernetes.Clientset{}
// This Job Only Needs an HTTP Client
wp.SubmitXT(wpxt.Job{
Name: "This Job Only Needs an HTTP Client",
Options: map[string]interface{}{
"http": myhttpclient,
},
Task: func(o wpxt.Options) wpxt.Response {
// access options here
httpclient := o["http"]
// ... do work with `httpclient`
},
})
// This Job Only Needs Kubernetes Clientset
wp.SubmitXT(wpxt.Job{
Name: "This Job Only Needs Kubernetes Clientset",
Options: map[string]interface{}{
"kube": myk8sclient,
},
Task: func(o wpxt.Options) wpxt.Response {
// access options here
kubernetesclient := o["kube"]
// ... do work with `kubernetesclient`
},
})