messaging

package
v0.0.0-...-2e56a1b Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 22, 2024 License: BSD-3-Clause Imports: 14 Imported by: 6

README

The standard message format for CACAO is the CloudEvent. The following is an example a generic CloudEvent in CACAO for a mythical microservice:

{
    "specversion": "1.0",
    "type": "org.cyverse.events.MythicalCreated",
    "source": "https://gitlab.com/cyverse/mythical-microservice/mythical-85f9bbcd97-kttdg",
    "id": "cloudevent-c0usj6qljdkc1t9civvg",
    "time": "2020-02-28T12:13:39.4589254Z",
    "datacontenttype": "application/json; charset=utf-8",
    "data": {
        ...
    }
}

Attributes

specversion

specversion will currently always be "1.0"

type

type will contain the message type as related to the query operation or event. Per the CloudEvent spec, this value is prefixed with reverse dns name starting with "org.cyverse" and directly maps to query channels and events. Examples include:

"org.cyverse.mythical.Get"
"org.cyverse.events.MythicalDeleted"
"org.cyverse.workspaces.List"
"org.cyverse.events.WorkspaceCreated"
"org.cyverse.users.Get"
"org.cyverse.users.List"
"org.cyverse.events.UserUpdated"
source

source will contain the URI for the (micro)service that created the message. If relevant, then the Pod Name should also appended at the end of the URL following the microservice name. Examples include

https://gitlab.com/cyverse/mythical-microservice/{POD_NAME}
https://gitlab.com/cyverse/users-microservice/{POD_NAME}
https://gitlab.com/cyverse/wiretap-microservice/{POD_NAME}
https://gitlab.com/cyverse/nafigos/api-service/{POD_NAME}
https://gitlab.com/cyverse/nafigos/workspace-service/{POD_NAME}
https://gitlab.com/cyverse/nafigos/credential-service/{POD_NAME}
id

id will contain "cloudevent-" concatenated with an xid. Examples include:

cloudevent-9m4e2mr0ui3e8a215n4g
time

time will contain the message creation timestamp in ISO 8601 format in UTC timezone. In golang, this is can be generated with the following code:

time.Now().UTC().Format(time.RFC3339)

An example of this format:

2021-03-02T05:11:12Z
datacontenttype

datacontenttype will always be "application/json; charset=utf-8"

data

data will contain the message json message body according to type.

Documentation

Overview

Package messaging -- this file contains utility functions for dealing with CloudEvents

Package messaging contains utility functions for dealing with a massaging service

Package messaging contains utility functions for dealing with a massaging service

Package messaging contains utility functions for dealing with a massaging service

Package messaging contains utility functions for dealing with a massaging service

Package messaging contains utility functions for dealing with a massaging service

Index

Constants

View Source
const (
	// DefaultNatsURL is a default nats URL
	DefaultNatsURL = "nats://nats:4222"
	// DefaultNatsClusterID is a default Cluster ID for cacao
	DefaultNatsClusterID = "cacao-cluster"
	// DefaultNatsMaxReconnect is default max reconnect trials
	DefaultNatsMaxReconnect = 6 // max times to reconnect within nats.connect()
	// DefaultNatsReconnectWait is a default delay for next reconnect
	DefaultNatsReconnectWait = 10 * time.Second // seconds to wait within nats.connect()
	// DefaultNatsRequestTimeout is default timeout for requests
	DefaultNatsRequestTimeout = 5 * time.Second // timeout for requests
)
View Source
const (
	// DefaultStanEventsTimeout is a default timeout
	DefaultStanEventsTimeout = 10 * time.Second // used to wait for event ops to complete, is very conservation
	// DefaultStanAckWaitTime is a default wait time for ack wait
	DefaultStanAckWaitTime = 60 * time.Second
)

Variables

This section is empty.

Functions

func ConvertNats

func ConvertNats(msg *nats.Msg) (cloudevents.Event, error)

ConvertNats converts NATS message to CloudEvents message

func ConvertStan

func ConvertStan(msg *stan.Msg) (cloudevents.Event, error)

ConvertStan converts STAN message to CloudEvents message

func CreateCloudEvent

func CreateCloudEvent(data interface{}, eventType string, source string) (cloudevents.Event, error)

CreateCloudEvent takes any object, eventType string, source string, and creates a resulting CloudEvent This utility provides the following conveniences: * uniformly assigns a new id of the format "cloudevent-" + xid * sets the time to UTC * generically marshals the data to json and appropriately assigns the cloudevent type * creates or sets a transaction id, which can later be used to pair requests to responses or corresponding events

func CreateCloudEventWithTransactionID

func CreateCloudEventWithTransactionID(data interface{}, eventType string, source string, transactionID common.TransactionID) (cloudevents.Event, error)

CreateCloudEventWithTransactionID creates a CloudEvent with given transactionID, transactionID is optional

func GetTransactionID

func GetTransactionID(ce *cloudevents.Event) common.TransactionID

GetTransactionID is a utility function that reads TransactionID from CloudEvent quick

func NewTransactionID

func NewTransactionID() common.TransactionID

NewTransactionID is a utility function that will generate a CACAO-specific transaction ID of the form tid-<xid> Replies and events generated from a specific request should include the transaction id for easier matching of requests

func SetTransactionID

func SetTransactionID(ce *cloudevents.Event, transactionID common.TransactionID)

SetTransactionID is a utility function that writes TransactionID to CloudEvent quick

Types

type EventObservable

type EventObservable interface {
	AddListener(common.EventType, common.TransactionID, Listener) (ListenerID, error)
	// AddListenerMultiEventType register a listener to a set of common.EventType
	AddListenerMultiEventType([]common.EventType, common.TransactionID, Listener) (ListenerID, error)
	RemoveListenerByID(id ListenerID)
}

EventObservable is an interface that allow dynamically adding and removing event listener.

type EventSource

type EventSource struct {
	// contains filtered or unexported fields
}

EventSource dispatch events to registered listeners. Listeners can be registered dynamically.

func NewEventSource

func NewEventSource(natsConf NatsConfig, stanConf StanConfig) *EventSource

NewEventSource creates a new EventSource

func NewEventSourceFromStanConnection

func NewEventSourceFromStanConnection(conn *StanConnection) (*EventSource, error)

NewEventSourceFromStanConnection creates a EventSource from StanConnection (w/o subscription), the underlying STAN connection is re-used.

func (*EventSource) AddListener

func (es *EventSource) AddListener(eventType common.EventType, transactionID common.TransactionID, listener Listener) (ListenerID, error)

AddListener adds a listener for event. Use "" for eventType or transactionID as wildcard (cannot both be wildcard). Return a unique id of the listener, and connection-related error. This will automatically starts a STAN connection & subscription if not already.

func (*EventSource) AddListenerMultiEventType

func (es *EventSource) AddListenerMultiEventType(eventTypes []common.EventType, transactionID common.TransactionID, listener Listener) (ListenerID, error)

AddListenerMultiEventType adds a listener for a set of event types. Use "" for eventType or transactionID as wildcard (cannot both be wildcard). Return a unique id of the listener, and connection-related error. If listener has set ListenOnce, it will be removed after being called once. This will automatically starts a STAN connection & subscription if not already.

func (*EventSource) RemoveListenerByID

func (es *EventSource) RemoveListenerByID(id ListenerID)

RemoveListenerByID removes a listener. This will disconnect & unsubscribe if the last listener is removed.

type Listener

type Listener struct {
	Callback ListenerCallback
	// if enabled, then listener will be removed after been called once.
	// default to false.
	ListenOnce bool
}

Listener is callback registration for some EventType and/or TransactionID

type ListenerCallback

type ListenerCallback func(ev common.EventType, ce cloudevents.Event)

ListenerCallback is the callback function for a Listener

type ListenerID

type ListenerID string

ListenerID is the ID of listener in EventSource

type MockNatsConnection

type MockNatsConnection struct {
	Config                   *NatsConfig
	CommonSubjectRegex       *regexp.Regexp
	HandlerLock              sync.Mutex
	EventHandlers            map[common.QueryOp]QueryEventHandler
	DefaultEventHandler      QueryEventHandler
	CloudEventHandlers       map[common.QueryOp]QueryCloudEventHandler
	DefaultCloudEventHandler QueryCloudEventHandler
}

MockNatsConnection mocks Nats connection implements QueryEventService

func CreateMockNatsConnection

func CreateMockNatsConnection(config *NatsConfig, eventHandlerMappings []QueryEventHandlerMapping) (*MockNatsConnection, error)

CreateMockNatsConnection creates MockNatsConnection

func (*MockNatsConnection) AddCloudEventHandler

func (conn *MockNatsConnection) AddCloudEventHandler(subject common.QueryOp, eventHandler QueryCloudEventHandler) error

AddCloudEventHandler adds a new event handler function to a specified subject. The subject must match the wildcard subject specified in NatsConfig.WildcardSubject eventHandler receives a cloudevent of an event, and returns a cloudevent

func (*MockNatsConnection) AddDefaultCloudEventHandler

func (conn *MockNatsConnection) AddDefaultCloudEventHandler(eventHandler QueryCloudEventHandler) error

AddDefaultCloudEventHandler adds a default event handler function. The handler is called when there is no matching handlers for the subject eventHandler receives a cloudevent of an event, and returns a cloudevent

func (*MockNatsConnection) AddDefaultEventHandler

func (conn *MockNatsConnection) AddDefaultEventHandler(eventHandler QueryEventHandler) error

AddDefaultEventHandler adds a default event handler function. The handler is called when there is no matching handlers for the subject eventHandler receives subject and JSON data of an event, and returns data object (not required to JSONfy)

func (*MockNatsConnection) AddEventHandler

func (conn *MockNatsConnection) AddEventHandler(subject common.QueryOp, eventHandler QueryEventHandler) error

AddEventHandler adds a new event handler function to a specified subject. The subject must match the wildcard subject specified in NatsConfig.WildcardSubject eventHandler receives subject and JSON data of an event, and returns data object (not required to JSONfy)

func (*MockNatsConnection) Disconnect

func (conn *MockNatsConnection) Disconnect() error

Disconnect disconnects Nats connection

func (*MockNatsConnection) Request

func (conn *MockNatsConnection) Request(ctx context.Context, subject common.QueryOp, data interface{}) ([]byte, error)

Request publishes Nats event automatically wraps the data with cloud objects, and pills when it returns

func (*MockNatsConnection) RequestCloudEvent

func (conn *MockNatsConnection) RequestCloudEvent(ctx context.Context, ce *cloudevents.Event) ([]byte, error)

RequestCloudEvent publishes Nats event, request and response via cloud event objects

func (*MockNatsConnection) RequestWithTransactionID

func (conn *MockNatsConnection) RequestWithTransactionID(ctx context.Context, subject common.QueryOp, data interface{}, transactionID common.TransactionID) ([]byte, error)

RequestWithTransactionID publishes Nats event transactionID will be passed along to the CreateCloudEvent if given automatically wraps the data with cloud objects, and pills when it returns

type MockNatsMessage

type MockNatsMessage struct {
	Data         []byte
	ResponseData []byte
}

MockNatsMessage mocks Nats Msg

func (*MockNatsMessage) Respond

func (msg *MockNatsMessage) Respond(data []byte) error

Respond responds

type MockStanConnection

type MockStanConnection struct {
	NatsConfig               *NatsConfig
	StanConfig               *StanConfig
	HandlerLock              sync.Mutex
	EventHandlers            map[common.EventType]StreamingEventHandler
	DefaultEventHandler      StreamingEventHandler
	CloudEventHandlers       map[common.EventType]StreamingCloudEventHandler
	DefaultCloudEventHandler StreamingCloudEventHandler
}

MockStanConnection mocks Stan connection implements StreamingEventService

func CreateMockStanConnection

func CreateMockStanConnection(natsConfig *NatsConfig, stanConfig *StanConfig, eventHandlerMappings []StreamingEventHandlerMapping) (*MockStanConnection, error)

CreateMockStanConnection creates MockStanConnection

func (*MockStanConnection) AddCloudEventHandler

func (conn *MockStanConnection) AddCloudEventHandler(subject common.EventType, eventHandler StreamingCloudEventHandler) error

AddCloudEventHandler adds a new event handler function to a specified subject eventHandler receives a cloudevent of an event

func (*MockStanConnection) AddDefaultCloudEventHandler

func (conn *MockStanConnection) AddDefaultCloudEventHandler(eventHandler StreamingCloudEventHandler) error

AddDefaultCloudEventHandler adds a default event handler function. The handler is called when there is no matching handlers for the subject eventHandler receives a cloudevent of an event

func (*MockStanConnection) AddDefaultEventHandler

func (conn *MockStanConnection) AddDefaultEventHandler(eventHandler StreamingEventHandler) error

AddDefaultEventHandler adds a default event handler function. The handler is called when there is no matching handlers for the subject eventHandler receives subject and JSON data of an event

func (*MockStanConnection) AddEventHandler

func (conn *MockStanConnection) AddEventHandler(subject common.EventType, eventHandler StreamingEventHandler) error

AddEventHandler adds a new event handler function to a specified subject eventHandler receives subject and JSON data of an event

func (*MockStanConnection) Disconnect

func (conn *MockStanConnection) Disconnect() error

Disconnect disconnects Stan connection

func (*MockStanConnection) Publish

func (conn *MockStanConnection) Publish(subject common.EventType, data interface{}) error

Publish publishes Stan event

func (*MockStanConnection) PublishCloudEvent

func (conn *MockStanConnection) PublishCloudEvent(ce *cloudevents.Event) error

PublishCloudEvent publishes Stan event

func (*MockStanConnection) PublishWithTransactionID

func (conn *MockStanConnection) PublishWithTransactionID(subject common.EventType, data interface{}, transactionID common.TransactionID) error

PublishWithTransactionID publishes Stan event

type NatsConfig

type NatsConfig struct {
	URL             string `envconfig:"NATS_URL" default:"nats://nats:4222"`
	QueueGroup      string `envconfig:"NATS_QUEUE_GROUP"`
	WildcardSubject string `envconfig:"NATS_WILDCARD_SUBJECT" default:"cyverse.>"` // WildcardSubject field is optional, only used for NATS Query
	ClientID        string `envconfig:"NATS_CLIENT_ID"`                            // While not strictly used by Nats, this is propagated into the cloudevent
	MaxReconnects   int    `envconfig:"NATS_MAX_RECONNECTS" default:"-1"`          // implementation should default to DefaultNatsMaxReconnect
	ReconnectWait   int    `envconfig:"NATS_RECONNECT_WAIT" default:"-1"`          // in seconds, implementation should default to
	RequestTimeout  int    `envconfig:"NATS_REQUEST_TIMEOUT" default:"-1"`
}

NatsConfig stores configurations used by both nats query channel and nats streaming

type NatsConnection

type NatsConnection struct {
	Config                   *NatsConfig
	Connection               *nats.Conn
	Subscription             *nats.Subscription
	HandlerLock              sync.Mutex
	EventHandlers            map[common.QueryOp]QueryEventHandler
	DefaultEventHandler      QueryEventHandler
	CloudEventHandlers       map[common.QueryOp]QueryCloudEventHandler
	DefaultCloudEventHandler QueryCloudEventHandler
}

NatsConnection contains Nats connection info

func ConnectNatsForService

func ConnectNatsForService(config *NatsConfig, eventHandlerMappings []QueryEventHandlerMapping) (*NatsConnection, error)

ConnectNatsForService connects to Nats

func ConnectNatsForServiceClient

func ConnectNatsForServiceClient(config *NatsConfig) (*NatsConnection, error)

ConnectNatsForServiceClient connects to Nats for service clients who send events

func (*NatsConnection) AddCloudEventHandler

func (conn *NatsConnection) AddCloudEventHandler(subject common.QueryOp, eventHandler QueryCloudEventHandler) error

AddCloudEventHandler adds a new event handler function to a specified subject. The subject must match the wildcard subject specified in NatsConfig.WildcardSubject eventHandler receives a cloudevent of an event, and returns a cloudevent

func (*NatsConnection) AddDefaultCloudEventHandler

func (conn *NatsConnection) AddDefaultCloudEventHandler(eventHandler QueryCloudEventHandler) error

AddDefaultCloudEventHandler adds a default event handler function. The handler is called when there is no matching handlers for the subject eventHandler receives a cloudevent of an event, and returns a cloudevent

func (*NatsConnection) AddDefaultEventHandler

func (conn *NatsConnection) AddDefaultEventHandler(eventHandler QueryEventHandler) error

AddDefaultEventHandler adds a default event handler function. The handler is called when there is no matching handlers for the subject eventHandler receives subject and JSON data of an event, and returns data object (not required to JSONfy)

func (*NatsConnection) AddEventHandler

func (conn *NatsConnection) AddEventHandler(subject common.QueryOp, eventHandler QueryEventHandler) error

AddEventHandler adds a new event handler function to a specified subject. The subject must match the wildcard subject specified in NatsConfig.WildcardSubject eventHandler receives subject and JSON data of an event, and returns data object (not required to JSONfy)

func (*NatsConnection) Disconnect

func (conn *NatsConnection) Disconnect() error

Disconnect disconnects Nats connection

func (*NatsConnection) Request

func (conn *NatsConnection) Request(ctx context.Context, subject common.QueryOp, data interface{}) ([]byte, error)

Request publishes Nats event automatically wraps the data with cloud objects, and pills when it returns

func (*NatsConnection) RequestCloudEvent

func (conn *NatsConnection) RequestCloudEvent(ctx context.Context, ce *cloudevents.Event) ([]byte, error)

RequestCloudEvent publishes Nats event, request and response via cloud event objects

func (*NatsConnection) RequestWithTransactionID

func (conn *NatsConnection) RequestWithTransactionID(ctx context.Context, subject common.QueryOp, data interface{}, transactionID common.TransactionID) ([]byte, error)

RequestWithTransactionID publishes Nats event transactionID will be passed along to the CreateCloudEvent if given automatically wraps the data with cloud objects, and pills when it returns

type QueryCloudEventHandler

type QueryCloudEventHandler func(event *cloudevents.Event) ([]byte, error)

QueryCloudEventHandler is a function prototype for query event handlers

type QueryEventHandler

type QueryEventHandler func(subject common.QueryOp, transactionID common.TransactionID, jsonData []byte) ([]byte, error)

QueryEventHandler is a function prototype for query event handlers

type QueryEventHandlerMapping

type QueryEventHandlerMapping struct {
	Subject common.QueryOp // required, empty string for default handler

	// optional, set CloudEventHandler or EventHandler
	CloudEventHandler QueryCloudEventHandler
	EventHandler      QueryEventHandler
}

QueryEventHandlerMapping is a struct that maps query operation and query event handler

type QueryEventService

type QueryEventService interface {
	Disconnect() error

	AddEventHandler(subject common.QueryOp, eventHandler QueryEventHandler) error
	AddDefaultEventHandler(eventHandler QueryEventHandler) error
	AddCloudEventHandler(subject common.QueryOp, eventHandler QueryCloudEventHandler) error
	AddDefaultCloudEventHandler(eventHandler QueryCloudEventHandler) error
	Request(ctx context.Context, subject common.QueryOp, data interface{}) ([]byte, error)
	RequestWithTransactionID(ctx context.Context, subject common.QueryOp, data interface{}, transactionID common.TransactionID) ([]byte, error)
	RequestCloudEvent(ctx context.Context, ce *cloudevents.Event) ([]byte, error)
}

QueryEventService is an interface for a query event service (e.g., Nats)

type StanConfig

type StanConfig struct {
	ClusterID     string `envconfig:"NATS_CLUSTER_ID" default:"cacao-cluster"`
	DurableName   string `envconfig:"NATS_DURABLE_NAME"`
	EventsTimeout int    `envconfig:"NATS_EVENTS_TIMEOUT" default:"-1"`
}

StanConfig stores additional configurations used by nats streaming, used along with NatsConfig DefaultNatsReconnectWait

type StanConnection

type StanConnection struct {
	NatsConfig               *NatsConfig
	StanConfig               *StanConfig
	Connection               stan.Conn
	Subscription             stan.Subscription
	HandlerLock              sync.Mutex
	EventHandlers            map[common.EventType]StreamingEventHandler
	DefaultEventHandler      StreamingEventHandler
	CloudEventHandlers       map[common.EventType]StreamingCloudEventHandler
	DefaultCloudEventHandler StreamingCloudEventHandler
}

StanConnection contains Stan connection info

func ConnectStanForService

func ConnectStanForService(natsConfig *NatsConfig, stanConfig *StanConfig, eventHandlerMappings []StreamingEventHandlerMapping) (*StanConnection, error)

ConnectStanForService connects to Stan

func ConnectStanForServiceClient

func ConnectStanForServiceClient(natsConfig *NatsConfig, stanConfig *StanConfig) (*StanConnection, error)

ConnectStanForServiceClient connects to Stan for service clients who send events with no subscription

func (*StanConnection) AddCloudEventHandler

func (conn *StanConnection) AddCloudEventHandler(subject common.EventType, eventHandler StreamingCloudEventHandler) error

AddCloudEventHandler adds a new event handler function to a specified subject eventHandler receives a cloudevent of an event

func (*StanConnection) AddDefaultCloudEventHandler

func (conn *StanConnection) AddDefaultCloudEventHandler(eventHandler StreamingCloudEventHandler) error

AddDefaultCloudEventHandler adds a default event handler function. The handler is called when there is no matching handlers for the subject eventHandler receives a cloudevent of an event

func (*StanConnection) AddDefaultEventHandler

func (conn *StanConnection) AddDefaultEventHandler(eventHandler StreamingEventHandler) error

AddDefaultEventHandler adds a default event handler function. The handler is called when there is no matching handlers for the subject eventHandler receives subject and JSON data of an event

func (*StanConnection) AddEventHandler

func (conn *StanConnection) AddEventHandler(subject common.EventType, eventHandler StreamingEventHandler) error

AddEventHandler adds a new event handler function to a specified subject eventHandler receives subject and JSON data of an event

func (*StanConnection) Disconnect

func (conn *StanConnection) Disconnect() error

Disconnect disconnects Stan connection

func (*StanConnection) Publish

func (conn *StanConnection) Publish(subject common.EventType, data interface{}) error

Publish publishes Stan event

func (*StanConnection) PublishCloudEvent

func (conn *StanConnection) PublishCloudEvent(ce *cloudevents.Event) error

PublishCloudEvent publishes Stan event

func (*StanConnection) PublishWithTransactionID

func (conn *StanConnection) PublishWithTransactionID(subject common.EventType, data interface{}, transactionID common.TransactionID) error

PublishWithTransactionID publishes Stan event transactionID will be passed along to the CreateCloudEvent

type StreamingCloudEventHandler

type StreamingCloudEventHandler func(event *cloudevents.Event) error

StreamingCloudEventHandler is a function prototype for streaming event handlers

type StreamingEventHandler

type StreamingEventHandler func(subject common.EventType, transactionID common.TransactionID, jsonData []byte) error

StreamingEventHandler is a function prototype for streaming event handlers

type StreamingEventHandlerMapping

type StreamingEventHandlerMapping struct {
	Subject common.EventType // required, empty string for default handler

	// optional, set CloudEventHandler or EventHandler
	CloudEventHandler StreamingCloudEventHandler
	EventHandler      StreamingEventHandler
}

StreamingEventHandlerMapping is a struct that maps event type and event handler

type StreamingEventService

type StreamingEventService interface {
	Disconnect() error

	AddEventHandler(subject common.EventType, eventHandler StreamingEventHandler) error
	AddDefaultEventHandler(eventHandler StreamingEventHandler) error
	AddCloudEventHandler(subject common.EventType, eventHandler StreamingCloudEventHandler) error
	AddDefaultCloudEventHandler(eventHandler StreamingCloudEventHandler) error
	Publish(subject common.EventType, data interface{}) error
	PublishWithTransactionID(subject common.EventType, data interface{}, transactionID common.TransactionID) error
	PublishCloudEvent(ce *cloudevents.Event) error
}

StreamingEventService is an interface for a streaming event service (e.g., Stan)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL