workerpoolxt

package module
v1.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 16, 2020 License: MIT Imports: 5 Imported by: 0

README

workerpoolxt GitHub

GitHub Workflow Status Coveralls github
Codacy grade

Worker pool library that extends https://github.com/gammazero/workerpool

Synopsis


Hello World

  • Obligatory "as simple as it gets" example
package main

import (
    "context"
    "fmt"
    wpxt "github.com/oze4/workerpoolxt"
)

func main() {
    ctx := context.Background()
    numWorkers := 10

    wp := wpxt.New(ctx, numWorkers)

    wp.SubmitXT(wpxt.Job{
        Name: "My first job",
        Task: func(o wpxt.Options) wpxt.Response {
            return wpxt.Response{Data: "Hello, world!"}
        },
    })

    jobResults := wp.StopWaitXT()

    for _, jobresult := range jobResults {
        fmt.Println(jobresult)
    }
}

How we extend workerpool

Results

// ...
// ... pretend we submitted jobs here
// ...

results := wp.StopWaitXT() // -> []wpxt.Response

for _, result := range results {
    // If job failed, `result.Error != nil`
}

Error Handling

  • What if I encounter an error in one of my jobs?
  • How can I handle or check for errors/timeout?
Return Error From Job
// Just set the `Error` field on the `wpxt.Response` you return
wp.SubmitXT(wpxt.Job{
    Name: "How to handle errors",
    Task: func(o wpxt.Options) wpxt.Response {
        // Pretend we got an error doing something
        if theError != nil {
            return wpxt.Response{Error: theError}
        }
    },
})
Check For Errors In Response
// ... pretend we submitted a bunch of jobs
//
// StopWaitXT() returns []wpxt.Response
// Each response has an `Error` field
// Whether a timeout, or an error you set
// Check for it like
if someResponseFromSomeJob.Error != nil {
    // ....
}

Context

  • Required default context when creating new workerpoolxt
  • You can override default context per job

Default Context

myctx := context.Background() // Any `context.Context`
numWorkers := 10
wp := wpxt.New(myctx, numWorkers)

Per Job Context

Timeouts
defaultCtx := context.Background()
numWorkers := 10
wp := wpxt.New(defaultCtx, numWorkers)
timeout := time.Duration(time.Millisecond)

myCtx, done := context.WithTimeout(context.Background(), timeout)
defer done()

wp.SubmitXT(wpxt.Job{
    Name: "my ctx job",
    Context: myCtx,
    Task: func(o wpxt.Options) wpxt.Response {
        // Simulate long running task
        time.Sleep(time.Second*10) 
        return wpxt.Response{Data: "I could be anything"}
    },
})
// > `Response.Error` will be `context.DeadlineExceeded`

Retry

  • Optional
  • Seamlessly retry failed jobs
wp.SubmitXT(wpxt.Job{
    // This job is configured to fail immediately, 
    // therefore it will retry 5 times
    // (as long as we have not exceeded our job timeout)
    timeoutctx, _ := context.WithTimeout(context.Background(), time.Duration(time.Millisecond*500))
    Retry: 5,
    // ^^^^^^
    Name: "I will retry 5 times",
    // Set timeout field on job
    Context: timeoutctx,
    Task: func(o wpxt.Options) wpxt.Response {
        return wpxt.Response{Error: errors.New("some_err")}
    },
})

Options

  • Help make jobs flexible

Default Options

myopts := map[string]interface{}{
    "myclient": &http.Client{},
}

wp := wpxt.New(context.Background(), 10)
wp.WithOptions(myopts)

wp.SubmitXT(wpxt.Job{
    Name: "myjob",
    Task: func(o wpxt.Options) wpxt.Response {
        // access options here
        client := o["myclient"]
    },
})

Per Job Options

myhttpclient := &http.Client{}
myk8sclient := kubernetes.Clientset{}

// This Job Only Needs an HTTP Client
wp.SubmitXT(wpxt.Job{
    Name: "This Job Only Needs an HTTP Client",
    Options: map[string]interface{}{
        "http": myhttpclient,
    },
    Task: func(o wpxt.Options) wpxt.Response {
        // access options here
        httpclient := o["http"]
        // ... do work with `httpclient`
    },
})

// This Job Only Needs Kubernetes Clientset
wp.SubmitXT(wpxt.Job{
    Name: "This Job Only Needs Kubernetes Clientset",
    Options: map[string]interface{}{
        "kube": myk8sclient,
    },
    Task: func(o wpxt.Options) wpxt.Response {
        // access options here
        kubernetesclient := o["kube"]
        // ... do work with `kubernetesclient`
    },
})

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Job

type Job struct {
	Name    string
	Task    func(Options) Response
	Context context.Context
	Options Options
	Retry   int
	// contains filtered or unexported fields
}

Job holds job data

type Options added in v0.0.18

type Options map[string]interface{}

Options hold misc options

type Response

type Response struct {
	Error error
	Data  interface{}
	// contains filtered or unexported fields
}

Response holds job results

func (*Response) Name

func (r *Response) Name() string

Name returns the job name

func (*Response) RuntimeDuration

func (r *Response) RuntimeDuration() time.Duration

RuntimeDuration returns the amount of time it took to run the job

type WorkerPoolXT

type WorkerPoolXT struct {
	*workerpool.WorkerPool
	// contains filtered or unexported fields
}

WorkerPoolXT extends `github.com/gammazero/workerpool`

func New

func New(ctx context.Context, maxWorkers int) *WorkerPoolXT

New creates WorkerPoolXT

func (*WorkerPoolXT) StopWaitXT

func (wp *WorkerPoolXT) StopWaitXT() (rs []Response)

StopWaitXT gets results then kills the worker pool

func (*WorkerPoolXT) SubmitXT

func (wp *WorkerPoolXT) SubmitXT(job Job)

SubmitXT submits a job which you can get a response from

func (*WorkerPoolXT) WithOptions added in v0.0.18

func (wp *WorkerPoolXT) WithOptions(o Options)

WithOptions sets default options for each job

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL