consumer

package
v0.0.0-...-bdfe0f4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 8, 2018 License: MIT Imports: 12 Imported by: 0

Documentation

Index

Constants

View Source
const (
	TOPIC_EXCHANGE = "topic"
)

Variables

View Source
var (
	ERRNAMEREQUIRED = errors.New("name is a required exchange field")
)

Functions

This section is empty.

Types

type BrokerConfig

type BrokerConfig struct {
	Exchange  ExchangeConfig
	Consumers map[string]ConsumerConfig
}

type Consumer

type Consumer interface {
	// Init can be used to get a custom
	// consumer config. If it returns nil a consumer
	// with default params will be setup
	Init() (*ConsumerConfig, error)
	// Prefix defines a common prefix to be added
	// to all queue names
	Prefix() string
	// Middleware can be used to implement custom
	// middleware which gets called before messages
	// are passed to handlers
	Middleware(HandlerFunc) HandlerFunc
	// Queues is used to define the queues, keys and handlers
	// Config is passed which can be used to set QOS and consumer name
	Queues(context.Context) map[string]*Routes
}

Consumer is an interface which can be implemented to create a consumer

The consumer can setup multiple queues, define n routing keys for each queue and in turn assign a handler to manage the messages received

Custom middleware can be added using the Middleware method

Example implementation shown below:

type MyConsumer struct{

}

func NewMyConsumer() events.Consumer {
	return &MyConsumer{}
}

func (c *MyConsumer) Prefix() string{
	return ""
}

func (c *MyConsumer) Middleware(h events.HandlerFunc) events.HandlerFunc{
	return func(m events.BasicMessage) error {
		return h(m)
	}
}

func (c *MyConsumer) Setup(ctx context.Context) (map[string]*events.ConsumerRoutes, ){
	return map[string]*events.ConsumerRoutes{"mark.queue":{
			Keys: []string{"test.message", "mark.#"},
			Handler:c.TestHandler,
		},
	}
}

func (c *MyConsumer) TestHandler(m events.BasicMessage) error{
	return nil
}

type ConsumerConfig

type ConsumerConfig struct {
	Name           string
	Durable        *bool
	AutoDelete     *bool
	NoWait         *bool
	Exclusive      *bool
	Ttl            *uint
	PrefetchCount  *uint
	PrefetchSize   *uint
	Args           map[string]interface{}
	HasDeadletter  *bool
	DeadletterName *string
}

ConsumerConfig defines the setup of a consumer If this isn't set default values will be used. To set a custom config for a consumer setup a new consumer struct, pass the config and return it in the Init() method

func (*ConsumerConfig) BuildDeadletterQueue

func (c *ConsumerConfig) BuildDeadletterQueue(routes *Routes, ch *amqp.Channel, con *amqp.Connection, ex string) (err error)

func (*ConsumerConfig) BuildQueue

func (c *ConsumerConfig) BuildQueue(queueName string, routes *Routes, ch *amqp.Channel, ex string) (err error)

func (*ConsumerConfig) GetArgs

func (e *ConsumerConfig) GetArgs() map[string]interface{}

GetArgs gets a table of arbitrary arguments which are passed to the exchange

func (*ConsumerConfig) GetAutoDelete

func (c *ConsumerConfig) GetAutoDelete() bool

GetAutoDelete determines whether the queue is deleted on server restart, default is false

func (*ConsumerConfig) GetDeadletterName

func (e *ConsumerConfig) GetDeadletterName() string

GetDeadletterName gets the name for the deadletter queue to be setup, if nil then a name of %QueueName%.deadletter is used

func (*ConsumerConfig) GetDurable

func (c *ConsumerConfig) GetDurable() bool

GetDurable returns the type of durability set in config, if nil then it returns a default of true

func (*ConsumerConfig) GetExclusive

func (c *ConsumerConfig) GetExclusive() bool

GetExclusive queues are only accessible by the connection that declares them and will be deleted when the connection closes. default is false

func (*ConsumerConfig) GetHasDeadletter

func (e *ConsumerConfig) GetHasDeadletter() bool

GetArgs gets a table of arbitrary arguments which are passed to the exchange

func (*ConsumerConfig) GetName

func (c *ConsumerConfig) GetName() string

GetName returns the consumer name if set in config otherwise it returns a random uuid

func (*ConsumerConfig) GetNoWait

func (c *ConsumerConfig) GetNoWait() bool

GetNoWait When true, the queue will assume to be declared on the server. A channel exception will arrive if the conditions are met for existing queues or attempting to modify an existing queue from a different connection. default is false

func (*ConsumerConfig) GetPrefetchCount

func (c *ConsumerConfig) GetPrefetchCount() uint

GetPrefetchCount returns the Qos value for number of messages pulled from the queue at a time default is 0 which will pull the default count for most libs

func (*ConsumerConfig) GetPrefetchSize

func (c *ConsumerConfig) GetPrefetchSize() uint

type Exchange

type Exchange struct {
	// contains filtered or unexported fields
}

type ExchangeConfig

type ExchangeConfig struct {
	Name string
	// contains filtered or unexported fields
}

Exchange config sets up a new exchange with the provided params Defaults are enabled so not all params may need set depending on requirements

func (*ExchangeConfig) BuildExchange

func (e *ExchangeConfig) BuildExchange(ch *amqp.Channel) (err error)

BuildExchange builds an exchange

func (*ExchangeConfig) GetArgs

func (e *ExchangeConfig) GetArgs() map[string]interface{}

GetArgs gets a table of arbitrary arguments which are passed to the exchange

func (*ExchangeConfig) GetAutoDelete

func (e *ExchangeConfig) GetAutoDelete() bool

GetAutoDelete returns the type of deletion policy set in config, if nil then it returns a default of false

func (*ExchangeConfig) GetDurable

func (e *ExchangeConfig) GetDurable() bool

GetDurable returns the type of durability set in config, if nil then it returns a default of true

func (*ExchangeConfig) GetInternal

func (e *ExchangeConfig) GetInternal() bool

GetInternal determines whether this exchange can only be published to from other exchanges default value is false meaning external sources can by default publish to this exchange

func (*ExchangeConfig) GetName

func (e *ExchangeConfig) GetName() (string, error)

GetAutoDelete returns the type of deletion policy set in config, if nil then it returns a default of false

func (*ExchangeConfig) GetType

func (e *ExchangeConfig) GetType() string

GetType returns the type of exchange set in config, if nil then it returns a default of Topic

type HandlerFunc

type HandlerFunc func(context.Context, amqp.Delivery)

func JsonHandler

func JsonHandler(h HandlerFunc) HandlerFunc

func MessageDump

func MessageDump(h HandlerFunc) HandlerFunc

MessageDump will output the entire amqp message with the body converted to a string Handy for debugging

func (HandlerFunc) HandleMessage

func (f HandlerFunc) HandleMessage(ctx context.Context, m amqp.Delivery)

type Host

type Host interface {
	// AddBroker will register an exchange and n consumers
	// which will consume from that exchange
	AddBroker(context.Context, *ExchangeConfig, []Consumer) error
	// Start will setup all queues and routing keys
	// assigned to each consumer and then in turn start them
	Run(context.Context) (err error)
	// Middleware can be used to implement custom
	// middleware which gets called before messages
	// are passed to handlers
	Middleware(...HostMiddleware)
	// Stop can be called when you wish to shut down the host
	Stop(context.Context) error

	GetConnectionStatus() bool
}

Host is the container which is used to host all consumers that are registered. It is responsible for the amqp connection starting & gracefully stopping all running consumers h := NewRabbitHost().Init(cfg.Host) h.AddBroker(NewBroker(cfg.Exchange, [])

func NewConsumerHost

func NewConsumerHost(cfg *HostConfig) Host

Init sets up the initial connection & quality of service to be used by all registered consumers

type HostConfig

type HostConfig struct {
	Address string
}

HostConfig contains global config used for the rabbit connection

type HostMiddleware

type HostMiddleware func(handler HandlerFunc) HandlerFunc

type KeyHandlerFunc

type KeyHandlerFunc func(context.Context, amqp.Delivery) error

type MessageHandler

type MessageHandler interface {
	HandleMessage(context.Context, amqp.Delivery)
}

MessageHandler works in the same way httpHandlers do allowing middleware etc to be used on consumers

type MiddlewareList

type MiddlewareList []HostMiddleware

type RabbitHost

type RabbitHost struct {
	// contains filtered or unexported fields
}

func (*RabbitHost) AddBroker

func (h *RabbitHost) AddBroker(ctx context.Context, cfg *ExchangeConfig, consumers []Consumer) error

AddBroker will register an exchange and n consumers which will consume from that exchange

func (*RabbitHost) GetConnectionStatus

func (h *RabbitHost) GetConnectionStatus() bool

func (*RabbitHost) Middleware

func (h *RabbitHost) Middleware(fn ...HostMiddleware)

func (*RabbitHost) Run

func (h *RabbitHost) Run(ctx context.Context) (err error)

Start will setup all queues and routing keys assigned to each consumer and then in turn start them

func (*RabbitHost) Stop

func (h *RabbitHost) Stop(context.Context) error

type Routes

type Routes struct {
	Keys         []string
	DeliveryFunc KeyHandlerFunc
}

Routes contains a set of routing keys and a handlerFunc that will be used to process messages meeting the routing keys

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL