tcr

package
v2.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 29, 2022 License: MIT Imports: 27 Imported by: 6

Documentation

Index

Constants

View Source
const (
	// GzipCompressionType helps identify which compression/decompression to use.
	GzipCompressionType = "gzip"

	// ZstdCompressionType helps identify which compression/decompression to use.
	ZstdCompressionType = "zstd"

	//AesSymmetricType helps identity which encryption/decryption to use.
	AesSymmetricType = "aes"
)
View Source
const (
	// QueueTypeQuorum indicates a queue of type quorum.
	QueueTypeQuorum = "quorum"

	// QueueTypeClassic indicates a queue of type classic.
	QueueTypeClassic = "classic"
)

Variables

This section is empty.

Functions

func CompareArgon2Hash

func CompareArgon2Hash(passphrase, salt string, multiplier uint32, hashedPassword []byte) (bool, error)

CompareArgon2Hash creates an Argon hash and then compares it to a provided hash.

func CompressWithGzip

func CompressWithGzip(data []byte, buffer *bytes.Buffer) error

CompressWithGzip uses the standard Gzip Writer to compress data and places data in the supplied buffer.

func CompressWithZstd

func CompressWithZstd(data []byte, buffer *bytes.Buffer) error

CompressWithZstd uses an external dependency for Zstd to compress data and places data in the supplied buffer.

func CreatePayload

func CreatePayload(
	input interface{},
	compression *CompressionConfig,
	encryption *EncryptionConfig) ([]byte, error)

CreatePayload creates a JSON marshal and optionally compresses and encrypts the bytes.

func CreateTLSConfig

func CreateTLSConfig(pemLocation string, localLocation string) (*tls.Config, error)

CreateTLSConfig creates a x509 TLS Config for use in TLS-based communication.

func CreateWrappedPayload

func CreateWrappedPayload(
	input interface{},
	letterID uuid.UUID,
	metadata string,
	compression *CompressionConfig,
	encryption *EncryptionConfig) ([]byte, error)

CreateWrappedPayload wraps your data in a plaintext wrapper called ModdedLetter and performs the selected modifications to data.

func DecompressWithGzip

func DecompressWithGzip(buffer *bytes.Buffer) error

DecompressWithGzip uses the standard Gzip Reader to decompress data and replaces the supplied buffer with a new buffer with data in it.

func DecompressWithZstd

func DecompressWithZstd(buffer *bytes.Buffer) error

DecompressWithZstd uses an external dependency for Zstd to decompress data and replaces the supplied buffer with a new buffer with data in it.

func DecryptWithAes

func DecryptWithAes(cipherDataWithNonce, hashedKey []byte, nonceSize int) ([]byte, error)

DecryptWithAes decrypts bytes based on an Aes compatible hashed key.

func EncryptWithAes

func EncryptWithAes(data, hashedKey []byte, nonceSize int) ([]byte, error)

EncryptWithAes encrypts bytes based on an AES-256 compatible hashed key. If nonceSize is less than 12, the standard, 12, is used.

func GetHashWithArgon

func GetHashWithArgon(passphrase, salt string, timeConsideration uint32, multiplier uint32, threads uint8, hashLength uint32) []byte

GetHashWithArgon uses Argon2 version 0x13 to hash a plaintext password with a provided salt string and return hash as bytes.

func GetStringHashWithArgon

func GetStringHashWithArgon(passphrase, salt string, timeConsideration uint32, threads uint8, hashLength uint32) string

GetStringHashWithArgon uses Argon2 version 0x13 to hash a plaintext password with a provided salt string and return hash as base64 string.

func JSONUtcTimestamp added in v2.1.0

func JSONUtcTimestamp() string

JSONUtcTimestamp quickly creates a string RFC3339 format in UTC

func JSONUtcTimestampFromTime added in v2.1.0

func JSONUtcTimestampFromTime(t time.Time) string

JSONUtcTimestampFromTime quickly creates a string RFC3339 format in UTC

func RandomBytes

func RandomBytes(size int) []byte

RandomBytes returns a RandomString converted to bytes.

func RandomString

func RandomString(size int) string

RandomString creates a new RandomSource to generate a RandomString unique per nanosecond.

func RandomStringFromSource

func RandomStringFromSource(size int, src rand.Source) string

RandomStringFromSource generates a Random string that should always be unique. Example RandSrc.) var src = rand.NewSource(time.Now().UnixNano()) Source: https://stackoverflow.com/questions/22892120/how-to-generate-a-random-string-of-a-fixed-length-in-go

func ReadJSONFileToInterface

func ReadJSONFileToInterface(fileNamePath string) (interface{}, error)

ReadJSONFileToInterface opens a file.json and converts to interface{}.

func ReadPayload

func ReadPayload(buffer *bytes.Buffer, compression *CompressionConfig, encryption *EncryptionConfig) error

ReadPayload unencrypts and uncompresses payloads

func RepeatedBytes

func RepeatedBytes(size int, repeat int) []byte

RepeatedBytes generates a RandomString and then repeats it up to size.

func RepeatedRandomString

func RepeatedRandomString(size int, repeat int) string

RepeatedRandomString generates a RandomString and then repeats it up to size and repeat count.

Types

type ChannelHost

type ChannelHost struct {
	Channel       *amqp.Channel
	ID            uint64
	ConnectionID  uint64
	Ackable       bool
	CachedChannel bool
	Confirmations chan amqp.Confirmation
	Errors        chan *amqp.Error
	// contains filtered or unexported fields
}

ChannelHost is an internal representation of amqp.Channel.

func NewChannelHost

func NewChannelHost(
	connHost *ConnectionHost,
	id uint64,
	connectionID uint64,
	ackable, cached bool) (*ChannelHost, error)

NewChannelHost creates a simple ChannelHost wrapper for management by end-user developer.

func (*ChannelHost) Close

func (ch *ChannelHost) Close()

Close allows for manual close of Amqp Channel kept internally.

func (*ChannelHost) FlushConfirms

func (ch *ChannelHost) FlushConfirms()

FlushConfirms removes all previous confirmations pending processing.

func (*ChannelHost) MakeChannel

func (ch *ChannelHost) MakeChannel() (err error)

MakeChannel tries to create (or re-create) the channel from the ConnectionHost its attached to.

func (*ChannelHost) PauseForFlowControl

func (ch *ChannelHost) PauseForFlowControl()

PauseForFlowControl allows you to wait and sleep while receiving flow control messages.

type CompressionConfig

type CompressionConfig struct {
	Enabled bool   `json:"Enabled" yaml:"Enabled"`
	Type    string `json:"Type,omitempty" yaml:"Type,omitempty"`
}

CompressionConfig allows you to configuration symmetric key encryption based on options

type ConnectionHost

type ConnectionHost struct {
	Connection         *amqp.Connection
	ConnectionID       uint64
	CachedChannelCount uint64

	Errors   chan *amqp.Error
	Blockers chan amqp.Blocking
	// contains filtered or unexported fields
}

ConnectionHost is an internal representation of amqp.Connection.

func NewConnectionHost

func NewConnectionHost(
	uri string,
	connectionName string,
	connectionID uint64,
	heartbeatInterval time.Duration,
	connectionTimeout time.Duration,
	tlsConfig *TLSConfig) (*ConnectionHost, error)

NewConnectionHost creates a simple ConnectionHost wrapper for management by end-user developer.

func (*ConnectionHost) Connect

func (ch *ConnectionHost) Connect() bool

Connect tries to connect (or reconnect) to the provided properties of the host one time.

func (*ConnectionHost) ConnectWithErrorHandler added in v2.3.0

func (ch *ConnectionHost) ConnectWithErrorHandler(errorHandler func(error)) bool

ConnectWithErrorHandler tries to connect (or reconnect) to the provided properties of the host one time with an error handler.

func (*ConnectionHost) PauseOnFlowControl

func (ch *ConnectionHost) PauseOnFlowControl()

PauseOnFlowControl allows you to wait and sleep while receiving flow control messages. Sleeps for one second, repeatedly until the blocking has stopped.

type ConnectionPool

type ConnectionPool struct {
	Config PoolConfig
	// contains filtered or unexported fields
}

ConnectionPool houses the pool of RabbitMQ connections.

func NewConnectionPool

func NewConnectionPool(config *PoolConfig) (*ConnectionPool, error)

NewConnectionPool creates hosting structure for the ConnectionPool.

func NewConnectionPoolWithErrorHandler added in v2.0.7

func NewConnectionPoolWithErrorHandler(config *PoolConfig, errorHandler func(error)) (*ConnectionPool, error)

NewConnectionPoolWithErrorHandler creates hosting structure for the ConnectionPool with an error handler.

func NewConnectionPoolWithHandlers added in v2.3.0

func NewConnectionPoolWithHandlers(config *PoolConfig, errorHandler func(error), unhealthyHandler func(error)) (*ConnectionPool, error)

NewConnectionPoolWithHandlers creates hosting structure for the ConnectionPool with an error and/or unhealthy handler.

func NewConnectionPoolWithUnhealthyHandler added in v2.3.0

func NewConnectionPoolWithUnhealthyHandler(config *PoolConfig, unhealthyHandler func(error)) (*ConnectionPool, error)

NewConnectionPoolWithUnhealthyHandler creates hosting structure for the ConnectionPool with an unhealthy handler.

func (*ConnectionPool) GetChannelFromPool

func (cp *ConnectionPool) GetChannelFromPool() *ChannelHost

GetChannelFromPool gets a cached ackable channel from the Pool if they exist or creates a channel. A non-acked channel is always a transient channel. Blocking if Ackable is true and the cache is empty. If you want a transient Ackable channel (un-managed), use CreateChannel directly.

func (*ConnectionPool) GetConnection

func (cp *ConnectionPool) GetConnection() (*ConnectionHost, error)

GetConnection gets a connection based on whats in the ConnectionPool (blocking under bad network conditions). Flowcontrol (blocking) or transient network outages will pause here until cleared. Uses the SleepOnErrorInterval to pause between retries.

func (*ConnectionPool) GetTransientChannel

func (cp *ConnectionPool) GetTransientChannel(ackable bool) *amqp.Channel

GetTransientChannel allows you create an unmanaged amqp Channel with the help of the ConnectionPool.

func (*ConnectionPool) ReturnChannel

func (cp *ConnectionPool) ReturnChannel(chanHost *ChannelHost, erred bool)

ReturnChannel returns a Channel. If Channel is not a cached channel, it is simply closed here. If Cache Channel, we check if erred, new Channel is created instead and then returned to the cache.

func (*ConnectionPool) ReturnConnection

func (cp *ConnectionPool) ReturnConnection(connHost *ConnectionHost, flag bool)

ReturnConnection puts the connection back in the queue and flag it for error. This helps maintain a Round Robin on Connections and their resources.

func (*ConnectionPool) Shutdown

func (cp *ConnectionPool) Shutdown()

Shutdown closes all connections in the ConnectionPool and resets the Pool to pre-initialized state.

type Consumer

type Consumer struct {
	Config         *ConsumerConfig
	ConnectionPool *ConnectionPool
	Enabled        bool
	QueueName      string
	ConsumerName   string
	// contains filtered or unexported fields
}

Consumer receives messages from a RabbitMQ location.

func NewConsumer

func NewConsumer(
	rconfig *RabbitSeasoning,
	cp *ConnectionPool,
	queuename string,
	consumerName string,
	autoAck bool,
	exclusive bool,
	noWait bool,
	args map[string]interface{},
	qosCountOverride int,
	sleepOnErrorInterval uint32,
	sleepOnIdleInterval uint32) (*Consumer, error)

NewConsumer creates a new Consumer to receive messages from a specific queuename.

func NewConsumerFromConfig

func NewConsumerFromConfig(config *ConsumerConfig, cp *ConnectionPool) *Consumer

NewConsumerFromConfig creates a new Consumer to receive messages from a specific queuename.

func (*Consumer) Errors

func (con *Consumer) Errors() <-chan error

Errors yields all the internal errs for consuming messages.

func (*Consumer) FlushErrors

func (con *Consumer) FlushErrors()

FlushErrors allows you to flush out all previous Errors.

func (*Consumer) FlushMessages

func (con *Consumer) FlushMessages()

FlushMessages allows you to flush out all previous Messages. WARNING: THIS WILL RESULT IN LOST MESSAGES.

func (*Consumer) FlushStop

func (con *Consumer) FlushStop()

FlushStop allows you to flush out all previous Stop signals.

func (*Consumer) Get

func (con *Consumer) Get(queueName string) (*amqp.Delivery, error)

Get gets a single message from any queue. Auto-Acknowledges.

func (*Consumer) GetBatch

func (con *Consumer) GetBatch(queueName string, batchSize int) ([]*amqp.Delivery, error)

GetBatch gets a group of messages from any queue. Auto-Acknowledges.

func (*Consumer) ReceivedMessages

func (con *Consumer) ReceivedMessages() <-chan *ReceivedMessage

ReceivedMessages yields all the internal messages ready for consuming.

func (*Consumer) StartConsuming

func (con *Consumer) StartConsuming()

StartConsuming starts the Consumer.

func (*Consumer) StartConsumingWithAction

func (con *Consumer) StartConsumingWithAction(action func(*ReceivedMessage))

StartConsumingWithAction starts the Consumer invoking a method on every ReceivedMessage.

func (*Consumer) Started added in v2.0.7

func (con *Consumer) Started() bool

Started allows you to determine if a consumer has started.

func (*Consumer) StopConsuming

func (con *Consumer) StopConsuming(immediate bool, flushMessages bool) error

StopConsuming allows you to signal stop to the consumer. Will stop on the consumer channelclose or responding to signal after getting all remaining deviveries. FlushMessages empties the internal buffer of messages received by queue. Ackable messages are still in RabbitMQ queue, while noAck messages will unfortunately be lost. Use wisely.

type ConsumerConfig

type ConsumerConfig struct {
	Enabled              bool                   `json:"Enabled" yaml:"Enabled"`
	QueueName            string                 `json:"QueueName" yaml:"QueueName"`
	ConsumerName         string                 `json:"ConsumerName" yaml:"ConsumerName"`
	AutoAck              bool                   `json:"AutoAck" yaml:"AutoAck"`
	Exclusive            bool                   `json:"Exclusive" yaml:"Exclusive"`
	NoWait               bool                   `json:"NoWait" yaml:"NoWait"`
	Args                 map[string]interface{} `json:"Args" yaml:"Args"`
	QosCountOverride     int                    `json:"QosCountOverride" yaml:"QosCountOverride"`         // if zero ignored
	SleepOnErrorInterval uint32                 `json:"SleepOnErrorInterval" yaml:"SleepOnErrorInterval"` // sleep on error
	SleepOnIdleInterval  uint32                 `json:"SleepOnIdleInterval" yaml:"SleepOnIdleInterval"`   // sleep on idle
}

ConsumerConfig represents settings for configuring a consumer with ease.

type EncryptionConfig

type EncryptionConfig struct {
	Enabled           bool   `json:"Enabled" yaml:"Enabled"`
	Type              string `json:"Type,omitempty" yaml:"Type,omitempty"`
	Hashkey           []byte
	TimeConsideration uint32 `json:"TimeConsideration,omitempty" yaml:"TimeConsideration,omitempty"`
	MemoryMultiplier  uint32 `json:"" yaml:""`
	Threads           uint8  `json:"Threads,omitempty" yaml:"Threads,omitempty"`
}

EncryptionConfig allows you to configuration symmetric key encryption based on options

type Envelope

type Envelope struct {
	Exchange      string
	RoutingKey    string
	ContentType   string
	CorrelationID string
	Type          string
	Mandatory     bool
	Immediate     bool
	Headers       amqp.Table
	DeliveryMode  uint8
	Priority      uint8
}

Envelope contains all the address details of where a letter is going.

type ErrorMessage

type ErrorMessage struct {
	Code    int
	Reason  string
	Server  bool
	Recover bool
}

ErrorMessage allow for you to replay a message that was returned.

func NewErrorMessage

func NewErrorMessage(amqpError *amqp.Error) *ErrorMessage

NewErrorMessage creates a new ErrorMessage.

func (*ErrorMessage) Error

func (em *ErrorMessage) Error() string

Error allows you to quickly log the ErrorMessage struct as a string.

type Exchange

type Exchange struct {
	Name           string     `json:"Name" yaml:"Name"`
	Type           string     `json:"Type" yaml:"Type"` // "direct", "fanout", "topic", "headers"
	PassiveDeclare bool       `json:"PassiveDeclare" yaml:"PassiveDeclare"`
	Durable        bool       `json:"Durable" yaml:"Durable"`
	AutoDelete     bool       `json:"AutoDelete" yaml:"AutoDelete"`
	InternalOnly   bool       `json:"InternalOnly" yaml:"InternalOnly"`
	NoWait         bool       `json:"NoWait" yaml:"NoWait"`
	Args           amqp.Table `json:"Args,omitempty" yaml:"Args,omitempty"` // map[string]interface()
}

Exchange allows for you to create Exchange topology.

type ExchangeBinding

type ExchangeBinding struct {
	ExchangeName       string     `json:"ExchangeName" yaml:"ExchangeName"`
	ParentExchangeName string     `json:"ParentExchangeName" yaml:"ParentExchangeName"`
	RoutingKey         string     `json:"RoutingKey" yaml:"RoutingKey"`
	NoWait             bool       `json:"NoWait" yaml:"NoWait"`
	Args               amqp.Table `json:"Args,omitempty" yaml:"Args,omitempty"` // map[string]interface()
}

ExchangeBinding allows for you to create Bindings between an Exchange and Exchange.

type Letter

type Letter struct {
	LetterID   uuid.UUID
	RetryCount uint32
	Body       []byte
	Envelope   *Envelope
}

Letter contains the message body and address of where things are going.

func CreateLetter

func CreateLetter(exchangeName string, routingKey string, body []byte) *Letter

CreateLetter creates a simple letter for publishing.

func CreateMockLetter

func CreateMockLetter(exchangeName string, routingKey string, body []byte) *Letter

CreateMockLetter creates a mock letter for publishing.

func CreateMockRandomLetter

func CreateMockRandomLetter(routingKey string) *Letter

CreateMockRandomLetter creates a mock letter for publishing with random sizes and random Ids.

func CreateMockRandomWrappedBodyLetter

func CreateMockRandomWrappedBodyLetter(routingKey string) *Letter

CreateMockRandomWrappedBodyLetter creates a mock Letter for publishing with random sizes and random Ids.

type ModdedBody

type ModdedBody struct {
	Encrypted   bool   `json:"Encrypted"`
	EType       string `json:"EncryptionType,omitempty"`
	Compressed  bool   `json:"Compressed"`
	CType       string `json:"CompressionType,omitempty"`
	UTCDateTime string `json:"UTCDateTime"`
	Data        []byte `json:"Data"`
}

ModdedBody is a payload with modifications and indicators of what was modified.

type PoolConfig

type PoolConfig struct {
	ApplicationName      string     `json:"ApplicationName" yaml:"ApplicationName"`
	URI                  string     `json:"URI" yaml:"URI"`
	Heartbeat            uint32     `json:"Heartbeat" yaml:"Heartbeat"`
	ConnectionTimeout    uint32     `json:"ConnectionTimeout" yaml:"ConnectionTimeout"`
	SleepOnErrorInterval uint32     `json:"SleepOnErrorInterval" yaml:"SleepOnErrorInterval"` // sleep length on errors
	MaxConnectionCount   uint64     `json:"MaxConnectionCount" yaml:"MaxConnectionCount"`     // number of connections to create in the pool
	MaxCacheChannelCount uint64     `json:"MaxCacheChannelCount" yaml:"MaxCacheChannelCount"` // number of channels to be cached in the pool
	TLSConfig            *TLSConfig `json:"TLSConfig" yaml:"TLSConfig"`                       // TLS settings for connection with AMQPS.
}

PoolConfig represents settings for creating/configuring pools.

type PublishConfirmation

type PublishConfirmation struct {
	DeliveryTag uint64 // Delivery Tag Id
	Acked       bool   // Acked Serverside
}

PublishConfirmation aids in guaranteed Deliverability.

func NewPublishConfirmation

func NewPublishConfirmation(confirmation *amqp.Confirmation) *PublishConfirmation

NewPublishConfirmation creates a new PublishConfirmation.

type PublishReceipt

type PublishReceipt struct {
	LetterID     uuid.UUID
	FailedLetter *Letter
	Success      bool
	Error        error
}

PublishReceipt is a way to monitor publishing success and to initiate a retry when using async publishing.

func (*PublishReceipt) ToString

func (not *PublishReceipt) ToString() string

ToString allows you to quickly log the PublishReceipt struct as a string.

type Publisher

type Publisher struct {
	Config         *RabbitSeasoning
	ConnectionPool *ConnectionPool
	// contains filtered or unexported fields
}

Publisher contains everything you need to publish a message.

func NewPublisher

func NewPublisher(
	cp *ConnectionPool,
	sleepOnIdleInterval time.Duration,
	sleepOnErrorInterval time.Duration,
	publishTimeOutDuration time.Duration) *Publisher

NewPublisher creates and configures a new Publisher.

func NewPublisherFromConfig

func NewPublisherFromConfig(
	config *RabbitSeasoning,
	cp *ConnectionPool) *Publisher

NewPublisherFromConfig creates and configures a new Publisher.

func (*Publisher) Publish

func (pub *Publisher) Publish(letter *Letter, skipReceipt bool)

Publish sends a single message to the address on the letter using a cached ChannelHost. Subscribe to PublishReceipts to see success and errors.

For proper resilience (at least once delivery guarantee over shaky network) use PublishWithConfirmation

func (*Publisher) PublishReceipts

func (pub *Publisher) PublishReceipts() <-chan *PublishReceipt

PublishReceipts yields all the success and failures during all publish events. Highly recommend susbscribing to this.

func (*Publisher) PublishWithConfirmation

func (pub *Publisher) PublishWithConfirmation(letter *Letter, timeout time.Duration)

PublishWithConfirmation sends a single message to the address on the letter with confirmation capabilities.

This is an expensive and slow call - use this when delivery confirmation on publish is your highest priority. A timeout failure drops the letter back in the PublishReceipts. A confirmation failure keeps trying to publish (at least until timeout failure occurs.)

func (*Publisher) PublishWithConfirmationContext

func (pub *Publisher) PublishWithConfirmationContext(ctx context.Context, letter *Letter)

PublishWithConfirmationContext sends a single message to the address on the letter with confirmation capabilities. This is an expensive and slow call - use this when delivery confirmation on publish is your highest priority. A timeout failure drops the letter back in the PublishReceipts. A confirmation failure keeps trying to publish (at least until timeout failure occurs.)

func (*Publisher) PublishWithConfirmationContextError added in v2.1.4

func (pub *Publisher) PublishWithConfirmationContextError(ctx context.Context, letter *Letter) error

PublishWithConfirmationContextError sends a single message to the address on the letter with confirmation capabilities. This is an expensive and slow call - use this when delivery confirmation on publish is your highest priority. A timeout failure drops the letter back in the PublishReceipts. A confirmation failure keeps trying to publish (at least until timeout failure occurs.)

func (*Publisher) PublishWithConfirmationError added in v2.1.4

func (pub *Publisher) PublishWithConfirmationError(letter *Letter, timeout time.Duration) error

PublishWithConfirmationError sends a single message to the address on the letter with confirmation capabilities.

This is an expensive and slow call - use this when delivery confirmation on publish is your highest priority. A timeout failure drops the letter back in the PublishReceipts. A confirmation failure keeps trying to publish (at least until timeout failure occurs.)

func (*Publisher) PublishWithConfirmationTransient

func (pub *Publisher) PublishWithConfirmationTransient(letter *Letter, timeout time.Duration)

PublishWithConfirmationTransient sends a single message to the address on the letter with confirmation capabilities on transient Channels. This is an expensive and slow call - use this when delivery confirmation on publish is your highest priority. A timeout failure drops the letter back in the PublishReceipts. When combined with QueueLetter, it automatically

gets requeued for re-publish.

A confirmation failure keeps trying to publish (at least until timeout failure occurs.)

func (*Publisher) PublishWithError added in v2.1.4

func (pub *Publisher) PublishWithError(letter *Letter, skipReceipt bool) error

PublishWithError sends a single message to the address on the letter using a cached ChannelHost.

For proper resilience (at least once delivery guarantee over shaky network) use PublishWithConfirmation

func (*Publisher) PublishWithTransient

func (pub *Publisher) PublishWithTransient(letter *Letter) error

PublishWithTransient sends a single message to the address on the letter using a transient (new) RabbitMQ channel. Subscribe to PublishReceipts to see success and errors. For proper resilience (at least once delivery guarantee over shaky network) use PublishWithConfirmation

func (*Publisher) QueueLetter

func (pub *Publisher) QueueLetter(letter *Letter) bool

QueueLetter queues up a letter that will be consumed by AutoPublish. By default, AutoPublish uses PublishWithConfirmation as the mechanism for publishing.

func (*Publisher) QueueLetters

func (pub *Publisher) QueueLetters(letters []*Letter) bool

QueueLetters allows you to bulk queue letters that will be consumed by AutoPublish. By default, AutoPublish uses PublishWithConfirmation as the mechanism for publishing.

func (*Publisher) Shutdown

func (pub *Publisher) Shutdown(shutdownPools bool)

Shutdown cleanly shutdown the publisher and resets it's internal state.

func (*Publisher) StartAutoPublishing

func (pub *Publisher) StartAutoPublishing()

StartAutoPublishing starts the Publisher's auto-publishing capabilities.

type PublisherConfig

type PublisherConfig struct {
	AutoAck                bool   `json:"AutoAck" yaml:"AutoAck"`
	SleepOnIdleInterval    uint32 `json:"SleepOnIdleInterval" yaml:"SleepOnIdleInterval"`
	SleepOnErrorInterval   uint32 `json:"SleepOnErrorInterval" yaml:"SleepOnErrorInterval"`
	PublishTimeOutInterval uint32 `json:"PublishTimeOutInterval" yaml:"PublishTimeOutInterval"`
	MaxRetryCount          uint32 `json:"MaxRetryCount" yaml:"MaxRetryCount"`
}

PublisherConfig represents settings for configuring global settings for all Publishers with ease.

type Queue

type Queue struct {
	Name           string     `json:"Name" yaml:"Name"`
	PassiveDeclare bool       `json:"PassiveDeclare" yaml:"PassiveDeclare"`
	Durable        bool       `json:"Durable" yaml:"Durable"`
	AutoDelete     bool       `json:"AutoDelete" yaml:"AutoDelete"`
	Exclusive      bool       `json:"Exclusive" yaml:"Exclusive"`
	NoWait         bool       `json:"NoWait" yaml:"NoWait"`
	Type           string     `json:"Type" yaml:"Type"`                     // classic or quorum, type of quorum disregards exclusive and enables durable properties when building from config
	Args           amqp.Table `json:"Args,omitempty" yaml:"Args,omitempty"` // map[string]interface()
}

Queue allows for you to create Queue topology.

type QueueBinding

type QueueBinding struct {
	QueueName    string     `json:"QueueName" yaml:"QueueName"`
	ExchangeName string     `json:"ExchangeName" yaml:"ExchangeName"`
	RoutingKey   string     `json:"RoutingKey" yaml:"RoutingKey"`
	NoWait       bool       `json:"NoWait" yaml:"NoWait"`
	Args         amqp.Table `json:"Args,omitempty" yaml:"Args,omitempty"` // map[string]interface()
}

QueueBinding allows for you to create Bindings between a Queue and Exchange.

type RabbitSeasoning

type RabbitSeasoning struct {
	EncryptionConfig  *EncryptionConfig          `json:"EncryptionConfig" yaml:"EncryptionConfig"`
	CompressionConfig *CompressionConfig         `json:"CompressionConfig" yaml:"CompressionConfig"`
	PoolConfig        *PoolConfig                `json:"PoolConfig" yaml:"PoolConfig"`
	ConsumerConfigs   map[string]*ConsumerConfig `json:"ConsumerConfigs" yaml:"ConsumerConfigs"`
	PublisherConfig   *PublisherConfig           `json:"PublisherConfig" yaml:"PublisherConfig"`
}

RabbitSeasoning represents the configuration values.

func ConvertJSONFileToConfig

func ConvertJSONFileToConfig(fileNamePath string) (*RabbitSeasoning, error)

ConvertJSONFileToConfig opens a file.json and converts to RabbitSeasoning.

type RabbitService

type RabbitService struct {
	Config         *RabbitSeasoning
	ConnectionPool *ConnectionPool
	Topologer      *Topologer
	Publisher      *Publisher
	// contains filtered or unexported fields
}

RabbitService is the struct for containing all you need for RabbitMQ access.

func NewRabbitService

func NewRabbitService(
	config *RabbitSeasoning,
	passphrase string,
	salt string,
	processPublishReceipts func(*PublishReceipt),
	processError func(error)) (*RabbitService, error)

NewRabbitService creates everything you need for a RabbitMQ communication service.

func NewRabbitServiceWithConnectionPool added in v2.3.0

func NewRabbitServiceWithConnectionPool(
	connectionPool *ConnectionPool,
	config *RabbitSeasoning,
	passphrase string,
	salt string,
	processPublishReceipts func(*PublishReceipt),
	processError func(error)) (*RabbitService, error)

NewRabbitServiceWithConnectionPool creates everything you need for a RabbitMQ communication service from a connection pool.

func NewRabbitServiceWithPublisher added in v2.3.0

func NewRabbitServiceWithPublisher(
	publisher *Publisher,
	config *RabbitSeasoning,
	passphrase string,
	salt string,
	processPublishReceipts func(*PublishReceipt),
	processError func(error)) (*RabbitService, error)

NewRabbitServiceWithPublisher creates everything you need for a RabbitMQ communication service from a publisher.

func (*RabbitService) CentralErr

func (rs *RabbitService) CentralErr() <-chan error

CentralErr yields all the internal errs for sub-processes.

func (*RabbitService) GetConsumer

func (rs *RabbitService) GetConsumer(consumerName string) (*Consumer, error)

GetConsumer allows you to get the individual consumers stored in memory.

func (*RabbitService) GetConsumerConfig

func (rs *RabbitService) GetConsumerConfig(consumerName string) (*ConsumerConfig, error)

GetConsumerConfig allows you to get the individual consumers' config stored in memory.

func (*RabbitService) Publish

func (rs *RabbitService) Publish(
	input interface{},
	exchangeName, routingKey, metadata string,
	wrapPayload bool,
	headers amqp.Table) error

Publish tries to publish directly without retry and data optionally wrapped in a ModdedLetter.

func (*RabbitService) PublishData

func (rs *RabbitService) PublishData(
	data []byte,
	exchangeName, routingKey string,
	headers amqp.Table) error

PublishData tries to publish.

func (*RabbitService) PublishLetter

func (rs *RabbitService) PublishLetter(letter *Letter) error

PublishLetter wraps around Publisher to simply Publish.

func (*RabbitService) PublishWithConfirmation

func (rs *RabbitService) PublishWithConfirmation(
	input interface{},
	exchangeName, routingKey, metadata string,
	wrapPayload bool,
	headers amqp.Table) error

PublishWithConfirmation tries to publish and wait for a confirmation.

func (*RabbitService) QueueLetter

func (rs *RabbitService) QueueLetter(letter *Letter) error

QueueLetter wraps around AutoPublisher to simply QueueLetter. Error indicates message was not queued.

func (*RabbitService) Shutdown

func (rs *RabbitService) Shutdown(stopConsumers bool)

Shutdown stops the service and shuts down the ChannelPool.

type ReceivedMessage

type ReceivedMessage struct {
	IsAckable     bool
	Body          []byte
	MessageID     string // LetterID
	ApplicationID string
	PublishDate   string
	Delivery      amqp.Delivery // Access everything.
}

ReceivedMessage allow for you to acknowledge, after processing the received payload, by its RabbitMQ tag and Channel pointer.

func NewReceivedMessage added in v2.1.0

func NewReceivedMessage(
	isAckable bool,
	delivery amqp.Delivery) *ReceivedMessage

NewReceivedMessage creates a new ReceivedMessage.

func (*ReceivedMessage) Acknowledge

func (msg *ReceivedMessage) Acknowledge() error

Acknowledge allows for you to acknowledge message on the original channel it was received. Will fail if channel is closed and this is by design per RabbitMQ server. Can't ack from a different channel.

func (*ReceivedMessage) Nack

func (msg *ReceivedMessage) Nack(requeue bool) error

Nack allows for you to negative acknowledge message on the original channel it was received. Will fail if channel is closed and this is by design per RabbitMQ server.

func (*ReceivedMessage) Reject

func (msg *ReceivedMessage) Reject(requeue bool) error

Reject allows for you to reject on the original channel it was received. Will fail if channel is closed and this is by design per RabbitMQ server.

type ReturnMessage

type ReturnMessage struct {
	ReplyCode  uint16 // reason
	ReplyText  string // description
	Exchange   string // basic.publish exchange
	RoutingKey string // basic.publish routing key

	// Properties
	ContentType     string                 // MIME content type
	ContentEncoding string                 // MIME content encoding
	Headers         map[string]interface{} // Application or header exchange table
	DeliveryMode    uint8                  // queue implementation use - non-persistent (1) or persistent (2)
	Priority        uint8                  // queue implementation use - 0 to 9
	CorrelationID   string                 // application use - correlation identifier
	ReplyTo         string                 // application use - address to to reply to (ex: RPC)
	Expiration      string                 // implementation use - message expiration spec
	MessageID       string                 // application use - message identifier
	Timestamp       time.Time              // application use - message timestamp
	Type            string                 // application use - message type name
	UserID          string                 // application use - creating user id
	AppID           string                 // application use - creating application

	Body []byte
}

ReturnMessage allow for you to replay a message that was returned.

func NewReturnMessage

func NewReturnMessage(amqpReturn *amqp.Return) *ReturnMessage

NewReturnMessage creates a new ReturnMessage.

type TLSConfig

type TLSConfig struct {
	EnableTLS         bool   `json:"EnableTLS" yaml:"EnableTLS"` // Use TLSConfig to create connections with AMQPS uri.
	PEMCertLocation   string `json:"PEMCertLocation" yaml:"PEMCertLocation"`
	LocalCertLocation string `json:"LocalCertLocation" yaml:"LocalCertLocation"`
	CertServerName    string `json:"CertServerName" yaml:"CertServerName"`
}

TLSConfig represents settings for configuring TLS.

type Topologer

type Topologer struct {
	ConnectionPool *ConnectionPool
}

Topologer allows you to build RabbitMQ topology backed by a ConnectionPool.

func NewTopologer

func NewTopologer(cp *ConnectionPool) *Topologer

NewTopologer builds you a new Topologer.

func (*Topologer) BindExchanges

func (top *Topologer) BindExchanges(bindings []*ExchangeBinding, ignoreErrors bool) error

BindExchanges loops thrrough and binds Exchanges to Exchanges - stops on first error.

func (*Topologer) BindQueues

func (top *Topologer) BindQueues(bindings []*QueueBinding, ignoreErrors bool) error

BindQueues loops through and binds Queues to Exchanges - stops on first error.

func (*Topologer) BuildExchanges

func (top *Topologer) BuildExchanges(exchanges []*Exchange, ignoreErrors bool) error

BuildExchanges loops through and builds Exchanges - stops on first error.

func (*Topologer) BuildQueues

func (top *Topologer) BuildQueues(queues []*Queue, ignoreErrors bool) error

BuildQueues loops through and builds Queues - stops on first error.

func (*Topologer) BuildTopology added in v2.1.3

func (top *Topologer) BuildTopology(config *TopologyConfig, ignoreErrors bool) error

BuildTopology builds a topology based on a TopologyConfig - stops on first error.

func (*Topologer) CreateExchange

func (top *Topologer) CreateExchange(
	exchangeName string,
	exchangeType string,
	passiveDeclare, durable, autoDelete, internal, noWait bool,
	args map[string]interface{}) error

CreateExchange builds an Exchange topology.

func (*Topologer) CreateExchangeFromConfig

func (top *Topologer) CreateExchangeFromConfig(exchange *Exchange) error

CreateExchangeFromConfig builds an Exchange topology from a config Exchange element.

func (*Topologer) CreateQueue

func (top *Topologer) CreateQueue(
	queueName string,
	passiveDeclare bool,
	durable bool,
	autoDelete bool,
	exclusive bool,
	noWait bool,
	args map[string]interface{}) error

CreateQueue builds a Queue topology.

func (*Topologer) CreateQueueFromConfig

func (top *Topologer) CreateQueueFromConfig(queue *Queue) error

CreateQueueFromConfig builds a Queue topology from a config Exchange element.

func (*Topologer) ExchangeBind

func (top *Topologer) ExchangeBind(exchangeBinding *ExchangeBinding) error

ExchangeBind binds an exchange to an Exchange.

func (*Topologer) ExchangeDelete

func (top *Topologer) ExchangeDelete(
	exchangeName string,
	ifUnused, noWait bool) error

ExchangeDelete removes the exchange from the server.

func (*Topologer) ExchangeUnbind

func (top *Topologer) ExchangeUnbind(exchangeName, routingKey, parentExchangeName string, noWait bool, args map[string]interface{}) error

ExchangeUnbind removes the binding of an Exchange to an Exchange.

func (*Topologer) PurgeQueue

func (top *Topologer) PurgeQueue(queueName string, noWait bool) (int, error)

PurgeQueue removes all messages from the Queue that are not waiting to be Acknowledged and returns the count.

func (*Topologer) PurgeQueues

func (top *Topologer) PurgeQueues(queueNames []string, noWait bool) (int, error)

PurgeQueues purges each Queue provided.

func (*Topologer) QueueBind

func (top *Topologer) QueueBind(queueBinding *QueueBinding) error

QueueBind binds an Exchange to a Queue.

func (*Topologer) QueueDelete

func (top *Topologer) QueueDelete(name string, ifUnused, ifEmpty, noWait bool) (int, error)

QueueDelete removes the queue from the server (and all bindings) and returns messages purged (count).

func (*Topologer) UnbindQueue

func (top *Topologer) UnbindQueue(queueName, routingKey, exchangeName string, args map[string]interface{}) error

UnbindQueue removes the binding of a Queue to an Exchange.

type TopologyConfig

type TopologyConfig struct {
	Exchanges        []*Exchange        `json:"Exchanges" yaml:"Exchanges"`
	Queues           []*Queue           `json:"Queues" yaml:"Queues"`
	QueueBindings    []*QueueBinding    `json:"QueueBindings" yaml:"QueueBindings"`
	ExchangeBindings []*ExchangeBinding `json:"ExchangeBindings" yaml:"ExchangeBindings"`
}

TopologyConfig allows you to build simple toplogies from a JSON file.

func ConvertJSONFileToTopologyConfig

func ConvertJSONFileToTopologyConfig(fileNamePath string) (*TopologyConfig, error)

ConvertJSONFileToTopologyConfig opens a file.json and converts to Topology.

type WrappedBody

type WrappedBody struct {
	LetterID       uuid.UUID   `json:"LetterID"`
	Body           *ModdedBody `json:"Body"`
	LetterMetadata string      `json:"LetterMetadata"`
}

WrappedBody is to go inside a Letter struct with indications of the body of data being modified (ex., compressed).

func ReadWrappedBodyFromJSONBytes

func ReadWrappedBodyFromJSONBytes(data []byte) (*WrappedBody, error)

ReadWrappedBodyFromJSONBytes simply read the bytes as a Letter.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL