workout

package module
v0.0.0-...-7d4e061 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 16, 2015 License: MIT Imports: 14 Imported by: 0

README

Workout

Workout is a work processing library for Go built on top of beanstalkd. It provides a simple API, allowing you to get started quickly with reasonable default settings. Under the hood, Workout uses goroutines and a configurable number of workers to coordinate concurrent processing.

Example

Here's an entire working program. It creates 3 tubes and inserts 1000 jobs into each. It then spawns 20 concurrent workers to plow through the work, reporting the results to stdout. Give it a try!

package main

import (
  "fmt"
  "github.com/jcoene/workout"
  "os"
  "os/signal"
  "time"
)

func main() {
  // These are the tubes we'll be using
  tubes := []string{"default", "person", "address"}

  // Create a new workout client for job insertion
  wc, err := workout.NewClient("localhost:11300", tubes)
  if err != nil {
    fmt.Printf("unable to connect to beanstalkd: %s\n", err)
    os.Exit(1)
  }

  // Insert 1000 jobs for each tube
  for i := 0; i < 1000; i++ {
    for _, t := range tubes {
      job := &workout.Job{
        Tube:      t,
        Priority:  1,
        TimeToRun: (60 * time.Second),
        Body:      fmt.Sprintf("%d", i),
      }
      wc.Put(job)
    }
  }

  // Setup a workout master with 20 workers
  wm := workout.NewMaster("localhost:11300", tubes, 20)

  // Assign a job handler, callback handler and duration (after which the handler is abandoned and we return an error) for each job.
  for _, t := range tubes {
    wm.RegisterHandler(t, jobHandler, jobCallback, 60*time.Second)
  }

  // Start processing!
  wm.Start()

  // Tell workout to stop on CTRL+C
  go func() {
    ch := make(chan os.Signal)
    signal.Notify(ch, os.Interrupt)
    <-ch
    wm.Stop()
  }()

  // Block until we're finished
  wm.Wait()

  return
}

// A handler function takes a job pointer and returns an error (or nil on success)
func jobHandler(job *workout.Job) (err error) {
  // we don't actually have any work to do
  return nil
}

// A callback function takes a job, error and duration - useful for reporting
func jobCallback(job *workout.Job, err error, dur time.Duration) {
  if err != nil {
    fmt.Printf("job %d encountered an error: %s (took %v)\n", job.Id, err, dur)
  } else {
    fmt.Printf("job %d succeeded (took %v)\n", job.Id, dur)
  }
  return
}

License

MIT license, see LICENSE for details.

Author

Jason Coene, @jcoene

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrJobTimeout = errors.New("job timed out")
)

Functions

func Error

func Error(f string, v ...interface{}) error

func SetLogger

func SetLogger(alogger *log.FactorLog)

Types

type Client

type Client struct {
	ReserveTimeout time.Duration

	ExitChan chan struct{}
	// contains filtered or unexported fields
}

func NewClient

func NewClient(addr string, tubes []string) (client *Client, err error)

func (*Client) Close

func (c *Client) Close() error

func (*Client) Delete

func (c *Client) Delete(job *Job) (err error)

func (*Client) Drop

func (c *Client) Drop()

func (*Client) Exit

func (c *Client) Exit()

func (*Client) Put

func (c *Client) Put(job *Job) (id uint64, err error)

func (*Client) Release

func (c *Client) Release(job *Job, err error)

func (*Client) Reserve

func (c *Client) Reserve() (job *Job, found bool, err error)

func (*Client) Stats

func (c *Client) Stats()

func (*Client) StatsJob

func (c *Client) StatsJob(jobId uint64) (map[string]string, error)

func (*Client) TubeStats

func (c *Client) TubeStats(tube string) (map[string]string, error)

type Job

type Job struct {
	Id        uint64        `json:"id"`
	Priority  uint32        `json:"priority"`
	Tube      string        `json:"tube"`
	Delay     time.Duration `json:"delay"`
	TimeToRun time.Duration `json:"ttr"`
	Body      string        `json:"body"`
	Age       time.Duration `json:"age"`
	Attempt   uint32        `json:"failure_count"`
}

func (*Job) Describe

func (j *Job) Describe() string

func (*Job) NextDelay

func (j *Job) NextDelay() time.Duration

type JobCallback

type JobCallback func(*Job, error, time.Duration)

type JobHandler

type JobHandler func(*Job) error

type JobPanic

type JobPanic struct {
	Value interface{}
}

func (JobPanic) Error

func (j JobPanic) Error() string

type Master

type Master struct {
	ReserveTimeout time.Duration
	// contains filtered or unexported fields
}

func NewMaster

func NewMaster(url string, concurrency int, max_retry uint64) *Master

func (*Master) RegisterHandler

func (m *Master) RegisterHandler(name string, hfn JobHandler, cfn JobCallback, to time.Duration)

func (*Master) SetSentry

func (m *Master) SetSentry(sentry *raven.Client)

func (*Master) Start

func (m *Master) Start() (err error)

func (*Master) Stats

func (m *Master) Stats() (s *Stats)

func (*Master) Stop

func (m *Master) Stop() (err error)

func (*Master) Wait

func (m *Master) Wait()

type Stats

type Stats struct {
	// contains filtered or unexported fields
}

type Worker

type Worker struct {
	// contains filtered or unexported fields
}

func NewWorker

func NewWorker(m *Master, wid int) (w *Worker, err error)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL