beanstalk

package module
v0.0.0-...-abc23f2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 27, 2017 License: MIT Imports: 11 Imported by: 0

README

Beanstalk

Go client for beanstalkd.

Install

$ go get github.com/kr/beanstalk

Use

Produce jobs:

c, err := beanstalk.Dial("tcp", "127.0.0.1:11300")
id, err := c.Put([]byte("hello"), 1, 0, 120*time.Second)

Consume jobs:

c, err := beanstalk.Dial("tcp", "127.0.0.1:11300")
id, body, err := c.Reserve(5 * time.Second)

Documentation

Overview

Package beanstalk provides a client for the beanstalk protocol. See http://kr.github.com/beanstalkd/ for the server.

This package is synchronized internally and safe to use from multiple goroutines without other coordination.

Example (Put)
package main

import (
	"fmt"
	"github.com/kr/beanstalk"
	"time"
)

var conn, _ = beanstalk.Dial("tcp", "127.0.0.1:11300")

func main() {
	id, err := conn.Put([]byte("myjob"), 1, 0, time.Minute)
	if err != nil {
		panic(err)
	}
	fmt.Println("job", id)
}
Output:

Example (PutOtherTube)
package main

import (
	"fmt"
	"github.com/kr/beanstalk"
	"time"
)

var conn, _ = beanstalk.Dial("tcp", "127.0.0.1:11300")

func main() {
	tube := &beanstalk.Tube{conn, "mytube"}
	id, err := tube.Put([]byte("myjob"), 1, 0, time.Minute)
	if err != nil {
		panic(err)
	}
	fmt.Println("job", id)
}
Output:

Example (Reserve)
package main

import (
	"fmt"
	"github.com/kr/beanstalk"
	"time"
)

var conn, _ = beanstalk.Dial("tcp", "127.0.0.1:11300")

func main() {
	id, body, err := conn.Reserve(5 * time.Second)
	if err != nil {
		panic(err)
	}
	fmt.Println("job", id)
	fmt.Println(string(body))
}
Output:

Example (ReserveOtherTubeSet)
package main

import (
	"fmt"
	"github.com/kr/beanstalk"
	"time"
)

var conn, _ = beanstalk.Dial("tcp", "127.0.0.1:11300")

func main() {
	tubeSet := beanstalk.NewTubeSet(conn, "mytube1", "mytube2")
	id, body, err := tubeSet.Reserve(10 * time.Hour)
	if err != nil {
		panic(err)
	}
	fmt.Println("job", id)
	fmt.Println(string(body))
}
Output:

Index

Examples

Constants

View Source
const NameChars = `\-+/;.$_()0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz`

Characters allowed in a name in the beanstalkd protocol.

Variables

View Source
var (
	ErrBadFormat  = errors.New("bad command format")
	ErrBuried     = errors.New("buried")
	ErrDeadline   = errors.New("deadline soon")
	ErrDraining   = errors.New("draining")
	ErrInternal   = errors.New("internal error")
	ErrJobTooBig  = errors.New("job too big")
	ErrNoCRLF     = errors.New("expected CR LF")
	ErrNotFound   = errors.New("not found")
	ErrNotIgnored = errors.New("not ignored")
	ErrOOM        = errors.New("server is out of memory")
	ErrTimeout    = errors.New("timeout")
	ErrUnknown    = errors.New("unknown command")
)

Error messages returned by the server.

View Source
var (
	ErrEmpty   = errors.New("name is empty")
	ErrBadChar = errors.New("name has bad char") // contains a character not in NameChars
	ErrTooLong = errors.New("name is too long")
)

Name format errors. The Err field of NameError contains one of these.

View Source
var ErrPoolExhausted = errors.New("beanstalk: connection pool exhausted")

Functions

This section is empty.

Types

type Conn

type Conn struct {
	Tube
	TubeSet
	// contains filtered or unexported fields
}

A Conn represents a connection to a beanstalkd server. It consists of a default Tube and TubeSet as well as the underlying network connection. The embedded types carry methods with them; see the documentation of those types for details.

func Dial

func Dial(network, addr string) (*Conn, error)

Dial connects to the given address on the given network using net.Dial and then returns a new Conn for the connection.

func NewConn

func NewConn(conn io.ReadWriteCloser) *Conn

NewConn returns a new Conn using conn for I/O.

func (*Conn) Bury

func (c *Conn) Bury(id uint64, pri uint32) error

Bury places the given job in a holding area in the job's tube and sets its priority to pri. The job will not be scheduled again until it has been kicked; see also the documentation of Kick.

func (*Conn) Close

func (c *Conn) Close() error

Close closes the underlying network connection.

func (*Conn) Delete

func (c *Conn) Delete(id uint64) error

Delete deletes the given job.

func (*Conn) ListTubes

func (c *Conn) ListTubes() ([]string, error)

ListTubes returns the names of the tubes that currently exist on the server.

func (*Conn) Peek

func (c *Conn) Peek(id uint64) (body []byte, err error)

Peek gets a copy of the specified job from the server.

func (*Conn) Release

func (c *Conn) Release(id uint64, pri uint32, delay time.Duration) error

Release tells the server to perform the following actions: set the priority of the given job to pri, remove it from the list of jobs reserved by c, wait delay seconds, then place the job in the ready queue, which makes it available for reservation by any client.

func (*Conn) Stats

func (c *Conn) Stats() (map[string]string, error)

Stats retrieves global statistics from the server.

func (*Conn) StatsJob

func (c *Conn) StatsJob(id uint64) (map[string]string, error)

StatsJob retrieves statistics about the given job.

func (*Conn) Touch

func (c *Conn) Touch(id uint64) error

Touch resets the reservation timer for the given job. It is an error if the job isn't currently reserved by c. See the documentation of Reserve for more details.

func (*Conn) UseTube

func (c *Conn) UseTube(name string) error

func (*Conn) UseTubes

func (c *Conn) UseTubes(name ...string) error

type ConnError

type ConnError struct {
	Conn *Conn
	Op   string
	Err  error
}

ConnError records an error message from the server and the operation and connection that caused it.

func (ConnError) Error

func (e ConnError) Error() string

type IConn

type IConn interface {
	Close() error
	Delete(uint64) error
	Release(uint64, uint32, time.Duration) error
	Bury(uint64, uint32) error
	Touch(uint64) error
	Peek(uint64) ([]byte, error)
	Stats() (map[string]string, error)
	StatsJob(uint64) (map[string]string, error)
	ListTubes() ([]string, error)

	// Implemented in tube
	Put(body []byte, pri uint32, delay, ttr time.Duration) (id uint64, err error)
	PeekReady() (id uint64, body []byte, err error)
	PeekDelayed() (id uint64, body []byte, err error)
	PeekBuried() (id uint64, body []byte, err error)
	Kick(bound int) (n int, err error)
	Pause(d time.Duration) error

	// Implemented in tubeset
	Reserve(timeout time.Duration) (id uint64, body []byte, err error)

	UseTube(name string) error
	UseTubes(name ...string) error
}

type NameError

type NameError struct {
	Name string
	Err  error
}

NameError indicates that a name was malformed and the specific error describing how.

func (NameError) Error

func (e NameError) Error() string

type Pool

type Pool struct {

	// Dial is an application supplied function for creating and configuring a
	// connection.
	//
	// The connection returned from Dial must not be in a special state
	// (subscribed to pubsub channel, transaction started, ...).
	Dial func() (IConn, error)

	// TestOnBorrow is an optional application supplied function for checking
	// the health of an idle connection before the connection is used again by
	// the application. Argument t is the time that the connection was returned
	// to the pool. If the function returns an error, then the connection is
	// closed.
	TestOnBorrow func(c IConn, t time.Time) error

	// Maximum number of idle connections in the pool.
	MaxIdle int

	// Maximum number of connections allocated by the pool at a given time.
	// When zero, there is no limit on the number of connections in the pool.
	MaxActive int

	// Close connections after remaining idle for this duration. If the value
	// is zero, then idle connections are not closed. Applications should set
	// the timeout to a value less than the server's timeout.
	IdleTimeout time.Duration

	// If Wait is true and the pool is at the MaxActive limit, then Get() waits
	// for a connection to be returned to the pool before returning.
	Wait bool
	// contains filtered or unexported fields
}

func NewPool deprecated

func NewPool(newFn func() (IConn, error), maxIdle int) *Pool

NewPool creates a new pool.

Deprecated: Initialize the Pool directory as shown in the example.

func (*Pool) ActiveCount

func (p *Pool) ActiveCount() int

ActiveCount returns the number of active connections in the pool.

func (*Pool) Close

func (p *Pool) Close() error

Close releases the resources used by the pool.

func (*Pool) Get

func (p *Pool) Get() IConn

Get gets a connection. The application must close the returned connection. This method always returns a valid connection so that applications can defer error handling to the first use of the connection. If there is an error getting an underlying connection, then the connection Err, Do, Send, Flush and Receive methods return that error.

type Tube

type Tube struct {
	Conn *Conn
	Name string
}

Tube represents tube Name on the server connected to by Conn. It has methods for commands that operate on a single tube.

func (*Tube) Kick

func (t *Tube) Kick(bound int) (n int, err error)

Kick takes up to bound jobs from the holding area and moves them into the ready queue, then returns the number of jobs moved. Jobs will be taken in the order in which they were last buried.

func (*Tube) Pause

func (t *Tube) Pause(d time.Duration) error

Pause pauses new reservations in t for time d.

func (*Tube) PeekBuried

func (t *Tube) PeekBuried() (id uint64, body []byte, err error)

PeekBuried gets a copy of the job in the holding area that would be kicked next by Kick.

func (*Tube) PeekDelayed

func (t *Tube) PeekDelayed() (id uint64, body []byte, err error)

PeekDelayed gets a copy of the delayed job that is next to be put in t's ready queue.

func (*Tube) PeekReady

func (t *Tube) PeekReady() (id uint64, body []byte, err error)

PeekReady gets a copy of the job at the front of t's ready queue.

func (*Tube) Put

func (t *Tube) Put(body []byte, pri uint32, delay, ttr time.Duration) (id uint64, err error)

Put puts a job into tube t with priority pri and TTR ttr, and returns the id of the newly-created job. If delay is nonzero, the server will wait the given amount of time after returning to the client and before putting the job into the ready queue.

func (*Tube) Stats

func (t *Tube) Stats() (map[string]string, error)

Stats retrieves statistics about tube t.

type TubeSet

type TubeSet struct {
	Conn *Conn
	Name map[string]bool
}

TubeSet represents a set of tubes on the server connected to by Conn. Name names the tubes represented.

func NewTubeSet

func NewTubeSet(c *Conn, name ...string) *TubeSet

NewTubeSet returns a new TubeSet representing the given names.

func (*TubeSet) Reserve

func (t *TubeSet) Reserve(timeout time.Duration) (id uint64, body []byte, err error)

Reserve reserves and returns a job from one of the tubes in t. If no job is available before time timeout has passed, Reserve returns a ConnError recording ErrTimeout.

Typically, a client will reserve a job, perform some work, then delete the job with Conn.Delete.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL