beanstalk

package module
v1.4.7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 1, 2022 License: BSD-3-Clause Imports: 15 Imported by: 3

README

Package beanstalk

import "github.com/prep/beanstalk"

Overview

Package beanstalk provides a beanstalk client.

Producer

The Producer is used to put jobs into tubes. It provides a connection pool:

producer, err := beanstalk.NewProducer([]string{"localhost:11300"}, beanstalk.Config{
	// Multiply the list of URIs to create a larger pool of connections.
	Multiply: 3,
})
if err != nil {
	// handle error
}
defer producer.Stop()

Putting a job in a tube is done by calling Put, which will select a random connection for its operation:

// Create the put parameters. These can be reused between Put calls.
params := beanstalk.PutParams{Priority: 1024, Delay: 0, TTR: 30 * time.Second}

// Put the "Hello World" message in the "mytube" tube.
id, err := producer.Put(ctx, "mytube", []byte("Hello World"), params)
if err != nil {
	// handle error
}

If a Put operation fails on a connection, another connection in the pool will be selected for a retry.

Consumer

The Consumer is used to reserve jobs from tubes. It provides a connection pool:

consumer, err := beanstalk.NewConsumer([]string{"localhost:11300"}, []string{"mytube"}, beanstalk.Config{
	// Multiply the list of URIs to create a larger pool of connections.
	Multiply: 3,
	// NumGoroutines is the number of goroutines that the Receive method will
	// spin up to process jobs concurrently.
	NumGoroutines: 30,
})
if err != nil {
	// handle error
}

The ratio of Multiply and NumGoroutines is important. Multiply determines the size of the connection pool and NumGoroutines determines how many reserved jobs you have in-flight. If you have a limited number of connections, but a high number of reserved jobs in-flight, your TCP connection pool might experience congestion and your processing speed will suffer. Although the ratio depends on the speed by which jobs are processed, a good rule of thumb is 1:10.

Reserve jobs from the tubes specified in NewConsumer is done by calling Receive, which will reserve jobs on any of the connections in the pool:

// Call the inline function for every job that was reserved.
consumer.Receive(ctx, func(ctx context.Context, job *beanstalk.Job) {
	// handle job

	if err := job.Delete(ctx); err != nil {
		// handle error
	}
})

If the context passed to Receive is cancelled, Receive will finish processing the jobs it has reserved before returning.

Job

When Receive offers a job the goroutine is responsible for processing that job and finishing it up. A job can either be deleted, released or buried:

// Delete a job, when processing was successful.
err = job.Delete(ctx)

// Release a job, putting it back in the queue for another worker to pick up.
err = job.Release(ctx)

// Release a job, but put it back with a custom priority and a delay before
// it's offered to another worker.
err = job.ReleaseWithParams(ctx, 512, 5 * time.Second)

// Bury a job, when it doesn't need to be processed but needs to be kept
// around for manual inspection or manual requeuing.
err = job.Bury(ctx)
Conn

If the Producer and Consumer abstractions are too high, then Conn provides the lower level abstraction of a single connection to a beanstalk server:

conn, err := beanstalk.Dial("localhost:11300", beanstalk.Config{}))
if err != nil {
	// handle error
}
defer conn.Close()

// conn.Put(...)
// conn.Watch(...)
// conn.Reserve(...)
Logging

The Config structure offers hooks for info and error logs that allows hooking in to a custom log solution.

config := beanstalk.Config{
	InfoFunc: func(message string) {
		log.Info(message)
	},
	ErrorFunc: func(err error, message string) {
		log.WithError(err).Error(message)
	},
}
URIs

NewProducer, NewConsumer and Dial take a URI or a list of URIs as their first argument, who can be described in various formats. In the above examples the beanstalk server was referenced by the host:port notation. This package also supports URI formats like beanstalk:// for a plaintext connection, and beanstalks:// or tls:// for encrypted connections, and unix:// for Unix Domain Socket connections.

In the case of encrypted connections, if no port has been specified it will default to port 11400 as opposed to the default 11300 port.

Documentation

Overview

Package beanstalk provides a beanstalk client.

Producer

The Producer is used to put jobs into tubes. It provides a connection pool:

producer, err := beanstalk.NewProducer([]string{"localhost:11300"}, beanstalk.Config{
	// Multiply the list of URIs to create a larger pool of connections.
	Multiply: 3,
})
if err != nil {
	// handle error
}
defer producer.Stop()

Putting a job in a tube is done by calling Put, which will select a random connection for its operation:

// Create the put parameters. These can be reused between Put calls.
params := beanstalk.PutParams{Priority: 1024, Delay: 0, TTR: 30 * time.Second}

// Put the "Hello World" message in the "mytube" tube.
id, err := producer.Put(ctx, "mytube", []byte("Hello World"), params)
if err != nil {
	// handle error
}

If a Put operation fails on a connection, another connection in the pool will be selected for a retry.

Consumer

The Consumer is used to reserve jobs from tubes. It provides a connection pool:

consumer, err := beanstalk.NewConsumer([]string{"localhost:11300"}, []string{"mytube"}, beanstalk.Config{
	// Multiply the list of URIs to create a larger pool of connections.
	Multiply: 3,
	// NumGoroutines is the number of goroutines that the Receive method will
	// spin up to process jobs concurrently.
	NumGoroutines: 30,
})
if err != nil {
	// handle error
}

The ratio of Multiply and NumGoroutines is important. Multiply determines the size of the connection pool and NumGoroutines determines how many reserved jobs you have in-flight. If you have a limited number of connections, but a high number of reserved jobs in-flight, your TCP connection pool might experience congestion and your processing speed will suffer. Although the ratio depends on the speed by which jobs are processed, a good rule of thumb is 1:10.

Reserve jobs from the tubes specified in NewConsumer is done by calling Receive, which will reserve jobs on any of the connections in the pool:

// Call the inline function for every job that was reserved.
consumer.Receive(ctx, func(ctx context.Context, job *beanstalk.Job) {
	// handle job

	if err := job.Delete(ctx); err != nil {
		// handle error
	}
})

If the context passed to Receive is cancelled, Receive will finish processing the jobs it has reserved before returning.

Job

When Receive offers a job the goroutine is responsible for processing that job and finishing it up. A job can either be deleted, released or buried:

// Delete a job, when processing was successful.
err = job.Delete(ctx)

// Release a job, putting it back in the queue for another worker to pick up.
err = job.Release(ctx)

// Release a job, but put it back with a custom priority and a delay before
// it's offered to another worker.
err = job.ReleaseWithParams(ctx, 512, 5 * time.Second)

// Bury a job, when it doesn't need to be processed but needs to be kept
// around for manual inspection or manual requeuing.
err = job.Bury(ctx)

Conn

If the Producer and Consumer abstractions are too high, then Conn provides the lower level abstraction of a single connection to a beanstalk server:

conn, err := beanstalk.Dial("localhost:11300", beanstalk.Config{}))
if err != nil {
	// handle error
}
defer conn.Close()

// conn.Put(...)
// conn.Watch(...)
// conn.Reserve(...)

Logging

The Config structure offers hooks for info and error logs that allows hooking in to a custom log solution.

config := beanstalk.Config{
	InfoFunc: func(message string) {
		log.Info(message)
	},
	ErrorFunc: func(err error, message string) {
		log.WithError(err).Error(message)
	},
}

URIs

NewProducer, NewConsumer and Dial take a URI or a list of URIs as their first argument, who can be described in various formats. In the above examples the beanstalk server was referenced by the host:port notation. This package also supports URI formats like beanstalk:// for a plaintext connection, and beanstalks:// or tls:// for encrypted connections.

In the case of encrypted connections, if no port has been specified it will default to port 11400 as opposed to the default 11300 port.

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrBuried       = errors.New("job was buried")
	ErrDeadlineSoon = errors.New("deadline soon")
	ErrDisconnected = errors.New("client disconnected")
	ErrDraining     = errors.New("server is draining")
	ErrNotFound     = errors.New("job not found")
	ErrTimedOut     = errors.New("reserve timed out")
	ErrTooBig       = errors.New("job too big")
	ErrNotIgnored   = errors.New("tube not ignored")
	ErrTubeTooLong  = errors.New("tube name too long")
	ErrUnexpected   = errors.New("unexpected response received")
)

These error may be returned by any of Conn's methods.

View Source
var ErrJobFinished = errors.New("job was already finished")

ErrJobFinished is returned when a job was already finished.

Functions

func ValidURIs added in v1.4.4

func ValidURIs(uris []string) error

ValidURIs returns an error if any of the specified URIs is invalid, or if the host names in the URIs could not be resolved.

Types

type Config added in v1.3.0

type Config struct {
	// Multiply the list of URIs to create a larger pool of connections.
	//
	// The default is to have 1.
	Multiply int
	// NumGoroutines is the number of goroutines that the Receive method will
	// spin up to process jobs concurrently.
	//
	// The default is to spin up 10 goroutines.
	NumGoroutines int
	// ConnTimeout configures the read and write timeout of the connection. This
	// can be overridden by a context deadline if its value is lower.
	//
	// Note that this does not work with Reserve() and might interfere with
	// ReserveWithTimeout() if configured incorrectly.
	//
	// The default is to have no timeout.
	ConnTimeout time.Duration
	// ReserveTimeout is the time a consumer connection waits between reserve
	// attempts if the last attempt failed to reserve a job.
	//
	// The default is to wait 1 seconds.
	ReserveTimeout time.Duration
	// ReconnectTimeout is the timeout between reconnects.
	//
	// The default is to wait 3 seconds.
	ReconnectTimeout time.Duration
	// TLSConfig describes the configuration that is used when Dial() makes a
	// TLS connection.
	TLSConfig *tls.Config
	// InfoFunc is called to log informational messages.
	InfoFunc func(message string)
	// ErrorFunc is called to log error messages.
	ErrorFunc func(err error, message string)
	// IgnoreURIValidation skips the step of calling ValidURIs() method during initialization
	IgnoreURIValidation bool
}

Config is used to configure a Consumer, Producer or Conn.

type Conn added in v1.3.0

type Conn struct {
	URI string
	// contains filtered or unexported fields
}

Conn describes a connection to a beanstalk server.

func Dial added in v1.3.0

func Dial(uri string, config Config) (*Conn, error)

Dial into a beanstalk server.

func (*Conn) Close added in v1.3.0

func (conn *Conn) Close() error

Close this connection.

func (*Conn) Delete added in v1.4.5

func (conn *Conn) Delete(ctx context.Context, id uint64) error

Delete the job with the specified ID.

func (*Conn) Ignore added in v1.3.0

func (conn *Conn) Ignore(ctx context.Context, tube string) error

Ignore the specified tube.

func (*Conn) Kick added in v1.3.3

func (conn *Conn) Kick(ctx context.Context, tube string, bound int) (int64, error)

Kick one or more jobs in the specified tube. This function returns the number of jobs that were kicked.

func (*Conn) ListTubes added in v1.3.3

func (conn *Conn) ListTubes(ctx context.Context) ([]string, error)

ListTubes returns a list of available tubes.

func (*Conn) PeekBuried added in v1.3.3

func (conn *Conn) PeekBuried(ctx context.Context, tube string) (*Job, error)

PeekBuried peeks at a buried job on the specified tube and returns the job. If there are no jobs to peek at, this function will return without a job or error.

func (*Conn) PeekDelayed added in v1.4.2

func (conn *Conn) PeekDelayed(ctx context.Context, tube string) (*Job, error)

PeekDelayed peeks at a delayed job on the specified tube and returns the job. If there are no jobs to peek at, this function will return without a job or error.

func (*Conn) Put added in v1.3.0

func (conn *Conn) Put(ctx context.Context, tube string, body []byte, params PutParams) (uint64, error)

Put a job in the specified tube.

func (*Conn) Reserve added in v1.4.0

func (conn *Conn) Reserve(ctx context.Context) (*Job, error)

Reserve tries to reserve a job and block until it found one.

func (*Conn) ReserveWithTimeout added in v1.3.0

func (conn *Conn) ReserveWithTimeout(ctx context.Context, timeout time.Duration) (*Job, error)

ReserveWithTimeout tries to reserve a job and block for up to a maximum of timeout. If no job could be reserved, this function will return without a job or error.

func (*Conn) String added in v1.3.0

func (conn *Conn) String() string

func (*Conn) TubeStats added in v1.3.3

func (conn *Conn) TubeStats(ctx context.Context, tube string) (TubeStats, error)

TubeStats return the statistics of the specified tube.

func (*Conn) Watch added in v1.3.0

func (conn *Conn) Watch(ctx context.Context, tube string) error

Watch the specified tube.

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer maintains a pool of connections and allows workers to reserve jobs on those connections.

func NewConsumer

func NewConsumer(uris, tubes []string, config Config) (*Consumer, error)

NewConsumer returns a new Consumer.

func (*Consumer) Receive added in v1.3.0

func (consumer *Consumer) Receive(ctx context.Context, fn func(ctx context.Context, job *Job))

Receive calls fn for each job it can reserve.

type Job

type Job struct {
	ID         uint64
	Body       []byte
	ReservedAt time.Time
	Stats      struct {
		PutParams `yaml:",inline"`
		Tube      string        `yaml:"tube"`
		State     string        `yaml:"state"`
		Age       time.Duration `yaml:"age"`
		TimeLeft  time.Duration `yaml:"time-left"`
		File      int           `yaml:"file"`
		Reserves  int           `yaml:"reserves"`
		Timeouts  int           `yaml:"timeouts"`
		Releases  int           `yaml:"releases"`
		Buries    int           `yaml:"buries"`
		Kicks     int           `yaml:"kicks"`
	}
	// contains filtered or unexported fields
}

Job describes a beanstalk job and its stats.

func (*Job) Bury

func (job *Job) Bury(ctx context.Context) error

Bury this job.

func (*Job) BuryWithPriority

func (job *Job) BuryWithPriority(ctx context.Context, priority uint32) error

BuryWithPriority buries this job with the specified priority.

func (*Job) Delete

func (job *Job) Delete(ctx context.Context) error

Delete this job.

func (*Job) Kick added in v1.4.2

func (job *Job) Kick(ctx context.Context) error

Kick moves the job into the ready queue.

func (*Job) Release

func (job *Job) Release(ctx context.Context) error

Release this job back with its original priority and without delay.

func (*Job) ReleaseWithParams

func (job *Job) ReleaseWithParams(ctx context.Context, priority uint32, delay time.Duration) error

ReleaseWithParams releases this job back with the specified priority and delay.

func (*Job) Touch

func (job *Job) Touch(ctx context.Context) error

Touch the job thereby resetting its reserved status.

func (*Job) TouchAfter added in v1.3.0

func (job *Job) TouchAfter() time.Duration

TouchAfter returns the duration until this jobs needs to be touched for its reservation to be retained.

type Producer

type Producer struct {
	// contains filtered or unexported fields
}

Producer maintains a pool of connections to beanstalk servers on which it inserts jobs.

func NewProducer

func NewProducer(uris []string, config Config) (*Producer, error)

NewProducer returns a new Producer.

func (*Producer) IsConnected added in v1.4.0

func (pool *Producer) IsConnected() bool

IsConnected returns true when at least one producer in the pool is connected.

func (*Producer) Put added in v1.3.0

func (pool *Producer) Put(ctx context.Context, tube string, body []byte, params PutParams) (uint64, error)

Put a job into the specified tube.

func (*Producer) Stop

func (pool *Producer) Stop()

Stop this producer.

type PutParams

type PutParams struct {
	Priority uint32        `yaml:"pri"`
	Delay    time.Duration `yaml:"delay"`
	TTR      time.Duration `yaml:"ttr"`
}

PutParams are the parameters used to perform a Put operation.

type TubeStats added in v1.2.2

type TubeStats struct {
	Name            string        `yaml:"name"`
	UrgentJobs      int64         `yaml:"current-jobs-urgent"`
	ReadyJobs       int64         `yaml:"current-jobs-ready"`
	ReservedJobs    int64         `yaml:"current-jobs-reserved"`
	DelayedJobs     int64         `yaml:"current-jobs-delayed"`
	BuriedJobs      int64         `yaml:"current-jobs-buried"`
	TotalJobs       int64         `yaml:"total-jobs"`
	CurrentUsing    int64         `yaml:"current-using"`
	CurrentWatching int64         `yaml:"current-watching"`
	CurrentWaiting  int64         `yaml:"current-waiting"`
	Deletes         int64         `yaml:"cmd-delete"`
	Pauses          int64         `yaml:"cmd-pause-tube"`
	Pause           time.Duration `yaml:"pause"`
	PauseLeft       time.Duration `yaml:"pause-time-left"`
}

TubeStats describe the statistics of a beanstalk tube.

Directories

Path Synopsis
cmd

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL