hop

package module
v0.0.0-...-0210273 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 15, 2018 License: MIT Imports: 5 Imported by: 0

README

Hop

GoDoc Go Report Card license

An AMQP client wrapper that provides easy work queue semantics

Introduction

Hop consists of a simple set of abstractions over AMQP concepts that allow you to think in terms of jobs and topics, rather than queues, exchanges and routing.

Jobs

The most basic abstraction in Hop are Jobs, which are work units that may be marked as done or failed by your worker processes. When a Job is completed succesfully, it gets removed from the queue. If a Job fails, on the other hand, the worker has the option of requeueing it or dropping it altogether. If a worker dies in the middle of processing a Job, the Job is placed back into the queue. All of this maps to AMQP messages and ack and reject mechanics.

Topics

Similarly to the beanstalkd concept of tubes, Hop introduces Topics, which are distinct queues into which producers can put work units, and from which workers can pull them for processing. Underneath, a Topic is a mapping to an AMQP TCP channel, and a queue with the same name as the Topic. Note that Topics are not the same as AMQP exchange topics.

The Work Queue

The third Hop abstraction is the Work Queue, which is nothing more than the grouping of Topics a worker or producer can interact with. The Work Queue abtracts over the TCP connection to the AMQP broker and performs such tasks as dialing, graceful shutdown, and declaration of new channels. Underneath, it maps to an AMQP direct exchange.

Usage

package main

import (
	log "github.com/Sirupsen/logrus"
	"github.com/mazingstudio/hop"
)

func main() {
	// Initialize a WorkQueue with the default configuration
	wq, err := hop.DefaultQueue("amqp://guest:guest@localhost:5672/")
	if err != nil {
		log.Fatalf("error creating queue: %s", err)
	}
	// Make sure to gracefully shut down
	defer wq.Close()

	// Get the "tasks" Topic handle
	tasks, err := wq.GetTopic("tasks")
	if err != nil {
		log.Fatalf("error getting topic: %s", err)
	}

	// Put a Job into the "tasks" Topic
	err = tasks.Put([]byte("Hello"))
	if err != nil {
		log.Fatalf("error putting: %s", err)
	}

	// Pull the Job from the Topic
	hello, err := tasks.Pull()
	if err != nil {
		log.Fatalf("error pulling: %s", err)
	}
	// This should print "hello"
	log.Infof("job body: %s", hello.Body())

	// Mark the Job as failed and requeue
	hello.Fail(true)

	// Pull the Job again
	hello2, err := tasks.Pull()
	if err != nil {
		log.Fatalf("error pulling: %s", err)
	}
	log.Infof("job body: %s", hello.Body())

	// Mark the Job as done
	hello2.Done()
}

License & Third Party Code

Hop uses streadway/amqp internally.

Hop is distributed under the MIT License. Please refer to LICENSE for more details.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var DefaultConfig = &Config{
	ExchangeName:       "hop.exchange",
	Persistent:         false,
	MaxConnectionRetry: 15 * time.Minute,
}

DefaultConfig are the default values used when initializing the default queue.

Functions

This section is empty.

Types

type Config

type Config struct {
	// ExchangeName is used when naming the underlying exchange for the work
	// queue. Please DO NOT use a pre-existing exchange name, as this would not
	// guarantee that the pre-existing exchange has the necessary properties
	// to provide work queue semantics.
	ExchangeName string
	// Persist indicates whether the underlying queues and messages should be
	// saved to disk to survive server restarts and crashes
	Persistent bool
	// MaxConnectionRetry indicates for how long dialing should be retried
	// during connection recovery
	MaxConnectionRetry time.Duration
	// AMQPConfig is optionally used to set up the internal connection to the
	// AMQP broker. If nil, default values (determined by the underlying
	// library) are used.
	AMQPConfig *amqp.Config
}

Config represents a WorkQueue configuration.

type Job

type Job struct {
	// contains filtered or unexported fields
}

Job represents a work unit taken from the work queue.

func (*Job) Body

func (j *Job) Body() []byte

Body returns the Job's body

func (*Job) Delivery

func (j *Job) Delivery() *amqp.Delivery

Delivery returns the Job's underlying delivery. Use this if you need more control over the AMQP message.

func (*Job) Done

func (j *Job) Done() error

Done marks the Job for queue removal.

func (*Job) Fail

func (j *Job) Fail(requeue bool) error

Fail marks the Job as failed. If requeue is true, the Job will be added back to the queue; otherwise, it will be dropped.

type Topic

type Topic struct {
	// contains filtered or unexported fields
}

Topic represents a named "tube" from which jobs can be exclusively pulled and put into. Underneath, each topic corresponds to a queue bound to a direct exchange, and each topic instance manages its own channel.

func (*Topic) Name

func (t *Topic) Name() string

Name returns the Topic's name.

func (*Topic) Pull

func (t *Topic) Pull() (*Job, error)

Pull blocks until a Job is available to consume and returns it.

func (*Topic) Put

func (t *Topic) Put(body []byte) error

Put places a Job in the queue with the given body. Use PutPublishing if you need more control over your messages.

func (*Topic) PutPublishing

func (t *Topic) PutPublishing(p *amqp.Publishing) error

PutPublishing puts an AMQP message into the queue. Use this if you need more granularity in your message parameters.

type WorkQueue

type WorkQueue struct {
	// contains filtered or unexported fields
}

WorkQueue is an abstraction over an AMQP connection that automatically performs management and configuration to provide easy work queue semantics.

func ConnectQueue

func ConnectQueue(addr string, config *Config) (*WorkQueue, error)

ConnectQueue dials using the provided address string and creates a queue with the passed configuration.

func DefaultQueue

func DefaultQueue(addr string) (*WorkQueue, error)

DefaultQueue creates a queue with the default configuration and connection parameters.

func (*WorkQueue) Close

func (q *WorkQueue) Close() error

Close gracefully closes the connection to the message broker.

func (*WorkQueue) GetTopic

func (q *WorkQueue) GetTopic(name string) (*Topic, error)

GetTopic returns a "tube" handle, from which to pull and put jobs into. Thanks to AMQP protocol rules, Topic declarations are idempotent, meaning that Topics only get created if they don't already exist.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL