pubsub

package
v1.2.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 11, 2021 License: MIT Imports: 8 Imported by: 0

README

Pub Sub

Pub Subs provide a common way to interact with different message bus implementations to achieve reliable, high-scale scenarios based on event-driven async communications, while allowing users to opt-in to advanced capabilities using defined metadata.

Currently supported pub-subs are:

  • Hazelcast
  • Redis Streams
  • NATS
  • Kafka
  • Azure Service Bus
  • RabbitMQ
  • Azure Event Hubs
  • GCP Pub/Sub
  • MQTT

Implementing a new Pub Sub

A compliant pub sub needs to implement the following interface:

type PubSub interface {
	Init(metadata Metadata) error
	Publish(req *PublishRequest) error
	Subscribe(req SubscribeRequest, handler func(msg *NewMessage) error) error
}
Message TTL (or Time To Live)

Message Time to live is implemented by default in Dapr. A publishing application can set the expiration of individual messages by publishing it with the ttlInSeconds metadata. Components that support message TTL should parse this metadata attribute. For components that do not implement this feature in Dapr, the runtime will automatically populate the expiration attribute in the CloudEvent object if ttlInSeconds is present - in this case, Dapr will expire the message when a Dapr subscriber is about to consume an expired message. The expiration attribute is handled by Dapr runtime as a convenience to subscribers, dropping expired messages without invoking subscribers' endpoint. Subscriber applications that don't use Dapr, need to handle this attribute and implement the expiration logic.

If the pub sub component implementation can handle message TTL natively without relying on Dapr, consume the ttlInSeconds metadata in the component implementation for the Publish function. Also, implement the Features() function so the Dapr runtime knows that it should not add the expiration attribute to events.

Example:

import contrib_metadata "github.com/r-c-correa/components-contrib/metadata"

//...

func (c *MyComponent) Publish(req *pubsub.PublishRequest) error {
	//...
	ttl, hasTTL, _ := contrib_metadata.TryGetTTL(req.Metadata)
	if hasTTL {
		//... handle ttl for component.
	}
	//...
	return nil
}

func (c *MyComponent) Features() []pubsub.Feature {
	// Tip: cache this list into a private property.
	// Simply return nil if component does not implement any addition features.
	return []pubsub.Feature{pubsub.FeatureMessageTTL}
}

For pub sub components that support TTL per topic or queue but not per message, there are some design choices:

  • Configure the TTL for the topic or queue as usual. Optionally, implement topic or queue provisioning in the Init() method, using the component configuration's metadata to determine the topic or queue TTL.
  • Let Dapr runtime handle ttlInSeconds for messages that want to expire earlier than the topic's or queue's TTL. So, applications can still benefit from TTL per message via Dapr for this scenario.

Note: as per the CloudEvent spec, timestamps (like expiration) are formatted using RFC3339.

Documentation

Index

Constants

View Source
const (
	// DefaultCloudEventType is the default event type for an Dapr published event
	DefaultCloudEventType = "com.dapr.event.sent"
	// CloudEventsSpecVersion is the specversion used by Dapr for the cloud events implementation
	CloudEventsSpecVersion = "1.0"
	// DefaultCloudEventSource is the default event source
	DefaultCloudEventSource = "Dapr"
	// DefaultCloudEventDataContentType is the default content-type for the data attribute
	DefaultCloudEventDataContentType = "text/plain"
	TraceIDField                     = "traceid"
	TopicField                       = "topic"
	PubsubField                      = "pubsubname"
	ExpirationField                  = "expiration"
	DataContentTypeField             = "datacontenttype"
	DataField                        = "data"
	DataBase64Field                  = "data_base64"
	SpecVersionField                 = "specversion"
	TypeField                        = "type"
	SourceField                      = "source"
	IDField                          = "id"
	SubjectField                     = "subject"
)

Variables

This section is empty.

Functions

func ApplyMetadata

func ApplyMetadata(cloudEvent map[string]interface{}, componentFeatures []Feature, metadata map[string]string)

ApplyMetadata will process metadata to modify the cloud event based on the component's feature set.

func FromCloudEvent

func FromCloudEvent(cloudEvent []byte, topic, pubsub, traceID string) (map[string]interface{}, error)

FromCloudEvent returns a map representation of an existing cloudevents JSON

func FromRawPayload

func FromRawPayload(data []byte, topic, pubsub string) map[string]interface{}

FromRawPayload returns a CloudEvent for a raw payload on subscriber's end.

func HasExpired

func HasExpired(cloudEvent map[string]interface{}) bool

HasExpired determines if the current cloud event has expired.

func NewCloudEventsEnvelope

func NewCloudEventsEnvelope(id, source, eventType, subject string, topic string, pubsubName string, dataContentType string, data []byte, traceID string) map[string]interface{}

NewCloudEventsEnvelope returns a map representation of a cloudevents JSON

Types

type AppResponse

type AppResponse struct {
	Status AppResponseStatus `json:"status"`
}

AppResponse is the object describing the response from user code after a pubsub event

type AppResponseStatus

type AppResponseStatus string

AppResponseStatus represents a status of a PubSub response.

const (
	// Success means the message is received and processed correctly.
	Success AppResponseStatus = "SUCCESS"
	// Retry means the message is received but could not be processed and must be retried.
	Retry AppResponseStatus = "RETRY"
	// Drop means the message is received but should not be processed.
	Drop AppResponseStatus = "DROP"
)

type ConcurrencyMode

type ConcurrencyMode string

ConcurrencyMode is a pub/sub metadata setting that allows to specify whether messages are delivered in a serial or parallel execution

const (
	// ConcurrencyKey is the metadata key name for ConcurrencyMode
	ConcurrencyKey                 = "concurrencyMode"
	Single         ConcurrencyMode = "single"
	Parallel       ConcurrencyMode = "parallel"
)

func Concurrency

func Concurrency(metadata map[string]string) (ConcurrencyMode, error)

Concurrency takes a metadata object and returns the ConcurrencyMode configured. Default is Parallel

type Feature

type Feature string

Feature names a feature that can be implemented by PubSub components.

const (
	// FeatureMessageTTL is the feature to handle message TTL.
	FeatureMessageTTL Feature = "MESSAGE_TTL"
)

func (Feature) IsPresent

func (f Feature) IsPresent(features []Feature) bool

IsPresent checks if a given feature is present in the list.

type Handler

type Handler func(ctx context.Context, msg *NewMessage) error

Handler is the handler used to invoke the app handler

type Metadata

type Metadata struct {
	Properties map[string]string `json:"properties"`
}

Metadata represents a set of message-bus specific properties

type NewMessage

type NewMessage struct {
	Data     []byte            `json:"data"`
	Topic    string            `json:"topic"`
	Metadata map[string]string `json:"metadata"`
}

NewMessage is an event arriving from a message bus instance

type PubSub

type PubSub interface {
	Init(metadata Metadata) error
	Features() []Feature
	Publish(req *PublishRequest) error
	Subscribe(req SubscribeRequest, handler Handler) error
	Close() error
}

PubSub is the interface for message buses

type PublishRequest

type PublishRequest struct {
	Data       []byte            `json:"data"`
	PubsubName string            `json:"pubsubname"`
	Topic      string            `json:"topic"`
	Metadata   map[string]string `json:"metadata"`
}

PublishRequest is the request to publish a message

type SubscribeRequest

type SubscribeRequest struct {
	Topic    string            `json:"topic"`
	Metadata map[string]string `json:"metadata"`
}

SubscribeRequest is the request to subscribe to a topic

Directories

Path Synopsis
aws
azure
gcp
Package natsstreaming implements NATS Streaming pubsub component
Package natsstreaming implements NATS Streaming pubsub component

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL