amqptee

package module
v0.0.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 15, 2014 License: MIT Imports: 11 Imported by: 0

README

amqp-tee

Work In Progress

API is unstable.

Build Status Godoc

AMQP consumer which will consume all messages from a given queue and insert them into a configurable database with a static schema (you can see the schema in delivery_store.go). Currently it only stores the properties and body of the message, but not any headers.

Usage

Run amqp-tee -h for a listing of flags.

Development

Requires SQLite3 header files to be present for testing.

Requires gosqlite package for testing (go get -v code.google.com/p/gosqlite/sqlite3)

Test via go test github.com/modcloth-labs/amqptee.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	//Version is the git version injected via build flags (see Makefile)
	Version string
	//Rev is the git rev injected via build flags (see Makefile)
	Rev string
)

Functions

This section is empty.

Types

type AMQPConsumer

type AMQPConsumer struct {
	// contains filtered or unexported fields
}

AMQPConsumer pulls acssages from an AMQP queue and executes a callback for each

func NewAMQPConsumer

func NewAMQPConsumer(amqpURI string, queueName string) (amqpConsumer *AMQPConsumer, err error)

NewAMQPConsumer create a new AMQPConsumer, connect to an AMQP service, and start consuming from the queue

func (*AMQPConsumer) Close

func (ac *AMQPConsumer) Close()

Close close connections to AMQP service

func (*AMQPConsumer) Consume

func (ac *AMQPConsumer) Consume(deliveryHandler func(*amqp.Delivery) (err error)) (err error)

Consume start consuming all messages and execute the callback for each if the callback returns an error, this function exits with the same error err's if it cannot ack

type DeliveryStore

type DeliveryStore struct {
	// contains filtered or unexported fields
}

DeliveryStore represents a backing database to insert the AMQP messages into

func NewDeliveryStore

func NewDeliveryStore(databaseDriver string, databaseURI string, table string) (deliveryStore *DeliveryStore, err error)

NewDeliveryStore open a connection to the given database and initialize the schema

func (*DeliveryStore) Close

func (ds *DeliveryStore) Close()

Close closes the connection to the database

func (*DeliveryStore) Store

func (ds *DeliveryStore) Store(delivery *amqp.Delivery) (err error)

Store stores the given delivery object in the database

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL