azure-event-hubs-go: github.com/Azure/azure-event-hubs-go Index | Examples | Files | Directories

package eventhub

import "github.com/Azure/azure-event-hubs-go"

Package eventhub provides functionality for interacting with Azure Event Hubs.

Index

Examples

Package Files

amqp_mgmt.go batch.go errors.go event.go http_mgmt.go hub.go namespace.go receiver.go sender.go session.go tracing.go version.go

Constants

const (
    // DefaultConsumerGroup is the default name for a event stream consumer group
    DefaultConsumerGroup = "$Default"
)
const (
    // MsftVendor is the Microsoft vendor identifier
    MsftVendor = "com.microsoft"
)
const (
    // Version is the semantic version number
    Version = "2.0.0"
)

func ApplyComponentInfo Uses

func ApplyComponentInfo(span tab.Spanner)

ApplyComponentInfo applies eventhub library and network info to the span

type BaseEntityDescription Uses

type BaseEntityDescription struct {
    InstanceMetadataSchema *string `xml:"xmlns:i,attr,omitempty"`
    ServiceBusSchema       *string `xml:"xmlns,attr,omitempty"`
}

BaseEntityDescription provides common fields which are part of Queues, Topics and Subscriptions

type BatchIterator Uses

type BatchIterator interface {
    Done() bool
    Next(messageID string, opts *BatchOptions) (*EventBatch, error)
}

BatchIterator offers a simple mechanism for batching a list of events

type BatchOption Uses

type BatchOption func(opt *BatchOptions) error

BatchOption provides a way to configure `BatchOptions`

func BatchWithMaxSizeInBytes Uses

func BatchWithMaxSizeInBytes(sizeInBytes int) BatchOption

BatchWithMaxSizeInBytes configures the EventBatchIterator to fill the batch to the specified max size in bytes

type BatchOptions Uses

type BatchOptions struct {
    MaxSize MaxMessageSizeInBytes
}

BatchOptions are optional information to add to a batch of messages

type ErrNoMessages Uses

type ErrNoMessages struct{}

ErrNoMessages is returned when an operation returned no messages. It is not indicative that there will not be more messages in the future.

func (ErrNoMessages) Error Uses

func (e ErrNoMessages) Error() string

type Event Uses

type Event struct {
    Data         []byte
    PartitionKey *string
    Properties   map[string]interface{}
    ID           string

    SystemProperties *SystemProperties
    // contains filtered or unexported fields
}

Event is an Event Hubs message to be sent or received

func NewEvent Uses

func NewEvent(data []byte) *Event

NewEvent builds an Event from a slice of data

func NewEventFromString Uses

func NewEventFromString(message string) *Event

NewEventFromString builds an Event from a string message

func (*Event) Get Uses

func (e *Event) Get(key string) (interface{}, bool)

Get will fetch a property from the event

func (*Event) GetCheckpoint Uses

func (e *Event) GetCheckpoint() persist.Checkpoint

GetCheckpoint returns the checkpoint information on the Event

func (*Event) GetKeyValues Uses

func (e *Event) GetKeyValues() map[string]interface{}

GetKeyValues implements tab.Carrier

func (*Event) Set Uses

func (e *Event) Set(key string, value interface{})

Set implements tab.Carrier

type EventBatch Uses

type EventBatch struct {
    *Event

    MaxSize MaxMessageSizeInBytes
    // contains filtered or unexported fields
}

EventBatch is a batch of Event Hubs messages to be sent

func NewEventBatch Uses

func NewEventBatch(eventID string, opts *BatchOptions) *EventBatch

NewEventBatch builds a new event batch

func (*EventBatch) Add Uses

func (eb *EventBatch) Add(e *Event) (bool, error)

Add adds a message to the batch if the message will not exceed the max size of the batch

func (*EventBatch) Clear Uses

func (eb *EventBatch) Clear()

Clear will zero out the batch size and clear the buffered messages

func (*EventBatch) Size Uses

func (eb *EventBatch) Size() int

Size is the number of bytes in the message batch

type EventBatchIterator Uses

type EventBatchIterator struct {
    Events []*Event
    Cursor int
}

EventBatchIterator provides an easy way to iterate over a slice of events to reliably create batches

func NewEventBatchIterator Uses

func NewEventBatchIterator(events ...*Event) *EventBatchIterator

NewEventBatchIterator wraps a slice of `Event` pointers to allow it to be made into a `EventBatchIterator`.

func (*EventBatchIterator) Done Uses

func (ebi *EventBatchIterator) Done() bool

Done communicates whether there are more messages remaining to be iterated over.

func (*EventBatchIterator) Next Uses

func (ebi *EventBatchIterator) Next(eventID string, opts *BatchOptions) (*EventBatch, error)

Next fetches the batch of messages in the message slice at a position one larger than the last one accessed.

type Handler Uses

type Handler func(ctx context.Context, event *Event) error

Handler is the function signature for any receiver of events

type Hub Uses

type Hub struct {
    // contains filtered or unexported fields
}

Hub provides the ability to send and receive Event Hub messages

Code:

ctx, cancel := context.WithTimeout(context.Background(), 40*time.Second)
defer cancel()

connStr := os.Getenv("EVENTHUB_CONNECTION_STRING")
if connStr == "" {
    fmt.Println("FATAL: expected environment variable EVENTHUB_CONNECTION_STRING not set")
    return
}

hubManager, err := eventhub.NewHubManagerFromConnectionString(connStr)
if err != nil {
    fmt.Println(err)
    return
}

hubEntity, err := ensureHub(ctx, hubManager, "ExampleHub_helloWorld")
if err != nil {
    fmt.Println(err)
    return
}

// Create a client to communicate with EventHub
hub, err := eventhub.NewHubFromConnectionString(connStr + ";EntityPath=" + hubEntity.Name)
if err != nil {
    fmt.Println(err)
    return
}

err = hub.Send(ctx, eventhub.NewEventFromString("Hello World!"))
if err != nil {
    fmt.Println(err)
    return
}

exit := make(chan struct{})
handler := func(ctx context.Context, event *eventhub.Event) error {
    text := string(event.Data)
    fmt.Println(text)
    exit <- struct{}{}
    return nil
}

for _, partitionID := range *hubEntity.PartitionIDs {
    _, err = hub.Receive(ctx, partitionID, handler)
}

// wait for the first handler to get called with "Hello World!"
<-exit
err = hub.Close(ctx)
if err != nil {
    fmt.Println(err)
    return
}

Output:

Hello World!

Code:

ctx, cancel := context.WithTimeout(context.Background(), 40*time.Second)
defer cancel()

connStr := os.Getenv("EVENTHUB_CONNECTION_STRING")
if connStr == "" {
    fmt.Println("FATAL: expected environment variable EVENTHUB_CONNECTION_STRING not set")
    return
}

hubManager, err := eventhub.NewHubManagerFromConnectionString(connStr)
if err != nil {
    fmt.Println(err)
    return
}

hubEntity, err := ensureHub(ctx, hubManager, "ExampleHub_helloWorld")
if err != nil {
    fmt.Println(err)
    return
}

// Create a client to communicate with EventHub
hub, err := eventhub.NewHubFromConnectionString(connStr+";EntityPath="+hubEntity.Name, eventhub.HubWithWebSocketConnection())
if err != nil {
    fmt.Println(err)
    return
}

err = hub.Send(ctx, eventhub.NewEventFromString("this message was sent and received via web socket!!"))
if err != nil {
    fmt.Println(err)
    return
}

exit := make(chan struct{})
handler := func(ctx context.Context, event *eventhub.Event) error {
    text := string(event.Data)
    fmt.Println(text)
    exit <- struct{}{}
    return nil
}

for _, partitionID := range *hubEntity.PartitionIDs {
    _, err = hub.Receive(ctx, partitionID, handler)
}

// wait for the first handler to get called with "Hello World!"
<-exit
err = hub.Close(ctx)
if err != nil {
    fmt.Println(err)
    return
}

Output:

this message was sent and received via web socket!!

func NewHub Uses

func NewHub(namespace, name string, tokenProvider auth.TokenProvider, opts ...HubOption) (*Hub, error)

NewHub creates a new Event Hub client for sending and receiving messages

func NewHubFromConnectionString Uses

func NewHubFromConnectionString(connStr string, opts ...HubOption) (*Hub, error)

NewHubFromConnectionString creates a new Event Hub client for sending and receiving messages from a connection string formatted like the following:

Endpoint=sb://namespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=superSecret1234=;EntityPath=hubName

func NewHubFromEnvironment Uses

func NewHubFromEnvironment(opts ...HubOption) (*Hub, error)

NewHubFromEnvironment creates a new Event Hub client for sending and receiving messages from environment variables

Expected Environment Variables:

- "EVENTHUB_NAMESPACE" the namespace of the Event Hub instance
- "EVENTHUB_NAME" the name of the Event Hub instance

This method depends on NewHubWithNamespaceNameAndEnvironment which will attempt to build a token provider from environment variables. If unable to build a AAD Token Provider it will fall back to a SAS token provider. If neither can be built, it will return error.

SAS TokenProvider environment variables:

There are two sets of environment variables which can produce a SAS TokenProvider

1) Expected Environment Variables:
  - "EVENTHUB_NAMESPACE" the namespace of the Event Hub instance
  - "EVENTHUB_KEY_NAME" the name of the Event Hub key
  - "EVENTHUB_KEY_VALUE" the secret for the Event Hub key named in "EVENTHUB_KEY_NAME"

2) Expected Environment Variable:
  - "EVENTHUB_CONNECTION_STRING" connection string from the Azure portal

AAD TokenProvider environment variables:

1. client Credentials: attempt to authenticate with a Service Principal via "AZURE_TENANT_ID", "AZURE_CLIENT_ID" and
  "AZURE_CLIENT_SECRET"

2. client Certificate: attempt to authenticate with a Service Principal via "AZURE_TENANT_ID", "AZURE_CLIENT_ID",
  "AZURE_CERTIFICATE_PATH" and "AZURE_CERTIFICATE_PASSWORD"

3. Managed Service Identity (MSI): attempt to authenticate via MSI

The Azure Environment used can be specified using the name of the Azure Environment set in the AZURE_ENVIRONMENT var.

func NewHubWithNamespaceNameAndEnvironment Uses

func NewHubWithNamespaceNameAndEnvironment(namespace, name string, opts ...HubOption) (*Hub, error)

NewHubWithNamespaceNameAndEnvironment creates a new Event Hub client for sending and receiving messages from environment variables with supplied namespace and name which will attempt to build a token provider from environment variables. If unable to build a AAD Token Provider it will fall back to a SAS token provider. If neither can be built, it will return error.

SAS TokenProvider environment variables:

There are two sets of environment variables which can produce a SAS TokenProvider

1) Expected Environment Variables:
  - "EVENTHUB_KEY_NAME" the name of the Event Hub key
  - "EVENTHUB_KEY_VALUE" the secret for the Event Hub key named in "EVENTHUB_KEY_NAME"

2) Expected Environment Variable:
  - "EVENTHUB_CONNECTION_STRING" connection string from the Azure portal

AAD TokenProvider environment variables:

1. client Credentials: attempt to authenticate with a Service Principal via "AZURE_TENANT_ID", "AZURE_CLIENT_ID" and
  "AZURE_CLIENT_SECRET"

2. client Certificate: attempt to authenticate with a Service Principal via "AZURE_TENANT_ID", "AZURE_CLIENT_ID",
  "AZURE_CERTIFICATE_PATH" and "AZURE_CERTIFICATE_PASSWORD"

3. Managed Service Identity (MSI): attempt to authenticate via MSI on the default local MSI internally addressable IP
  and port. See: adal.GetMSIVMEndpoint()

The Azure Environment used can be specified using the name of the Azure Environment set in the AZURE_ENVIRONMENT var.

func (*Hub) Close Uses

func (h *Hub) Close(ctx context.Context) error

Close drains and closes all of the existing senders, receivers and connections

func (*Hub) GetPartitionInformation Uses

func (h *Hub) GetPartitionInformation(ctx context.Context, partitionID string) (*HubPartitionRuntimeInformation, error)

GetPartitionInformation fetches runtime information about a specific partition from the Event Hub management node

func (*Hub) GetRuntimeInformation Uses

func (h *Hub) GetRuntimeInformation(ctx context.Context) (*HubRuntimeInformation, error)

GetRuntimeInformation fetches runtime information from the Event Hub management node

func (*Hub) Receive Uses

func (h *Hub) Receive(ctx context.Context, partitionID string, handler Handler, opts ...ReceiveOption) (*ListenerHandle, error)

Receive subscribes for messages sent to the provided entityPath.

The context passed into Receive is only used to limit the amount of time the caller will wait for the Receive method to connect to the Event Hub. The context passed in does not control the lifetime of Receive after connection.

If Receive encounters an initial error setting up the connection, an error will be returned.

If Receive starts successfully, a *ListenerHandle and a nil error will be returned. The ListenerHandle exposes methods which will help manage the life span of the receiver.

ListenerHandle.Close(ctx) closes the receiver

ListenerHandle.Done() signals the consumer when the receiver has stopped

ListenerHandle.Err() provides the last error the listener encountered and was unable to recover from

func (*Hub) Send Uses

func (h *Hub) Send(ctx context.Context, event *Event, opts ...SendOption) error

Send sends an event to the Event Hub

Send will retry sending the message for as long as the context allows

func (*Hub) SendBatch Uses

func (h *Hub) SendBatch(ctx context.Context, iterator BatchIterator, opts ...BatchOption) error

SendBatch sends a batch of events to the Hub

type HubDescription Uses

type HubDescription struct {
    XMLName                  xml.Name               `xml:"EventHubDescription"`
    MessageRetentionInDays   *int32                 `xml:"MessageRetentionInDays,omitempty"`
    SizeInBytes              *int64                 `xml:"SizeInBytes,omitempty"`
    Status                   *eventhub.EntityStatus `xml:"Status,omitempty"`
    CreatedAt                *date.Time             `xml:"CreatedAt,omitempty"`
    UpdatedAt                *date.Time             `xml:"UpdatedAt,omitempty"`
    PartitionCount           *int32                 `xml:"PartitionCount,omitempty"`
    PartitionIDs             *[]string              `xml:"PartitionIds>string,omitempty"`
    EntityAvailabilityStatus *string                `xml:"EntityAvailabilityStatus,omitempty"`
    BaseEntityDescription
}

HubDescription is the content type for Event Hub management requests

type HubEntity Uses

type HubEntity struct {
    *HubDescription
    Name string
}

HubEntity is the Azure Event Hub description of a Hub for management activities

type HubManagementOption Uses

type HubManagementOption func(description *HubDescription) error

HubManagementOption provides structure for configuring new Event Hubs

func HubWithMessageRetentionInDays Uses

func HubWithMessageRetentionInDays(days int32) HubManagementOption

HubWithMessageRetentionInDays configures an Event Hub to retain messages for that number of days

func HubWithPartitionCount Uses

func HubWithPartitionCount(count int32) HubManagementOption

HubWithPartitionCount configures an Event Hub to have the specified number of partitions. More partitions == more throughput

type HubManager Uses

type HubManager struct {
    // contains filtered or unexported fields
}

HubManager provides CRUD functionality for Event Hubs

func NewHubManagerFromAzureEnvironment Uses

func NewHubManagerFromAzureEnvironment(namespace string, tokenProvider auth.TokenProvider, env azure.Environment) (*HubManager, error)

NewHubManagerFromAzureEnvironment builds a HubManager from a Event Hub name, SAS or AAD token provider and Azure Environment

func NewHubManagerFromConnectionString Uses

func NewHubManagerFromConnectionString(connStr string) (*HubManager, error)

NewHubManagerFromConnectionString builds a HubManager from an Event Hub connection string

func (*HubManager) Delete Uses

func (hm *HubManager) Delete(ctx context.Context, name string) error

Delete deletes an Event Hub entity by name

func (HubManager) Execute Uses

func (em HubManager) Execute(ctx context.Context, method string, entityPath string, body io.Reader) (*http.Response, error)

Execute performs an HTTP request given a http method, path and body

func (*HubManager) Get Uses

func (hm *HubManager) Get(ctx context.Context, name string) (*HubEntity, error)

Get fetches an Event Hubs Hub entity by name

func (*HubManager) List Uses

func (hm *HubManager) List(ctx context.Context) ([]*HubEntity, error)

List fetches all of the Hub for an Event Hubs Namespace

func (HubManager) Post Uses

func (em HubManager) Post(ctx context.Context, entityPath string, body []byte) (*http.Response, error)

Post performs an HTTP POST for a given entity path and body

func (*HubManager) Put Uses

func (hm *HubManager) Put(ctx context.Context, name string, opts ...HubManagementOption) (*HubEntity, error)

Put creates or updates an Event Hubs Hub

type HubOption Uses

type HubOption func(h *Hub) error

HubOption provides structure for configuring new Event Hub clients. For building new Event Hubs, see HubManagementOption.

func HubWithEnvironment Uses

func HubWithEnvironment(env azure.Environment) HubOption

HubWithEnvironment configures the Hub to use the specified environment.

By default, the Hub instance will use Azure US Public cloud environment

func HubWithOffsetPersistence Uses

func HubWithOffsetPersistence(offsetPersister persist.CheckpointPersister) HubOption

HubWithOffsetPersistence configures the Hub instance to read and write offsets so that if a Hub is interrupted, it can resume after the last consumed event.

func HubWithPartitionedSender Uses

func HubWithPartitionedSender(partitionID string) HubOption

HubWithPartitionedSender configures the Hub instance to send to a specific event Hub partition

func HubWithUserAgent Uses

func HubWithUserAgent(userAgent string) HubOption

HubWithUserAgent configures the Hub to append the given string to the user agent sent to the server

This option can be specified multiple times to add additional segments.

Max user agent length is specified by the const maxUserAgentLen.

func HubWithWebSocketConnection Uses

func HubWithWebSocketConnection() HubOption

HubWithWebSocketConnection configures the Hub to use a WebSocket connection wss:// rather than amqps://

type HubPartitionRuntimeInformation Uses

type HubPartitionRuntimeInformation struct {
    HubPath                 string    `mapstructure:"name"`
    PartitionID             string    `mapstructure:"partition"`
    BeginningSequenceNumber int64     `mapstructure:"begin_sequence_number"`
    LastSequenceNumber      int64     `mapstructure:"last_enqueued_sequence_number"`
    LastEnqueuedOffset      string    `mapstructure:"last_enqueued_offset"`
    LastEnqueuedTimeUtc     time.Time `mapstructure:"last_enqueued_time_utc"`
}

HubPartitionRuntimeInformation provides management node information about a given Event Hub partition

type HubRuntimeInformation Uses

type HubRuntimeInformation struct {
    Path           string    `mapstructure:"name"`
    CreatedAt      time.Time `mapstructure:"created_at"`
    PartitionCount int       `mapstructure:"partition_count"`
    PartitionIDs   []string  `mapstructure:"partition_ids"`
}

HubRuntimeInformation provides management node information about a given Event Hub instance

type ListenerHandle Uses

type ListenerHandle struct {
    // contains filtered or unexported fields
}

ListenerHandle provides the ability to close or listen to the close of a Receiver

func (*ListenerHandle) Close Uses

func (lc *ListenerHandle) Close(ctx context.Context) error

Close will close the listener

func (*ListenerHandle) Done Uses

func (lc *ListenerHandle) Done() <-chan struct{}

Done will close the channel when the listener has stopped

func (*ListenerHandle) Err Uses

func (lc *ListenerHandle) Err() error

Err will return the last error encountered

type Manager Uses

type Manager interface {
    GetRuntimeInformation(context.Context) (HubRuntimeInformation, error)
    GetPartitionInformation(context.Context, string) (HubPartitionRuntimeInformation, error)
}

Manager provides the ability to query management node information about a node

type MaxMessageSizeInBytes Uses

type MaxMessageSizeInBytes uint

MaxMessageSizeInBytes is the max number of bytes allowed by Azure Service Bus

const (
    // DefaultMaxMessageSizeInBytes is the maximum number of bytes in an event (https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-quotas)
    DefaultMaxMessageSizeInBytes MaxMessageSizeInBytes = 1000000
)

type PartitionedReceiver Uses

type PartitionedReceiver interface {
    Receive(ctx context.Context, partitionID string, handler Handler, opts ...ReceiveOption) (ListenerHandle, error)
}

PartitionedReceiver provides the ability to receive messages from a given partition

type ReceiveOption Uses

type ReceiveOption func(receiver *receiver) error

ReceiveOption provides a structure for configuring receivers

func ReceiveFromTimestamp Uses

func ReceiveFromTimestamp(t time.Time) ReceiveOption

ReceiveFromTimestamp configures the receiver to start receiving from a specific point in time in the event stream

func ReceiveWithConsumerGroup Uses

func ReceiveWithConsumerGroup(consumerGroup string) ReceiveOption

ReceiveWithConsumerGroup configures the receiver to listen to a specific consumer group

func ReceiveWithEpoch Uses

func ReceiveWithEpoch(epoch int64) ReceiveOption

ReceiveWithEpoch configures the receiver to use an epoch. Specifying an epoch for a receiver will cause any receiver with a lower epoch value to be disconnected from the message broker. If a receiver attempts to start with a lower epoch than the broker currently knows for a given partition, the broker will respond with an error on initiation of the receive request.

Ownership enforcement: Once you created an epoch based receiver, you cannot create a non-epoch receiver to the same consumer group / partition combo until all receivers to the combo are closed.

Ownership stealing: If a receiver with higher epoch value is created for a consumer group / partition combo, any older epoch receiver to that combo will be force closed.

func ReceiveWithLatestOffset Uses

func ReceiveWithLatestOffset() ReceiveOption

ReceiveWithLatestOffset configures the receiver to start at a given position in the event stream

func ReceiveWithPrefetchCount Uses

func ReceiveWithPrefetchCount(prefetch uint32) ReceiveOption

ReceiveWithPrefetchCount configures the receiver to attempt to fetch as many messages as the prefetch amount

func ReceiveWithStartingOffset Uses

func ReceiveWithStartingOffset(offset string) ReceiveOption

ReceiveWithStartingOffset configures the receiver to start at a given position in the event stream

type SendOption Uses

type SendOption func(event *Event) error

SendOption provides a way to customize a message on sending

func SendWithMessageID Uses

func SendWithMessageID(messageID string) SendOption

SendWithMessageID configures the message with a message ID

type Sender Uses

type Sender interface {
    Send(ctx context.Context, event *Event, opts ...SendOption) error
    SendBatch(ctx context.Context, batch *EventBatch, opts ...SendOption) error
}

Sender provides the ability to send a messages

type SystemProperties Uses

type SystemProperties struct {
    SequenceNumber *int64     `mapstructure:"x-opt-sequence-number"` // unique sequence number of the message
    EnqueuedTime   *time.Time `mapstructure:"x-opt-enqueued-time"`   // time the message landed in the message queue
    Offset         *int64     `mapstructure:"x-opt-offset"`
    PartitionID    *int16     `mapstructure:"x-opt-partition-id"`
    PartitionKey   *string    `mapstructure:"x-opt-partition-key"`
}

SystemProperties are used to store properties that are set by the system.

Directories

PathSynopsis
atomPackage atom contains base data structures for use in the Azure Event Hubs management HTTP API
ephPackage eph provides functionality for balancing load of Event Hub receivers through scheduling receivers across processes and machines.
internal/testPackage test is an internal package to handle common test setup
persistPackage persist provides abstract structures for checkpoint persistence.
storagePackage storage provides implementations for Checkpointer and Leaser from package eph for persisting leases and checkpoints for the Event Processor Host using Azure Storage as a durable store.

Package eventhub imports 36 packages (graph) and is imported by 5 packages. Updated 2019-06-06. Refresh now. Tools for package owners.