go-shuttle

module
v0.8.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 23, 2022 License: MIT

README

go-shuttle

go-shuttle serves as a wrapper around the azure-service-bus-go SDK to facilitate the implementation of a pub-sub pattern on Azure using service bus.

NOTE: This library is in early development and should be considered experimental. The api is still moving and can change. We do have breaking changes in v0.*. Use at your own risks.

Conventions & Assumptions

Currently we are assuming that both the publisher and the listener will both use this azure-pub-sub library. This is because the listener assumes that the struct type of the body is in the header of the message it receives. This addition is done automatically when using the publisher of this library via reflection. This is done so that the library user can easily filter out certain event types. Specifically this is what the message should look like:

{
  "data": "<some data>",
  "userProperties": {
     "type": "<name of the struct type>" // used for subscription filters
  }
}

This is enforced by the fact that the listener handler's function signature expects the messageType to be there:

type Handle func(ctx context.Context, msg *message.Message) message.Handler

If the type field from userProperties is missing, the listener handler will automatically throw an error saying it is not supported.

In the future we will support raw listener handlers that don't have this restriction to allow for more publisher flexibility.

Listener Examples

To start receiving messages, you need to create a Listener, and start listening. Creating the Listener creates the connections and initialized the token provider. You start receiving messages when you call Listen(...) and pass a message handler.

Initializing a listener with a Service Bus connection string
l, err := listener.New(listener.WithConnectionString(serviceBusConnectionString))
Initializing a listener with an adal.ServicePrincipalToken

This is useful when the consumer needs to control the creation of the token or when multiple publishers/listeners in a single process can share the same token. It allows to reduce the number of request to refresh the token since the cache is shared.

"github.com/Azure/go-autorest/autorest/adal"

spt, err := adal.NewServicePrincipalTokenFromMSIWithIdentityResourceID(...)
if err != nil {
    // handle
}
l, err := listener.New(listener.WithToken(sbNamespaceName, spt))
Initializing a listener using Managed Identity

To configure using managed identity with Service Bus, refer to this link. Note if you see errors for certain operation, you may have an RBAC issue. Refer to the built-in RBAC roles here.

Using user assigned managed identity
l, _ := listener.New(listener.WithManagedIdentityClientID(serviceBusNamespaceName, managedIdentityClientID))

Or using the resource id:

l, _ := listener.New(listener.WithManagedIdentityResourceID(serviceBusNamespaceName, managedIdentityResourceID))
Using system assigned managed identity

Keep the clientID parameter empty

l, _ := listener.New(listener.WithManagedIdentityClientID(serviceBusNamespaceName, ""))
defer listener.Close(context.Background(()) // stop receiving messages
Start listening : Subscribe to a topic
The Handler

The Handler is a func that takes in a context and the message, and returns another Handler type, represents the result of the handling.

handler := message.HandlerFunc(func(ctx context.Context, msg *message.Message) message.Handler {
    err := DoSomething(ctx, msg)
    if err != nil {
        return msg.Error(err) //trace the error, and abandon the message. message will be retried
    }
    return msg.Complete() // handling successful. remove message from topic
})

// listen blocks and handle messages from the topic
err := l.Listen(ctx, handler, topicName)

// Note that err occurs when calling l.Close(), because it closes the context, to shut down the listener.
// This is expected as it is the only way to get out of the blocking Lister call.

Postponed handling of message

In some cases, your message handler can detect that it is not ready to process the message, and needs to retry later:

handler := message.HandlerFunc(func(ctx context.Context, msg *message.Message) message.Handler {
    // This is currently a delayed abandon so it can not be longer than the lock duration (max of 5 minutes) and effects your dequeue count.
    return msg.RetryLater(4*time.Minute)
})

// listen blocks and handle messages from the topic
err := l.Listen(ctx, handler, topicName)

Notes:

  • RetryLater simply waits for the given duration before abandoning. So, if you are using RetryLater you probably want to set WithSubscriptionDetails, especially maxDelivery as each call to RetryLater will up the delivery count by 1
  • You also likely want to enable Automatic Lock renewal. since the retry later just holds the message in memory, it could lose the peek lock while you wait
Start Listening
err := l.Listen(ctx, handler, topicName)
Subscribe to a topic with a client-supplied name
err := l.Listen(
    ctx,
    handler,
    topicName,
    listener.SetSubscriptionName("subName"),
)
Subscribe to a topic with a filter
sqlFilter := fmt.Sprintf("destinationId LIKE '%s'", "test")
err := l.Listen(
    ctx,
    handle,
    topicName,
    listener.SetSubscriptionFilter(
        "testFilter",
        servicebus.SQLFilter{Expression: sqlFilter},
    ),
)
Listen sample with error check and Close()
l, err := listener.New(listener.WithManagedIdentityResourceID(serviceBusNamespaceName, managedIdentityResourceID))
if err != nil {
    return err
}
...
if err := l.Listen(ctx, handler, topicName); err != nil {
    return err
}
defer func() {
    err := l.Close(ctx)
    if err != nil {
        log.Errorf("failed to close listener: %s", err)
    }
}
Enable message lock renewal

This will renew the lock on each message every 30 seconds until the message is Completed or Abandoned.

renewInterval := 30 * time.Second
if err := l.Listen(ctx, handler, topicName, listener.WithMessageLockAutoRenewal(renewInterval)); err != nil {
    return err
}
Concurrent message handling

There 2 aspects to concurrent handling: Prefetch and MaxConcurrency values :

renewInterval := 30 * time.Second
err := l.Listen(ctx, handler, topicName, 
    listener.WithPrefetchCount(50),
    listener.WithMaxConcurrency(50),
    listener.WithMessageLockAutoRenewal(10 * time.Second))

if err != nil {
    return err
}

Both settings are well documented on Azure Service Bus doc. Read carefully this doc to understand the consequences of using these features : https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-prefetch

Prefetch

Azure service bus implements the AMQP 1.0 protocol. Prefetch translates into the LinkCredit setting on an AMQP connection to understand how prefetch works, you need to understand amqp flow control via link credit : https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-amqp-protocol-guide#flow-control

TL;DR: AMQP works as a streaming protocol. in the case of azure service bus, broker pushes messages to you. so Prefetch is a misnomer, it is not batching. It tells the broker how many messages can be pushed to your worker without being completed or abandoned. Or in other words, how many messages can be worked on at the same time by the receiver.

MaxConcurrency

MaxConcurrency defines how many concurrent message can be worked on. i.e : When prefetch is higher than MaxConcurrency, you will have some messages sitting in memory on the receiver's side until a worker picks it up. This improves performance there always is a message ready to be processed when a handler is done, but it has to be carefully balanced with the Message Lock Duration and the usual message processing time.
For more info : https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-prefetch

Publisher Examples

Initializing a publisher with a Service Bus connection string
var (
    ctx       = context.Background()
    topicName = "topic"
)

pub, _ := publisher.New(
    ctx,
    topicName,
    publisher.PublisherWithConnectionString(serviceBusConnectionString),
)
Initializing a publisher using Managed Identity

To configure using managed identity with Service Bus, refer to this link. Note if you see errors for certain operation, you may have an RBAC issue. Refer to the built-in RBAC roles here.

Using user assigned managed identity

Using Identity ClientID

var (
    ctx       = context.Background()
    topicName = "topic"
)

pub, _ := publisher.New(
    ctx,
    topicName,
    publisher.WithManagedIdentityClientID(serviceBusNamespaceName, managedIdentityClientID),
)

Using Identity ResourceID

var (
    ctx       = context.Background()
    topicName = "topic"
)

pub, _ := publisher.New(
    ctx,
    topicName,
    publisher.WithManagedIdentityResourceID(serviceBusNamespaceName, managedIdentityResourceID),
)
Using system assigned managed identity

Keep the clientID parameter empty

var (
    ctx       = context.Background()
    topicName = "topic"
)

pub, _ := publisher.New(
    ctx,
    topicName,
    publisher.WithManagedIdentityClientID(serviceBusNamespaceName, ""),
)
Initializing a publisher with a header
var (
    ctx       = context.Background()
    topicName = "topic"
)

pub, _ := publisher.New(
    ctx,
    topicName,
    publisher.WithConnectionString(serviceBusConnectionString),
    // msgs will have a header with the name "headerName" and value from the msg body field "Id"
    publisher.SetDefaultHeader("headerName", "Id"),
)
Initializing a publisher with duplication detection

Duplication detection cannot be enabled on Service Bus topics that already exist. Please think about what capabilities you would like on the Service Bus topic up front at creation time.

Note that you need to use this feature in conjunction with setting a messageID on each message you send. Refer to the Publishing a message with a message ID section on how to do this.

var (
    ctx       = context.Background()
    topicName = "topic"

    dupeDetectionWindow = 5 * time.Minute
)

pub, _ := publisher.New(
    ctx,
    topicName,
    publisher.WithConnectionString(serviceBusConnectionString),
    publisher.SetDuplicateDetection(&dupeDetectionWindow), // if window is null then a default of 30 seconds is used
)
Publishing a message
cmd := &SomethingHappenedEvent{
    Id: uuid.New(),
    SomeStringField: "value",
}
// by default the msg header will have "type" == "SomethingHappenedEvent"
err := pub.Publish(ctx, cmd)
Publishing a message with a delay
cmd := &SomethingHappenedEvent{
    Id: uuid.New(),
    SomeStringField: "value",
}
err := pub.Publish(
    ctx,
    cmd,
    publisher.SetMessageDelay(5*time.Second),
)
Publishing a message with a message ID

The duplication detection feature requires messages to have a messageID, as messageID is the key ServiceBus will de-dupe on. Refer to the Initializing a publisher with duplication detection section.

cmd := &SomethingHappenedEvent{
    Id: uuid.New(),
    SomeStringField: "value",
}
messageID := "someMessageIDWithBusinessContext"
err := pub.Publish(
    ctx,
    cmd,
    publisher.SetMessageID(messageID),
)

Dev environment and integration tests

  1. copy the .env.template to a .env at the root of the repository
  2. fill in the environment variable in the .env file
  3. run make test-setup. that will create the necessary azure resources.
  4. run make integration. <- build & push image + start integration test run on ACI

Contributing

This project welcomes contributions and suggestions. Most contributions require you to agree to a Contributor License Agreement (CLA) declaring that you have the right to, and actually do, grant us the rights to use your contribution. For details, visit https://cla.opensource.microsoft.com.

When you submit a pull request, a CLA bot will automatically determine whether you need to provide a CLA and decorate the PR appropriately (e.g., status check, comment). Simply follow the instructions provided by the bot. You will only need to do this once across all repos using our CLA.

This project has adopted the Microsoft Open Source Code of Conduct. For more information see the Code of Conduct FAQ or contact opencode@microsoft.com with any additional questions or comments.

Trademarks

This project may contain trademarks or logos for projects, products, or services. Authorized use of Microsoft trademarks or logos is subject to and must follow Microsoft's Trademark & Brand Guidelines. Use of Microsoft trademarks or logos in modified versions of this project must not cause confusion or imply Microsoft sponsorship. Any use of third-party trademarks or logos are subject to those third-party's policies.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL