brokers

package
v1.0.14 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 5, 2017 License: MPL-2.0 Imports: 13 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func AdjustRoutingKey added in v1.0.14

func AdjustRoutingKey(b Interface, s *tasks.Signature)

AdjustRoutingKey makes sure the routing key is correct. If the routing key is an empty string: a) set it to binding key for direct exchange type b) set it to default queue name

func IsAMQP added in v1.0.14

func IsAMQP(b Interface) bool

IsAMQP returns true if the broker is AMQP

Types

type AMQPBroker

type AMQPBroker struct {
	Broker
	common.AMQPConnector
	// contains filtered or unexported fields
}

AMQPBroker represents an AMQP broker

func (*AMQPBroker) Publish

func (b *AMQPBroker) Publish(signature *tasks.Signature) error

Publish places a new message on the default queue

func (*AMQPBroker) StartConsuming

func (b *AMQPBroker) StartConsuming(consumerTag string, concurrency int, taskProcessor TaskProcessor) (bool, error)

StartConsuming enters a loop and waits for incoming messages

func (*AMQPBroker) StopConsuming

func (b *AMQPBroker) StopConsuming()

StopConsuming quits the loop

type Broker

type Broker struct {
	// contains filtered or unexported fields
}

Broker represents a base broker structure

func New

func New(cnf *config.Config) Broker

New creates new Broker instance

func (*Broker) GetConfig added in v1.0.14

func (b *Broker) GetConfig() *config.Config

GetConfig returns config

func (*Broker) GetPendingTasks

func (b *Broker) GetPendingTasks(queue string) ([]*tasks.Signature, error)

GetPendingTasks returns a slice of task.Signatures waiting in the queue

func (*Broker) IsTaskRegistered

func (b *Broker) IsTaskRegistered(name string) bool

IsTaskRegistered returns true if the task is registered with this broker

func (*Broker) Publish added in v1.0.14

func (b *Broker) Publish(signature *tasks.Signature) error

Publish places a new message on the default queue

func (*Broker) SetRegisteredTaskNames

func (b *Broker) SetRegisteredTaskNames(names []string)

SetRegisteredTaskNames sets registered task names

func (*Broker) StartConsuming added in v1.0.14

func (b *Broker) StartConsuming(consumerTag string, concurrency int, taskProcessor TaskProcessor) (bool, error)

StartConsuming enters a loop and waits for incoming messages

func (*Broker) StopConsuming added in v1.0.14

func (b *Broker) StopConsuming()

StopConsuming quits the loop

type EagerBroker

type EagerBroker struct {
	Broker
	// contains filtered or unexported fields
}

EagerBroker represents an "eager" in-memory broker

func (*EagerBroker) AssignWorker

func (eagerBroker *EagerBroker) AssignWorker(w TaskProcessor)

AssignWorker assigns a worker to the eager broker

func (*EagerBroker) GetPendingTasks

func (eagerBroker *EagerBroker) GetPendingTasks(queue string) ([]*tasks.Signature, error)

GetPendingTasks returns a slice of task.Signatures waiting in the queue

func (*EagerBroker) Publish

func (eagerBroker *EagerBroker) Publish(task *tasks.Signature) error

Publish places a new message on the default queue

func (*EagerBroker) StartConsuming

func (eagerBroker *EagerBroker) StartConsuming(consumerTag string, concurrency int, p TaskProcessor) (bool, error)

StartConsuming enters a loop and waits for incoming messages

func (*EagerBroker) StopConsuming

func (eagerBroker *EagerBroker) StopConsuming()

StopConsuming quits the loop

type EagerMode

type EagerMode interface {
	AssignWorker(p TaskProcessor)
}

EagerMode interface with methods specific for this broker

type ErrCouldNotUnmarshaTaskSignature added in v1.0.12

type ErrCouldNotUnmarshaTaskSignature struct {
	// contains filtered or unexported fields
}

ErrCouldNotUnmarshaTaskSignature ...

func NewErrCouldNotUnmarshaTaskSignature added in v1.0.12

func NewErrCouldNotUnmarshaTaskSignature(msg []byte, err error) ErrCouldNotUnmarshaTaskSignature

NewErrCouldNotUnmarshaTaskSignature returns new NewErrCouldNotUnmarshaTaskSignature instance

func (ErrCouldNotUnmarshaTaskSignature) Error added in v1.0.12

Error implements the error interface

type Interface

type Interface interface {
	GetConfig() *config.Config
	SetRegisteredTaskNames(names []string)
	IsTaskRegistered(name string) bool
	StartConsuming(consumerTag string, concurrency int, p TaskProcessor) (bool, error)
	StopConsuming()
	Publish(task *tasks.Signature) error
	GetPendingTasks(queue string) ([]*tasks.Signature, error)
}

Interface - a common interface for all brokers

func NewAMQPBroker

func NewAMQPBroker(cnf *config.Config) Interface

NewAMQPBroker creates new AMQPBroker instance

func NewEagerBroker

func NewEagerBroker() Interface

NewEagerBroker creates new EagerBroker instance

func NewRedisBroker

func NewRedisBroker(cnf *config.Config, host, password, socketPath string, db int) Interface

NewRedisBroker creates new RedisBroker instance

type RedisBroker

type RedisBroker struct {
	Broker
	common.RedisConnector
	// contains filtered or unexported fields
}

RedisBroker represents a Redis broker

func (*RedisBroker) GetPendingTasks

func (b *RedisBroker) GetPendingTasks(queue string) ([]*tasks.Signature, error)

GetPendingTasks returns a slice of task signatures waiting in the queue

func (*RedisBroker) Publish

func (b *RedisBroker) Publish(signature *tasks.Signature) error

Publish places a new message on the default queue

func (*RedisBroker) StartConsuming

func (b *RedisBroker) StartConsuming(consumerTag string, concurrency int, taskProcessor TaskProcessor) (bool, error)

StartConsuming enters a loop and waits for incoming messages

func (*RedisBroker) StopConsuming

func (b *RedisBroker) StopConsuming()

StopConsuming quits the loop

type TaskProcessor

type TaskProcessor interface {
	Process(signature *tasks.Signature) error
}

TaskProcessor - can process a delivered task This will probably always be a worker instance

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL