event

package
v0.8.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 1, 2024 License: MIT Imports: 15 Imported by: 4

Documentation

Index

Constants

View Source
const (
	PublishChannelSize   = 1024
	RetryMinDelay        = 5 * time.Second
	PublishLogSampleRate = 0.1
	MaxRetries           = 3
)

Variables

View Source
var (
	ErrProducerShuttingDown = errors.New("amqp: producer shutting down")
	ErrProducerClosed       = errors.New("amqp: producer closed")
	ErrMaxRetriesReached    = errors.New("amqp: publish max retries reached")
	ErrRetryQueueFull       = errors.New("amqp: retry queue full")
)
View Source
var ByteCapacity = stream.ByteCapacity{}
View Source
var ErrConsumerClosed = errors.New("amqp: consumer closed")

Functions

func IsUnprocessableMessageErr added in v0.5.3

func IsUnprocessableMessageErr(err error) bool

func NewConsumerOptions

func NewConsumerOptions(name string, offset stream.OffsetSpecification) *stream.ConsumerOptions

func ParseStreamOptions

func ParseStreamOptions(raw RawStreamOptions) (*stream.StreamOptions, error)

func TimestampOffset

func TimestampOffset(t time.Time) stream.OffsetSpecification

func UnprocessableMsgErr added in v0.6.0

func UnprocessableMsgErr(reason error) error

If error is not nil, wraps it in an unprocessable message error so the consumer does not requeue the message.

Types

type AMQPChanOps added in v0.4.12

type AMQPChanOps interface {
	Publish(exchange, key string, mandatory, immediate bool, msg amqp.Publishing) error
	Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args amqp.Table) (<-chan amqp.Delivery, error)
}

type AMQPChanSetup added in v0.4.12

type AMQPChanSetup interface {
	ExchangeBind(destination, key, source string, noWait bool, args amqp.Table) error
	ExchangeDeclare(name, kind string, durable, autoDelete, internal, noWait bool, args amqp.Table) error
	ExchangeDeclarePassive(name, kind string, durable, autoDelete, internal, noWait bool, args amqp.Table) error
	ExchangeDelete(name string, ifUnused, noWait bool) error
	ExchangeUnbind(destination, key, source string, noWait bool, args amqp.Table) error
	QueueBind(name, key, exchange string, noWait bool, args amqp.Table) error
	QueueDeclare(name string, durable, autoDelete, exclusive, noWait bool, args amqp.Table) (amqp.Queue, error)
	QueueDeclarePassive(name string, durable, autoDelete, exclusive, noWait bool, args amqp.Table) (amqp.Queue, error)
	QueueDelete(name string, ifUnused, ifEmpty, noWait bool) (int, error)
	QueueInspect(name string) (amqp.Queue, error)
	QueueUnbind(name, key, exchange string, args amqp.Table) error
	Qos(prefetchCount, prefetchSize int, global bool) error
}

type AMQPClient added in v0.4.12

type AMQPClient interface {
	AMQPProducer
	AMQPConsumer
}

func NewAMQPClient added in v0.4.12

func NewAMQPClient(uri string, connectFn AMQPConnectFunc) (AMQPClient, error)

type AMQPConnectFunc added in v0.3.0

type AMQPConnectFunc func(ctx context.Context, uri string, confirms chan amqp.Confirmation, closed chan *amqp.Error) (AMQPChanOps, error)

func NewAMQPConnectFunc added in v0.3.0

func NewAMQPConnectFunc(setup func(c AMQPChanSetup) error) AMQPConnectFunc

type AMQPConsumer added in v0.4.12

type AMQPConsumer interface {
	Consume(queue string, concurrency int, handler AMQPMessageHandler) error
	Shutdown(context.Context) error
}

func NewAMQPConsumer added in v0.4.12

func NewAMQPConsumer(uri string, connectFn AMQPConnectFunc) (AMQPConsumer, error)

type AMQPMessage added in v0.3.0

type AMQPMessage struct {
	// Exchange and Key of message in the AMQP protocol.
	Exchange, Key string
	// Body is the payload of the message.
	Body interface{}
	// Persistent means whether this message should be persisted in durable
	// storage not to be lost on broker restarts.
	Persistent bool
	// Mandatory means that if the message cannot be routed to a queue, the
	// broker should return it to the sender. In other words, the broker will
	// try to put the message in at least one queue, and if there's no queue
	// bound to receive the message it will fail the publishing.
	Mandatory bool
	// ResultChan receives the result message from the publish operation. Used
	// to guarantee delivery of messages to the broker through confirmation.
	ResultChan chan<- PublishResult
	// WaitResult simplifies waiting for the result of a publish operation. If
	// true, `Publish` will only return after confirmation has been received for
	// the specific message. Cannot be specified together with a `ResultChan`.
	WaitResult bool
}

type AMQPMessageHandler added in v0.4.12

type AMQPMessageHandler func(amqp.Delivery) error

AMQPMessageHandler is a function that will be called for each message received.

type AMQPProducer added in v0.3.0

type AMQPProducer interface {
	Publish(ctx context.Context, msg AMQPMessage) error
	Shutdown(context.Context) error
}

func NewAMQPProducer added in v0.3.0

func NewAMQPProducer(uri string, connectFn AMQPConnectFunc) (AMQPProducer, error)

type BindingArgs

type BindingArgs struct {
	Key      string
	Exchange string
	Args     amqp.Table
}

type ConsumeOptions

type ConsumeOptions struct {
	Stream string
	*StreamOptions
	*stream.ConsumerOptions
	// Whether to memorize the message offset in the stream and use it on
	// re-connections to continue from the last read message.
	MemorizeOffset bool
}

type Handler

type Handler interface {
	HandleMessage(msg StreamMessage)
}

type PublishResult added in v0.4.13

type PublishResult struct {
	Message AMQPMessage
	Error   error
}

type RawStreamOptions

type RawStreamOptions struct {
	MaxLengthBytes      string
	MaxSegmentSizeBytes string
	MaxAge              time.Duration
}

type SimpleProducer added in v0.4.12

type SimpleProducer interface {
	Publish(ctx context.Context, key string, body interface{}, persistent bool) error
}

func NewAMQPExchangeProducer added in v0.0.3

func NewAMQPExchangeProducer(ctx context.Context, uri, exchange, keyNs string) (SimpleProducer, error)

func NewAMQPQueueProducer added in v0.0.3

func NewAMQPQueueProducer(ctx context.Context, uri, queue string) (SimpleProducer, error)

type StreamConsumer

type StreamConsumer interface {
	ConsumeChan(ctx context.Context, opts ConsumeOptions) (<-chan StreamMessage, error)
	Consume(ctx context.Context, opts ConsumeOptions, handler Handler) error
	CheckConnection() error
	Close() error
}

func NewStreamConsumer

func NewStreamConsumer(streamUriStr, amqpUriStr string) (StreamConsumer, error)

type StreamMessage

type StreamMessage struct {
	stream.ConsumerContext
	*streamAmqp.Message
}

type StreamOptions

type StreamOptions struct {
	stream.StreamOptions
	Bindings []BindingArgs
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL