Documentation ¶
Index ¶
- Constants
- Variables
- func EnableAzureLogging(log Logger)
- func NewAzbusError(err error) error
- func OutMessageProperties(o *OutMessage) map[string]any
- func OutMessageSetProperty(o *OutMessage, k string, v any)
- func ReceivedProperties(r *ReceivedMessage) map[string]any
- func ReceivedSetProperty(r *ReceivedMessage, k string, v any)
- type AZAdminClient
- func (c *AZAdminClient) EnsureSubscriptionRule(ctx context.Context, topicName, subscriptionName string, ruleName string, ...) error
- func (c *AZAdminClient) GetQueueMaxMessageSize(queueName string) (int64, error)
- func (c *AZAdminClient) GetTopicMaxMessageSize(topicName string) (int64, error)
- func (c *AZAdminClient) Open() (*azadmin.Client, error)
- type AZClient
- type Disposition
- type Handler
- type Logger
- type MsgReceiver
- type MsgSender
- type OutMessage
- type ReceivedMessage
- type Receiver
- type ReceiverConfig
- type ReceiverOption
- type Sender
- type SenderConfig
Constants ¶
const ( // RenewalTime is the how often we want to renew the message PEEK lock // // Inspection of the topics and subscription shows that the PeekLock timeout is one minute. // // This clarifies the peeklock duration as 60s: https://github.com/MicrosoftDocs/azure-docs/issues/106047 // // "The default lock duration is indeed 1 minute, we will get this updated in our documentation." // "As for your question about RenewLock, it's best to set the lock duration to something higher than your normal" // "processing time, so you don't have to call the RenewLock. Note that the maximum value is 5 minutes, so you will" // "need to call RenewLock if you want to have this longer. Also note that having a longer lock duration then needed" // "has some implications as well, f.e. when your client stops working, the message will only become available again" // "after the lock duration has passed." // // An analysis of elapsed times when processing msgs shows no message takes longer than 10s to process during our // normal test suites. // // Set to 50 seconds, well within the 60 seconds peek lock timeout RenewalTime = 50 * time.Second )
Variables ¶
var ( ErrConnectionLost = errors.New("connection lost") ErrLockLost = errors.New("lock lost") ErrTimeout = errors.New("timeout") )
Azure package expects the user to elucidate errors like so:
var servicebusError *azservicebus.Error if errors.As(err, &servicebusError) && servicebusError.code == azservicebus.CodeUnauthorizedAccess { ...
which is rather clumsy.
This code maps the internal code to an actual error so one can:
if errors.Is(err, azbus.ErrConnectionLost) { ...
which is easier and more idiomatic
var (
ErrMessageOversized = errors.New("message is too large")
)
var (
ErrNoHandler = errors.New("no handler defined")
)
var (
ErrPeekLockTimeout = errors.New("peeklock deadline reached")
)
Set a timeout for processing the message, this should be no later than the message lock time. It is quite surprising that the azure service bus package does not add a deadline to the context input to the message handler.
NB: this has no effect as cancellaton is removed to get the azure sdk for go retry
logic which increases reliability.
Inspection of logs shows that the deadline is always 60s in the future which we will never exceed.
The use of the context returned here is problematic. Inspection of code that uses it shows that submethods called do not generally obey cancellation - they do not even have a context.Context as first argument.
Code that follows from calling this method should be wrapped in a select statement that terminates when the timeout expires - i.e. waits on ctx.Done(). Even this is not bulletproof as it is unclear how to terminate any of these submethods.
Probably the best solution is to remove this entirely and rely on RenewMessageLock. If it does timeout then it is too late anyway as the peeklock will already be released.
for the time being we impose a timeout as it is safe.
Functions ¶
func EnableAzureLogging ¶
func EnableAzureLogging(log Logger)
First attempt at incorporating azure logging. The EventSender option does not appear to work. TODO: Generalise this for any azure downstream package.
func NewAzbusError ¶
func OutMessageProperties ¶ added in v0.13.3
func OutMessageProperties(o *OutMessage) map[string]any
func OutMessageSetProperty ¶ added in v0.13.3
func OutMessageSetProperty(o *OutMessage, k string, v any)
SetProperty adds key-value pair to OutMessage and can be chained.
func ReceivedProperties ¶ added in v0.13.3
func ReceivedProperties(r *ReceivedMessage) map[string]any
func ReceivedSetProperty ¶ added in v0.13.3
func ReceivedSetProperty(r *ReceivedMessage, k string, v any)
SetProperty adds key-value pair to Message and can be chained.
Types ¶
type AZAdminClient ¶
type AZAdminClient struct { ConnectionString string // contains filtered or unexported fields }
AZAdminClient provides access to the administrative client for the message bus. Services that self manage subscriptions are the exceptional case and co-ordination with devops is required before using this mechanism.
func NewAZAdminClient ¶
func NewAZAdminClient(log Logger, connectionString string) AZAdminClient
func (*AZAdminClient) EnsureSubscriptionRule ¶
func (c *AZAdminClient) EnsureSubscriptionRule( ctx context.Context, topicName, subscriptionName string, ruleName string, ruleString string, ) error
EnsuresubscriptionRule ensures the named rule is set on the subscription and creates it from the supplied filter if not. Note: When the ruleName exists, we do not attempt check the supplied filter matches the existing filter.
func (*AZAdminClient) GetQueueMaxMessageSize ¶
func (c *AZAdminClient) GetQueueMaxMessageSize(queueName string) (int64, error)
func (*AZAdminClient) GetTopicMaxMessageSize ¶
func (c *AZAdminClient) GetTopicMaxMessageSize(topicName string) (int64, error)
type AZClient ¶
type AZClient struct { // ConnectionString contains all the details necessary to connect, // authenticate and authorize a client for communicating with azure servicebus. ConnectionString string // contains filtered or unexported fields }
func NewAZClient ¶
type Disposition ¶ added in v0.10.3
type Disposition int
Disposition describes the eventual demise of the message after processing by the client. Upstream is notified whether the message can be deleted, deadlettered or will be reprocessed later.
const ( DeadletterDisposition Disposition = iota AbandonDisposition RescheduleDisposition CompleteDisposition )
func (Disposition) String ¶ added in v0.10.3
func (d Disposition) String() string
type Handler ¶
type Handler interface { Handle(context.Context, *ReceivedMessage) (Disposition, context.Context, error) Open() error Close() }
Handler processes a ReceivedMessage.
type MsgReceiver ¶
type OutMessage ¶
type OutMessage = azservicebus.Message
OutMessage abstracts the output message interface.
func NewOutMessage ¶
func NewOutMessage(data []byte) *OutMessage
We dont use With style options as this is executed in the hotpath.
type ReceivedMessage ¶
type ReceivedMessage = azservicebus.ReceivedMessage
type Receiver ¶
type Receiver struct { Cfg ReceiverConfig // contains filtered or unexported fields }
Receiver to receive messages on a queue
func NewReceiver ¶
func NewReceiver(log Logger, cfg ReceiverConfig, opts ...ReceiverOption) *Receiver
NewReciver creates a new Receiver that will process a number of messages simultaneously. Each handler executes in its own goroutine.
func (*Receiver) GetAZClient ¶
type ReceiverConfig ¶
type ReceiverConfig struct { ConnectionString string // Name is the name of the queue or topic TopicOrQueueName string // Subscriptioon is the name of the topic subscription. // If blank then messages are received from a Queue. SubscriptionName string // See azbus/receiver.go RenewMessageLock bool RenewMessageTime time.Duration // If a deadletter receiver then this is true Deadletter bool }
ReceiverConfig configuration for an azure servicebus queue
type ReceiverOption ¶
type ReceiverOption func(*Receiver)
func WithRenewalTime ¶
func WithRenewalTime(t int) ReceiverOption
WithRenewalTime takes an optional time to renew the peek lock. This should be comfortably less than the peek lock timeout. For example: the default peek lock timeout is 60s and the default renewal time is 50s.
Note! Only use this if you know what you're doing and you require custom timeout behaviour. The peek lock timeout is specified in terraform configs currently, as it is a property of subscriptions or queues.
type Sender ¶
type Sender struct { Cfg SenderConfig // contains filtered or unexported fields }
Sender to send or receive messages on a queue or topic
func NewSender ¶
func NewSender(log Logger, cfg SenderConfig) *Sender
NewSender creates a new client
func (*Sender) GetAZClient ¶
type SenderConfig ¶
type SenderConfig struct { ConnectionString string // Name is the name of the queue or topic to send to. TopicOrQueueName string }
SenderConfig configuration for an azure servicebus namespace and queue