Documentation ¶
Index ¶
- func AdjustRoutingKey(b Interface, s *tasks.Signature)
- func IsAMQP(b Interface) bool
- type AMQPBroker
- type Broker
- func (b *Broker) GetConfig() *config.Config
- func (b *Broker) GetPendingTasks(queue string) ([]*tasks.Signature, error)
- func (b *Broker) GetRegisteredTaskNames() []string
- func (b *Broker) IsTaskRegistered(name string) bool
- func (b *Broker) Publish(signature *tasks.Signature) error
- func (b *Broker) SetRegisteredTaskNames(names []string)
- func (b *Broker) StartConsuming(consumerTag string, concurrency int, taskProcessor TaskProcessor) (bool, error)
- func (b *Broker) StopConsuming()
- type ErrCouldNotUnmarshaTaskSignature
- type Interface
- type TaskProcessor
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func AdjustRoutingKey ¶
AdjustRoutingKey makes sure the routing key is correct. If the routing key is an empty string: a) set it to binding key for direct exchange type b) set it to default queue name
Types ¶
type AMQPBroker ¶
type AMQPBroker struct { Broker common.AMQPConnector // contains filtered or unexported fields }
AMQPBroker represents an AMQP broker
func (*AMQPBroker) Publish ¶
func (b *AMQPBroker) Publish(signature *tasks.Signature) error
Publish places a new message on the default queue
func (*AMQPBroker) StartConsuming ¶
func (b *AMQPBroker) StartConsuming(consumerTag string, concurrency int, taskProcessor TaskProcessor) (bool, error)
StartConsuming enters a loop and waits for incoming messages
func (*AMQPBroker) StopConsuming ¶
func (b *AMQPBroker) StopConsuming()
StopConsuming quits the loop
type Broker ¶
type Broker struct {
// contains filtered or unexported fields
}
Broker represents a base broker structure
func (*Broker) GetPendingTasks ¶
GetPendingTasks returns a slice of task.Signatures waiting in the queue
func (*Broker) GetRegisteredTaskNames ¶
GetRegisteredTaskNames returns registered tasks names
func (*Broker) IsTaskRegistered ¶
IsTaskRegistered returns true if the task is registered with this broker
func (*Broker) SetRegisteredTaskNames ¶
SetRegisteredTaskNames sets registered task names
func (*Broker) StartConsuming ¶
func (b *Broker) StartConsuming(consumerTag string, concurrency int, taskProcessor TaskProcessor) (bool, error)
StartConsuming enters a loop and waits for incoming messages
type ErrCouldNotUnmarshaTaskSignature ¶
type ErrCouldNotUnmarshaTaskSignature struct {
// contains filtered or unexported fields
}
ErrCouldNotUnmarshaTaskSignature ...
func NewErrCouldNotUnmarshaTaskSignature ¶
func NewErrCouldNotUnmarshaTaskSignature(msg []byte, err error) ErrCouldNotUnmarshaTaskSignature
NewErrCouldNotUnmarshaTaskSignature returns new ErrCouldNotUnmarshaTaskSignature instance
func (ErrCouldNotUnmarshaTaskSignature) Error ¶
func (e ErrCouldNotUnmarshaTaskSignature) Error() string
Error implements the error interface
type Interface ¶
type Interface interface { GetConfig() *config.Config SetRegisteredTaskNames(names []string) IsTaskRegistered(name string) bool StartConsuming(consumerTag string, concurrency int, p TaskProcessor) (bool, error) StopConsuming() Publish(task *tasks.Signature) error GetPendingTasks(queue string) ([]*tasks.Signature, error) }
Interface - a common interface for all brokers
func NewAMQPBroker ¶
NewAMQPBroker creates new AMQPBroker instance