azbus

package
v0.14.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 16, 2024 License: MIT Imports: 15 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// RenewalTime is the how often we want to renew the message PEEK lock
	//
	// Inspection of the topics and subscription shows that the PeekLock timeout is one minute.
	//
	// This clarifies the peeklock duration as 60s: https://github.com/MicrosoftDocs/azure-docs/issues/106047
	//
	// "The default lock duration is indeed 1 minute, we will get this updated in our documentation."
	// "As for your question about RenewLock, it's best to set the lock duration to something higher than your normal"
	// "processing time, so you don't have to call the RenewLock. Note that the maximum value is 5 minutes, so you will"
	// "need to call RenewLock if you want to have this longer. Also note that having a longer lock duration then needed"
	// "has some implications as well, f.e. when your client stops working, the message will only become available again"
	// "after the lock duration has passed."
	//
	// An analysis of elapsed times when processing msgs shows no message takes longer than 10s to process during our
	// normal test suites.
	//
	// Set to 50 seconds, well within the 60 seconds peek lock timeout
	RenewalTime = 50 * time.Second
)

Variables

View Source
var (
	ErrConnectionLost     = errors.New("connection lost")
	ErrLockLost           = errors.New("lock lost")
	ErrUnauthorizedAccess = errors.New("unauthorized")
	ErrTimeout            = errors.New("timeout")
)

Azure package expects the user to elucidate errors like so:

    var servicebusError *azservicebus.Error
    if errors.As(err, &servicebusError) && servicebusError.code == azservicebus.CodeUnauthorizedAccess {
	         ...

which is rather clumsy.

This code maps the internal code to an actual error so one can:

if errors.Is(err, azbus.ErrConnectionLost) {
    ...

which is easier and more idiomatic

View Source
var (
	ErrMessageOversized = errors.New("message is too large")
)
View Source
var (
	ErrNoHandler = errors.New("no handler defined")
)
View Source
var (
	ErrPeekLockTimeout = errors.New("peeklock deadline reached")
)

Set a timeout for processing the message, this should be no later than the message lock time. It is quite surprising that the azure service bus package does not add a deadline to the context input to the message handler.

NB: this has no effect as cancellaton is removed to get the azure sdk for go retry

logic which increases reliability.

Inspection of logs shows that the deadline is always 60s in the future which we will never exceed.

The use of the context returned here is problematic. Inspection of code that uses it shows that submethods called do not generally obey cancellation - they do not even have a context.Context as first argument.

Code that follows from calling this method should be wrapped in a select statement that terminates when the timeout expires - i.e. waits on ctx.Done(). Even this is not bulletproof as it is unclear how to terminate any of these submethods.

Probably the best solution is to remove this entirely and rely on RenewMessageLock. If it does timeout then it is too late anyway as the peeklock will already be released.

for the time being we impose a timeout as it is safe.

Functions

func EnableAzureLogging

func EnableAzureLogging(log Logger)

First attempt at incorporating azure logging. The EventSender option does not appear to work. TODO: Generalise this for any azure downstream package.

func NewAzbusError

func NewAzbusError(err error) error

func OutMessageProperties added in v0.13.3

func OutMessageProperties(o *OutMessage) map[string]any

func OutMessageSetProperty added in v0.13.3

func OutMessageSetProperty(o *OutMessage, k string, v any)

SetProperty adds key-value pair to OutMessage and can be chained.

func ReceivedProperties added in v0.13.3

func ReceivedProperties(r *ReceivedMessage) map[string]any

func ReceivedSetProperty added in v0.13.3

func ReceivedSetProperty(r *ReceivedMessage, k string, v any)

SetProperty adds key-value pair to Message and can be chained.

Types

type AZAdminClient

type AZAdminClient struct {
	ConnectionString string
	// contains filtered or unexported fields
}

AZAdminClient provides access to the administrative client for the message bus. Services that self manage subscriptions are the exceptional case and co-ordination with devops is required before using this mechanism.

func NewAZAdminClient

func NewAZAdminClient(log Logger, connectionString string) AZAdminClient

func (*AZAdminClient) EnsureSubscriptionRule

func (c *AZAdminClient) EnsureSubscriptionRule(
	ctx context.Context,
	topicName, subscriptionName string,
	ruleName string,
	ruleString string,
) error

EnsuresubscriptionRule ensures the named rule is set on the subscription and creates it from the supplied filter if not. Note: When the ruleName exists, we do not attempt check the supplied filter matches the existing filter.

func (*AZAdminClient) GetQueueMaxMessageSize

func (c *AZAdminClient) GetQueueMaxMessageSize(queueName string) (int64, error)

func (*AZAdminClient) GetTopicMaxMessageSize

func (c *AZAdminClient) GetTopicMaxMessageSize(topicName string) (int64, error)

func (*AZAdminClient) Open

func (c *AZAdminClient) Open() (*azadmin.Client, error)

Open - connects and returns the azure admin Client interface that allows creation of topics etc. Note that creation is cached

type AZClient

type AZClient struct {
	// ConnectionString contains all the details necessary to connect,
	// authenticate and authorize a client for communicating with azure servicebus.
	ConnectionString string
	// contains filtered or unexported fields
}

func NewAZClient

func NewAZClient(connectionString string) AZClient

type Disposition added in v0.10.3

type Disposition int

Disposition describes the eventual demise of the message after processing by the client. Upstream is notified whether the message can be deleted, deadlettered or will be reprocessed later.

const (
	DeadletterDisposition Disposition = iota
	AbandonDisposition
	RescheduleDisposition
	CompleteDisposition
)

func (Disposition) String added in v0.10.3

func (d Disposition) String() string

type Handler

type Handler interface {
	Handle(context.Context, *ReceivedMessage) (Disposition, context.Context, error)
	Open() error
	Close()
}

Handler processes a ReceivedMessage.

type Logger

type Logger = logger.Logger

type MsgReceiver

type MsgReceiver interface {
	// Listener interface
	Listen() error
	Shutdown(context.Context) error

	GetAZClient() AZClient
	String() string
}

type MsgSender

type MsgSender interface {
	Open() error
	Close(context.Context)

	Send(context.Context, *OutMessage) error
	String() string

	GetAZClient() AZClient
}

type OutMessage

type OutMessage = azservicebus.Message

OutMessage abstracts the output message interface.

func NewOutMessage

func NewOutMessage(data []byte) *OutMessage

We dont use With style options as this is executed in the hotpath.

type ReceivedMessage

type ReceivedMessage = azservicebus.ReceivedMessage

type Receiver

type Receiver struct {
	Cfg ReceiverConfig
	// contains filtered or unexported fields
}

Receiver to receive messages on a queue

func NewReceiver

func NewReceiver(log Logger, cfg ReceiverConfig, opts ...ReceiverOption) *Receiver

NewReciver creates a new Receiver that will process a number of messages simultaneously. Each handler executes in its own goroutine.

func (*Receiver) GetAZClient

func (r *Receiver) GetAZClient() AZClient

func (*Receiver) Listen

func (r *Receiver) Listen() error

The following 2 methods satisfy the startup.Listener interface.

func (*Receiver) Shutdown

func (r *Receiver) Shutdown(ctx context.Context) error

func (*Receiver) String

func (r *Receiver) String() string

String - returns string representation of receiver.

type ReceiverConfig

type ReceiverConfig struct {
	ConnectionString string

	// Name is the name of the queue or topic
	TopicOrQueueName string

	// Subscriptioon is the name of the topic subscription.
	// If blank then messages are received from a Queue.
	SubscriptionName string

	// See azbus/receiver.go
	RenewMessageLock bool
	RenewMessageTime time.Duration

	// If a deadletter receiver then this is true
	Deadletter bool
}

ReceiverConfig configuration for an azure servicebus queue

type ReceiverOption

type ReceiverOption func(*Receiver)

func WithHandlers added in v0.10.3

func WithHandlers(h ...Handler) ReceiverOption

WithHandlers

func WithRenewalTime

func WithRenewalTime(t int) ReceiverOption

WithRenewalTime takes an optional time to renew the peek lock. This should be comfortably less than the peek lock timeout. For example: the default peek lock timeout is 60s and the default renewal time is 50s.

Note! Only use this if you know what you're doing and you require custom timeout behaviour. The peek lock timeout is specified in terraform configs currently, as it is a property of subscriptions or queues.

type Sender

type Sender struct {
	Cfg SenderConfig
	// contains filtered or unexported fields
}

Sender to send or receive messages on a queue or topic

func NewSender

func NewSender(log Logger, cfg SenderConfig) *Sender

NewSender creates a new client

func (*Sender) Close

func (s *Sender) Close(ctx context.Context)

func (*Sender) GetAZClient

func (s *Sender) GetAZClient() AZClient

func (*Sender) Open

func (s *Sender) Open() error

func (*Sender) Send

func (s *Sender) Send(ctx context.Context, message *OutMessage) error

Send submits a message to the queue. Ignores cancellation.

func (*Sender) String

func (s *Sender) String() string

type SenderConfig

type SenderConfig struct {
	ConnectionString string

	// Name is the name of the queue or topic to send to.
	TopicOrQueueName string
}

SenderConfig configuration for an azure servicebus namespace and queue

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL