brokers

package
v2.0.0-...-0175b30 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 11, 2019 License: MPL-2.0 Imports: 14 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func AdjustRoutingKey

func AdjustRoutingKey(b Interface, s *tasks.Signature)

AdjustRoutingKey makes sure the routing key is correct. If the routing key is an empty string: a) set it to binding key for direct exchange type b) set it to default queue name

func IsAMQP

func IsAMQP(b Interface) bool

IsAMQP returns true if the broker is AMQP

Types

type AMQPBroker

type AMQPBroker struct {
	Broker
	common.AMQPConnector
	// contains filtered or unexported fields
}

AMQPBroker represents an AMQP broker

func (*AMQPBroker) Publish

func (b *AMQPBroker) Publish(signature *tasks.Signature) error

Publish places a new message on the default queue

func (*AMQPBroker) StartConsuming

func (b *AMQPBroker) StartConsuming(consumerTag string, concurrency int, taskProcessor TaskProcessor) (bool, error)

StartConsuming enters a loop and waits for incoming messages

func (*AMQPBroker) StopConsuming

func (b *AMQPBroker) StopConsuming()

StopConsuming quits the loop

type Broker

type Broker struct {
	// contains filtered or unexported fields
}

Broker represents a base broker structure

func New

func New(cnf *config.Config) Broker

New creates new Broker instance

func (*Broker) GetConfig

func (b *Broker) GetConfig() *config.Config

GetConfig returns config

func (*Broker) GetPendingTasks

func (b *Broker) GetPendingTasks(queue string) ([]*tasks.Signature, error)

GetPendingTasks returns a slice of task.Signatures waiting in the queue

func (*Broker) GetRegisteredTaskNames

func (b *Broker) GetRegisteredTaskNames() []string

GetRegisteredTaskNames returns registered tasks names

func (*Broker) IsTaskRegistered

func (b *Broker) IsTaskRegistered(name string) bool

IsTaskRegistered returns true if the task is registered with this broker

func (*Broker) Publish

func (b *Broker) Publish(signature *tasks.Signature) error

Publish places a new message on the default queue

func (*Broker) SetRegisteredTaskNames

func (b *Broker) SetRegisteredTaskNames(names []string)

SetRegisteredTaskNames sets registered task names

func (*Broker) StartConsuming

func (b *Broker) StartConsuming(consumerTag string, concurrency int, taskProcessor TaskProcessor) (bool, error)

StartConsuming enters a loop and waits for incoming messages

func (*Broker) StopConsuming

func (b *Broker) StopConsuming()

StopConsuming quits the loop

type ErrCouldNotUnmarshaTaskSignature

type ErrCouldNotUnmarshaTaskSignature struct {
	// contains filtered or unexported fields
}

ErrCouldNotUnmarshaTaskSignature ...

func NewErrCouldNotUnmarshaTaskSignature

func NewErrCouldNotUnmarshaTaskSignature(msg []byte, err error) ErrCouldNotUnmarshaTaskSignature

NewErrCouldNotUnmarshaTaskSignature returns new ErrCouldNotUnmarshaTaskSignature instance

func (ErrCouldNotUnmarshaTaskSignature) Error

Error implements the error interface

type Interface

type Interface interface {
	GetConfig() *config.Config
	SetRegisteredTaskNames(names []string)
	IsTaskRegistered(name string) bool
	StartConsuming(consumerTag string, concurrency int, p TaskProcessor) (bool, error)
	StopConsuming()
	Publish(task *tasks.Signature) error
	GetPendingTasks(queue string) ([]*tasks.Signature, error)
}

Interface - a common interface for all brokers

func NewAMQPBroker

func NewAMQPBroker(cnf *config.Config) Interface

NewAMQPBroker creates new AMQPBroker instance

type TaskProcessor

type TaskProcessor interface {
	Process(signature *tasks.Signature) error
	ProcessBatch(parent *tasks.Signature, signatures []*tasks.Signature, pool chan struct{}) error
}

TaskProcessor - can process a delivered task This will probably always be a worker instance

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL