Documentation ¶
Overview ¶
Package azuresb provides an implementation of pubsub using Azure Service Bus Topic and Subscription. See https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-messaging-overview for an overview.
URLs ¶
For pubsub.OpenTopic and pubsub.OpenSubscription, azuresb registers for the scheme "azuresb". The default URL opener will use a Service Bus Connection String based on the environment variable "SERVICEBUS_CONNECTION_STRING". To customize the URL opener, or for more details on the URL format, see URLOpener. See https://github.com/eliben/gocdkx/concepts/urls/ for background information.
Message Delivery Semantics ¶
Azure ServiceBus supports at-least-once semantics in the default Peek-Lock mode; applications must call Message.Ack after processing a message, or it will be redelivered. However, it also supports a Receive-Delete mode, which essentially auto-acks a message when it is delivered, resulting in at-most-once semantics. Use SubscriberOptions.AckFuncForReceiveAndDelete to choose between the two. See https://godoc.org/github.com/eliben/gocdkx/pubsub#hdr-At_most_once_and_At_least_once_Delivery for more background.
As ¶
azuresb exposes the following types for As:
- Topic: *servicebus.Topic
- Subscription: *servicebus.Subscription
- Message.BeforeSend: *servicebus.Message
- Message: *servicebus.Message
- Error: common.Retryable
Example (AckFuncForReceiveAndDelete) ¶
package main import ( "context" "log" "os" servicebus "github.com/Azure/azure-service-bus-go" "github.com/eliben/gocdkx/pubsub" "github.com/eliben/gocdkx/pubsub/azuresb" ) func main() { ctx := context.Background() // See docs below on how to provision an Azure Service Bus Namespace and obtaining the connection string. // https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-dotnet-get-started-with-queues connString := os.Getenv("SERVICEBUS_CONNECTION_STRING") topicName := "test-topic" subscriptionName := "test-sub" if connString == "" { log.Fatal("Service Bus ConnectionString is not set") } // Construct a Service Bus Namespace from a SAS Token. // See https://godoc.org/github.com/Azure/azure-service-bus-go#Namespace. ns, err := azuresb.NewNamespaceFromConnectionString(connString) if err != nil { log.Fatal(err) } // Construct a Service Bus Topic for a topicName associated with a NameSpace. // See https://godoc.org/github.com/Azure/azure-service-bus-go#Topic. sbTopic, err := azuresb.NewTopic(ns, topicName, nil) if err != nil { log.Fatal(err) } defer sbTopic.Close(ctx) // Construct Receiver to AutoDelete messages. // See https://godoc.org/github.com/Azure/azure-service-bus-go#SubscriptionWithReceiveAndDelete. var opts []servicebus.SubscriptionOption opts = append(opts, servicebus.SubscriptionWithReceiveAndDelete()) // Construct a Service Bus Subscription which is a child to a Service Bus Topic. // See https://godoc.org/github.com/Azure/azure-service-bus-go#Topic.NewSubscription. sbSub, err := azuresb.NewSubscription(sbTopic, subscriptionName, opts) if err != nil { log.Fatal(err) } defer sbSub.Close(ctx) // This package accommodates both kinds of systems. If your application uses // at-least-once providers, it should always call Message.Ack. If your application // only uses at-most-once providers (ReceiveAndDeleteMode), it may call Message.Ack, but does not need to. To avoid // calling message.Ack, set option.AckFuncForReceiveAndDelete to a no-op as shown below. // // For more information on Service Bus ReceiveMode, see https://godoc.org/github.com/Azure/azure-service-bus-go#SubscriptionWithReceiveAndDelete. noop := func() {} subOpts := &azuresb.SubscriptionOptions{ AckFuncForReceiveAndDelete: noop, } // Construct a *pubsub.Subscription for a given Service Bus NameSpace and Topic. s, err := azuresb.OpenSubscription(ctx, ns, sbTopic, sbSub, subOpts) if err != nil { log.Fatal(err) } defer s.Shutdown(ctx) // Construct a *pubsub.Topic. t, err := azuresb.OpenTopic(ctx, sbTopic, nil) if err != nil { log.Fatal(err) } defer t.Shutdown(ctx) // Send *pubsub.Message from *pubsub.Topic backed by Azure Service Bus. err = t.Send(ctx, &pubsub.Message{ Body: []byte("example message"), Metadata: map[string]string{ "Priority": "1", }, }) if err != nil { log.Fatal(err) } // Receive a message from the *pubsub.Subscription backed by Service Bus. msg, err := s.Receive(ctx) if err != nil { log.Fatal(err) } // Ack will redirect to the AckOverride option (if provided), otherwise the driver Ack will be called. msg.Ack() }
Output:
Example (OpenSubscription) ¶
package main import ( "context" "log" "github.com/eliben/gocdkx/pubsub" ) func main() { // This example is used in https://github.com/eliben/gocdkx/howto/pubsub/subscribe/#azure // import _ "github.com/eliben/gocdkx/pubsub/azuresb" // Variables set up elsewhere: ctx := context.Background() subscription, err := pubsub.OpenSubscription(ctx, "azuresb://mytopic?subscription=mysubscription") if err != nil { log.Fatal(err) } defer subscription.Shutdown(ctx) }
Output:
Example (OpenTopic) ¶
package main import ( "context" "log" "github.com/eliben/gocdkx/pubsub" ) func main() { // This example is used in https://github.com/eliben/gocdkx/howto/pubsub/publish/#azure // import _ "github.com/eliben/gocdkx/pubsub/azuresb" // Variables set up elsewhere: ctx := context.Background() // OpenTopic creates a *pubsub.Topic from a URL. // This URL will open the topic "mytopic" using a connection string // from the environment variable SERVICEBUS_CONNECTION_STRING. topic, err := pubsub.OpenTopic(ctx, "azuresb://mytopic") if err != nil { log.Fatal(err) } defer topic.Shutdown(ctx) }
Output:
Index ¶
- Constants
- Variables
- func NewNamespaceFromConnectionString(connectionString string) (*servicebus.Namespace, error)
- func NewSubscription(parentTopic *servicebus.Topic, subscriptionName string, ...) (*servicebus.Subscription, error)
- func NewTopic(ns *servicebus.Namespace, topicName string, opts []servicebus.TopicOption) (*servicebus.Topic, error)
- func OpenSubscription(ctx context.Context, parentNamespace *servicebus.Namespace, ...) (*pubsub.Subscription, error)
- func OpenTopic(ctx context.Context, sbTopic *servicebus.Topic, opts *TopicOptions) (*pubsub.Topic, error)
- type SubscriptionOptions
- type TopicOptions
- type URLOpener
Examples ¶
Constants ¶
const Scheme = "azuresb"
Scheme is the URL scheme azuresb registers its URLOpeners under on pubsub.DefaultMux.
Variables ¶
var Set = wire.NewSet( SubscriptionOptions{}, TopicOptions{}, URLOpener{}, )
Set holds Wire providers for this package.
Functions ¶
func NewNamespaceFromConnectionString ¶
func NewNamespaceFromConnectionString(connectionString string) (*servicebus.Namespace, error)
NewNamespaceFromConnectionString returns a *servicebus.Namespace from a Service Bus connection string. https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-dotnet-get-started-with-queues
func NewSubscription ¶
func NewSubscription(parentTopic *servicebus.Topic, subscriptionName string, opts []servicebus.SubscriptionOption) (*servicebus.Subscription, error)
NewSubscription returns a *servicebus.Subscription associated with a Service Bus Topic.
func NewTopic ¶
func NewTopic(ns *servicebus.Namespace, topicName string, opts []servicebus.TopicOption) (*servicebus.Topic, error)
NewTopic returns a *servicebus.Topic associated with a Service Bus Namespace.
func OpenSubscription ¶
func OpenSubscription(ctx context.Context, parentNamespace *servicebus.Namespace, parentTopic *servicebus.Topic, sbSubscription *servicebus.Subscription, opts *SubscriptionOptions) (*pubsub.Subscription, error)
OpenSubscription initializes a pubsub Subscription on a given Service Bus Subscription and its parent Service Bus Topic.
Example ¶
package main import ( "context" "log" "os" "github.com/eliben/gocdkx/pubsub/azuresb" ) func main() { // This example is used in https://github.com/eliben/gocdkx/howto/pubsub/subscribe/#azure-ctor // Variables set up elsewhere: ctx := context.Background() // Change these as needed for your application. serviceBusConnString := os.Getenv("SERVICEBUS_CONNECTION_STRING") const topicName = "test-topic" const subscriptionName = "test-subscription" // Connect to Azure Service Bus for the given subscription. busNamespace, err := azuresb.NewNamespaceFromConnectionString(serviceBusConnString) if err != nil { log.Fatal(err) } busTopic, err := azuresb.NewTopic(busNamespace, topicName, nil) if err != nil { log.Fatal(err) } defer busTopic.Close(ctx) busSub, err := azuresb.NewSubscription(busTopic, subscriptionName, nil) if err != nil { log.Fatal(err) } defer busSub.Close(ctx) // Construct a *pubsub.Subscription. subscription, err := azuresb.OpenSubscription(ctx, busNamespace, busTopic, busSub, nil) if err != nil { log.Fatal(err) } defer subscription.Shutdown(ctx) }
Output:
func OpenTopic ¶
func OpenTopic(ctx context.Context, sbTopic *servicebus.Topic, opts *TopicOptions) (*pubsub.Topic, error)
OpenTopic initializes a pubsub Topic on a given Service Bus Topic.
Example ¶
package main import ( "context" "log" "os" "github.com/eliben/gocdkx/pubsub/azuresb" ) func main() { // This example is used in https://github.com/eliben/gocdkx/howto/pubsub/publish/#azure-ctor // Variables set up elsewhere: ctx := context.Background() // Change these as needed for your application. connString := os.Getenv("SERVICEBUS_CONNECTION_STRING") topicName := "test-topic" if connString == "" { log.Fatal("Service Bus ConnectionString is not set") } // Connect to Azure Service Bus for the given topic. busNamespace, err := azuresb.NewNamespaceFromConnectionString(connString) if err != nil { log.Fatal(err) } busTopic, err := azuresb.NewTopic(busNamespace, topicName, nil) if err != nil { log.Fatal(err) } defer busTopic.Close(ctx) // Construct a *pubsub.Topic. topic, err := azuresb.OpenTopic(ctx, busTopic, nil) if err != nil { log.Fatal(err) } defer topic.Shutdown(ctx) }
Output:
Types ¶
type SubscriptionOptions ¶
type SubscriptionOptions struct { // If nil, the subscription MUST be in Peek-Lock mode. The Ack method must be called on each message // to complete it, otherwise you run the risk of deadlettering messages. // If non-nil, the subscription MUST be in Receive-and-Delete mode, and this function will be called // whenever Ack is called on a message. // See the "At-most-once vs. At-least-once Delivery" section in the pubsub package documentation. AckFuncForReceiveAndDelete func() }
SubscriptionOptions will contain configuration for subscriptions.
type TopicOptions ¶
type TopicOptions struct{}
TopicOptions provides configuration options for an Azure SB Topic.
type URLOpener ¶
type URLOpener struct { // ConnectionString is the Service Bus connection string (required). // https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-dotnet-get-started-with-queues ConnectionString string // Options passed when creating the ServiceBus Topic/Subscription. ServiceBusTopicOptions []servicebus.TopicOption ServiceBusSubscriptionOptions []servicebus.SubscriptionOption // TopicOptions specifies the options to pass to OpenTopic. TopicOptions TopicOptions // SubscriptionOptions specifies the options to pass to OpenSubscription. SubscriptionOptions SubscriptionOptions }
URLOpener opens Azure Service Bus URLs like "azuresb://mytopic" for topics or "azuresb://mytopic?subscription=mysubscription" for subscriptions.
- The URL's host+path is used as the topic name.
- For subscriptions, the subscription name must be provided in the "subscription" query parameter.
No other query parameters are supported.
func (*URLOpener) OpenSubscriptionURL ¶
func (o *URLOpener) OpenSubscriptionURL(ctx context.Context, u *url.URL) (*pubsub.Subscription, error)
OpenSubscriptionURL opens a pubsub.Subscription based on u.