goku

package module
v0.0.0-...-9ae4d45 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 6, 2016 License: MIT Imports: 8 Imported by: 0

README

goku

distributed task queue for go

how to get

go get github.com/i/goku

defining jobs

package jobs

import "email"

/*
  Defining jobs is easy. A job only needs to implement two methods: Name() and Execute().
  Name() simply returns the name of the job. The name should be unique so that workers know how what type of job it is and if they can correctly process it.
*/

// All fields used in Execute() must be exported!
type SendEmailJob struct {
  To         string
  From       string
  Subject    string
  Body       string
}

// Receivers of Execute() must be structs.
func (j SendEmailJob) Execute() error {
  return email.Send(j.To, j.From, j.Subject, j.Body)
}

// The return value from Name() should be unique from other jobs because it 
// is used to differentiate between different jobs.
func (j SendEmailJob) Name() string {
  return "send_email_job_v0"
}

how to queue up jobs

package main

import (
  "time"

  "github.com/i/goku"

  "./jobs"
)

func main() {
  err := goku.Configure(goku.BrokerConfig{
    Hostport:      "127.0.0.1:6379",
    Timeout:       time.Second,
    DefaultQueue:  "goku_queue",
  })
  if err != nil {
    log.Fatalf("Couldn't configure goku: %v", err)
  }

  job := jobs.SendEmailJob{
    To:      "Will Smith",
    From:    "Ian",
    Subject: "re: Men in Black 2",
    Body:    "I thought it was pretty good",
  }

  // schedule the job to run immediately on the broker's default queue
  if err := goku.Run(job); err != nil {
    panic("will probably won't get this...")
  }

  // schedule the job to run in an hour
  if err := goku.RunAt(time.Now().Add(time.Hour)); err != nil {
    panic("he's never gonna read these messages :(")
  }
}

how to execute jobs

package main

import (
  "time"

  "github.com/i/goku"

  "./jobs"
)

func main() {
  config := goku.WorkerConfig{
    NumWorkers:   1,
    Queues:       []string{"hi_priority"],
    Timeout:      time.Second,
    Hostport:     "127.0.0.1:6379",
  }

  opts := goku.WorkerPoolOptions{
    Jobs: []goku.Job{
      jobs.WriteMessageJob{},
    }
  }

  wp, err := goku.NewWorkerPool(config, opts)
  if err != nil {
    log.Fatalf("Error creating worker pool: %v", err)
  }

  // doesn't block
  wp.Start(config, jobs)

  // wait for something...

  // waits for all current jobs to finish
  wp.Stop()
}

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrPointer           = errors.New("method receiver was a pointer when it shouldn't be")
	ErrStdNotInitialized = errors.New("default broker hasn't been initialized")
	ErrInvalidQueue      = errors.New("invalid queue name")
	ErrNoDefaultQueue    = errors.New("no default queue name provided")
	ErrNoRedis           = errors.New("can't establish a connection to redis")
	ErrInvalidJob        = errors.New("invalid job")
)

generic goku errors

Functions

func Configure

func Configure(cfg BrokerConfig) error

Configure configures the default broker for package level use

func Run

func Run(j Job, opts ...JobOption) error

Run schedules a job using the default broker. Before calling goku.Run, the default client must be configured using goku.Configure.

func RunAt

func RunAt(j Job, t time.Time, opts ...JobOption) error

RunAt is the same as Run, except it schedules a job to run no sooner than time t.

Types

type Broker

type Broker struct {
	// contains filtered or unexported fields
}

Broker objects schedule jobs to be processed

func NewBroker

func NewBroker(cfg BrokerConfig) (*Broker, error)

NewBroker returns a new *Broker.

func (*Broker) Run

func (b *Broker) Run(job Job, opts ...JobOption) error

Run schedules jobs to be run asynchronously. If queue is not specified, the job will be schedules on the default queue.

func (*Broker) RunAt

func (b *Broker) RunAt(job Job, t time.Time, opts ...JobOption) error

type BrokerConfig

type BrokerConfig struct {
	Hostport     string
	Password     string
	Timeout      time.Duration
	DefaultQueue string
}

BrokerConfig is the information needed to set up a new broker

type FailureFunc

type FailureFunc func(worker int, job Job, r interface{})

FailureFunc is a function that gets executed when a job fails. It will get run when a job returns an error or panics.

type Job

type Job interface {
	Name() string
	Execute() error
}

Job is any type that implements Execute and Name. In order for a job to be valid, all fields used within its Execute method must be exported.

type JobOption

type JobOption struct {
	// contains filtered or unexported fields
}

JobOption specifies an option for a job.

func JobQueue

func JobQueue(queue string) JobOption

JobQueue specifies which queue to run a job on.

type WorkerConfig

type WorkerConfig struct {
	NumWorkers int           // number of workers that belong to the pool
	Queues     []string      // what queues to pull jobs from
	Hostport   string        // redis hostport
	Password   string        // redis auth password (optional)
	Timeout    time.Duration // redis timeout

	// If a worker doesn't know how to handle a job it will be requeued.
	// sometimes requeuing can fail. This field is max number of retries before
	// losing the job.
	RequeRetries int
}

WorkerConfig describes the configuration needed for setting up a new worker pool.

type WorkerPool

type WorkerPool struct {
	// contains filtered or unexported fields
}

WorkerPool is what will pull jobs from redis and distribute them to workers within the pool.

func NewWorkerPool

func NewWorkerPool(cfg WorkerConfig, opts WorkerPoolOptions) (*WorkerPool, error)

NewWorkerPool returns a new WorkerPool. It fails when a connection to redis cannot be established.

func (*WorkerPool) Start

func (wp *WorkerPool) Start()

Start tells the worker pool to start pulling things off the queue to be processed.

func (*WorkerPool) Stop

func (wp *WorkerPool) Stop()

Stop waits for all jobs to finish executing, and then returns.

type WorkerPoolOptions

type WorkerPoolOptions struct {
	Failure FailureFunc
	Jobs    []Job
}

WorkerPoolOptions exists for defining things that wouldn't be possible within a yaml configuration file. Failure is optional, but jobs are required if you want the workers to do anything.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL