rabbit

package
v1.2.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 22, 2023 License: Apache-2.0 Imports: 12 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	DialError                   error
	ConnectionChannelError      error
	ChannelExchangeDeclareError error
	ChannelQueueDeclareError    error
	ChannelQueueBindError       error
	ChannelConsumeError         error
	MockSubCh                   <-chan amqp.Delivery
	ChannelPublishError         error
)

Functions

func InitializeContext

func InitializeContext(ctx context.Context) context.Context

InitializeContext adds the rabbit session to the context. The new context is returned because context is immutable.

func WaitForWithWantedErrorNormalizer added in v0.16.1

func WaitForWithWantedErrorNormalizer(wantErr bool, err error, propertiesType string) error

WaitForWithWantedErrorNormalizer Normalizes error for "wait for" steps depending if you are expecting error or not.

Types

type AMQPService added in v0.16.0

type AMQPService struct{}

func NewAMQPService added in v0.16.0

func NewAMQPService() *AMQPService

func (AMQPService) ChannelClose added in v0.16.0

func (a AMQPService) ChannelClose(channel *amqp.Channel) error

func (AMQPService) ChannelConsume added in v0.16.0

func (a AMQPService) ChannelConsume(channel *amqp.Channel, queue string,
) (<-chan amqp.Delivery, error)

func (AMQPService) ChannelExchangeDeclare added in v0.16.0

func (a AMQPService) ChannelExchangeDeclare(channel *amqp.Channel, name string) error

func (AMQPService) ChannelPublish added in v0.16.0

func (a AMQPService) ChannelPublish(channel *amqp.Channel,
	exchange string,
	msg amqp.Publishing,
) error

func (AMQPService) ChannelQueueBind added in v0.16.0

func (a AMQPService) ChannelQueueBind(channel *amqp.Channel, name, exchange string) error

func (AMQPService) ChannelQueueDeclare added in v0.16.0

func (a AMQPService) ChannelQueueDeclare(channel *amqp.Channel) (amqp.Queue, error)

func (AMQPService) ConnectionChannel added in v0.16.0

func (a AMQPService) ConnectionChannel(connection *amqp.Connection) (*amqp.Channel, error)

func (AMQPService) Dial added in v0.16.0

func (a AMQPService) Dial(url string) (*amqp.Connection, error)

type AMQPServiceFuncMock added in v0.16.0

type AMQPServiceFuncMock struct{}

func (AMQPServiceFuncMock) ChannelClose added in v0.16.0

func (a AMQPServiceFuncMock) ChannelClose(channel *amqp.Channel) error

func (AMQPServiceFuncMock) ChannelConsume added in v0.16.0

func (a AMQPServiceFuncMock) ChannelConsume(channel *amqp.Channel, queue string,
) (<-chan amqp.Delivery, error)

func (AMQPServiceFuncMock) ChannelExchangeDeclare added in v0.16.0

func (a AMQPServiceFuncMock) ChannelExchangeDeclare(channel *amqp.Channel, name string) error

func (AMQPServiceFuncMock) ChannelPublish added in v0.16.0

func (a AMQPServiceFuncMock) ChannelPublish(channel *amqp.Channel,
	exchange string,
	msg amqp.Publishing,
) error

func (AMQPServiceFuncMock) ChannelQueueBind added in v0.16.0

func (a AMQPServiceFuncMock) ChannelQueueBind(channel *amqp.Channel, name, exchange string) error

func (AMQPServiceFuncMock) ChannelQueueDeclare added in v0.16.0

func (a AMQPServiceFuncMock) ChannelQueueDeclare(channel *amqp.Channel) (amqp.Queue, error)

func (AMQPServiceFuncMock) ConnectionChannel added in v0.16.0

func (a AMQPServiceFuncMock) ConnectionChannel(c *amqp.Connection) (*amqp.Channel, error)

func (AMQPServiceFuncMock) Dial added in v0.16.0

type AMQPServiceFunctions added in v0.16.0

type AMQPServiceFunctions interface {
	Dial(url string) (*amqp.Connection, error)
	ConnectionChannel(c *amqp.Connection) (*amqp.Channel, error)
	ChannelExchangeDeclare(channel *amqp.Channel, name string) error
	ChannelQueueDeclare(channel *amqp.Channel) (amqp.Queue, error)
	ChannelQueueBind(channel *amqp.Channel, name, exchange string) error
	ChannelConsume(channel *amqp.Channel, queue string,
	) (<-chan amqp.Delivery, error)
	ChannelClose(channel *amqp.Channel) error
	ChannelPublish(channel *amqp.Channel,
		exchange string,
		msg amqp.Publishing,
	) error
}

type ContextKey

type ContextKey string

ContextKey defines a type to store the rabbit session in context.Context.

type Logger

type Logger struct {
	Log *golium.Logger
}

Logger logs in a configurable file.

func GetLogger

func GetLogger() *Logger

GetLogger returns the logger for rabbit messages in publish/subscribe. If the logger is not created yet, it creates a new instance of Logger.

func (Logger) LogPublishedMessage

func (l Logger) LogPublishedMessage(msg, topic, corr string)

LogPublishedMessage logs a rabbit message published to a topic.

func (Logger) LogReceivedMessage

func (l Logger) LogReceivedMessage(msg, topic, corr string)

LogReceivedMessage logs a rabbit message received from a topic.

func (Logger) LogSubscribedTopic

func (l Logger) LogSubscribedTopic(topic string)

LogSubscribedTopic logs the subscription to a rabbit topic.

type Session

type Session struct {
	Connection *amqp.Connection
	// Messages received from the publish/subscribe channel
	Messages []amqp.Delivery
	// Messages consumed from the publish/subscribe channel
	ConsumedMessages []amqp.Delivery
	// Correlator is used to correlate the messages for a specific session
	Correlator string

	// ampq service
	AMQPServiceClient AMQPServiceFunctions
	// contains filtered or unexported fields
}

Session contains the information of a rabbit session.

func GetSession

func GetSession(ctx context.Context) *Session

GetSession returns the rabbit session stored in context. Note that the context should be previously initialized with InitializeContext function.

func (*Session) ConfigureConnection

func (s *Session) ConfigureConnection(ctx context.Context, uri string) error

ConfigureConnection creates a rabbit connection based on the URI.

func (*Session) ConfigureHeaders

func (s *Session) ConfigureHeaders(ctx context.Context, t *godog.Table) error

ConfigureHeaders stores a table of rabbit headers in the application context.

func (*Session) ConfigureStandardProperties

func (s *Session) ConfigureStandardProperties(ctx context.Context, t *godog.Table) error

ConfigureStandardProperties stores a table of rabbit properties in the application context.

func (*Session) PublishJSONMessage

func (s *Session) PublishJSONMessage(
	ctx context.Context,
	topic string,
	t *godog.Table,
) error

PublishJSONMessage publishes a JSON message in a rabbit topic.

func (*Session) PublishTextMessage

func (s *Session) PublishTextMessage(ctx context.Context, topic, message string) error

PublishTextMessage publishes a text message in a rabbit topic.

func (*Session) SubscribeTopic

func (s *Session) SubscribeTopic(ctx context.Context, topic string) error

SubscribeTopic subscribes to a rabbit topic to receive messages via a channel.

func (*Session) Unsubscribe

func (s *Session) Unsubscribe(ctx context.Context) error

Unsubscribe unsubscribes from rabbit closing the channel associated. If this method is not invoked, then the goroutine created with SubscribeTopic is never closed and will permanently processing messages from the topic until the program is finished.

func (*Session) ValidateMessageHeaders

func (s *Session) ValidateMessageHeaders(
	ctx context.Context,
	t *godog.Table,
) error

ValidateMessageHeaders checks if the message rabbit headers are equal the expected values.

func (*Session) ValidateMessageJSONBody

func (s *Session) ValidateMessageJSONBody(ctx context.Context,
	t *godog.Table,
	pos int,
) error

ValidateMessageJSONBody checks if the message json body properties of message in position 'pos' are equal the expected values. if pos == -1 then it means last message stored, that is the one stored in s.msg

func (*Session) ValidateMessageStandardProperties

func (s *Session) ValidateMessageStandardProperties(
	props amqp.Delivery,
) error

ValidateMessageStandardProperties checks if the message standard rabbit properties are equal the expected values.

func (*Session) ValidateMessageTextBody

func (s *Session) ValidateMessageTextBody(ctx context.Context, expectedMsg string) error

ValidateMessageTextBody checks if the message text body is equal to the expected value.

func (*Session) WaitForJSONMessageWithProperties

func (s *Session) WaitForJSONMessageWithProperties(ctx context.Context,
	timeout time.Duration,
	t *godog.Table,
	wantErr bool,
) error

WaitForJSONMessageWithProperties waits up to timeout and verifies if there is a message received in the topic with the requested properties. When wantErr is set to true function returns error if message is found with the JSON properties and returns no error when message is not found after timeout.

func (*Session) WaitForMessagesWithStandardProperties

func (s *Session) WaitForMessagesWithStandardProperties(
	ctx context.Context,
	timeout time.Duration,
	count int,
	strictly bool,
	t *godog.Table,
	wantErr bool,
) error

WaitForMessagesWithStandardProperties waits for 'count' messages with standard rabbit properties that are equal to the expected values. If strictly is set to true, function loops through all received messages to check that the amount exactly matches the expected When wantErr is set to true function returns error if message is found with the JSON properties and returns no error when message is not found after timeout.

func (*Session) WaitForTextMessage

func (s *Session) WaitForTextMessage(ctx context.Context,
	timeout time.Duration,
	expectedMsg string,
) error

WaitForTextMessage waits up to timeout until the expected message is found in the received messages for this session.

type Steps

type Steps struct {
}

Steps to initialize common steps.

func (Steps) InitializeSteps

func (cs Steps) InitializeSteps(ctx context.Context, scenCtx *godog.ScenarioContext) context.Context

InitializeSteps initializes all the steps.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL