Documentation ¶
Overview ¶
Package messaging -- this file contains utility functions for dealing with CloudEvents
Package messaging contains utility functions for dealing with a massaging service ¶
Package messaging contains utility functions for dealing with a massaging service ¶
Package messaging contains utility functions for dealing with a massaging service ¶
Package messaging contains utility functions for dealing with a massaging service ¶
Package messaging contains utility functions for dealing with a massaging service
Index ¶
- Constants
- func ConvertNats(msg *nats.Msg) (cloudevents.Event, error)
- func ConvertStan(msg *stan.Msg) (cloudevents.Event, error)
- func CreateCloudEvent(data interface{}, eventType string, source string) (cloudevents.Event, error)
- func CreateCloudEventWithTransactionID(data interface{}, eventType string, source string, ...) (cloudevents.Event, error)
- func GetTransactionID(ce *cloudevents.Event) common.TransactionID
- func NewTransactionID() common.TransactionID
- func SetTransactionID(ce *cloudevents.Event, transactionID common.TransactionID)
- type EventObservable
- type EventSource
- func (es *EventSource) AddListener(eventType common.EventType, transactionID common.TransactionID, ...) (ListenerID, error)
- func (es *EventSource) AddListenerMultiEventType(eventTypes []common.EventType, transactionID common.TransactionID, ...) (ListenerID, error)
- func (es *EventSource) RemoveListenerByID(id ListenerID)
- type Listener
- type ListenerCallback
- type ListenerID
- type MockNatsConnection
- func (conn *MockNatsConnection) AddCloudEventHandler(subject common.QueryOp, eventHandler QueryCloudEventHandler) error
- func (conn *MockNatsConnection) AddDefaultCloudEventHandler(eventHandler QueryCloudEventHandler) error
- func (conn *MockNatsConnection) AddDefaultEventHandler(eventHandler QueryEventHandler) error
- func (conn *MockNatsConnection) AddEventHandler(subject common.QueryOp, eventHandler QueryEventHandler) error
- func (conn *MockNatsConnection) Disconnect() error
- func (conn *MockNatsConnection) Request(ctx context.Context, subject common.QueryOp, data interface{}) ([]byte, error)
- func (conn *MockNatsConnection) RequestCloudEvent(ctx context.Context, ce *cloudevents.Event) ([]byte, error)
- func (conn *MockNatsConnection) RequestWithTransactionID(ctx context.Context, subject common.QueryOp, data interface{}, ...) ([]byte, error)
- type MockNatsMessage
- type MockStanConnection
- func (conn *MockStanConnection) AddCloudEventHandler(subject common.EventType, eventHandler StreamingCloudEventHandler) error
- func (conn *MockStanConnection) AddDefaultCloudEventHandler(eventHandler StreamingCloudEventHandler) error
- func (conn *MockStanConnection) AddDefaultEventHandler(eventHandler StreamingEventHandler) error
- func (conn *MockStanConnection) AddEventHandler(subject common.EventType, eventHandler StreamingEventHandler) error
- func (conn *MockStanConnection) Disconnect() error
- func (conn *MockStanConnection) Publish(subject common.EventType, data interface{}) error
- func (conn *MockStanConnection) PublishCloudEvent(ce *cloudevents.Event) error
- func (conn *MockStanConnection) PublishWithTransactionID(subject common.EventType, data interface{}, transactionID common.TransactionID) error
- type NatsConfig
- type NatsConnection
- func (conn *NatsConnection) AddCloudEventHandler(subject common.QueryOp, eventHandler QueryCloudEventHandler) error
- func (conn *NatsConnection) AddDefaultCloudEventHandler(eventHandler QueryCloudEventHandler) error
- func (conn *NatsConnection) AddDefaultEventHandler(eventHandler QueryEventHandler) error
- func (conn *NatsConnection) AddEventHandler(subject common.QueryOp, eventHandler QueryEventHandler) error
- func (conn *NatsConnection) Disconnect() error
- func (conn *NatsConnection) Request(ctx context.Context, subject common.QueryOp, data interface{}) ([]byte, error)
- func (conn *NatsConnection) RequestCloudEvent(ctx context.Context, ce *cloudevents.Event) ([]byte, error)
- func (conn *NatsConnection) RequestWithTransactionID(ctx context.Context, subject common.QueryOp, data interface{}, ...) ([]byte, error)
- type QueryCloudEventHandler
- type QueryEventHandler
- type QueryEventHandlerMapping
- type QueryEventService
- type StanConfig
- type StanConnection
- func (conn *StanConnection) AddCloudEventHandler(subject common.EventType, eventHandler StreamingCloudEventHandler) error
- func (conn *StanConnection) AddDefaultCloudEventHandler(eventHandler StreamingCloudEventHandler) error
- func (conn *StanConnection) AddDefaultEventHandler(eventHandler StreamingEventHandler) error
- func (conn *StanConnection) AddEventHandler(subject common.EventType, eventHandler StreamingEventHandler) error
- func (conn *StanConnection) Disconnect() error
- func (conn *StanConnection) Publish(subject common.EventType, data interface{}) error
- func (conn *StanConnection) PublishCloudEvent(ce *cloudevents.Event) error
- func (conn *StanConnection) PublishWithTransactionID(subject common.EventType, data interface{}, transactionID common.TransactionID) error
- type StreamingCloudEventHandler
- type StreamingEventHandler
- type StreamingEventHandlerMapping
- type StreamingEventService
Constants ¶
const ( // DefaultNatsURL is a default nats URL DefaultNatsURL = "nats://nats:4222" // DefaultNatsClusterID is a default Cluster ID for cacao DefaultNatsClusterID = "cacao-cluster" // DefaultNatsMaxReconnect is default max reconnect trials DefaultNatsMaxReconnect = 6 // max times to reconnect within nats.connect() // DefaultNatsReconnectWait is a default delay for next reconnect DefaultNatsReconnectWait = 10 * time.Second // seconds to wait within nats.connect() // DefaultNatsRequestTimeout is default timeout for requests DefaultNatsRequestTimeout = 5 * time.Second // timeout for requests )
const ( // DefaultStanEventsTimeout is a default timeout DefaultStanEventsTimeout = 10 * time.Second // used to wait for event ops to complete, is very conservation // DefaultStanAckWaitTime is a default wait time for ack wait DefaultStanAckWaitTime = 60 * time.Second )
Variables ¶
This section is empty.
Functions ¶
func ConvertNats ¶
func ConvertNats(msg *nats.Msg) (cloudevents.Event, error)
ConvertNats converts NATS message to CloudEvents message
func ConvertStan ¶
func ConvertStan(msg *stan.Msg) (cloudevents.Event, error)
ConvertStan converts STAN message to CloudEvents message
func CreateCloudEvent ¶
func CreateCloudEvent(data interface{}, eventType string, source string) (cloudevents.Event, error)
CreateCloudEvent takes any object, eventType string, source string, and creates a resulting CloudEvent This utility provides the following conveniences: * uniformly assigns a new id of the format "cloudevent-" + xid * sets the time to UTC * generically marshals the data to json and appropriately assigns the cloudevent type * creates or sets a transaction id, which can later be used to pair requests to responses or corresponding events
func CreateCloudEventWithTransactionID ¶
func CreateCloudEventWithTransactionID(data interface{}, eventType string, source string, transactionID common.TransactionID) (cloudevents.Event, error)
CreateCloudEventWithTransactionID creates a CloudEvent with given transactionID, transactionID is optional
func GetTransactionID ¶
func GetTransactionID(ce *cloudevents.Event) common.TransactionID
GetTransactionID is a utility function that reads TransactionID from CloudEvent quick
func NewTransactionID ¶
func NewTransactionID() common.TransactionID
NewTransactionID is a utility function that will generate a CACAO-specific transaction ID of the form tid-<xid> Replies and events generated from a specific request should include the transaction id for easier matching of requests
func SetTransactionID ¶
func SetTransactionID(ce *cloudevents.Event, transactionID common.TransactionID)
SetTransactionID is a utility function that writes TransactionID to CloudEvent quick
Types ¶
type EventObservable ¶
type EventObservable interface { AddListener(common.EventType, common.TransactionID, Listener) (ListenerID, error) // AddListenerMultiEventType register a listener to a set of common.EventType AddListenerMultiEventType([]common.EventType, common.TransactionID, Listener) (ListenerID, error) RemoveListenerByID(id ListenerID) }
EventObservable is an interface that allow dynamically adding and removing event listener.
type EventSource ¶
type EventSource struct {
// contains filtered or unexported fields
}
EventSource dispatch events to registered listeners. Listeners can be registered dynamically.
func NewEventSource ¶
func NewEventSource(natsConf NatsConfig, stanConf StanConfig) *EventSource
NewEventSource creates a new EventSource
func NewEventSourceFromStanConnection ¶
func NewEventSourceFromStanConnection(conn *StanConnection) (*EventSource, error)
NewEventSourceFromStanConnection creates a EventSource from StanConnection (w/o subscription), the underlying STAN connection is re-used.
func (*EventSource) AddListener ¶
func (es *EventSource) AddListener(eventType common.EventType, transactionID common.TransactionID, listener Listener) (ListenerID, error)
AddListener adds a listener for event. Use "" for eventType or transactionID as wildcard (cannot both be wildcard). Return a unique id of the listener, and connection-related error. This will automatically starts a STAN connection & subscription if not already.
func (*EventSource) AddListenerMultiEventType ¶
func (es *EventSource) AddListenerMultiEventType(eventTypes []common.EventType, transactionID common.TransactionID, listener Listener) (ListenerID, error)
AddListenerMultiEventType adds a listener for a set of event types. Use "" for eventType or transactionID as wildcard (cannot both be wildcard). Return a unique id of the listener, and connection-related error. If listener has set ListenOnce, it will be removed after being called once. This will automatically starts a STAN connection & subscription if not already.
func (*EventSource) RemoveListenerByID ¶
func (es *EventSource) RemoveListenerByID(id ListenerID)
RemoveListenerByID removes a listener. This will disconnect & unsubscribe if the last listener is removed.
type Listener ¶
type Listener struct { Callback ListenerCallback // if enabled, then listener will be removed after been called once. // default to false. ListenOnce bool }
Listener is callback registration for some EventType and/or TransactionID
type ListenerCallback ¶
type ListenerCallback func(ev common.EventType, ce cloudevents.Event)
ListenerCallback is the callback function for a Listener
type MockNatsConnection ¶
type MockNatsConnection struct { Config *NatsConfig CommonSubjectRegex *regexp.Regexp HandlerLock sync.Mutex EventHandlers map[common.QueryOp]QueryEventHandler DefaultEventHandler QueryEventHandler CloudEventHandlers map[common.QueryOp]QueryCloudEventHandler DefaultCloudEventHandler QueryCloudEventHandler }
MockNatsConnection mocks Nats connection implements QueryEventService
func CreateMockNatsConnection ¶
func CreateMockNatsConnection(config *NatsConfig, eventHandlerMappings []QueryEventHandlerMapping) (*MockNatsConnection, error)
CreateMockNatsConnection creates MockNatsConnection
func (*MockNatsConnection) AddCloudEventHandler ¶
func (conn *MockNatsConnection) AddCloudEventHandler(subject common.QueryOp, eventHandler QueryCloudEventHandler) error
AddCloudEventHandler adds a new event handler function to a specified subject. The subject must match the wildcard subject specified in NatsConfig.WildcardSubject eventHandler receives a cloudevent of an event, and returns a cloudevent
func (*MockNatsConnection) AddDefaultCloudEventHandler ¶
func (conn *MockNatsConnection) AddDefaultCloudEventHandler(eventHandler QueryCloudEventHandler) error
AddDefaultCloudEventHandler adds a default event handler function. The handler is called when there is no matching handlers for the subject eventHandler receives a cloudevent of an event, and returns a cloudevent
func (*MockNatsConnection) AddDefaultEventHandler ¶
func (conn *MockNatsConnection) AddDefaultEventHandler(eventHandler QueryEventHandler) error
AddDefaultEventHandler adds a default event handler function. The handler is called when there is no matching handlers for the subject eventHandler receives subject and JSON data of an event, and returns data object (not required to JSONfy)
func (*MockNatsConnection) AddEventHandler ¶
func (conn *MockNatsConnection) AddEventHandler(subject common.QueryOp, eventHandler QueryEventHandler) error
AddEventHandler adds a new event handler function to a specified subject. The subject must match the wildcard subject specified in NatsConfig.WildcardSubject eventHandler receives subject and JSON data of an event, and returns data object (not required to JSONfy)
func (*MockNatsConnection) Disconnect ¶
func (conn *MockNatsConnection) Disconnect() error
Disconnect disconnects Nats connection
func (*MockNatsConnection) Request ¶
func (conn *MockNatsConnection) Request(ctx context.Context, subject common.QueryOp, data interface{}) ([]byte, error)
Request publishes Nats event automatically wraps the data with cloud objects, and pills when it returns
func (*MockNatsConnection) RequestCloudEvent ¶
func (conn *MockNatsConnection) RequestCloudEvent(ctx context.Context, ce *cloudevents.Event) ([]byte, error)
RequestCloudEvent publishes Nats event, request and response via cloud event objects
func (*MockNatsConnection) RequestWithTransactionID ¶
func (conn *MockNatsConnection) RequestWithTransactionID(ctx context.Context, subject common.QueryOp, data interface{}, transactionID common.TransactionID) ([]byte, error)
RequestWithTransactionID publishes Nats event transactionID will be passed along to the CreateCloudEvent if given automatically wraps the data with cloud objects, and pills when it returns
type MockNatsMessage ¶
MockNatsMessage mocks Nats Msg
func (*MockNatsMessage) Respond ¶
func (msg *MockNatsMessage) Respond(data []byte) error
Respond responds
type MockStanConnection ¶
type MockStanConnection struct { NatsConfig *NatsConfig StanConfig *StanConfig HandlerLock sync.Mutex EventHandlers map[common.EventType]StreamingEventHandler DefaultEventHandler StreamingEventHandler CloudEventHandlers map[common.EventType]StreamingCloudEventHandler DefaultCloudEventHandler StreamingCloudEventHandler }
MockStanConnection mocks Stan connection implements StreamingEventService
func CreateMockStanConnection ¶
func CreateMockStanConnection(natsConfig *NatsConfig, stanConfig *StanConfig, eventHandlerMappings []StreamingEventHandlerMapping) (*MockStanConnection, error)
CreateMockStanConnection creates MockStanConnection
func (*MockStanConnection) AddCloudEventHandler ¶
func (conn *MockStanConnection) AddCloudEventHandler(subject common.EventType, eventHandler StreamingCloudEventHandler) error
AddCloudEventHandler adds a new event handler function to a specified subject eventHandler receives a cloudevent of an event
func (*MockStanConnection) AddDefaultCloudEventHandler ¶
func (conn *MockStanConnection) AddDefaultCloudEventHandler(eventHandler StreamingCloudEventHandler) error
AddDefaultCloudEventHandler adds a default event handler function. The handler is called when there is no matching handlers for the subject eventHandler receives a cloudevent of an event
func (*MockStanConnection) AddDefaultEventHandler ¶
func (conn *MockStanConnection) AddDefaultEventHandler(eventHandler StreamingEventHandler) error
AddDefaultEventHandler adds a default event handler function. The handler is called when there is no matching handlers for the subject eventHandler receives subject and JSON data of an event
func (*MockStanConnection) AddEventHandler ¶
func (conn *MockStanConnection) AddEventHandler(subject common.EventType, eventHandler StreamingEventHandler) error
AddEventHandler adds a new event handler function to a specified subject eventHandler receives subject and JSON data of an event
func (*MockStanConnection) Disconnect ¶
func (conn *MockStanConnection) Disconnect() error
Disconnect disconnects Stan connection
func (*MockStanConnection) Publish ¶
func (conn *MockStanConnection) Publish(subject common.EventType, data interface{}) error
Publish publishes Stan event
func (*MockStanConnection) PublishCloudEvent ¶
func (conn *MockStanConnection) PublishCloudEvent(ce *cloudevents.Event) error
PublishCloudEvent publishes Stan event
func (*MockStanConnection) PublishWithTransactionID ¶
func (conn *MockStanConnection) PublishWithTransactionID(subject common.EventType, data interface{}, transactionID common.TransactionID) error
PublishWithTransactionID publishes Stan event
type NatsConfig ¶
type NatsConfig struct { URL string `envconfig:"NATS_URL" default:"nats://nats:4222"` QueueGroup string `envconfig:"NATS_QUEUE_GROUP"` WildcardSubject string `envconfig:"NATS_WILDCARD_SUBJECT" default:"cyverse.>"` // WildcardSubject field is optional, only used for NATS Query ClientID string `envconfig:"NATS_CLIENT_ID"` // While not strictly used by Nats, this is propagated into the cloudevent MaxReconnects int `envconfig:"NATS_MAX_RECONNECTS" default:"-1"` // implementation should default to DefaultNatsMaxReconnect ReconnectWait int `envconfig:"NATS_RECONNECT_WAIT" default:"-1"` // in seconds, implementation should default to RequestTimeout int `envconfig:"NATS_REQUEST_TIMEOUT" default:"-1"` }
NatsConfig stores configurations used by both nats query channel and nats streaming
type NatsConnection ¶
type NatsConnection struct { Config *NatsConfig Connection *nats.Conn Subscription *nats.Subscription HandlerLock sync.Mutex EventHandlers map[common.QueryOp]QueryEventHandler DefaultEventHandler QueryEventHandler CloudEventHandlers map[common.QueryOp]QueryCloudEventHandler DefaultCloudEventHandler QueryCloudEventHandler }
NatsConnection contains Nats connection info
func ConnectNatsForService ¶
func ConnectNatsForService(config *NatsConfig, eventHandlerMappings []QueryEventHandlerMapping) (*NatsConnection, error)
ConnectNatsForService connects to Nats
func ConnectNatsForServiceClient ¶
func ConnectNatsForServiceClient(config *NatsConfig) (*NatsConnection, error)
ConnectNatsForServiceClient connects to Nats for service clients who send events
func (*NatsConnection) AddCloudEventHandler ¶
func (conn *NatsConnection) AddCloudEventHandler(subject common.QueryOp, eventHandler QueryCloudEventHandler) error
AddCloudEventHandler adds a new event handler function to a specified subject. The subject must match the wildcard subject specified in NatsConfig.WildcardSubject eventHandler receives a cloudevent of an event, and returns a cloudevent
func (*NatsConnection) AddDefaultCloudEventHandler ¶
func (conn *NatsConnection) AddDefaultCloudEventHandler(eventHandler QueryCloudEventHandler) error
AddDefaultCloudEventHandler adds a default event handler function. The handler is called when there is no matching handlers for the subject eventHandler receives a cloudevent of an event, and returns a cloudevent
func (*NatsConnection) AddDefaultEventHandler ¶
func (conn *NatsConnection) AddDefaultEventHandler(eventHandler QueryEventHandler) error
AddDefaultEventHandler adds a default event handler function. The handler is called when there is no matching handlers for the subject eventHandler receives subject and JSON data of an event, and returns data object (not required to JSONfy)
func (*NatsConnection) AddEventHandler ¶
func (conn *NatsConnection) AddEventHandler(subject common.QueryOp, eventHandler QueryEventHandler) error
AddEventHandler adds a new event handler function to a specified subject. The subject must match the wildcard subject specified in NatsConfig.WildcardSubject eventHandler receives subject and JSON data of an event, and returns data object (not required to JSONfy)
func (*NatsConnection) Disconnect ¶
func (conn *NatsConnection) Disconnect() error
Disconnect disconnects Nats connection
func (*NatsConnection) Request ¶
func (conn *NatsConnection) Request(ctx context.Context, subject common.QueryOp, data interface{}) ([]byte, error)
Request publishes Nats event automatically wraps the data with cloud objects, and pills when it returns
func (*NatsConnection) RequestCloudEvent ¶
func (conn *NatsConnection) RequestCloudEvent(ctx context.Context, ce *cloudevents.Event) ([]byte, error)
RequestCloudEvent publishes Nats event, request and response via cloud event objects
func (*NatsConnection) RequestWithTransactionID ¶
func (conn *NatsConnection) RequestWithTransactionID(ctx context.Context, subject common.QueryOp, data interface{}, transactionID common.TransactionID) ([]byte, error)
RequestWithTransactionID publishes Nats event transactionID will be passed along to the CreateCloudEvent if given automatically wraps the data with cloud objects, and pills when it returns
type QueryCloudEventHandler ¶
type QueryCloudEventHandler func(event *cloudevents.Event) ([]byte, error)
QueryCloudEventHandler is a function prototype for query event handlers
type QueryEventHandler ¶
type QueryEventHandler func(subject common.QueryOp, transactionID common.TransactionID, jsonData []byte) ([]byte, error)
QueryEventHandler is a function prototype for query event handlers
type QueryEventHandlerMapping ¶
type QueryEventHandlerMapping struct { Subject common.QueryOp // required, empty string for default handler // optional, set CloudEventHandler or EventHandler CloudEventHandler QueryCloudEventHandler EventHandler QueryEventHandler }
QueryEventHandlerMapping is a struct that maps query operation and query event handler
type QueryEventService ¶
type QueryEventService interface { Disconnect() error AddEventHandler(subject common.QueryOp, eventHandler QueryEventHandler) error AddDefaultEventHandler(eventHandler QueryEventHandler) error AddCloudEventHandler(subject common.QueryOp, eventHandler QueryCloudEventHandler) error AddDefaultCloudEventHandler(eventHandler QueryCloudEventHandler) error Request(ctx context.Context, subject common.QueryOp, data interface{}) ([]byte, error) RequestWithTransactionID(ctx context.Context, subject common.QueryOp, data interface{}, transactionID common.TransactionID) ([]byte, error) RequestCloudEvent(ctx context.Context, ce *cloudevents.Event) ([]byte, error) }
QueryEventService is an interface for a query event service (e.g., Nats)
type StanConfig ¶
type StanConfig struct { ClusterID string `envconfig:"NATS_CLUSTER_ID" default:"cacao-cluster"` DurableName string `envconfig:"NATS_DURABLE_NAME"` EventsTimeout int `envconfig:"NATS_EVENTS_TIMEOUT" default:"-1"` }
StanConfig stores additional configurations used by nats streaming, used along with NatsConfig DefaultNatsReconnectWait
type StanConnection ¶
type StanConnection struct { NatsConfig *NatsConfig StanConfig *StanConfig Connection stan.Conn Subscription stan.Subscription HandlerLock sync.Mutex EventHandlers map[common.EventType]StreamingEventHandler DefaultEventHandler StreamingEventHandler CloudEventHandlers map[common.EventType]StreamingCloudEventHandler DefaultCloudEventHandler StreamingCloudEventHandler }
StanConnection contains Stan connection info
func ConnectStanForService ¶
func ConnectStanForService(natsConfig *NatsConfig, stanConfig *StanConfig, eventHandlerMappings []StreamingEventHandlerMapping) (*StanConnection, error)
ConnectStanForService connects to Stan
func ConnectStanForServiceClient ¶
func ConnectStanForServiceClient(natsConfig *NatsConfig, stanConfig *StanConfig) (*StanConnection, error)
ConnectStanForServiceClient connects to Stan for service clients who send events with no subscription
func (*StanConnection) AddCloudEventHandler ¶
func (conn *StanConnection) AddCloudEventHandler(subject common.EventType, eventHandler StreamingCloudEventHandler) error
AddCloudEventHandler adds a new event handler function to a specified subject eventHandler receives a cloudevent of an event
func (*StanConnection) AddDefaultCloudEventHandler ¶
func (conn *StanConnection) AddDefaultCloudEventHandler(eventHandler StreamingCloudEventHandler) error
AddDefaultCloudEventHandler adds a default event handler function. The handler is called when there is no matching handlers for the subject eventHandler receives a cloudevent of an event
func (*StanConnection) AddDefaultEventHandler ¶
func (conn *StanConnection) AddDefaultEventHandler(eventHandler StreamingEventHandler) error
AddDefaultEventHandler adds a default event handler function. The handler is called when there is no matching handlers for the subject eventHandler receives subject and JSON data of an event
func (*StanConnection) AddEventHandler ¶
func (conn *StanConnection) AddEventHandler(subject common.EventType, eventHandler StreamingEventHandler) error
AddEventHandler adds a new event handler function to a specified subject eventHandler receives subject and JSON data of an event
func (*StanConnection) Disconnect ¶
func (conn *StanConnection) Disconnect() error
Disconnect disconnects Stan connection
func (*StanConnection) Publish ¶
func (conn *StanConnection) Publish(subject common.EventType, data interface{}) error
Publish publishes Stan event
func (*StanConnection) PublishCloudEvent ¶
func (conn *StanConnection) PublishCloudEvent(ce *cloudevents.Event) error
PublishCloudEvent publishes Stan event
func (*StanConnection) PublishWithTransactionID ¶
func (conn *StanConnection) PublishWithTransactionID(subject common.EventType, data interface{}, transactionID common.TransactionID) error
PublishWithTransactionID publishes Stan event transactionID will be passed along to the CreateCloudEvent
type StreamingCloudEventHandler ¶
type StreamingCloudEventHandler func(event *cloudevents.Event) error
StreamingCloudEventHandler is a function prototype for streaming event handlers
type StreamingEventHandler ¶
type StreamingEventHandler func(subject common.EventType, transactionID common.TransactionID, jsonData []byte) error
StreamingEventHandler is a function prototype for streaming event handlers
type StreamingEventHandlerMapping ¶
type StreamingEventHandlerMapping struct { Subject common.EventType // required, empty string for default handler // optional, set CloudEventHandler or EventHandler CloudEventHandler StreamingCloudEventHandler EventHandler StreamingEventHandler }
StreamingEventHandlerMapping is a struct that maps event type and event handler
type StreamingEventService ¶
type StreamingEventService interface { Disconnect() error AddEventHandler(subject common.EventType, eventHandler StreamingEventHandler) error AddDefaultEventHandler(eventHandler StreamingEventHandler) error AddCloudEventHandler(subject common.EventType, eventHandler StreamingCloudEventHandler) error AddDefaultCloudEventHandler(eventHandler StreamingCloudEventHandler) error Publish(subject common.EventType, data interface{}) error PublishWithTransactionID(subject common.EventType, data interface{}, transactionID common.TransactionID) error PublishCloudEvent(ce *cloudevents.Event) error }
StreamingEventService is an interface for a streaming event service (e.g., Stan)