consumer

package
v0.0.0-...-0b84189 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 12, 2024 License: MIT Imports: 20 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrConsumerClose             = errors.New("failed to close rabbitmq consumer")
	ErrValueDeserialization      = errors.New("failed to deserialize value")
	ErrMessageHandler            = errors.New("message handler failed")
	ErrAck                       = errors.New("failed to acknowledge message")
	ErrConsumerRetryLimitReached = errors.New("consumer retry limit reached")
)
View Source
var (
	ErrMessageErrorHandler = errors.New("message error handler failed processing")
)

these errors are the result of a rabbitmq failure and should be considered fatal/critical

Functions

This section is empty.

Types

type JSONDeserializer

type JSONDeserializer[T any] struct{}

func NewJsonValueDeserializer

func NewJsonValueDeserializer[T any]() (*JSONDeserializer[T], error)

func (JSONDeserializer[T]) Close

func (JSONDeserializer[T]) Close()

func (JSONDeserializer[T]) Deserialize

func (JSONDeserializer[T]) Deserialize(ctx context.Context, topic string, payload []byte) (interface{}, error)

func (JSONDeserializer[T]) DeserializeInto

func (JSONDeserializer[T]) DeserializeInto(ctx context.Context, topic string, payload []byte, msg interface{}) error

type MessageErrorHandler

type MessageErrorHandler interface {
	OnError(ctx context.Context, raw *amqp.Delivery) error
}

type MessageHandler

type MessageHandler interface {
	OnReceive(ctx context.Context, key, value interface{}) error
}

type NilDeserializer

type NilDeserializer struct{}

func (NilDeserializer) Close

func (NilDeserializer) Close()

func (NilDeserializer) Deserialize

func (NilDeserializer) Deserialize(ctx context.Context, topic string, payload []byte) (interface{}, error)

func (NilDeserializer) DeserializeInto

func (NilDeserializer) DeserializeInto(ctx context.Context, topic string, payload []byte, msg interface{}) error

type NoOpMessageErrorHandler

type NoOpMessageErrorHandler struct{}

func (*NoOpMessageErrorHandler) OnError

func (n *NoOpMessageErrorHandler) OnError(ctx context.Context, raw *amqp.Delivery) error

type NoOpMessageHandler

type NoOpMessageHandler struct{}

func (*NoOpMessageHandler) OnReceive

func (n *NoOpMessageHandler) OnReceive(ctx context.Context, key, value interface{}) error

type ProtoJsonDeserializer

type ProtoJsonDeserializer[T proto.Message] struct{}

func NewProtoJsonValueDeserializer

func NewProtoJsonValueDeserializer[T proto.Message]() (*ProtoJsonDeserializer[T], error)

func (ProtoJsonDeserializer[T]) Close

func (ProtoJsonDeserializer[T]) Close()

func (ProtoJsonDeserializer[T]) Deserialize

func (ProtoJsonDeserializer[T]) Deserialize(ctx context.Context, topic string, payload []byte) (interface{}, error)

func (ProtoJsonDeserializer[T]) DeserializeInto

func (ProtoJsonDeserializer[T]) DeserializeInto(ctx context.Context, topic string, payload []byte, msg interface{}) error

type ProtobufDeserializer

type ProtobufDeserializer[T proto.Message] struct{}

func NewProtobufValueDeserializer

func NewProtobufValueDeserializer[T proto.Message]() (*ProtobufDeserializer[T], error)

func (ProtobufDeserializer[T]) Close

func (ProtobufDeserializer[T]) Close()

func (ProtobufDeserializer[T]) Deserialize

func (ProtobufDeserializer[T]) Deserialize(ctx context.Context, topic string, payload []byte) (interface{}, error)

func (ProtobufDeserializer[T]) DeserializeInto

func (ProtobufDeserializer[T]) DeserializeInto(ctx context.Context, topic string, payload []byte, msg interface{}) error

type RabbitConsumer

type RabbitConsumer struct {
	ValueDeserializer ValueDeserializer
	Tracer            oteltrace.Tracer
	TracePropagator   propagation.TextMapPropagator
	Metrics           *metrics.ConsumerMetrics
	Topic             string
	Ready             chan bool

	MessageHandler      MessageHandler
	MessageErrorHandler MessageErrorHandler
	ProcessingDelay     time.Duration
	// contains filtered or unexported fields
}

func NewRabbitConsumer

func NewRabbitConsumer(opts ...RabbitConsumerOption) (*RabbitConsumer, error)

func (*RabbitConsumer) Close

func (rc *RabbitConsumer) Close() error

func (*RabbitConsumer) Run

func (rc *RabbitConsumer) Run(ctx context.Context) error

type RabbitConsumerOption

type RabbitConsumerOption func(*RabbitConsumer)

func WithConnectionString

func WithConnectionString(connectionString string) RabbitConsumerOption

func WithConsumerArgs

func WithConsumerArgs(args amqp.Table) RabbitConsumerOption

func WithConsumerAutoAck

func WithConsumerAutoAck(autoAck bool) RabbitConsumerOption

func WithConsumerExclusive

func WithConsumerExclusive(exclusive bool) RabbitConsumerOption

func WithConsumerName

func WithConsumerName(consumerName string) RabbitConsumerOption

func WithConsumerNoLocal

func WithConsumerNoLocal(noLocal bool) RabbitConsumerOption

func WithConsumerNoWait

func WithConsumerNoWait(noWait bool) RabbitConsumerOption

func WithExchangeArgs

func WithExchangeArgs(args amqp.Table) RabbitConsumerOption

func WithExchangeAutoDelete

func WithExchangeAutoDelete(autoDelete bool) RabbitConsumerOption

func WithExchangeDurable

func WithExchangeDurable(durable bool) RabbitConsumerOption

func WithExchangeInternal

func WithExchangeInternal(internal bool) RabbitConsumerOption

func WithExchangeName

func WithExchangeName(exchangeName string) RabbitConsumerOption

func WithExchangeNoWait

func WithExchangeNoWait(noWait bool) RabbitConsumerOption

func WithExchangeType

func WithExchangeType(exchangeType string) RabbitConsumerOption

func WithHost

func WithHost(host string) RabbitConsumerOption

func WithMessageErrorHandler

func WithMessageErrorHandler(handler MessageErrorHandler) RabbitConsumerOption

func WithMessageHandler

func WithMessageHandler(handler MessageHandler) RabbitConsumerOption

func WithPort

func WithPort(port string) RabbitConsumerOption

func WithPrefetchCount

func WithPrefetchCount(prefetchCount int) RabbitConsumerOption

func WithProcessingDelay

func WithProcessingDelay(delay time.Duration) RabbitConsumerOption

func WithProtocol

func WithProtocol(protocol string) RabbitConsumerOption

func WithQueueArgs

func WithQueueArgs(args amqp.Table) RabbitConsumerOption

func WithQueueAutoDelete

func WithQueueAutoDelete(autoDelete bool) RabbitConsumerOption

func WithQueueDurable

func WithQueueDurable(durable bool) RabbitConsumerOption

func WithQueueExclusive

func WithQueueExclusive(exclusive bool) RabbitConsumerOption

func WithQueueName

func WithQueueName(queueName string) RabbitConsumerOption

func WithQueueNoBind

func WithQueueNoBind(noBind bool) RabbitConsumerOption

func WithQueueNoWait

func WithQueueNoWait(noWait bool) RabbitConsumerOption

func WithSuppressProcessingErrors

func WithSuppressProcessingErrors(suppress bool) RabbitConsumerOption

func WithTopic

func WithTopic(topic string) RabbitConsumerOption

func WithTracePropagator

func WithTracePropagator(propagator propagation.TextMapPropagator) RabbitConsumerOption

func WithTracer

func WithTracer(tracer oteltrace.Tracer) RabbitConsumerOption

func WithUsername

func WithUsername(username string) RabbitConsumerOption

func WithVHost

func WithVHost(vhost string) RabbitConsumerOption

func WithValueDeserializer

func WithValueDeserializer(valueDeserializer ValueDeserializer) RabbitConsumerOption

type StringDeserializer

type StringDeserializer struct{}

func (StringDeserializer) Close

func (StringDeserializer) Close()

func (StringDeserializer) Deserialize

func (StringDeserializer) Deserialize(ctx context.Context, topic string, payload []byte) (interface{}, error)

func (StringDeserializer) DeserializeInto

func (StringDeserializer) DeserializeInto(ctx context.Context, topic string, payload []byte, msg interface{}) error

type ValueDeserializer

type ValueDeserializer interface {
	Deserialize(ctx context.Context, topic string, payload []byte) (interface{}, error)
	DeserializeInto(ctx context.Context, topic string, payload []byte, msg interface{}) error
	Close()
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL