azure-service-bus-go: github.com/Azure/azure-service-bus-go Index | Examples | Files | Directories

package servicebus

import "github.com/Azure/azure-service-bus-go"

Code:

package main

import (
    "context"
    "fmt"
    "os"
    "time"

    "github.com/Azure/azure-service-bus-go"
)

type MessagePrinter struct{}

func (mp MessagePrinter) Handle(ctx context.Context, msg *servicebus.Message) error {
    fmt.Println(string(msg.Data))
    return msg.Complete(ctx)
}

func main() {
    ctx, cancel := context.WithTimeout(context.Background(), 40*time.Second)
    defer cancel()

    connStr := os.Getenv("SERVICEBUS_CONNECTION_STRING")
    if connStr == "" {
        fmt.Println("FATAL: expected environment variable SERVICEBUS_CONNECTION_STRING not set")
        return
    }

    // Create a client to communicate with a Service Bus Namespace.
    ns, err := servicebus.NewNamespace(servicebus.NamespaceWithConnectionString(connStr))
    if err != nil {
        fmt.Println(err)
        return
    }

    qm := ns.NewQueueManager()
    target, err := ensureQueue(ctx, qm, "AutoForwardTargetQueue")
    if err != nil {
        fmt.Println(err)
        return
    }

    source, err := ensureQueue(ctx, qm, "AutoForwardSourceQueue", servicebus.QueueEntityWithAutoForward(target))
    if err != nil {
        fmt.Println(err)
        return
    }

    sourceQueue, err := ns.NewQueue(source.Name)
    if err != nil {
        fmt.Println(err)
        return
    }
    defer func() {
        _ = sourceQueue.Close(ctx)
    }()

    if err := sourceQueue.Send(ctx, servicebus.NewMessageFromString("forward me to target!")); err != nil {
        fmt.Println(err)
        return
    }

    targetQueue, err := ns.NewQueue(target.Name)
    if err != nil {
        fmt.Println(err)
        return
    }
    defer func() {
        _ = targetQueue.Close(ctx)
    }()

    if err := targetQueue.ReceiveOne(ctx, MessagePrinter{}); err != nil {
        fmt.Println(err)
        return
    }

}

func ensureQueue(ctx context.Context, qm *servicebus.QueueManager, name string, opts ...servicebus.QueueManagementOption) (*servicebus.QueueEntity, error) {
    qe, err := qm.Get(ctx, name)
    if err == nil {
        _ = qm.Delete(ctx, name)
    }

    qe, err = qm.Put(ctx, name, opts...)
    if err != nil {
        fmt.Println(err)
        return nil, err
    }

    return qe, nil
}

Code:

ctx, cancel := context.WithTimeout(context.Background(), 40*time.Second)
defer cancel()

connStr := os.Getenv("SERVICEBUS_CONNECTION_STRING")
if connStr == "" {
    fmt.Println("FATAL: expected environment variable SERVICEBUS_CONNECTION_STRING not set")
    return
}

// Create a client to communicate with a Service Bus Namespace.
ns, err := servicebus.NewNamespace(servicebus.NamespaceWithConnectionString(connStr))
if err != nil {
    fmt.Println(err)
    return
}

qm := ns.NewQueueManager()
qe, err := ensureQueue(ctx, qm, "MessageBatchingExample")
if err != nil {
    fmt.Println(err)
    return
}

q, err := ns.NewQueue(qe.Name)
if err != nil {
    fmt.Println(err)
    return
}
defer func() {
    _ = q.Close(ctx)
}()

msgs := make([]*servicebus.Message, 10)
for i := 0; i < 10; i++ {
    msgs[i] = servicebus.NewMessageFromString(fmt.Sprintf("foo %d", i))
}

batcher := servicebus.NewMessageBatchIterator(servicebus.StandardMaxMessageSizeInBytes, msgs...)
if err := q.SendBatch(ctx, batcher); err != nil {
    fmt.Println(err)
    return
}

for i := 0; i < 10; i++ {
    err := q.ReceiveOne(ctx, MessagePrinter{})
    if err != nil {
        fmt.Println(err)
        return
    }
}

Output:

foo 0
foo 1
foo 2
foo 3
foo 4
foo 5
foo 6
foo 7
foo 8
foo 9

Code:

ctx, cancel := context.WithTimeout(context.Background(), 40*time.Second)
defer cancel()

connStr := os.Getenv("SERVICEBUS_CONNECTION_STRING")
if connStr == "" {
    fmt.Println("FATAL: expected environment variable SERVICEBUS_CONNECTION_STRING not set")
    return
}

// Create a client to communicate with a Service Bus Namespace.
ns, err := servicebus.NewNamespace(servicebus.NamespaceWithConnectionString(connStr))
if err != nil {
    fmt.Println(err)
    return
}

qm := ns.NewQueueManager()
qe, err := ensureQueue(ctx, qm, "DeadletterExample")
if err != nil {
    fmt.Println(err)
    return
}

q, err := ns.NewQueue(qe.Name)
if err != nil {
    fmt.Println(err)
    return
}
defer func() {
    _ = q.Close(ctx)
}()

if err := q.Send(ctx, servicebus.NewMessageFromString("foo")); err != nil {
    fmt.Println(err)
    return
}

// Abandon the message 10 times simulating attempting to process the message 10 times. After the 10th time, the
// message will be placed in the Deadletter Queue.
for count := 0; count < 10; count++ {
    err = q.ReceiveOne(ctx, servicebus.HandlerFunc(func(ctx context.Context, msg *servicebus.Message) error {
        fmt.Printf("count: %d\n", count+1)
        return msg.Abandon(ctx)
    }))
    if err != nil {
        fmt.Println(err)
        return
    }
}

// receive one from the queue's deadletter queue. It should be the foo message.
qdl := q.NewDeadLetter()
if err := qdl.ReceiveOne(ctx, MessagePrinter{}); err != nil {
    fmt.Println(err)
    return
}
defer func() {
    _ = qdl.Close(ctx)
}()

Output:

count: 1
count: 2
count: 3
count: 4
count: 5
count: 6
count: 7
count: 8
count: 9
count: 10
foo

Code:

package main

import (
    "context"
    "encoding/json"
    "fmt"
    "math/rand"
    "os"
    "time"

    "github.com/Azure/azure-service-bus-go"
)

type RecipeStep struct {
    Step  int    `json:"step,omitempty"`
    Title string `json:"title,omitempty"`
}

func main() {
    ctx, cancel := context.WithTimeout(context.Background(), 40*time.Second)
    defer cancel()

    connStr := os.Getenv("SERVICEBUS_CONNECTION_STRING")
    if connStr == "" {
        fmt.Println("FATAL: expected environment variable SERVICEBUS_CONNECTION_STRING not set")
        return
    }

    // Create a client to communicate with a Service Bus Namespace.
    ns, err := servicebus.NewNamespace(servicebus.NamespaceWithConnectionString(connStr))
    if err != nil {
        fmt.Println(err)
        return
    }

    qm := ns.NewQueueManager()
    qe, err := ensureQueue(ctx, qm, "DeferExample")
    if err != nil {
        fmt.Println(err)
        return
    }

    q, err := ns.NewQueue(qe.Name)
    if err != nil {
        fmt.Println(err)
        return
    }
    defer func() {
        _ = q.Close(ctx)
    }()

    steps := []RecipeStep{
        {
            Step:  1,
            Title: "Shop",
        },
        {
            Step:  2,
            Title: "Unpack",
        },
        {
            Step:  3,
            Title: "Prepare",
        },
        {
            Step:  4,
            Title: "Cook",
        },
        {
            Step:  5,
            Title: "Eat",
        },
    }

    for _, step := range steps {
        go func(s RecipeStep) {
            j, err := json.Marshal(s)
            if err != nil {
                fmt.Println(err)
                return
            }

            msg := &servicebus.Message{
                Data:        j,
                ContentType: "application/json",
                Label:       "RecipeStep",
            }

            // we shuffle the message order to introduce a random delay before each of the messages is sent to
            // simulate out of order sending
            time.Sleep(time.Duration(rand.Intn(2000)) * time.Millisecond)
            if err := q.Send(ctx, msg); err != nil {
                fmt.Println(err)
                return
            }
        }(step)
    }

    sequenceByStepNumber := map[int]int64{}
    // collect and defer messages
    for i := 0; i < len(steps); i++ {
        err = q.ReceiveOne(ctx, servicebus.HandlerFunc(func(ctx context.Context, msg *servicebus.Message) error {
            var step RecipeStep
            if err := json.Unmarshal(msg.Data, &step); err != nil {
                return err
            }
            sequenceByStepNumber[step.Step] = *msg.SystemProperties.SequenceNumber
            return msg.Defer(ctx)
        }))
        if err != nil {
            fmt.Println(err)
            return
        }
    }

    for i := 0; i < len(steps); i++ {
        err := q.ReceiveDeferred(ctx, servicebus.HandlerFunc(func(ctx context.Context, msg *servicebus.Message) error {
            var step RecipeStep
            if err := json.Unmarshal(msg.Data, &step); err != nil {
                return err
            }
            fmt.Printf("step: %d, %s\n", step.Step, step.Title)
            return msg.Complete(ctx)
        }), sequenceByStepNumber[i+1])
        if err != nil {
            fmt.Println(err)
            return
        }
    }

}

Code:

ctx, cancel := context.WithTimeout(context.Background(), 40*time.Second)
defer cancel()

connStr := os.Getenv("SERVICEBUS_CONNECTION_STRING")
if connStr == "" {
    fmt.Println("FATAL: expected environment variable SERVICEBUS_CONNECTION_STRING not set")
    return
}

// Create a client to communicate with a Service Bus Namespace.
ns, err := servicebus.NewNamespace(servicebus.NamespaceWithConnectionString(connStr))
if err != nil {
    fmt.Println(err)
    return
}

window := 30 * time.Second
qm := ns.NewQueueManager()
qe, err := ensureQueue(ctx, qm, "DuplicateDetectionExample", servicebus.QueueEntityWithDuplicateDetection(&window))
if err != nil {
    fmt.Println(err)
    return
}

q, err := ns.NewQueue(qe.Name)
if err != nil {
    fmt.Println(err)
    return
}
defer func() {
    _ = q.Close(ctx)
}()

guid, err := uuid.NewV4()
if err != nil {
    fmt.Println(err)
    return
}

msg := servicebus.NewMessageFromString("foo")
msg.ID = guid.String()

// send the message twice with the same ID
for i := 0; i < 2; i++ {
    if err := q.Send(ctx, msg); err != nil {
        fmt.Println(err)
        return
    }
}

// there should be only 1 message received from the queue
go func() {
    if err := q.Receive(ctx, MessagePrinter{}); err != nil {
        if err.Error() != "context canceled" {
            fmt.Println(err)
            return
        }
    }
}()

time.Sleep(2 * time.Second)

Output:

foo

Code:

package main

import (
    "context"
    "encoding/json"
    "fmt"
    "os"
    "time"

    "github.com/Azure/azure-service-bus-go"
)

type (
    Scientist struct {
        Surname   string `json:"surname,omitempty"`
        FirstName string `json:"firstname,omitempty"`
    }
)

func main() {
    ctx, cancel := context.WithTimeout(context.Background(), 40*time.Second)
    defer cancel()

    connStr := os.Getenv("SERVICEBUS_CONNECTION_STRING")
    if connStr == "" {
        fmt.Println("FATAL: expected environment variable SERVICEBUS_CONNECTION_STRING not set")
        return
    }

    // Create a client to communicate with a Service Bus Namespace.
    ns, err := servicebus.NewNamespace(servicebus.NamespaceWithConnectionString(connStr))
    if err != nil {
        fmt.Println(err)
        return
    }

    qm := ns.NewQueueManager()
    qEntity, err := ensureQueue(ctx, qm, "MessageBrowseExample")
    if err != nil {
        fmt.Println(err)
        return
    }

    q, err := ns.NewQueue(qEntity.Name)
    if err != nil {
        fmt.Println(err)
        return
    }

    txRxCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
    defer cancel()
    go sendMessages(txRxCtx, q)
    time.Sleep(1 * time.Second) // wait a second to ensure a message has landed in the queue
    go peekMessages(txRxCtx, q)

    <-txRxCtx.Done() // wait for the context to finish

}

func sendMessages(ctx context.Context, q *servicebus.Queue) {

    scientists := []Scientist{
        {
            Surname:   "Einstein",
            FirstName: "Albert",
        },
        {
            Surname:   "Heisenberg",
            FirstName: "Werner",
        },
        {
            Surname:   "Curie",
            FirstName: "Marie",
        },
        {
            Surname:   "Hawking",
            FirstName: "Steven",
        },
        {
            Surname:   "Newton",
            FirstName: "Isaac",
        },
        {
            Surname:   "Bohr",
            FirstName: "Niels",
        },
        {
            Surname:   "Faraday",
            FirstName: "Michael",
        },
        {
            Surname:   "Galilei",
            FirstName: "Galileo",
        },
        {
            Surname:   "Kepler",
            FirstName: "Johannes",
        },
        {
            Surname:   "Kopernikus",
            FirstName: "Nikolaus",
        },
    }

    for _, scientist := range scientists {
        bits, err := json.Marshal(scientist)
        if err != nil {
            fmt.Println(err)
            return
        }

        ttl := 2 * time.Minute
        msg := servicebus.NewMessage(bits)
        msg.ContentType = "application/json"
        msg.TTL = &ttl
        if err := q.Send(ctx, msg); err != nil {
            fmt.Println(err)
            return
        }
    }
}

func peekMessages(ctx context.Context, q *servicebus.Queue) {
    var opts []servicebus.PeekOption
    for {
        select {
        case <-ctx.Done():
            return
        default:
            msg, err := q.PeekOne(ctx, opts...)
            if err != nil {
                switch err.(type) {
                case servicebus.ErrNoMessages:
                    // all done
                    return
                default:
                    fmt.Println(err)
                    return
                }
            }

            var scientist Scientist
            if err := json.Unmarshal(msg.Data, &scientist); err != nil {
                fmt.Println(err)
                return
            }

            opts = []servicebus.PeekOption{servicebus.PeekFromSequenceNumber(*msg.SystemProperties.SequenceNumber)}
            fmt.Printf("Firstname: %s, Surname: %s\n", scientist.FirstName, scientist.Surname)
        }
    }
}

Code:

package main

import (
    "context"
    "encoding/json"
    "fmt"
    "os"
    "time"

    "github.com/Azure/azure-service-bus-go"
)

type StepSessionHandler struct {
    messageSession *servicebus.MessageSession
}

// Start is called when a new session is started
func (ssh *StepSessionHandler) Start(ms *servicebus.MessageSession) error {
    ssh.messageSession = ms
    fmt.Println("Begin session: ", *ssh.messageSession.SessionID())
    return nil
}

// Handle is called when a new session message is received
func (ssh *StepSessionHandler) Handle(ctx context.Context, msg *servicebus.Message) error {
    var step RecipeStep
    if err := json.Unmarshal(msg.Data, &step); err != nil {
        fmt.Println(err)
        return err
    }

    fmt.Printf("  Step: %d, %s\n", step.Step, step.Title)

    if step.Step == 5 {
        ssh.messageSession.Close()
    }
    return msg.Complete(ctx)
}

// End is called when the message session is closed. Service Bus will not automatically end your message session. Be
// sure to know when to terminate your own session.
func (ssh *StepSessionHandler) End() {
    fmt.Println("End session: ", *ssh.messageSession.SessionID())
    fmt.Println("")
}

func main() {
    ctx, cancel := context.WithTimeout(context.Background(), 40*time.Second)
    defer cancel()

    connStr := os.Getenv("SERVICEBUS_CONNECTION_STRING")
    if connStr == "" {
        fmt.Println("FATAL: expected environment variable SERVICEBUS_CONNECTION_STRING not set")
        return
    }

    // Create a client to communicate with a Service Bus Namespace.
    ns, err := servicebus.NewNamespace(servicebus.NamespaceWithConnectionString(connStr))
    if err != nil {
        fmt.Println(err)
        return
    }

    // Create a Service Bus Queue with required sessions enabled. This will ensure that all messages sent and received
    // are bound to a session.
    qm := ns.NewQueueManager()
    qEntity, err := ensureQueue(ctx, qm, "MessageSessionsExample", servicebus.QueueEntityWithRequiredSessions())
    if err != nil {
        fmt.Println(err)
        return
    }

    q, err := ns.NewQueue(qEntity.Name)
    if err != nil {
        fmt.Println(err)
        return
    }

    sessions := []string{"foo", "bar", "bazz", "buzz"}
    for _, session := range sessions {
        // send recipe steps
        // note that order is preserved within a given session
        sendSessionRecipeSteps(ctx, session, q)
    }

    // receive messages for each session
    // you can also call q.NewSession(nil) to receive from any available session
    for _, session := range sessions {
        queueSession := q.NewSession(&session)
        err := queueSession.ReceiveOne(ctx, new(StepSessionHandler))
        if err != nil {
            fmt.Println(err)
            return
        }

        if err := queueSession.Close(ctx); err != nil {
            fmt.Println(err)
            return
        }
    }

}

func sendSessionRecipeSteps(ctx context.Context, sessionID string, q *servicebus.Queue) {
    steps := []RecipeStep{
        {
            Step:  1,
            Title: "Shop",
        },
        {
            Step:  2,
            Title: "Unpack",
        },
        {
            Step:  3,
            Title: "Prepare",
        },
        {
            Step:  4,
            Title: "Cook",
        },
        {
            Step:  5,
            Title: "Eat",
        },
    }

    for _, step := range steps {
        bits, err := json.Marshal(step)
        if err != nil {
            fmt.Println(err)
            return
        }

        msg := servicebus.NewMessage(bits)
        msg.ContentType = "application/json"
        msg.SessionID = &sessionID
        if err := q.Send(ctx, msg); err != nil {
            fmt.Println(err)
            return
        }
    }
}

Code:

ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()

connStr := os.Getenv("SERVICEBUS_CONNECTION_STRING")
if connStr == "" {
    fmt.Println("FATAL: expected environment variable SERVICEBUS_CONNECTION_STRING not set")
    return
}

// Create a client to communicate with a Service Bus Namespace.
ns, err := servicebus.NewNamespace(servicebus.NamespaceWithConnectionString(connStr))
if err != nil {
    fmt.Println(err)
    return
}

qm := ns.NewQueueManager()
prefetch1, err := ensureQueue(ctx, qm, "Prefetch1Example")
if err != nil {
    fmt.Println(err)
    return
}

prefetch1000, err := ensureQueue(ctx, qm, "Prefetch1000Example")
if err != nil {
    fmt.Println(err)
    return
}

// sendAndReceive will send to the queue and read from the queue
sendAndReceive := func(ctx context.Context, name string, opt servicebus.QueueOption) error {
    messageCount := 200
    q, err := ns.NewQueue(name, opt, servicebus.QueueWithReceiveAndDelete())
    if err != nil {
        return err
    }

    buffer := make([]byte, 1000)
    if _, err := rand.Read(buffer); err != nil {
        return err
    }

    for i := 0; i < messageCount; i++ {
        if err := q.Send(ctx, servicebus.NewMessage(buffer)); err != nil {
            return err
        }
    }

    innerCtx, cancel := context.WithCancel(ctx)
    count := 0
    err = q.Receive(innerCtx, servicebus.HandlerFunc(func(ctx context.Context, msg *servicebus.Message) error {
        count++
        if count == messageCount-1 {
            defer cancel()
        }
        return msg.Complete(ctx)
    }))
    if err != nil {
        if err.Error() != "context canceled" {
            return err
        }
    }
    return nil
}

// run send and receive concurrently and compare the times
totalPrefetch1 := make(chan time.Duration)
go func() {
    start := time.Now()
    if err := sendAndReceive(ctx, prefetch1.Name, servicebus.QueueWithPrefetchCount(1)); err != nil {
        fmt.Println(err)
        return
    }
    totalPrefetch1 <- time.Now().Sub(start)
}()

totalPrefetch1000 := make(chan time.Duration)
go func() {
    start := time.Now()
    if err := sendAndReceive(ctx, prefetch1000.Name, servicebus.QueueWithPrefetchCount(1000)); err != nil {
        fmt.Println(err)
        return
    }
    totalPrefetch1000 <- time.Now().Sub(start)
}()

tp1 := <-totalPrefetch1
tp2 := <-totalPrefetch1000

if tp1 > tp2 {
    fmt.Println("prefetch of 1000 took less time!")
}

Output:

prefetch of 1000 took less time!

Code:

package main

import (
    "context"
    "fmt"
    "os"
    "strconv"
    "strings"
    "time"

    "github.com/Azure/azure-service-bus-go"
)

type PrioritySubscription struct {
    Name         string
    Expression   string
    MessageCount int
}

type PriorityMessage struct {
    Body     string
    Priority int
}

type PriorityPrinter struct {
    SubName string
}

func (pp PriorityPrinter) Handle(ctx context.Context, msg *servicebus.Message) error {
    i, ok := msg.UserProperties["Priority"].(int64)
    if !ok {
        fmt.Println("Priority is not an int64")
    }

    fmt.Println(strings.Join([]string{pp.SubName, string(msg.Data), strconv.Itoa(int(i))}, "_"))
    return msg.Complete(ctx)
}

func main() {
    ctx, cancel := context.WithTimeout(context.Background(), 40*time.Second)
    defer cancel()

    connStr := os.Getenv("SERVICEBUS_CONNECTION_STRING")
    if connStr == "" {
        fmt.Println("FATAL: expected environment variable SERVICEBUS_CONNECTION_STRING not set")
        return
    }

    // Create a client to communicate with a Service Bus Namespace.
    ns, err := servicebus.NewNamespace(servicebus.NamespaceWithConnectionString(connStr))
    if err != nil {
        fmt.Println(err)
        return
    }

    // build the topic for sending priority messages
    tm := ns.NewTopicManager()
    topicEntity, err := ensureTopic(ctx, tm, "PrioritySubscriptionsTopic")
    if err != nil {
        fmt.Println(err)
        return
    }

    sm, err := ns.NewSubscriptionManager(topicEntity.Name)
    if err != nil {
        fmt.Println(err)
        return
    }

    // build each priority subscription providing each with a SQL like expression to filter messages from the topic
    prioritySubs := []PrioritySubscription{
        {
            Name:         "Priority1",
            Expression:   "user.Priority=1",
            MessageCount: 1,
        },
        {
            Name:         "Priority2",
            Expression:   "user.Priority=2",
            MessageCount: 1,
        },
        {
            Name:         "PriorityGreaterThan2",
            Expression:   "user.Priority>2",
            MessageCount: 2,
        },
    }
    for _, s := range prioritySubs {
        subEntity, err := ensureSubscription(ctx, sm, s.Name)
        if err != nil {
            fmt.Println(err)
            return
        }

        // remove the default rule, which is the "TrueFilter" that accepts all messages
        err = sm.DeleteRule(ctx, subEntity.Name, "$Default")
        if err != nil {
            fmt.Println(err)
            return
        }

        _, err = sm.PutRule(ctx, subEntity.Name, s.Name+"Rule", servicebus.SQLFilter{Expression: s.Expression})
        if err != nil {
            fmt.Println(err)
            return
        }
    }

    priorityMessages := []PriorityMessage{
        {
            Body:     "foo",
            Priority: 1,
        },
        {
            Body:     "bar",
            Priority: 2,
        },
        {
            Body:     "bazz",
            Priority: 3,
        },
        {
            Body:     "buzz",
            Priority: 4,
        },
    }
    topic, err := ns.NewTopic(topicEntity.Name)
    if err != nil {
        fmt.Println(err)
        return
    }
    defer func() {
        _ = topic.Close(ctx)
    }()

    for _, pMessage := range priorityMessages {
        msg := servicebus.NewMessageFromString(pMessage.Body)
        msg.UserProperties = map[string]interface{}{"Priority": pMessage.Priority}
        if err := topic.Send(ctx, msg); err != nil {
            fmt.Println(err)
            return
        }
    }

    for _, s := range prioritySubs {
        sub, err := topic.NewSubscription(s.Name)
        if err != nil {
            fmt.Println(err)
            return
        }

        for i := 0; i < s.MessageCount; i++ {
            err := sub.ReceiveOne(ctx, PriorityPrinter{SubName: sub.Name})
            if err != nil {
                fmt.Println(err)
                return
            }
        }
        err = sub.Close(ctx)
        if err != nil {
            fmt.Println(err)
            return
        }
    }

}

func ensureTopic(ctx context.Context, tm *servicebus.TopicManager, name string, opts ...servicebus.TopicManagementOption) (*servicebus.TopicEntity, error) {
    te, err := tm.Get(ctx, name)
    if err == nil {
        _ = tm.Delete(ctx, name)
    }

    te, err = tm.Put(ctx, name, opts...)
    if err != nil {
        fmt.Println(err)
        return nil, err
    }

    return te, nil
}

func ensureSubscription(ctx context.Context, sm *servicebus.SubscriptionManager, name string, opts ...servicebus.SubscriptionManagementOption) (*servicebus.SubscriptionEntity, error) {
    subEntity, err := sm.Get(ctx, name)
    if err == nil {
        _ = sm.Delete(ctx, name)
    }

    subEntity, err = sm.Put(ctx, name, opts...)
    if err != nil {
        fmt.Println(err)
        return nil, err
    }

    return subEntity, nil
}

Code:

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

connStr := os.Getenv("SERVICEBUS_CONNECTION_STRING")
if connStr == "" {
    fmt.Println("FATAL: expected environment variable SERVICEBUS_CONNECTION_STRING not set")
    return
}

// Create a client to communicate with a Service Bus Namespace.
ns, err := servicebus.NewNamespace(servicebus.NamespaceWithConnectionString(connStr))
if err != nil {
    fmt.Println(err)
    return
}

// Create a client to communicate with the queue. (The queue must have already been created, see `QueueManager`)
q, err := ns.NewQueue("helloworld")
if err != nil {
    fmt.Println("FATAL: ", err)
    return
}

err = q.Send(ctx, servicebus.NewMessageFromString("Hello, World!!!"))
if err != nil {
    fmt.Println("FATAL: ", err)
    return
}

err = q.ReceiveOne(
    ctx,
    servicebus.HandlerFunc(func(ctx context.Context, message *servicebus.Message) error {
        fmt.Println(string(message.Data))
        return message.Complete(ctx)
    }))
if err != nil {
    fmt.Println("FATAL: ", err)
    return
}

Output:

Hello, World!!!

Code:

package main

import (
    "context"
    "fmt"
    "github.com/Azure/azure-service-bus-go"
    "os"
    "time"
)

func main() {
    ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
    defer cancel()

    connStr := os.Getenv("SERVICEBUS_CONNECTION_STRING")
    if connStr == "" {
        fmt.Println("FATAL: expected environment variable SERVICEBUS_CONNECTION_STRING not set")
        return
    }

    // Create a client to communicate with a Service Bus Namespace.
    ns, err := servicebus.NewNamespace(servicebus.NamespaceWithConnectionString(connStr))
    if err != nil {
        fmt.Println("FATAL: ", err)
        return
    }

    // Create a client to communicate with the queue. (The queue must have already been created, see `QueueManager`)
    client, err := ns.NewQueue("scheduledmessages")
    if err != nil {
        fmt.Println("FATAL: ", err)
        return
    }

    // purge all of the existing messages in the queue
    purgeMessages(ns)

    // The delay that we should schedule a message for.
    const waitTime = 1 * time.Minute
    // Service Bus guarantees roughly a one minute window. So that our tests aren't flaky, we'll buffer our expectations
    // on either side.
    const buffer = 20 * time.Second

    expectedTime := time.Now().Add(waitTime)
    msg := servicebus.NewMessageFromString("to the future!!")
    msg.ScheduleAt(expectedTime)

    err = client.Send(ctx, msg)
    if err != nil {
        fmt.Println("FATAL: ", err)
        return
    }

    err = client.ReceiveOne(
        ctx,
        servicebus.HandlerFunc(func(ctx context.Context, msg *servicebus.Message) error {
            received := time.Now()
            if received.Before(expectedTime.Add(buffer)) && received.After(expectedTime.Add(-buffer)) {
                fmt.Println("Received when expected!")
            } else {
                fmt.Println("Received outside the expected window.")
            }
            return msg.Complete(ctx)
        }))
    if err != nil {
        fmt.Println("FATAL: ", err)
        return
    }

}

func purgeMessages(ns *servicebus.Namespace) {
    purgeCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
    client, _ := ns.NewQueue("scheduledmessages")
    defer func() {
        _ = client.Close(purgeCtx)
    }()
    defer cancel()
    _ = client.Receive(purgeCtx, servicebus.HandlerFunc(func(ctx context.Context, msg *servicebus.Message) error {
        return msg.Complete(ctx)
    }))
}

Index

Examples

Package Files

action.go batch.go batch_disposition.go deadletter.go entity.go errors.go filter.go handler.go iterator.go message.go message_session.go mgmt.go namespace.go operation_constants.go queue.go queue_manager.go receiver.go rpc.go sender.go session.go subscription.go subscription_manager.go topic.go topic_manager.go tracing.go

Constants

const (
    // PeekLockMode causes a Receiver to peek at a message, lock it so no others can consume and have the queue wait for
    // the DispositionAction
    PeekLockMode ReceiveMode = 0
    // ReceiveAndDeleteMode causes a Receiver to pop messages off of the queue without waiting for DispositionAction
    ReceiveAndDeleteMode ReceiveMode = 1

    // DeadLetterQueueName is the name of the dead letter queue to be appended to the entity path
    DeadLetterQueueName = "$DeadLetterQueue"

    // TransferDeadLetterQueueName is the name of the transfer dead letter queue which is appended to the entity name to
    // build the full address of the transfer dead letter queue.
    TransferDeadLetterQueueName = "$Transfer/" + DeadLetterQueueName
)
const (

    // Version is the semantic version number
    Version = "0.9.1"
)

func IsErrNotFound Uses

func IsErrNotFound(err error) bool

IsErrNotFound returns true if the error argument is an ErrNotFound type

type ActionDescriber Uses

type ActionDescriber interface {
    ToActionDescription() ActionDescription
}

ActionDescriber can transform itself into a ActionDescription

type ActionDescription Uses

type ActionDescription struct {
    Type                  string `xml:"http://www.w3.org/2001/XMLSchema-instance type,attr"`
    SQLExpression         string `xml:"SqlExpression"`
    RequiresPreprocessing bool   `xml:"RequiresPreprocessing"`
    CompatibilityLevel    int    `xml:"CompatibilityLevel,omitempty"`
}

ActionDescription describes an action upon a message that matches a filter

With SQL filter conditions, you can define an action that can annotate the message by adding, removing, or replacing properties and their values. The action uses a SQL-like expression that loosely leans on the SQL UPDATE statement syntax. The action is performed on the message after it has been matched and before the message is selected into the subscription. The changes to the message properties are private to the message copied into the subscription.

type BaseEntityDescription Uses

type BaseEntityDescription struct {
    InstanceMetadataSchema *string `xml:"xmlns:i,attr,omitempty"`
    ServiceBusSchema       *string `xml:"xmlns,attr,omitempty"`
}

BaseEntityDescription provides common fields which are part of Queues, Topics and Subscriptions

type BatchDispositionError Uses

type BatchDispositionError struct {
    Errors []DispositionError
}

BatchDispositionError is an error which returns a collection of DispositionError.

func (BatchDispositionError) Error Uses

func (bde BatchDispositionError) Error() string

type BatchDispositionIterator Uses

type BatchDispositionIterator struct {
    LockTokenIDs []*uuid.UUID
    Status       MessageStatus
    // contains filtered or unexported fields
}

BatchDispositionIterator provides an iterator over LockTokenIDs

func (*BatchDispositionIterator) Done Uses

func (bdi *BatchDispositionIterator) Done() bool

Done communicates whether there are more messages remaining to be iterated over.

func (*BatchDispositionIterator) Next Uses

func (bdi *BatchDispositionIterator) Next() (uuid *uuid.UUID)

Next iterates to the next LockToken

type BatchIterator Uses

type BatchIterator interface {
    Done() bool
    Next(messageID string, opts *BatchOptions) (*MessageBatch, error)
}

BatchIterator offers a simple mechanism for batching a list of messages

type BatchOptions Uses

type BatchOptions struct {
    SessionID *string
}

BatchOptions are optional information to add to a batch of messages

type Closer Uses

type Closer interface {
    Close(ctx context.Context) error
}

Closer provides the ability to close an entity

type CorrelationFilter Uses

type CorrelationFilter struct {
    CorrelationID    *string                `xml:"CorrelationId,omitempty"`
    MessageID        *string                `xml:"MessageId,omitempty"`
    To               *string                `xml:"To,omitempty"`
    ReplyTo          *string                `xml:"ReplyTo,omitempty"`
    Label            *string                `xml:"Label,omitempty"`
    SessionID        *string                `xml:"SessionId,omitempty"`
    ReplyToSessionID *string                `xml:"ReplyToSessionId,omitempty"`
    ContentType      *string                `xml:"ContentType,omitempty"`
    Properties       map[string]interface{} `xml:"Properties,omitempty"`
}

CorrelationFilter holds a set of conditions that are matched against one or more of an arriving message's user and system properties. A common use is to match against the CorrelationId property, but the application can also choose to match against ContentType, Label, MessageId, ReplyTo, ReplyToSessionId, SessionId, To, and any user-defined properties. A match exists when an arriving message's value for a property is equal to the value specified in the correlation filter. For string expressions, the comparison is case-sensitive. When specifying multiple match properties, the filter combines them as a logical AND condition, meaning for the filter to match, all conditions must match.

func (CorrelationFilter) ToFilterDescription Uses

func (cf CorrelationFilter) ToFilterDescription() FilterDescription

ToFilterDescription will transform the CorrelationFilter into a FilterDescription

type CountDetails Uses

type CountDetails struct {
    XMLName                        xml.Name `xml:"CountDetails"`
    ActiveMessageCount             *int32   `xml:"ActiveMessageCount,omitempty"`
    DeadLetterMessageCount         *int32   `xml:"DeadLetterMessageCount,omitempty"`
    ScheduledMessageCount          *int32   `xml:"ScheduledMessageCount,omitempty"`
    TransferDeadLetterMessageCount *int32   `xml:"TransferDeadLetterMessageCount,omitempty"`
    TransferMessageCount           *int32   `xml:"TransferMessageCount,omitempty"`
}

CountDetails has current active (and other) messages for queue/topic.

type DeadLetter Uses

type DeadLetter struct {
    // contains filtered or unexported fields
}

DeadLetter represents a dead letter queue in Azure Service Bus.

Azure Service Bus queues, topics and subscriptions provide a secondary sub-queue, called a dead-letter queue (DLQ). The dead-letter queue does not need to be explicitly created and cannot be deleted or otherwise managed independent of the main entity.

The purpose of the dead-letter queue is to hold messages that cannot be delivered to any receiver, or messages that could not be processed. Messages can then be removed from the DLQ and inspected. An application might, with help of an operator, correct issues and resubmit the message, log the fact that there was an error, and take corrective action.

From an API and protocol perspective, the DLQ is mostly similar to any other queue, except that messages can only be submitted via the dead-letter operation of the parent entity. In addition, time-to-live is not observed, and you can't dead-letter a message from a DLQ. The dead-letter queue fully supports peek-lock delivery and transactional operations.

Note that there is no automatic cleanup of the DLQ. Messages remain in the DLQ until you explicitly retrieve them from the DLQ and call Complete() on the dead-letter message.

func NewDeadLetter Uses

func NewDeadLetter(builder DeadLetterBuilder) *DeadLetter

NewDeadLetter constructs an instance of DeadLetter which represents a dead letter queue in Azure Service Bus

func (*DeadLetter) Close Uses

func (dl *DeadLetter) Close(ctx context.Context) error

Close the underlying connection to Service Bus

func (*DeadLetter) ReceiveOne Uses

func (dl *DeadLetter) ReceiveOne(ctx context.Context, handler Handler) error

ReceiveOne will receive one message from the dead letter queue

type DeadLetterBuilder Uses

type DeadLetterBuilder interface {
    NewDeadLetterReceiver(ctx context.Context, opts ...ReceiverOption) (ReceiveOner, error)
}

DeadLetterBuilder provides the ability to create a new receiver addressed to a given entity's dead letter queue.

type DispositionAction Uses

type DispositionAction func(ctx context.Context) error

DispositionAction represents the action to notify Azure Service Bus of the Message's disposition

type DispositionError Uses

type DispositionError struct {
    LockTokenID *uuid.UUID
    // contains filtered or unexported fields
}

DispositionError is an error associated with a LockTokenID.

func (DispositionError) Error Uses

func (de DispositionError) Error() string

func (DispositionError) UnWrap Uses

func (de DispositionError) UnWrap() error

UnWrap will return the private error.

type Entity Uses

type Entity struct {
    Name string
    ID   string
}

Entity is represents the most basic form of an Azure Service Bus entity.

func (Entity) TargetURI Uses

func (e Entity) TargetURI() string

TargetURI provides an absolute address to a target entity

type EntityManagementAddresser Uses

type EntityManagementAddresser interface {
    ManagementPath() string
}

EntityManagementAddresser describes the ability of an entity to provide an addressable path to it's management endpoint

type EntityStatus Uses

type EntityStatus string

EntityStatus enumerates the values for entity status.

const (
    // Active ...
    Active EntityStatus = "Active"
    // Creating ...
    Creating EntityStatus = "Creating"
    // Deleting ...
    Deleting EntityStatus = "Deleting"
    // Disabled ...
    Disabled EntityStatus = "Disabled"
    // ReceiveDisabled ...
    ReceiveDisabled EntityStatus = "ReceiveDisabled"
    // Renaming ...
    Renaming EntityStatus = "Renaming"
    // Restoring ...
    Restoring EntityStatus = "Restoring"
    // SendDisabled ...
    SendDisabled EntityStatus = "SendDisabled"
    // Unknown ...
    Unknown EntityStatus = "Unknown"
)

type ErrAMQP Uses

type ErrAMQP rpc.Response

ErrAMQP indicates that the server communicated an AMQP error with a particular

func (ErrAMQP) Error Uses

func (e ErrAMQP) Error() string

type ErrIncorrectType Uses

type ErrIncorrectType struct {
    Key          string
    ExpectedType reflect.Type
    ActualValue  interface{}
}

ErrIncorrectType indicates that type assertion failed. This should only be encountered when there is an error with this library, or the server has altered its behavior unexpectedly.

func (ErrIncorrectType) Error Uses

func (e ErrIncorrectType) Error() string

type ErrMalformedMessage Uses

type ErrMalformedMessage string

ErrMalformedMessage indicates that a message was expected in the form of []byte was not a []byte. This is likely a bug and should be reported.

func (ErrMalformedMessage) Error Uses

func (e ErrMalformedMessage) Error() string

type ErrMissingField Uses

type ErrMissingField string

ErrMissingField indicates that an expected property was missing from an AMQP message. This should only be encountered when there is an error with this library, or the server has altered its behavior unexpectedly.

func (ErrMissingField) Error Uses

func (e ErrMissingField) Error() string

type ErrNoMessages Uses

type ErrNoMessages struct{}

ErrNoMessages is returned when an operation returned no messages. It is not indicative that there will not be more messages in the future.

func (ErrNoMessages) Error Uses

func (e ErrNoMessages) Error() string

type ErrNotFound Uses

type ErrNotFound struct {
    EntityPath string
}

ErrNotFound is returned when an entity is not found (404)

func (ErrNotFound) Error Uses

func (e ErrNotFound) Error() string

type FalseFilter Uses

type FalseFilter struct{}

FalseFilter represents a always false sql expression which will deny all messages

func (FalseFilter) ToFilterDescription Uses

func (ff FalseFilter) ToFilterDescription() FilterDescription

ToFilterDescription will transform the FalseFilter into a FilterDescription

type FilterDescriber Uses

type FilterDescriber interface {
    ToFilterDescription() FilterDescription
}

FilterDescriber can transform itself into a FilterDescription

type FilterDescription Uses

type FilterDescription struct {
    XMLName xml.Name `xml:"Filter"`
    CorrelationFilter
    Type               string  `xml:"http://www.w3.org/2001/XMLSchema-instance type,attr"`
    SQLExpression      *string `xml:"SqlExpression,omitempty"`
    CompatibilityLevel int     `xml:"CompatibilityLevel,omitempty"`
}

FilterDescription describes a filter which can be applied to a subscription to filter messages from the topic.

Subscribers can define which messages they want to receive from a topic. These messages are specified in the form of one or more named subscription rules. Each rule consists of a condition that selects particular messages and an action that annotates the selected message. For each matching rule condition, the subscription produces a copy of the message, which may be differently annotated for each matching rule.

Each newly created topic subscription has an initial default subscription rule. If you don't explicitly specify a filter condition for the rule, the applied filter is the true filter that enables all messages to be selected into the subscription. The default rule has no associated annotation action.

type Handler Uses

type Handler interface {
    Handle(context.Context, *Message) error
}

Handler exposes the functionality required to process a Service Bus message.

type HandlerFunc Uses

type HandlerFunc func(context.Context, *Message) error

HandlerFunc is a type converter that allows a func to be used as a `Handler`

func (HandlerFunc) Handle Uses

func (hf HandlerFunc) Handle(ctx context.Context, msg *Message) error

Handle redirects this call to the func that was provided.

type ListenerHandle Uses

type ListenerHandle struct {
    // contains filtered or unexported fields
}

ListenerHandle provides the ability to close or listen to the close of a Receiver

func (*ListenerHandle) Close Uses

func (lc *ListenerHandle) Close(ctx context.Context) error

Close will close the listener

func (*ListenerHandle) Done Uses

func (lc *ListenerHandle) Done() <-chan struct{}

Done will close the channel when the listener has stopped

func (*ListenerHandle) Err Uses

func (lc *ListenerHandle) Err() error

Err will return the last error encountered

type MaxMessageSizeInBytes Uses

type MaxMessageSizeInBytes int

MaxMessageSizeInBytes is the max number of bytes allowed by Azure Service Bus

const (
    // StandardMaxMessageSizeInBytes is the maximum number of bytes in a message for the Standard tier
    StandardMaxMessageSizeInBytes MaxMessageSizeInBytes = 256000
    // PremiumMaxMessageSizeInBytes is the maximum number of bytes in a message for the Premium tier
    PremiumMaxMessageSizeInBytes MaxMessageSizeInBytes = 1000000
)

type Message Uses

type Message struct {
    ContentType      string
    CorrelationID    string
    Data             []byte
    DeliveryCount    uint32
    SessionID        *string
    GroupSequence    *uint32
    ID               string
    Label            string
    ReplyTo          string
    ReplyToGroupID   string
    To               string
    TTL              *time.Duration
    LockToken        *uuid.UUID
    SystemProperties *SystemProperties
    UserProperties   map[string]interface{}
    Format           uint32
    // contains filtered or unexported fields
}

Message is an Service Bus message to be sent or received

func NewMessage Uses

func NewMessage(data []byte) *Message

NewMessage builds an Message from a slice of data

func NewMessageFromString Uses

func NewMessageFromString(message string) *Message

NewMessageFromString builds an Message from a string message

func (*Message) Abandon Uses

func (m *Message) Abandon(ctx context.Context) error

Abandon will notify Azure Service Bus the message failed but should be re-queued for delivery.

func (*Message) AbandonAction Uses

func (m *Message) AbandonAction() DispositionAction

AbandonAction will notify Azure Service Bus the message failed but should be re-queued for delivery.

func (*Message) Complete Uses

func (m *Message) Complete(ctx context.Context) error

Complete will notify Azure Service Bus that the message was successfully handled and should be deleted from the queue

func (*Message) CompleteAction Uses

func (m *Message) CompleteAction() DispositionAction

CompleteAction will notify Azure Service Bus that the message was successfully handled and should be deleted from the queue

func (*Message) DeadLetter Uses

func (m *Message) DeadLetter(ctx context.Context, err error) error

DeadLetter will notify Azure Service Bus the message failed and should not re-queued

func (*Message) DeadLetterAction Uses

func (m *Message) DeadLetterAction(err error) DispositionAction

DeadLetterAction will notify Azure Service Bus the message failed and should not re-queued

func (*Message) DeadLetterWithInfo Uses

func (m *Message) DeadLetterWithInfo(ctx context.Context, err error, condition MessageErrorCondition, additionalData map[string]string) error

DeadLetterWithInfo will notify Azure Service Bus the message failed and should not be re-queued with additional context

func (*Message) DeadLetterWithInfoAction Uses

func (m *Message) DeadLetterWithInfoAction(err error, condition MessageErrorCondition, additionalData map[string]string) DispositionAction

DeadLetterWithInfoAction will notify Azure Service Bus the message failed and should not be re-queued with additional context

func (*Message) Defer Uses

func (m *Message) Defer(ctx context.Context) error

Defer will set aside the message for later processing

When a queue or subscription client receives a message that it is willing to process, but for which processing is not currently possible due to special circumstances inside of the application, it has the option of "deferring" retrieval of the message to a later point. The message remains in the queue or subscription, but it is set aside.

Deferral is a feature specifically created for workflow processing scenarios. Workflow frameworks may require certain operations to be processed in a particular order, and may have to postpone processing of some received messages until prescribed prior work that is informed by other messages has been completed.

A simple illustrative example is an order processing sequence in which a payment notification from an external payment provider appears in a system before the matching purchase order has been propagated from the store front to the fulfillment system. In that case, the fulfillment system might defer processing the payment notification until there is an order with which to associate it. In rendezvous scenarios, where messages from different sources drive a workflow forward, the real-time execution order may indeed be correct, but the messages reflecting the outcomes may arrive out of order.

Ultimately, deferral aids in reordering messages from the arrival order into an order in which they can be processed, while leaving those messages safely in the message store for which processing needs to be postponed.

func (*Message) GetKeyValues Uses

func (m *Message) GetKeyValues() map[string]interface{}

GetKeyValues implements tab.Carrier

func (*Message) ScheduleAt Uses

func (m *Message) ScheduleAt(t time.Time)

ScheduleAt will ensure Azure Service Bus delivers the message after the time specified (usually within 1 minute after the specified time)

func (*Message) Set Uses

func (m *Message) Set(key string, value interface{})

Set implements tab.Carrier

type MessageBatch Uses

type MessageBatch struct {
    *Message

    MaxSize MaxMessageSizeInBytes
    // contains filtered or unexported fields
}

MessageBatch represents a batch of messages to send to Service Bus in a single message

func NewMessageBatch Uses

func NewMessageBatch(maxSize MaxMessageSizeInBytes, messageID string, opts *BatchOptions) *MessageBatch

NewMessageBatch builds a new message batch with a default standard max message size

func (*MessageBatch) Add Uses

func (mb *MessageBatch) Add(m *Message) (bool, error)

Add adds a message to the batch if the message will not exceed the max size of the batch

func (*MessageBatch) Clear Uses

func (mb *MessageBatch) Clear()

Clear will zero out the batch size and clear the buffered messages

func (*MessageBatch) Size Uses

func (mb *MessageBatch) Size() int

Size is the number of bytes in the message batch

type MessageBatchIterator Uses

type MessageBatchIterator struct {
    Messages []*Message
    Cursor   int
    MaxSize  MaxMessageSizeInBytes
}

MessageBatchIterator provides an easy way to iterate over a slice of messages to reliably create batches

func NewMessageBatchIterator Uses

func NewMessageBatchIterator(maxBatchSize MaxMessageSizeInBytes, msgs ...*Message) *MessageBatchIterator

NewMessageBatchIterator wraps a slice of Message pointers to allow it to be made into a MessageIterator.

func (*MessageBatchIterator) Done Uses

func (mbi *MessageBatchIterator) Done() bool

Done communicates whether there are more messages remaining to be iterated over.

func (*MessageBatchIterator) Next Uses

func (mbi *MessageBatchIterator) Next(messageID string, opts *BatchOptions) (*MessageBatch, error)

Next fetches the batch of messages in the message slice at a position one larger than the last one accessed.

type MessageErrorCondition Uses

type MessageErrorCondition string

MessageErrorCondition represents a well-known collection of AMQP errors

const (
    ErrorInternalError         MessageErrorCondition = "amqp:internal-error"
    ErrorNotFound              MessageErrorCondition = "amqp:not-found"
    ErrorUnauthorizedAccess    MessageErrorCondition = "amqp:unauthorized-access"
    ErrorDecodeError           MessageErrorCondition = "amqp:decode-error"
    ErrorResourceLimitExceeded MessageErrorCondition = "amqp:resource-limit-exceeded"
    ErrorNotAllowed            MessageErrorCondition = "amqp:not-allowed"
    ErrorInvalidField          MessageErrorCondition = "amqp:invalid-field"
    ErrorNotImplemented        MessageErrorCondition = "amqp:not-implemented"
    ErrorResourceLocked        MessageErrorCondition = "amqp:resource-locked"
    ErrorPreconditionFailed    MessageErrorCondition = "amqp:precondition-failed"
    ErrorResourceDeleted       MessageErrorCondition = "amqp:resource-deleted"
    ErrorIllegalState          MessageErrorCondition = "amqp:illegal-state"
)

Error Conditions

type MessageIterator Uses

type MessageIterator interface {
    Done() bool
    Next(context.Context) (*Message, error)
}

MessageIterator offers a simple mechanism for iterating over a list of

Code:

subject := servicebus.AsMessageSliceIterator([]*servicebus.Message{
    servicebus.NewMessageFromString("hello"),
    servicebus.NewMessageFromString("world"),
})

for !subject.Done() {
    cursor, err := subject.Next(context.Background())
    if err != nil {
        fmt.Println(err)
        return
    }
    fmt.Println(string(cursor.Data))
}

Output:

hello
world

type MessageSession Uses

type MessageSession struct {
    *Receiver
    // contains filtered or unexported fields
}

MessageSession represents and allows for interaction with a Service Bus Session.

func (*MessageSession) Close Uses

func (ms *MessageSession) Close()

Close communicates that Handler receiving messages should no longer continue to be executed. This can happen when: - A Handler recognizes that no further messages will come to this session. - A Handler has given up on receiving more messages before a session. Future messages should be delegated to the next

available session client.

func (*MessageSession) ListSessions Uses

func (ms *MessageSession) ListSessions(ctx context.Context) ([]byte, error)

ListSessions will list all of the sessions available

func (*MessageSession) LockedUntil Uses

func (ms *MessageSession) LockedUntil() time.Time

LockedUntil fetches the moment in time when the Session lock held by this Receiver will expire.

func (*MessageSession) RenewLock Uses

func (ms *MessageSession) RenewLock(ctx context.Context) error

RenewLock requests that the Service Bus Server renews this client's lock on an existing Session.

func (*MessageSession) SessionID Uses

func (ms *MessageSession) SessionID() *string

SessionID gets the unique identifier of the session being interacted with by this MessageSession.

func (*MessageSession) SetState Uses

func (ms *MessageSession) SetState(ctx context.Context, state []byte) error

SetState updates the current State associated with this Session.

func (*MessageSession) State Uses

func (ms *MessageSession) State(ctx context.Context) ([]byte, error)

State retrieves the current State associated with this Session. https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-amqp-request-response#get-session-state

type MessageSliceIterator Uses

type MessageSliceIterator struct {
    Target []*Message
    Cursor int
}

MessageSliceIterator is a wrapper, which lets any slice of Message pointers be used as a MessageIterator.

func AsMessageSliceIterator Uses

func AsMessageSliceIterator(target []*Message) *MessageSliceIterator

AsMessageSliceIterator wraps a slice of Message pointers to allow it to be made into a MessageIterator.

func (MessageSliceIterator) Done Uses

func (ms MessageSliceIterator) Done() bool

Done communicates whether there are more messages remaining to be iterated over.

func (*MessageSliceIterator) Next Uses

func (ms *MessageSliceIterator) Next(_ context.Context) (*Message, error)

Next fetches the Message in the slice at a position one larger than the last one accessed.

type MessageStatus Uses

type MessageStatus dispositionStatus

MessageStatus defines an acceptable Message disposition status.

const (
    // Complete exposes completedDisposition
    Complete MessageStatus = MessageStatus(completedDisposition)
    // Abort exposes abandonedDisposition
    Abort MessageStatus = MessageStatus(abandonedDisposition)
)

type MiddlewareFunc Uses

type MiddlewareFunc func(next RestHandler) RestHandler

MiddlewareFunc allows a consumer of the entity manager to inject handlers within the request / response pipeline

The example below adds the atom xml content type to the request, calls the next middleware and returns the result.

addAtomXMLContentType MiddlewareFunc = func(next RestHandler) RestHandler {

	return func(ctx context.Context, req *http.Request) (res *http.Response, e error) {
		if req.Method != http.MethodGet && req.Method != http.MethodHead {
			req.Header.Add("content-Type", "application/atom+xml;type=entry;charset=utf-8")
		}
		return next(ctx, req)
	}
}

func TraceReqAndResponseMiddleware Uses

func TraceReqAndResponseMiddleware() MiddlewareFunc

TraceReqAndResponseMiddleware will print the dump of the management request and response.

This should only be used for debugging or educational purposes.

type Namespace Uses

type Namespace struct {
    Name          string
    Suffix        string
    TokenProvider auth.TokenProvider
    Environment   azure.Environment
    // contains filtered or unexported fields
}

Namespace provides a simplified facade over the AMQP implementation of Azure Service Bus and is the entry point for using Queues, Topics and Subscriptions

func NewNamespace Uses

func NewNamespace(opts ...NamespaceOption) (*Namespace, error)

NewNamespace creates a new namespace configured through NamespaceOption(s)

func (*Namespace) NewQueue Uses

func (ns *Namespace) NewQueue(name string, opts ...QueueOption) (*Queue, error)

NewQueue creates a new Queue Sender / Receiver

func (*Namespace) NewQueueManager Uses

func (ns *Namespace) NewQueueManager() *QueueManager

NewQueueManager creates a new QueueManager for a Service Bus Namespace

func (*Namespace) NewReceiver Uses

func (ns *Namespace) NewReceiver(ctx context.Context, entityPath string, opts ...ReceiverOption) (*Receiver, error)

NewReceiver creates a new Service Bus message listener given an AMQP client and an entity path

func (*Namespace) NewSender Uses

func (ns *Namespace) NewSender(ctx context.Context, entityPath string, opts ...SenderOption) (*Sender, error)

NewSender creates a new Service Bus message Sender given an AMQP client and entity path

func (*Namespace) NewSubscriptionManager Uses

func (ns *Namespace) NewSubscriptionManager(topicName string) (*SubscriptionManager, error)

NewSubscriptionManager creates a new SubscriptionManger for a Service Bus Namespace

func (*Namespace) NewTopic Uses

func (ns *Namespace) NewTopic(name string, opts ...TopicOption) (*Topic, error)

NewTopic creates a new Topic Sender

func (*Namespace) NewTopicManager Uses

func (ns *Namespace) NewTopicManager() *TopicManager

NewTopicManager creates a new TopicManager for a Service Bus Namespace

type NamespaceOption Uses

type NamespaceOption func(h *Namespace) error

NamespaceOption provides structure for configuring a new Service Bus namespace

func NamespaceWithConnectionString Uses

func NamespaceWithConnectionString(connStr string) NamespaceOption

NamespaceWithConnectionString configures a namespace with the information provided in a Service Bus connection string

func NamespaceWithTLSConfig Uses

func NamespaceWithTLSConfig(tlsConfig *tls.Config) NamespaceOption

NamespaceWithTLSConfig appends to the TLS config.

func NamespaceWithUserAgent Uses

func NamespaceWithUserAgent(userAgent string) NamespaceOption

NamespaceWithUserAgent appends to the root user-agent value.

func NamespaceWithWebSocket Uses

func NamespaceWithWebSocket() NamespaceOption

NamespaceWithWebSocket configures the namespace and all entities to use wss:// rather than amqps://

Code:

const queueName = "wssQueue"

connStr := os.Getenv("SERVICEBUS_CONNECTION_STRING")
if connStr == "" {
    fmt.Println("FATAL: expected environment variable SERVICEBUS_CONNECTION_STRING not set")
    return
}

// Create a Service Bus Namespace using a connection string over wss:// on port 443
ns, err := servicebus.NewNamespace(
    servicebus.NamespaceWithConnectionString(connStr),
    servicebus.NamespaceWithWebSocket(),
)
if err != nil {
    fmt.Println(err)
    return
}

// Create a context to limit how long we will try to send, then push the message over the wire.
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

qm := ns.NewQueueManager()
if _, err := ensureQueue(ctx, qm, queueName); err != nil {
    fmt.Println(err)
    return
}

client, err := ns.NewQueue(queueName)
if err != nil {
    fmt.Println(err)
    return
}

// Send a message to the queue
if err := client.Send(ctx, servicebus.NewMessageFromString("Hello World!!!")); err != nil {
    fmt.Println(err)
}

// Receive the message from the queue
if err := client.ReceiveOne(ctx, MessagePrinter{}); err != nil {
    fmt.Println(err)
}

Output:

Hello World!!!

type PeekOption Uses

type PeekOption func(*peekIterator) error

PeekOption allows customization of parameters when querying a Service Bus entity for messages without committing to processing them.

func PeekFromSequenceNumber Uses

func PeekFromSequenceNumber(seq int64) PeekOption

PeekFromSequenceNumber adds a filter to the Peek operation, so that no messages with a Sequence Number less than 'seq' are returned.

func PeekWithPageSize Uses

func PeekWithPageSize(pageSize int) PeekOption

PeekWithPageSize adjusts how many messages are fetched at once while peeking from the server.

type Queue Uses

type Queue struct {
    // contains filtered or unexported fields
}

Queue represents a Service Bus Queue entity, which offers First In, First Out (FIFO) message delivery to one or more competing consumers. That is, messages are typically expected to be received and processed by the receivers in the order in which they were added to the queue, and each message is received and processed by only one message consumer.

Code:

const queueName = "myqueue"

connStr := os.Getenv("SERVICEBUS_CONNECTION_STRING")
if connStr == "" {
    fmt.Println("FATAL: expected environment variable SERVICEBUS_CONNECTION_STRING not set")
    return
}

ns, err := servicebus.NewNamespace(servicebus.NamespaceWithConnectionString(connStr))
if err != nil {
    fmt.Println(err)
    return
}

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

qm := ns.NewQueueManager()
qe, err := qm.Get(ctx, queueName)
if err != nil && !servicebus.IsErrNotFound(err) {
    fmt.Println(err)
    return
}

if qe == nil {
    _, err := qm.Put(ctx, queueName)
    if err != nil {
        fmt.Println(err)
        return
    }
}

q, err := ns.NewQueue(queueName)
if err != nil {
    fmt.Println(err)
    return
}

fmt.Println(q.Name)

Output:

myqueue

Code:

ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute+40*time.Second)
defer cancel()

connStr := os.Getenv("SERVICEBUS_CONNECTION_STRING")
if connStr == "" {
    fmt.Println("FATAL: expected environment variable SERVICEBUS_CONNECTION_STRING not set")
    return
}

// Create a client to communicate with a Service Bus Namespace.
ns, err := servicebus.NewNamespace(servicebus.NamespaceWithConnectionString(connStr))
if err != nil {
    fmt.Println("FATAL: ", err)
    return
}

client, err := ns.NewQueue("schedulewithqueue")
if err != nil {
    fmt.Println("FATAL: ", err)
    return
}

// The delay that we should schedule a message for.
const waitTime = 1 * time.Minute
expectedTime := time.Now().Add(waitTime)
msg := servicebus.NewMessageFromString("to the future!!")

scheduled, err := client.ScheduleAt(ctx, expectedTime, msg)
if err != nil {
    fmt.Println("FATAL: ", err)
    return
}

err = client.CancelScheduled(ctx, scheduled...)
if err != nil {
    fmt.Println("FATAL: ", err)
    return
}

fmt.Println("All Messages Scheduled and Cancelled")

Output:

All Messages Scheduled and Cancelled

Code:

ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// Setup the required clients for communicating with Service Bus.                                                 //
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
connStr := os.Getenv("SERVICEBUS_CONNECTION_STRING")
if connStr == "" {
    fmt.Println("FATAL: expected environment variable SERVICEBUS_CONNECTION_STRING not set")
    return
}

ns, err := servicebus.NewNamespace(servicebus.NamespaceWithConnectionString(connStr))
if err != nil {
    fmt.Println("FATAL: ", err)
    return
}

client, err := ns.NewQueue("receivesession")
if err != nil {
    fmt.Println("FATAL: ", err)
    return
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// Publish five session's worth of data.                                                                          //
//                                                                                                                //
// The sessions are deliberately interleaved to demonstrate consumption semantics.                                //
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
const numSessions = 5
adjectives := []string{"Doltish", "Foolish", "Juvenile"}
nouns := []string{"Automaton", "Luddite", "Monkey", "Neanderthal"}

// seed chosen arbitrarily, see https://en.wikipedia.org/wiki/Taxicab_number
generator := rand.New(rand.NewSource(1729))

sessionIDs := make([]string, numSessions)

// Establish a set of sessions
for i := 0; i < numSessions; i++ {
    if rawSessionID, err := uuid.NewV4(); err == nil {
        sessionIDs[i] = rawSessionID.String()
    } else {
        fmt.Println("FATAL: ", err)
        return
    }
}

// Publish an adjective for each session
for i := 0; i < numSessions; i++ {
    adj := adjectives[generator.Intn(len(adjectives))]
    msg := servicebus.NewMessageFromString(adj)
    msg.SessionID = &sessionIDs[i]
    if err := client.Send(ctx, msg); err != nil {
        fmt.Println("FATAL: ", err)
        return
    }
}

// Publish a noun for each session
for i := 0; i < numSessions; i++ {
    noun := nouns[generator.Intn(len(nouns))]
    msg := servicebus.NewMessageFromString(noun)
    msg.SessionID = &sessionIDs[i]
    if err := client.Send(ctx, msg); err != nil {
        fmt.Println("FATAL: ", err)
        return
    }
}

// Publish a numeric suffix for each session
for i := 0; i < numSessions; i++ {
    suffix := fmt.Sprintf("%02d", generator.Intn(100))
    msg := servicebus.NewMessageFromString(suffix)
    msg.SessionID = &sessionIDs[i]
    if err := client.Send(ctx, msg); err != nil {
        fmt.Println("FATAL: ", err)
        return
    }
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// Receive and process the previously published sessions.                                                         //
//                                                                                                                //
// The order the sessions are received in is not guaranteed, so the expected output must be "Unordered output".   //
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
for i := 0; i < numSessions; i++ {
    handler := &SessionPrinter{}
    qs := client.NewSession(nil)
    if err := qs.ReceiveOne(ctx, handler); err != nil {
        fmt.Println("FATAL: ", err)
        return
    }
}

// Unordered output:
// FoolishMonkey63
// FoolishLuddite05
// JuvenileMonkey80
// JuvenileLuddite84
// FoolishLuddite68

Output:

FoolishMonkey63
FoolishLuddite05
JuvenileMonkey80
JuvenileLuddite84
FoolishLuddite68

func (*Queue) Close Uses

func (q *Queue) Close(ctx context.Context) error

Close the underlying connection to Service Bus

func (*Queue) NewDeadLetter Uses

func (q *Queue) NewDeadLetter() *DeadLetter

NewDeadLetter creates an entity that represents the dead letter sub queue of the queue

Azure Service Bus queues and topic subscriptions provide a secondary sub-queue, called a dead-letter queue (DLQ). The dead-letter queue does not need to be explicitly created and cannot be deleted or otherwise managed independent of the main entity.

The purpose of the dead-letter queue is to hold messages that cannot be delivered to any receiver, or messages that could not be processed. Messages can then be removed from the DLQ and inspected. An application might, with help of an operator, correct issues and resubmit the message, log the fact that there was an error, and take corrective action.

From an API and protocol perspective, the DLQ is mostly similar to any other queue, except that messages can only be submitted via the dead-letter operation of the parent entity. In addition, time-to-live is not observed, and you can't dead-letter a message from a DLQ. The dead-letter queue fully supports peek-lock delivery and transactional operations.

Note that there is no automatic cleanup of the DLQ. Messages remain in the DLQ until you explicitly retrieve them from the DLQ and call Complete() on the dead-letter message.

func (*Queue) NewDeadLetterReceiver Uses

func (q *Queue) NewDeadLetterReceiver(ctx context.Context, opts ...ReceiverOption) (ReceiveOner, error)

NewDeadLetterReceiver builds a receiver for the Queue's dead letter queue

func (*Queue) NewReceiver Uses

func (q *Queue) NewReceiver(ctx context.Context, opts ...ReceiverOption) (*Receiver, error)

NewReceiver will create a new Receiver for receiving messages off of a queue

func (*Queue) NewSender Uses

func (q *Queue) NewSender(ctx context.Context, opts ...SenderOption) (*Sender, error)

NewSender will create a new Sender for sending messages to the queue

func (*Queue) NewSession Uses

func (q *Queue) NewSession(sessionID *string) *QueueSession

NewSession will create a new session based receiver and sender for the queue

Microsoft Azure Service Bus sessions enable joint and ordered handling of unbounded sequences of related messages. To realize a FIFO guarantee in Service Bus, use Sessions. Service Bus is not prescriptive about the nature of the relationship between the messages, and also does not define a particular model for determining where a message sequence starts or ends.

func (*Queue) NewTransferDeadLetter Uses

func (q *Queue) NewTransferDeadLetter() *TransferDeadLetter

NewTransferDeadLetter creates an entity that represents the transfer dead letter sub queue of the queue

Messages will be sent to the transfer dead-letter queue under the following conditions:

- A message passes through more than 3 queues or topics that are chained together.
- The destination queue or topic is disabled or deleted.
- The destination queue or topic exceeds the maximum entity size.

func (*Queue) NewTransferDeadLetterReceiver Uses

func (q *Queue) NewTransferDeadLetterReceiver(ctx context.Context, opts ...ReceiverOption) (ReceiveOner, error)

NewTransferDeadLetterReceiver builds a receiver for the Queue's transfer dead letter queue

Messages will be sent to the transfer dead-letter queue under the following conditions:

- A message passes through more than 3 queues or topics that are chained together.
- The destination queue or topic is disabled or deleted.
- The destination queue or topic exceeds the maximum entity size.

func (*Queue) Receive Uses

func (q *Queue) Receive(ctx context.Context, handler Handler) error

Receive subscribes for messages sent to the Queue. If the messages not within a session, messages will arrive unordered.

Handler must call a disposition action such as Complete, Abandon, Deadletter on the message. If the messages does not have a disposition set, the Queue's DefaultDisposition will be used.

If the handler returns an error, the receive loop will be terminated.

Code:

// Define a function that should be executed when a message is received.
var printMessage servicebus.HandlerFunc = func(ctx context.Context, msg *servicebus.Message) error {
    fmt.Println(string(msg.Data))
    return msg.Complete(ctx)
}

// Instantiate the clients needed to communicate with a Service Bus Queue.
ns, err := servicebus.NewNamespace(servicebus.NamespaceWithConnectionString("<your connection string here>"))
if err != nil {
    return
}

client, err := ns.NewQueue("myqueue")
if err != nil {
    return
}

// Define a context to limit how long we will block to receive messages, then start serving our function.
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute)
defer cancel()

if err := client.Receive(ctx, printMessage); err != nil {
    fmt.Println("FATAL: ", err)
}

Code:

// Set concurrent number
const concurrentNum = 5
// Define msg chan
msgChan := make(chan *servicebus.Message, concurrentNum)
// Define a function that should be executed when a message is received.
var concurrentHandler servicebus.HandlerFunc = func(ctx context.Context, msg *servicebus.Message) error {
    msgChan <- msg
    return nil
}

// Define msg workers
for i := 0; i < concurrentNum; i++ {
    go func() {
        for msg := range msgChan {

            ctx, cancel := context.WithTimeout(context.Background(), 30*time.Millisecond)
            defer cancel()

            fmt.Println(string(msg.Data))
            msg.Complete(ctx)
        }
    }()
}

// Instantiate the clients needed to communicate with a Service Bus Queue.
ns, err := servicebus.NewNamespace(servicebus.NamespaceWithConnectionString("<your connection string here>"))
if err != nil {
    close(msgChan)
    return
}

// Init queue client with prefetch count
client, err := ns.NewQueue("myqueue", servicebus.QueueWithPrefetchCount(concurrentNum))
if err != nil {
    close(msgChan)
    return
}

// Define a context to limit how long we will block to receive messages, then start serving our function.
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute)
defer cancel()

if err := client.Receive(ctx, concurrentHandler); err != nil {
    fmt.Println("FATAL: ", err)
}
// Close the message chan
close(msgChan)

func (*Queue) ReceiveOne Uses

func (q *Queue) ReceiveOne(ctx context.Context, handler Handler) error

ReceiveOne will listen to receive a single message. ReceiveOne will only wait as long as the context allows.

Handler must call a disposition action such as Complete, Abandon, Deadletter on the message. If the messages does not have a disposition set, the Queue's DefaultDisposition will be used.

func (*Queue) Send Uses

func (q *Queue) Send(ctx context.Context, msg *Message) error

Send sends messages to the Queue

Code:

// Instantiate the clients needed to communicate with a Service Bus Queue.
ns, err := servicebus.NewNamespace(servicebus.NamespaceWithConnectionString("<your connection string here>"))
if err != nil {
    return
}

client, err := ns.NewQueue("myqueue")
if err != nil {
    return
}

// Create a context to limit how long we will try to send, then push the message over the wire.
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

if err := client.Send(ctx, servicebus.NewMessageFromString("Hello World!!!")); err != nil {
    fmt.Println("FATAL: ", err)
}

func (*Queue) SendBatch Uses

func (q *Queue) SendBatch(ctx context.Context, iterator BatchIterator) error

SendBatch sends a batch of messages to the Queue

type QueueDescription Uses

type QueueDescription struct {
    XMLName xml.Name `xml:"QueueDescription"`
    BaseEntityDescription
    LockDuration                        *string       `xml:"LockDuration,omitempty"`               // LockDuration - ISO 8601 timespan duration of a peek-lock; that is, the amount of time that the message is locked for other receivers. The maximum value for LockDuration is 5 minutes; the default value is 1 minute.
    MaxSizeInMegabytes                  *int32        `xml:"MaxSizeInMegabytes,omitempty"`         // MaxSizeInMegabytes - The maximum size of the queue in megabytes, which is the size of memory allocated for the queue. Default is 1024.
    RequiresDuplicateDetection          *bool         `xml:"RequiresDuplicateDetection,omitempty"` // RequiresDuplicateDetection - A value indicating if this queue requires duplicate detection.
    RequiresSession                     *bool         `xml:"RequiresSession,omitempty"`
    DefaultMessageTimeToLive            *string       `xml:"DefaultMessageTimeToLive,omitempty"`            // DefaultMessageTimeToLive - ISO 8601 default message timespan to live value. This is the duration after which the message expires, starting from when the message is sent to Service Bus. This is the default value used when TimeToLive is not set on a message itself.
    DeadLetteringOnMessageExpiration    *bool         `xml:"DeadLetteringOnMessageExpiration,omitempty"`    // DeadLetteringOnMessageExpiration - A value that indicates whether this queue has dead letter support when a message expires.
    DuplicateDetectionHistoryTimeWindow *string       `xml:"DuplicateDetectionHistoryTimeWindow,omitempty"` // DuplicateDetectionHistoryTimeWindow - ISO 8601 timeSpan structure that defines the duration of the duplicate detection history. The default value is 10 minutes.
    MaxDeliveryCount                    *int32        `xml:"MaxDeliveryCount,omitempty"`                    // MaxDeliveryCount - The maximum delivery count. A message is automatically deadlettered after this number of deliveries. default value is 10.
    EnableBatchedOperations             *bool         `xml:"EnableBatchedOperations,omitempty"`             // EnableBatchedOperations - Value that indicates whether server-side batched operations are enabled.
    SizeInBytes                         *int64        `xml:"SizeInBytes,omitempty"`                         // SizeInBytes - The size of the queue, in bytes.
    MessageCount                        *int64        `xml:"MessageCount,omitempty"`                        // MessageCount - The number of messages in the queue.
    IsAnonymousAccessible               *bool         `xml:"IsAnonymousAccessible,omitempty"`
    Status                              *EntityStatus `xml:"Status,omitempty"`
    CreatedAt                           *date.Time    `xml:"CreatedAt,omitempty"`
    UpdatedAt                           *date.Time    `xml:"UpdatedAt,omitempty"`
    SupportOrdering                     *bool         `xml:"SupportOrdering,omitempty"`
    AutoDeleteOnIdle                    *string       `xml:"AutoDeleteOnIdle,omitempty"`
    EnablePartitioning                  *bool         `xml:"EnablePartitioning,omitempty"`
    EnableExpress                       *bool         `xml:"EnableExpress,omitempty"`
    CountDetails                        *CountDetails `xml:"CountDetails,omitempty"`
    ForwardTo                           *string       `xml:"ForwardTo,omitempty"`
    ForwardDeadLetteredMessagesTo       *string       `xml:"ForwardDeadLetteredMessagesTo,omitempty"` // ForwardDeadLetteredMessagesTo - absolute URI of the entity to forward dead letter messages
}

QueueDescription is the content type for Queue management requests

type QueueEntity Uses

type QueueEntity struct {
    *QueueDescription
    *Entity
}

QueueEntity is the Azure Service Bus description of a Queue for management activities

type QueueManagementOption Uses

type QueueManagementOption func(*QueueDescription) error

QueueManagementOption represents named configuration options for queue mutation

func QueueEntityWithAutoDeleteOnIdle Uses

func QueueEntityWithAutoDeleteOnIdle(window *time.Duration) QueueManagementOption

QueueEntityWithAutoDeleteOnIdle configures the queue to automatically delete after the specified idle interval. The minimum duration is 5 minutes.

func QueueEntityWithAutoForward Uses

func QueueEntityWithAutoForward(target Targetable) QueueManagementOption

QueueEntityWithAutoForward configures the queue to automatically forward messages to the specified target.

The ability to AutoForward to a target requires the connection have management authorization. If the connection string or Azure Active Directory identity used does not have management authorization, an unauthorized error will be returned on the PUT.

func QueueEntityWithDeadLetteringOnMessageExpiration Uses

func QueueEntityWithDeadLetteringOnMessageExpiration() QueueManagementOption

QueueEntityWithDeadLetteringOnMessageExpiration will ensure the queue sends expired messages to the dead letter queue

func QueueEntityWithDuplicateDetection Uses

func QueueEntityWithDuplicateDetection(window *time.Duration) QueueManagementOption

QueueEntityWithDuplicateDetection configures the queue to detect duplicates for a given time window. If window is not specified, then it uses the default of 10 minutes.

func QueueEntityWithForwardDeadLetteredMessagesTo Uses

func QueueEntityWithForwardDeadLetteredMessagesTo(target Targetable) QueueManagementOption

QueueEntityWithForwardDeadLetteredMessagesTo configures the queue to automatically forward dead letter messages to the specified target.

The ability to forward dead letter messages to a target requires the connection have management authorization. If the connection string or Azure Active Directory identity used does not have management authorization, an unauthorized error will be returned on the PUT.

func QueueEntityWithLockDuration Uses

func QueueEntityWithLockDuration(window *time.Duration) QueueManagementOption

QueueEntityWithLockDuration configures the queue to have a duration of a peek-lock; that is, the amount of time that the message is locked for other receivers. The maximum value for LockDuration is 5 minutes; the default value is 1 minute.

func QueueEntityWithMaxDeliveryCount Uses

func QueueEntityWithMaxDeliveryCount(count int32) QueueManagementOption

QueueEntityWithMaxDeliveryCount configures the queue to have a maximum number of delivery attempts before dead-lettering the message

func QueueEntityWithMaxSizeInMegabytes Uses

func QueueEntityWithMaxSizeInMegabytes(size int) QueueManagementOption

QueueEntityWithMaxSizeInMegabytes configures the maximum size of the queue in megabytes (1 * 1024 - 5 * 1024), which is the size of the memory allocated for the queue. Default is 1 MB (1 * 1024).

size must be between 1024 and 5 * 1024 for the Standard sku and up to 80 * 1024 for Premium sku

func QueueEntityWithMessageTimeToLive Uses

func QueueEntityWithMessageTimeToLive(window *time.Duration) QueueManagementOption

QueueEntityWithMessageTimeToLive configures the queue to set a time to live on messages. This is the duration after which the message expires, starting from when the message is sent to Service Bus. This is the default value used when TimeToLive is not set on a message itself. If nil, defaults to 14 days.

func QueueEntityWithPartitioning Uses

func QueueEntityWithPartitioning() QueueManagementOption

QueueEntityWithPartitioning ensure the created queue will be a partitioned queue. Partitioned queues offer increased storage and availability compared to non-partitioned queues with the trade-off of requiring the following to ensure FIFO message retrieval:

SessionId. If a message has the SessionId property set, then Service Bus uses the SessionId property as the partition key. This way, all messages that belong to the same session are assigned to the same fragment and handled by the same message broker. This allows Service Bus to guarantee message ordering as well as the consistency of session states.

PartitionKey. If a message has the PartitionKey property set but not the SessionId property, then Service Bus uses the PartitionKey property as the partition key. Use the PartitionKey property to send non-sessionful transactional messages. The partition key ensures that all messages that are sent within a transaction are handled by the same messaging broker.

MessageId. If the queue has the RequiresDuplicationDetection property set to true, then the MessageId property serves as the partition key if the SessionId or a PartitionKey properties are not set. This ensures that all copies of the same message are handled by the same message broker and, thus, allows Service Bus to detect and eliminate duplicate messages

func QueueEntityWithRequiredSessions Uses

func QueueEntityWithRequiredSessions() QueueManagementOption

QueueEntityWithRequiredSessions will ensure the queue requires senders and receivers to have sessionIDs

type QueueManager Uses

type QueueManager struct {
    // contains filtered or unexported fields
}

QueueManager provides CRUD functionality for Service Bus Queues

func (*QueueManager) Delete Uses

func (qm *QueueManager) Delete(ctx context.Context, name string) error

Delete deletes a Service Bus Queue entity by name

func (QueueManager) Execute Uses

func (em QueueManager) Execute(ctx context.Context, method string, entityPath string, body io.Reader, mw ...MiddlewareFunc) (*http.Response, error)

func (*QueueManager) Get Uses

func (qm *QueueManager) Get(ctx context.Context, name string) (*QueueEntity, error)

Get fetches a Service Bus Queue entity by name

func (*QueueManager) List Uses

func (qm *QueueManager) List(ctx context.Context) ([]*QueueEntity, error)

List fetches all of the queues for a Service Bus Namespace

func (QueueManager) Post Uses

func (em QueueManager) Post(ctx context.Context, entityPath string, body []byte, mw ...MiddlewareFunc) (*http.Response, error)

Post performs an HTTP POST for a given entity path and body

func (*QueueManager) Put Uses

func (qm *QueueManager) Put(ctx context.Context, name string, opts ...QueueManagementOption) (*QueueEntity, error)

Put creates or updates a Service Bus Queue

func (QueueManager) TokenProvider Uses

func (em QueueManager) TokenProvider() auth.TokenProvider

TokenProvider generates authorization tokens for communicating with the Service Bus management API

func (QueueManager) Use Uses

func (em QueueManager) Use(mw ...MiddlewareFunc)

Use adds middleware to the middleware mwStack

type QueueOption Uses

type QueueOption func(*Queue) error

QueueOption represents named options for assisting Queue message handling

func QueueWithPrefetchCount Uses

func QueueWithPrefetchCount(prefetch uint32) QueueOption

QueueWithPrefetchCount configures the queue to attempt to fetch the number of messages specified by the prefetch count at one time.

The default is 1 message at a time.

Caution: Using PeekLock, messages have a set lock timeout, which can be renewed. By setting a high prefetch count, a local queue of messages could build up and cause message locks to expire before the message lands in the handler. If this happens, the message disposition will fail and will be re-queued and processed again.

func QueueWithReceiveAndDelete Uses

func QueueWithReceiveAndDelete() QueueOption

QueueWithReceiveAndDelete configures a queue to pop and delete messages off of the queue upon receiving the message. This differs from the default, PeekLock, where PeekLock receives a message, locks it for a period of time, then sends a disposition to the broker when the message has been processed.

type QueueSession Uses

type QueueSession struct {
    // contains filtered or unexported fields
}

QueueSession wraps Service Bus session functionality over a Queue

func NewQueueSession Uses

func NewQueueSession(builder SendAndReceiveBuilder, sessionID *string) *QueueSession

NewQueueSession creates a new session sender and receiver to communicate with a Service Bus queue.

Microsoft Azure Service Bus sessions enable joint and ordered handling of unbounded sequences of related messages. To realize a FIFO guarantee in Service Bus, use Sessions. Service Bus is not prescriptive about the nature of the relationship between the messages, and also does not define a particular model for determining where a message sequence starts or ends.

func (*QueueSession) Close Uses

func (qs *QueueSession) Close(ctx context.Context) error

Close the underlying connection to Service Bus

func (*QueueSession) ManagementPath Uses

func (qs *QueueSession) ManagementPath() string

ManagementPath provides an addressable path to the Entity management endpoint

func (*QueueSession) ReceiveDeferred Uses

func (qs *QueueSession) ReceiveDeferred(ctx context.Context, handler Handler, mode ReceiveMode, sequenceNumbers ...int64) error

ReceiveDeferred will receive and handle a set of deferred messages

When a queue or subscription client receives a message that it is willing to process, but for which processing is not currently possible due to special circumstances inside of the application, it has the option of "deferring" retrieval of the message to a later point. The message remains in the queue or subscription, but it is set aside.

Deferral is a feature specifically created for workflow processing scenarios. Workflow frameworks may require certain operations to be processed in a particular order, and may have to postpone processing of some received messages until prescribed prior work that is informed by other messages has been completed.

A simple illustrative example is an order processing sequence in which a payment notification from an external payment provider appears in a system before the matching purchase order has been propagated from the store front to the fulfillment system. In that case, the fulfillment system might defer processing the payment notification until there is an order with which to associate it. In rendezvous scenarios, where messages from different sources drive a workflow forward, the real-time execution order may indeed be correct, but the messages reflecting the outcomes may arrive out of order.

Ultimately, deferral aids in reordering messages from the arrival order into an order in which they can be processed, while leaving those messages safely in the message store for which processing needs to be postponed.

func (*QueueSession) ReceiveOne Uses

func (qs *QueueSession) ReceiveOne(ctx context.Context, handler SessionHandler) error

ReceiveOne waits for the lock on a particular session to become available, takes it, then process the session. The session can contain multiple messages. ReceiveOne will receive all messages within that session.

Handler must call a disposition action such as Complete, Abandon, Deadletter on the message. If the messages does not have a disposition set, the Queue's DefaultDisposition will be used.

If the handler returns an error, the receive loop will be terminated.

func (*QueueSession) Send Uses

func (qs *QueueSession) Send(ctx context.Context, msg *Message) error

Send the message to the queue within a session

func (*QueueSession) SessionID Uses

func (qs *QueueSession) SessionID() *string

SessionID is the identifier for the Service Bus session

type ReceiveBuilder Uses

type ReceiveBuilder interface {
    ReceiverBuilder
    // contains filtered or unexported methods
}

ReceiveBuilder is a ReceiverBuilder and EntityManagementAddresser

type ReceiveMode Uses

type ReceiveMode int

ReceiveMode represents the behavior when consuming a message from a queue

type ReceiveOner Uses

type ReceiveOner interface {
    Closer
    ReceiveOne(ctx context.Context, handler Handler) error
}

ReceiveOner provides the ability to receive and handle events

type Receiver Uses

type Receiver struct {
    Name string

    DefaultDisposition DispositionAction
    Closed             bool
    // contains filtered or unexported fields
}

Receiver provides connection, session and link handling for a receiving to an entity path

func (*Receiver) Close Uses

func (r *Receiver) Close(ctx context.Context) error

Close will close the AMQP session and link of the Receiver

func (*Receiver) Listen Uses

func (r *Receiver) Listen(ctx context.Context, handler Handler) *ListenerHandle

Listen start a listener for messages sent to the entity path

func (*Receiver) ReceiveOne Uses

func (r *Receiver) ReceiveOne(ctx context.Context, handler Handler) error

ReceiveOne will receive one message from the link

func (*Receiver) Recover Uses

func (r *Receiver) Recover(ctx context.Context) error

Recover will attempt to close the current session and link, then rebuild them

type ReceiverBuilder Uses

type ReceiverBuilder interface {
    NewReceiver(ctx context.Context, opts ...ReceiverOption) (*Receiver, error)
}

ReceiverBuilder describes the ability of an entity to build receiver links

type ReceiverOption Uses

type ReceiverOption func(receiver *Receiver) error

ReceiverOption provides a structure for configuring receivers

func ReceiverWithPrefetchCount Uses

func ReceiverWithPrefetchCount(prefetch uint32) ReceiverOption

ReceiverWithPrefetchCount configures the receiver to attempt to fetch the number of messages specified by the prefect at one time.

The default is 1 message at a time.

Caution: Using PeekLock, messages have a set lock timeout, which can be renewed. By setting a high prefetch count, a local queue of messages could build up and cause message locks to expire before the message lands in the handler. If this happens, the message disposition will fail and will be re-queued and processed again.

func ReceiverWithReceiveMode Uses

func ReceiverWithReceiveMode(mode ReceiveMode) ReceiverOption

ReceiverWithReceiveMode configures a Receiver to use the specified receive mode

func ReceiverWithSession Uses

func ReceiverWithSession(sessionID *string) ReceiverOption

ReceiverWithSession configures a Receiver to use a session

type RestHandler Uses

type RestHandler func(ctx context.Context, req *http.Request) (*http.Response, error)

RestHandler is used to transform a request and response within the http pipeline

type RuleDescription Uses

type RuleDescription struct {
    XMLName xml.Name `xml:"RuleDescription"`
    BaseEntityDescription
    CreatedAt *date.Time         `xml:"CreatedAt,omitempty"`
    Filter    FilterDescription  `xml:"Filter"`
    Action    *ActionDescription `xml:"Action,omitempty"`
}

RuleDescription is the content type for Subscription Rule management requests

type RuleEntity Uses

type RuleEntity struct {
    *RuleDescription
    *Entity
}

RuleEntity is the Azure Service Bus description of a Subscription Rule for management activities

type SQLAction Uses

type SQLAction struct {
    Expression string
}

SQLAction represents a SQL language-based action expression that is evaluated against a BrokeredMessage. A SQLAction supports a subset of the SQL-92 standard.

With SQL filter conditions, you can define an action that can annotate the message by adding, removing, or replacing properties and their values. The action uses a SQL-like expression that loosely leans on the SQL UPDATE statement syntax. The action is performed on the message after it has been matched and before the message is selected into the subscription. The changes to the message properties are private to the message copied into the subscription.

see: https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-messaging-sql-filter

func (SQLAction) ToActionDescription Uses

func (sf SQLAction) ToActionDescription() ActionDescription

ToActionDescription will transform the SqlAction into a ActionDescription

type SQLFilter Uses

type SQLFilter struct {
    Expression string
}

SQLFilter represents a SQL language-based filter expression that is evaluated against a BrokeredMessage. A SQLFilter supports a subset of the SQL-92 standard.

see: https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-messaging-sql-filter

func (SQLFilter) ToFilterDescription Uses

func (sf SQLFilter) ToFilterDescription() FilterDescription

ToFilterDescription will transform the SqlFilter into a FilterDescription

type SendAndReceiveBuilder Uses

type SendAndReceiveBuilder interface {
    ReceiveBuilder
    SenderBuilder
}

SendAndReceiveBuilder is a ReceiverBuilder, SenderBuilder and EntityManagementAddresser

type SendOption Uses

type SendOption func(event *Message) error

SendOption provides a way to customize a message on sending

type Sender Uses

type Sender struct {
    Name string
    // contains filtered or unexported fields
}

Sender provides connection, session and link handling for an sending to an entity path

func (*Sender) Close Uses

func (s *Sender) Close(ctx context.Context) error

Close will close the AMQP connection, session and link of the Sender

func (*Sender) Recover Uses

func (s *Sender) Recover(ctx context.Context) error

Recover will attempt to close the current session and link, then rebuild them

func (*Sender) Send Uses

func (s *Sender) Send(ctx context.Context, msg *Message, opts ...SendOption) error

Send will send a message to the entity path with options

This will retry sending the message if the server responds with a busy error.

func (*Sender) String Uses

func (s *Sender) String() string

type SenderBuilder Uses

type SenderBuilder interface {
    NewSender(ctx context.Context, opts ...SenderOption) (*Sender, error)
}

SenderBuilder describes the ability of an entity to build sender links

type SenderOption Uses

type SenderOption func(*Sender) error

SenderOption provides a way to customize a Sender

func SenderWithSession Uses

func SenderWithSession(sessionID *string) SenderOption

SenderWithSession configures the message to send with a specific session and sequence. By default, a Sender has a default session (uuid.NewV4()) and sequence generator.

type SessionHandler Uses

type SessionHandler interface {
    Handler

    // Start is called when a Receiver is informed that has acquired a lock on a Service Bus Session.
    Start(*MessageSession) error

    // End is called when a Receiver is informed that the last message of a Session has been passed to it.
    End()
}

SessionHandler exposes a manner of handling a group of messages together. Instances of SessionHandler should be passed to a Receiver such as a Queue or Subscription.

func NewSessionHandler Uses

func NewSessionHandler(base Handler, start func(*MessageSession) error, end func()) SessionHandler

NewSessionHandler is a type converter that allows three funcs to be tied together into a type that fulfills the SessionHandler interface.

type Subscription Uses

type Subscription struct {
    Topic *Topic
    // contains filtered or unexported fields
}

Subscription represents a Service Bus Subscription entity which are used to receive topic messages. A topic subscription resembles a virtual queue that receives copies of the messages that are sent to the topic. Messages are received from a subscription identically to the way they are received from a queue.

func (*Subscription) Close Uses

func (s *Subscription) Close(ctx context.Context) error

Close the underlying connection to Service Bus

func (*Subscription) NewDeadLetter Uses

func (s *Subscription) NewDeadLetter() *DeadLetter

NewDeadLetter creates an entity that represents the dead letter sub queue of the queue

Azure Service Bus queues and topic subscriptions provide a secondary sub-queue, called a dead-letter queue (DLQ). The dead-letter queue does not need to be explicitly created and cannot be deleted or otherwise managed independent of the main entity.

The purpose of the dead-letter queue is to hold messages that cannot be delivered to any receiver, or messages that could not be processed. Messages can then be removed from the DLQ and inspected. An application might, with help of an operator, correct issues and resubmit the message, log the fact that there was an error, and take corrective action.

From an API and protocol perspective, the DLQ is mostly similar to any other queue, except that messages can only be submitted via the dead-letter operation of the parent entity. In addition, time-to-live is not observed, and you can't dead-letter a message from a DLQ. The dead-letter queue fully supports peek-lock delivery and transactional operations.

Note that there is no automatic cleanup of the DLQ. Messages remain in the DLQ until you explicitly retrieve them from the DLQ and call Complete() on the dead-letter message.

func (*Subscription) NewDeadLetterReceiver Uses

func (s *Subscription) NewDeadLetterReceiver(ctx context.Context, opts ...ReceiverOption) (ReceiveOner, error)

NewDeadLetterReceiver builds a receiver for the Subscriptions's dead letter queue

func (*Subscription) NewReceiver Uses

func (s *Subscription) NewReceiver(ctx context.Context, opts ...ReceiverOption) (*Receiver, error)

NewReceiver will create a new Receiver for receiving messages off of the queue

func (*Subscription) NewSession Uses

func (s *Subscription) NewSession(sessionID *string) *SubscriptionSession

NewSession will create a new session based receiver for the subscription

Microsoft Azure Service Bus sessions enable joint and ordered handling of unbounded sequences of related messages. To realize a FIFO guarantee in Service Bus, use Sessions. Service Bus is not prescriptive about the nature of the relationship between the messages, and also does not define a particular model for determining where a message sequence starts or ends.

func (*Subscription) NewTransferDeadLetter Uses

func (s *Subscription) NewTransferDeadLetter() *TransferDeadLetter

NewTransferDeadLetter creates an entity that represents the transfer dead letter sub queue of the subscription

Messages will be sent to the transfer dead-letter queue under the following conditions:

- A message passes through more than 3 queues or topics that are chained together.
- The destination queue or topic is disabled or deleted.
- The destination queue or topic exceeds the maximum entity size.

func (*Subscription) NewTransferDeadLetterReceiver Uses

func (s *Subscription) NewTransferDeadLetterReceiver(ctx context.Context, opts ...ReceiverOption) (ReceiveOner, error)

NewTransferDeadLetterReceiver builds a receiver for the Queue's transfer dead letter queue

Messages will be sent to the transfer dead-letter queue under the following conditions:

- A message passes through more than 3 queues or topics that are chained together.
- The destination queue or topic is disabled or deleted.
- The destination queue or topic exceeds the maximum entity size.

func (Subscription) Peek Uses

func (re Subscription) Peek(ctx context.Context, options ...PeekOption) (MessageIterator, error)

Peek fetches a list of Messages from the Service Bus broker without acquiring a lock or committing to a disposition. The messages are delivered as close to sequence order as possible.

The MessageIterator that is returned has the following properties: - Messages are fetches from the server in pages. Page size is configurable with PeekOptions. - The MessageIterator will always return "false" for Done(). - When Next() is called, it will return either: a slice of messages and no error, nil with an error related to being unable to complete the operation, or an empty slice of messages and an instance of "ErrNoMessages" signifying that there are currently no messages in the queue with a sequence ID larger than previously viewed ones.

func (Subscription) PeekOne Uses

func (re Subscription) PeekOne(ctx context.Context, options ...PeekOption) (*Message, error)

PeekOne fetches a single Message from the Service Bus broker without acquiring a lock or committing to a disposition.

func (*Subscription) Receive Uses

func (s *Subscription) Receive(ctx context.Context, handler Handler) error

Receive subscribes for messages sent to the Subscription

Handler must call a disposition action such as Complete, Abandon, Deadletter on the message. If the messages does not have a disposition set, the Queue's DefaultDisposition will be used.

If the handler returns an error, the receive loop will be terminated.

func (Subscription) ReceiveDeferred Uses

func (re Subscription) ReceiveDeferred(ctx context.Context, handler Handler, sequenceNumbers ...int64) error

ReceiveDeferred will receive and handle a set of deferred messages

When a queue or subscription client receives a message that it is willing to process, but for which processing is not currently possible due to special circumstances inside of the application, it has the option of "deferring" retrieval of the message to a later point. The message remains in the queue or subscription, but it is set aside.

Deferral is a feature specifically created for workflow processing scenarios. Workflow frameworks may require certain operations to be processed in a particular order, and may have to postpone processing of some received messages until prescribed prior work that is informed by other messages has been completed.

A simple illustrative example is an order processing sequence in which a payment notification from an external payment provider appears in a system before the matching purchase order has been propagated from the store front to the fulfillment system. In that case, the fulfillment system might defer processing the payment notification until there is an order with which to associate it. In rendezvous scenarios, where messages from different sources drive a workflow forward, the real-time execution order may indeed be correct, but the messages reflecting the outcomes may arrive out of order.

Ultimately, deferral aids in reordering messages from the arrival order into an order in which they can be processed, while leaving those messages safely in the message store for which processing needs to be postponed.

func (Subscription) ReceiveDeferredWithMode Uses

func (re Subscription) ReceiveDeferredWithMode(ctx context.Context, handler Handler, mode ReceiveMode, sequenceNumbers ...int64) error

ReceiveDeferredWithMode will receive and handle a set of deferred messages

When a queue or subscription client receives a message that it is willing to process, but for which processing is not currently possible due to special circumstances inside of the application, it has the option of "deferring" retrieval of the message to a later point. The message remains in the queue or subscription, but it is set aside.

Deferral is a feature specifically created for workflow processing scenarios. Workflow frameworks may require certain operations to be processed in a particular order, and may have to postpone processing of some received messages until prescribed prior work that is informed by other messages has been completed.

A simple illustrative example is an order processing sequence in which a payment notification from an external payment provider appears in a system before the matching purchase order has been propagated from the store front to the fulfillment system. In that case, the fulfillment system might defer processing the payment notification until there is an order with which to associate it. In rendezvous scenarios, where messages from different sources drive a workflow forward, the real-time execution order may indeed be correct, but the messages reflecting the outcomes may arrive out of order.

Ultimately, deferral aids in reordering messages from the arrival order into an order in which they can be processed, while leaving those messages safely in the message store for which processing needs to be postponed.

func (*Subscription) ReceiveOne Uses

func (s *Subscription) ReceiveOne(ctx context.Context, handler Handler) error

ReceiveOne will listen to receive a single message. ReceiveOne will only wait as long as the context allows.

Handler must call a disposition action such as Complete, Abandon, Deadletter on the message. If the messages does not have a disposition set, the Queue's DefaultDisposition will be used.

func (Subscription) RenewLocks Uses

func (re Subscription) RenewLocks(ctx context.Context, messages ...*Message) error

RenewLocks renews the locks on messages provided

func (Subscription) SendBatchDisposition Uses

func (re Subscription) SendBatchDisposition(ctx context.Context, iterator BatchDispositionIterator) error

SendBatchDisposition updates the LockTokenIDs to the disposition status.

type SubscriptionDescription Uses

type SubscriptionDescription struct {
    XMLName xml.Name `xml:"SubscriptionDescription"`
    BaseEntityDescription
    LockDuration                              *string       `xml:"LockDuration,omitempty"` // LockDuration - ISO 8601 timespan duration of a peek-lock; that is, the amount of time that the message is locked for other receivers. The maximum value for LockDuration is 5 minutes; the default value is 1 minute.
    RequiresSession                           *bool         `xml:"RequiresSession,omitempty"`
    DefaultMessageTimeToLive                  *string       `xml:"DefaultMessageTimeToLive,omitempty"`         // DefaultMessageTimeToLive - ISO 8601 default message timespan to live value. This is the duration after which the message expires, starting from when the message is sent to Service Bus. This is the default value used when TimeToLive is not set on a message itself.
    DeadLetteringOnMessageExpiration          *bool         `xml:"DeadLetteringOnMessageExpiration,omitempty"` // DeadLetteringOnMessageExpiration - A value that indicates whether this queue has dead letter support when a message expires.
    DeadLetteringOnFilterEvaluationExceptions *bool         `xml:"DeadLetteringOnFilterEvaluationExceptions,omitempty"`
    MessageCount                              *int64        `xml:"MessageCount,omitempty"`            // MessageCount - The number of messages in the queue.
    MaxDeliveryCount                          *int32        `xml:"MaxDeliveryCount,omitempty"`        // MaxDeliveryCount - The maximum delivery count. A message is automatically deadlettered after this number of deliveries. default value is 10.
    EnableBatchedOperations                   *bool         `xml:"EnableBatchedOperations,omitempty"` // EnableBatchedOperations - Value that indicates whether server-side batched operations are enabled.
    Status                                    *EntityStatus `xml:"Status,omitempty"`
    CreatedAt                                 *date.Time    `xml:"CreatedAt,omitempty"`
    UpdatedAt                                 *date.Time    `xml:"UpdatedAt,omitempty"`
    AccessedAt                                *date.Time    `xml:"AccessedAt,omitempty"`
    AutoDeleteOnIdle                          *string       `xml:"AutoDeleteOnIdle,omitempty"`
    ForwardTo                                 *string       `xml:"ForwardTo,omitempty"`                     // ForwardTo - absolute URI of the entity to forward messages
    ForwardDeadLetteredMessagesTo             *string       `xml:"ForwardDeadLetteredMessagesTo,omitempty"` // ForwardDeadLetteredMessagesTo - absolute URI of the entity to forward dead letter messages
    CountDetails                              *CountDetails `xml:"CountDetails,omitempty"`
}

SubscriptionDescription is the content type for Subscription management requests

type SubscriptionEntity Uses

type SubscriptionEntity struct {
    *SubscriptionDescription
    *Entity
}

SubscriptionEntity is the Azure Service Bus description of a topic Subscription for management activities

type SubscriptionManagementOption Uses

type SubscriptionManagementOption func(*SubscriptionDescription) error

SubscriptionManagementOption represents named options for assisting Subscription creation

func SubscriptionWithAutoDeleteOnIdle Uses

func SubscriptionWithAutoDeleteOnIdle(window *time.Duration) SubscriptionManagementOption

SubscriptionWithAutoDeleteOnIdle configures the subscription to automatically delete after the specified idle interval. The minimum duration is 5 minutes.

func SubscriptionWithAutoForward Uses

func SubscriptionWithAutoForward(target Targetable) SubscriptionManagementOption

SubscriptionWithAutoForward configures the queue to automatically forward messages to the specified entity path

The ability to AutoForward to a target requires the connection have management authorization. If the connection string or Azure Active Directory identity used does not have management authorization, an unauthorized error will be returned on the PUT.

func SubscriptionWithBatchedOperations Uses

func SubscriptionWithBatchedOperations() SubscriptionManagementOption

SubscriptionWithBatchedOperations configures the subscription to batch server-side operations.

func SubscriptionWithDeadLetteringOnMessageExpiration Uses

func SubscriptionWithDeadLetteringOnMessageExpiration() SubscriptionManagementOption

SubscriptionWithDeadLetteringOnMessageExpiration will ensure the Subscription sends expired messages to the dead letter queue

func SubscriptionWithForwardDeadLetteredMessagesTo Uses

func SubscriptionWithForwardDeadLetteredMessagesTo(target Targetable) SubscriptionManagementOption

SubscriptionWithForwardDeadLetteredMessagesTo configures the queue to automatically forward dead letter messages to the specified target entity.

The ability to forward dead letter messages to a target requires the connection have management authorization. If the connection string or Azure Active Directory identity used does not have management authorization, an unauthorized error will be returned on the PUT.

func SubscriptionWithLockDuration Uses

func SubscriptionWithLockDuration(window *time.Duration) SubscriptionManagementOption

SubscriptionWithLockDuration configures the subscription to have a duration of a peek-lock; that is, the amount of time that the message is locked for other receivers. The maximum value for LockDuration is 5 minutes; the default value is 1 minute.

func SubscriptionWithMessageTimeToLive Uses

func SubscriptionWithMessageTimeToLive(window *time.Duration) SubscriptionManagementOption

SubscriptionWithMessageTimeToLive configures the subscription to set a time to live on messages. This is the duration after which the message expires, starting from when the message is sent to Service Bus. This is the default value used when TimeToLive is not set on a message itself. If nil, defaults to 14 days.

func SubscriptionWithRequiredSessions Uses

func SubscriptionWithRequiredSessions() SubscriptionManagementOption

SubscriptionWithRequiredSessions will ensure the subscription requires senders and receivers to have sessionIDs

type SubscriptionManager Uses

type SubscriptionManager struct {
    Topic *Topic
    // contains filtered or unexported fields
}

SubscriptionManager provides CRUD functionality for Service Bus Subscription

func (*SubscriptionManager) Delete Uses

func (sm *SubscriptionManager) Delete(ctx context.Context, name string) error

Delete deletes a Service Bus Topic entity by name

func (*SubscriptionManager) DeleteRule Uses

func (sm *SubscriptionManager) DeleteRule(ctx context.Context, subscriptionName, ruleName string) error

DeleteRule will delete a rule on the subscription

func (SubscriptionManager) Execute Uses

func (em SubscriptionManager) Execute(ctx context.Context, method string, entityPath string, body io.Reader, mw ...MiddlewareFunc) (*http.Response, error)

func (*SubscriptionManager) Get Uses

func (sm *SubscriptionManager) Get(ctx context.Context, name string) (*SubscriptionEntity, error)

Get fetches a Service Bus Topic entity by name

func (*SubscriptionManager) List Uses

func (sm *SubscriptionManager) List(ctx context.Context) ([]*SubscriptionEntity, error)

List fetches all of the Topics for a Service Bus Namespace

func (*SubscriptionManager) ListRules Uses

func (sm *SubscriptionManager) ListRules(ctx context.Context, subscriptionName string) ([]*RuleEntity, error)

ListRules returns the slice of subscription filter rules

By default when the subscription is created, there exists a single "true" filter which matches all messages.

func (SubscriptionManager) Post Uses

func (em SubscriptionManager) Post(ctx context.Context, entityPath string, body []byte, mw ...MiddlewareFunc) (*http.Response, error)

Post performs an HTTP POST for a given entity path and body

func (*SubscriptionManager) Put Uses

func (sm *SubscriptionManager) Put(ctx context.Context, name string, opts ...SubscriptionManagementOption) (*SubscriptionEntity, error)

Put creates or updates a Service Bus Topic

func (*SubscriptionManager) PutRule Uses

func (sm *SubscriptionManager) PutRule(ctx context.Context, subscriptionName, ruleName string, filter FilterDescriber) (*RuleEntity, error)

PutRule creates a new Subscription rule to filter messages from the topic

func (*SubscriptionManager) PutRuleWithAction Uses

func (sm *SubscriptionManager) PutRuleWithAction(ctx context.Context, subscriptionName, ruleName string, filter FilterDescriber, action ActionDescriber) (*RuleEntity, error)

PutRuleWithAction creates a new Subscription rule to filter messages from the topic and then perform an action

func (SubscriptionManager) TokenProvider Uses

func (em SubscriptionManager) TokenProvider() auth.TokenProvider

TokenProvider generates authorization tokens for communicating with the Service Bus management API

func (SubscriptionManager) Use Uses

func (em SubscriptionManager) Use(mw ...MiddlewareFunc)

Use adds middleware to the middleware mwStack

type SubscriptionOption Uses

type SubscriptionOption func(*Subscription) error

SubscriptionOption configures the Subscription Azure Service Bus client

func SubscriptionWithPrefetchCount Uses

func SubscriptionWithPrefetchCount(prefetch uint32) SubscriptionOption

SubscriptionWithPrefetchCount configures the subscription to attempt to fetch the number of messages specified by the prefetch count at one time.

The default is 1 message at a time.

Caution: Using PeekLock, messages have a set lock timeout, which can be renewed. By setting a high prefetch count, a local queue of messages could build up and cause message locks to expire before the message lands in the handler. If this happens, the message disposition will fail and will be re-queued and processed again.

func SubscriptionWithReceiveAndDelete Uses

func SubscriptionWithReceiveAndDelete() SubscriptionOption

SubscriptionWithReceiveAndDelete configures a subscription to pop and delete messages off of the queue upon receiving the message. This differs from the default, PeekLock, where PeekLock receives a message, locks it for a period of time, then sends a disposition to the broker when the message has been processed.

type SubscriptionSession Uses

type SubscriptionSession struct {
    // contains filtered or unexported fields
}

SubscriptionSession wraps Service Bus session functionality over a Subscription

func NewSubscriptionSession Uses

func NewSubscriptionSession(builder ReceiveBuilder, sessionID *string) *SubscriptionSession

NewSubscriptionSession creates a new session receiver to receive from a Service Bus subscription.

Microsoft Azure Service Bus sessions enable joint and ordered handling of unbounded sequences of related messages. To realize a FIFO guarantee in Service Bus, use Sessions. Service Bus is not prescriptive about the nature of the relationship between the messages, and also does not define a particular model for determining where a message sequence starts or ends.

func (*SubscriptionSession) Close Uses

func (ss *SubscriptionSession) Close(ctx context.Context) error

Close the underlying connection to Service Bus

func (*SubscriptionSession) ManagementPath Uses

func (ss *SubscriptionSession) ManagementPath() string

ManagementPath provides an addressable path to the Entity management endpoint

func (*SubscriptionSession) ReceiveDeferred Uses

func (ss *SubscriptionSession) ReceiveDeferred(ctx context.Context, handler Handler, mode ReceiveMode, sequenceNumbers ...int64) error

ReceiveDeferred will receive and handle a set of deferred messages

When a queue or subscription client receives a message that it is willing to process, but for which processing is not currently possible due to special circumstances inside of the application, it has the option of "deferring" retrieval of the message to a later point. The message remains in the queue or subscription, but it is set aside.

Deferral is a feature specifically created for workflow processing scenarios. Workflow frameworks may require certain operations to be processed in a particular order, and may have to postpone processing of some received messages until prescribed prior work that is informed by other messages has been completed.

A simple illustrative example is an order processing sequence in which a payment notification from an external payment provider appears in a system before the matching purchase order has been propagated from the store front to the fulfillment system. In that case, the fulfillment system might defer processing the payment notification until there is an order with which to associate it. In rendezvous scenarios, where messages from different sources drive a workflow forward, the real-time execution order may indeed be correct, but the messages reflecting the outcomes may arrive out of order.

Ultimately, deferral aids in reordering messages from the arrival order into an order in which they can be processed, while leaving those messages safely in the message store for which processing needs to be postponed.

func (*SubscriptionSession) ReceiveOne Uses

func (ss *SubscriptionSession) ReceiveOne(ctx context.Context, handler SessionHandler) error

ReceiveOne waits for the lock on a particular session to become available, takes it, then process the session. The session can contain multiple messages. ReceiveOneSession will receive all messages within that session.

Handler must call a disposition action such as Complete, Abandon, Deadletter on the message. If the messages does not have a disposition set, the Queue's DefaultDisposition will be used.

If the handler returns an error, the receive loop will be terminated.

func (*SubscriptionSession) SessionID Uses

func (ss *SubscriptionSession) SessionID() *string

SessionID is the identifier for the Service Bus session

type SystemProperties Uses

type SystemProperties struct {
    LockedUntil            *time.Time `mapstructure:"x-opt-locked-until"`
    SequenceNumber         *int64     `mapstructure:"x-opt-sequence-number"`
    PartitionID            *int16     `mapstructure:"x-opt-partition-id"`
    PartitionKey           *string    `mapstructure:"x-opt-partition-key"`
    EnqueuedTime           *time.Time `mapstructure:"x-opt-enqueued-time"`
    DeadLetterSource       *string    `mapstructure:"x-opt-deadletter-source"`
    ScheduledEnqueueTime   *time.Time `mapstructure:"x-opt-scheduled-enqueue-time"`
    EnqueuedSequenceNumber *int64     `mapstructure:"x-opt-enqueue-sequence-number"`
    ViaPartitionKey        *string    `mapstructure:"x-opt-via-partition-key"`
}

SystemProperties are used to store properties that are set by the system.

type Targetable Uses

type Targetable interface {
    TargetURI() string
}

Targetable provides the ability to forward messages to the entity

type Topic Uses

type Topic struct {
    // contains filtered or unexported fields
}

Topic in contrast to queues, in which each message is processed by a single consumer, topics and subscriptions provide a one-to-many form of communication, in a publish/subscribe pattern. Useful for scaling to very large numbers of recipients, each published message is made available to each subscription registered with the topic. Messages are sent to a topic and delivered to one or more associated subscriptions, depending on filter rules that can be set on a per-subscription basis. The subscriptions can use additional filters to restrict the messages that they want to receive. Messages are sent to a topic in the same way they are sent to a queue, but messages are not received from the topic directly. Instead, they are received from subscriptions. A topic subscription resembles a virtual queue that receives copies of the messages that are sent to the topic. Messages are received from a subscription identically to the way they are received from a queue.

func (Topic) CancelScheduled Uses

func (se Topic) CancelScheduled(ctx context.Context, seq ...int64) error

CancelScheduled allows for removal of messages that have been handed to the Service Bus broker for later delivery, but have not yet ben enqueued.

func (*Topic) Close Uses

func (t *Topic) Close(ctx context.Context) error

Close the underlying connection to Service Bus

func (*Topic) NewSender Uses

func (t *Topic) NewSender(ctx context.Context, opts ...SenderOption) (*Sender, error)

NewSender will create a new Sender for sending messages to the queue

func (*Topic) NewSession Uses

func (t *Topic) NewSession(sessionID *string) *TopicSession

NewSession will create a new session based sender for the topic

Microsoft Azure Service Bus sessions enable joint and ordered handling of unbounded sequences of related messages. To realize a FIFO guarantee in Service Bus, use Sessions. Service Bus is not prescriptive about the nature of the relationship between the messages, and also does not define a particular model for determining where a message sequence starts or ends.

func (*Topic) NewSubscription Uses

func (t *Topic) NewSubscription(name string, opts ...SubscriptionOption) (*Subscription, error)

NewSubscription creates a new Topic Subscription client

func (*Topic) NewSubscriptionManager Uses

func (t *Topic) NewSubscriptionManager() *SubscriptionManager

NewSubscriptionManager creates a new SubscriptionManager for a Service Bus Topic

func (*Topic) NewTransferDeadLetter Uses

func (t *Topic) NewTransferDeadLetter() *TransferDeadLetter

NewTransferDeadLetter creates an entity that represents the transfer dead letter sub queue of the topic

Messages will be sent to the transfer dead-letter queue under the following conditions:

- A message passes through more than 3 queues or topics that are chained together.
- The destination queue or topic is disabled or deleted.
- The destination queue or topic exceeds the maximum entity size.

func (*Topic) NewTransferDeadLetterReceiver Uses

func (t *Topic) NewTransferDeadLetterReceiver(ctx context.Context, opts ...ReceiverOption) (ReceiveOner, error)

NewTransferDeadLetterReceiver builds a receiver for the Queue's transfer dead letter queue

Messages will be sent to the transfer dead-letter queue under the following conditions:

- A message passes through more than 3 queues or topics that are chained together.
- The destination queue or topic is disabled or deleted.
- The destination queue or topic exceeds the maximum entity size.

func (Topic) ScheduleAt Uses

func (se Topic) ScheduleAt(ctx context.Context, enqueueTime time.Time, messages ...*Message) ([]int64, error)

ScheduleAt will send a batch of messages to a Queue, schedule them to be enqueued, and return the sequence numbers that can be used to cancel each message.

func (*Topic) Send Uses

func (t *Topic) Send(ctx context.Context, event *Message, opts ...SendOption) error

Send sends messages to the Topic

func (*Topic) SendBatch Uses

func (t *Topic) SendBatch(ctx context.Context, iterator BatchIterator) error

SendBatch sends a batch of messages to the Topic

type TopicDescription Uses

type TopicDescription struct {
    XMLName xml.Name `xml:"TopicDescription"`
    BaseEntityDescription
    DefaultMessageTimeToLive            *string       `xml:"DefaultMessageTimeToLive,omitempty"`            // DefaultMessageTimeToLive - ISO 8601 default message time span to live value. This is the duration after which the message expires, starting from when the message is sent to Service Bus. This is the default value used when TimeToLive is not set on a message itself.
    MaxSizeInMegabytes                  *int32        `xml:"MaxSizeInMegabytes,omitempty"`                  // MaxSizeInMegabytes - The maximum size of the queue in megabytes, which is the size of memory allocated for the queue. Default is 1024.
    RequiresDuplicateDetection          *bool         `xml:"RequiresDuplicateDetection,omitempty"`          // RequiresDuplicateDetection - A value indicating if this queue requires duplicate detection.
    DuplicateDetectionHistoryTimeWindow *string       `xml:"DuplicateDetectionHistoryTimeWindow,omitempty"` // DuplicateDetectionHistoryTimeWindow - ISO 8601 timeSpan structure that defines the duration of the duplicate detection history. The default value is 10 minutes.
    EnableBatchedOperations             *bool         `xml:"EnableBatchedOperations,omitempty"`             // EnableBatchedOperations - Value that indicates whether server-side batched operations are enabled.
    SizeInBytes                         *int64        `xml:"SizeInBytes,omitempty"`                         // SizeInBytes - The size of the queue, in bytes.
    FilteringMessagesBeforePublishing   *bool         `xml:"FilteringMessagesBeforePublishing,omitempty"`
    IsAnonymousAccessible               *bool         `xml:"IsAnonymousAccessible,omitempty"`
    Status                              *EntityStatus `xml:"Status,omitempty"`
    CreatedAt                           *date.Time    `xml:"CreatedAt,omitempty"`
    UpdatedAt                           *date.Time    `xml:"UpdatedAt,omitempty"`
    SupportOrdering                     *bool         `xml:"SupportOrdering,omitempty"`
    AutoDeleteOnIdle                    *string       `xml:"AutoDeleteOnIdle,omitempty"`
    EnablePartitioning                  *bool         `xml:"EnablePartitioning,omitempty"`
    EnableSubscriptionPartitioning      *bool         `xml:"EnableSubscriptionPartitioning,omitempty"`
    EnableExpress                       *bool         `xml:"EnableExpress,omitempty"`
    CountDetails                        *CountDetails `xml:"CountDetails,omitempty"`
}

TopicDescription is the content type for Topic management requests

type TopicEntity Uses

type TopicEntity struct {
    *TopicDescription
    *Entity
}

TopicEntity is the Azure Service Bus description of a Topic for management activities

type TopicManagementOption Uses

type TopicManagementOption func(*TopicDescription) error

TopicManagementOption represents named options for assisting Topic creation

func TopicWithAutoDeleteOnIdle Uses

func TopicWithAutoDeleteOnIdle(window *time.Duration) TopicManagementOption

TopicWithAutoDeleteOnIdle configures the topic to automatically delete after the specified idle interval. The minimum duration is 5 minutes.

func TopicWithBatchedOperations Uses

func TopicWithBatchedOperations() TopicManagementOption

TopicWithBatchedOperations configures the topic to batch server-side operations.

func TopicWithDuplicateDetection Uses

func TopicWithDuplicateDetection(window *time.Duration) TopicManagementOption

TopicWithDuplicateDetection configures the topic to detect duplicates for a given time window. If window is not specified, then it uses the default of 10 minutes.

func TopicWithExpress Uses

func TopicWithExpress() TopicManagementOption

TopicWithExpress configures the topic to hold a message in memory temporarily before writing it to persistent storage.

func TopicWithMaxSizeInMegabytes Uses

func TopicWithMaxSizeInMegabytes(size int) TopicManagementOption

TopicWithMaxSizeInMegabytes configures the maximum size of the topic in megabytes (1 * 1024 - 5 * 1024), which is the size of the memory allocated for the topic. Default is 1 MB (1 * 1024).

size must be between 1024 and 5 * 1024 for the Standard sku and up to 80 * 1024 for Premium sku

func TopicWithMessageTimeToLive Uses

func TopicWithMessageTimeToLive(window *time.Duration) TopicManagementOption

TopicWithMessageTimeToLive configures the topic to set a time to live on messages. This is the duration after which the message expires, starting from when the message is sent to Service Bus. This is the default value used when TimeToLive is not set on a message itself. If nil, defaults to 14 days.

func TopicWithOrdering Uses

func TopicWithOrdering() TopicManagementOption

TopicWithOrdering configures the topic to support ordering of messages.

func TopicWithPartitioning Uses

func TopicWithPartitioning() TopicManagementOption

TopicWithPartitioning configures the topic to be partitioned across multiple message brokers.

type TopicManager Uses

type TopicManager struct {
    // contains filtered or unexported fields
}

TopicManager provides CRUD functionality for Service Bus Topics

func (*TopicManager) Delete Uses

func (tm *TopicManager) Delete(ctx context.Context, name string) error

Delete deletes a Service Bus Topic entity by name

func (TopicManager) Execute Uses

func (em TopicManager) Execute(ctx context.Context, method string, entityPath string, body io.Reader, mw ...MiddlewareFunc) (*http.Response, error)

func (*TopicManager) Get Uses

func (tm *TopicManager) Get(ctx context.Context, name string) (*TopicEntity, error)

Get fetches a Service Bus Topic entity by name

func (*TopicManager) List Uses

func (tm *TopicManager) List(ctx context.Context) ([]*TopicEntity, error)

List fetches all of the Topics for a Service Bus Namespace

func (TopicManager) Post Uses

func (em TopicManager) Post(ctx context.Context, entityPath string, body []byte, mw ...MiddlewareFunc) (*http.Response, error)

Post performs an HTTP POST for a given entity path and body

func (*TopicManager) Put Uses

func (tm *TopicManager) Put(ctx context.Context, name string, opts ...TopicManagementOption) (*TopicEntity, error)

Put creates or updates a Service Bus Topic

func (TopicManager) TokenProvider Uses

func (em TopicManager) TokenProvider() auth.TokenProvider

TokenProvider generates authorization tokens for communicating with the Service Bus management API

func (TopicManager) Use Uses

func (em TopicManager) Use(mw ...MiddlewareFunc)

Use adds middleware to the middleware mwStack

type TopicOption Uses

type TopicOption func(*Topic) error

TopicOption represents named options for assisting Topic message handling

type TopicSession Uses

type TopicSession struct {
    // contains filtered or unexported fields
}

TopicSession wraps Service Bus session functionality over a Topic

func NewTopicSession Uses

func NewTopicSession(builder SenderBuilder, sessionID *string) *TopicSession

NewTopicSession creates a new session receiver to receive from a Service Bus topic.

Microsoft Azure Service Bus sessions enable joint and ordered handling of unbounded sequences of related messages. To realize a FIFO guarantee in Service Bus, use Sessions. Service Bus is not prescriptive about the nature of the relationship between the messages, and also does not define a particular model for determining where a message sequence starts or ends.

func (*TopicSession) Close Uses

func (ts *TopicSession) Close(ctx context.Context) error

Close the underlying connection to Service Bus

func (*TopicSession) Send Uses

func (ts *TopicSession) Send(ctx context.Context, msg *Message) error

Send the message to the queue within a session

func (*TopicSession) SessionID Uses

func (ts *TopicSession) SessionID() *string

SessionID is the identifier for the Service Bus session

type TransferDeadLetter Uses

type TransferDeadLetter struct {
    // contains filtered or unexported fields
}

TransferDeadLetter represents a transfer dead letter queue in Azure Service Bus.

Messages will be sent to the transfer dead-letter queue under the following conditions:

- A message passes through more than 3 queues or topics that are chained together.
- The destination queue or topic is disabled or deleted.
- The destination queue or topic exceeds the maximum entity size.

func NewTransferDeadLetter Uses

func NewTransferDeadLetter(builder TransferDeadLetterBuilder) *TransferDeadLetter

NewTransferDeadLetter constructs an instance of DeadLetter which represents a transfer dead letter queue in Azure Service Bus

func (*TransferDeadLetter) Close Uses

func (dl *TransferDeadLetter) Close(ctx context.Context) error

Close the underlying connection to Service Bus

func (*TransferDeadLetter) ReceiveOne Uses

func (dl *TransferDeadLetter) ReceiveOne(ctx context.Context, handler Handler) error

ReceiveOne will receive one message from the dead letter queue

type TransferDeadLetterBuilder Uses

type TransferDeadLetterBuilder interface {
    NewTransferDeadLetterReceiver(ctx context.Context, opts ...ReceiverOption) (ReceiveOner, error)
}

TransferDeadLetterBuilder provides the ability to create a new receiver addressed to a given entity's transfer dead letter queue.

type TrueFilter Uses

type TrueFilter struct{}

TrueFilter represents a always true sql expression which will accept all messages

func (TrueFilter) ToFilterDescription Uses

func (tf TrueFilter) ToFilterDescription() FilterDescription

ToFilterDescription will transform the TrueFilter into a FilterDescription

Directories

PathSynopsis
atom
internal/test

Package servicebus imports 33 packages (graph) and is imported by 4 packages. Updated 2019-09-10. Refresh now. Tools for package owners.