hevent

package module
v0.0.0-...-81c1d68 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 28, 2022 License: MIT Imports: 11 Imported by: 1

README

hexa event implement events feature for Hexa

Install

go get github.com/kamva/hexa-event

Supported message queues and event streaming platforms :

  • Kafka & Kafka outbox pattern (supports deduplication)
  • Pulsar
  • Nats streaming

Known errors :

  • when two time subscribe with same subscription name with subscription type : pulsar.Exclusive on pulsar driver, you get this error :
server error: ConsumerBusy: Exclusive consumer is already connected

Proposal:

  • Remove the HandlerContext as first param and get error as return param of EventHandler, if you got an error, so return negative signal and log the error, otherwise return positive signal to the event broker.
  • Remove the err param as lat para of each handler, if occured error, so just log it and send nack, because we should not get any error in our app, if we get error on an event, so we don't need to call to the handler, we need to fix it.
  • Transactional outbox: we need to implement another emitter for each each driver which instead of emit, it prepare the message to emit later. we provide the hexa Context with its propagators, maybe we need to provide some data with that event which is avaiable in that time like hexa Context, we should convert message to raw message and do what we need to do to send a message, then store it in that driver's store.
  • After that we also need to another emitter interface which gets each document and emit that event directory. we also have another option to recreate event and emit it using noraml emiter, but this is not good, we don't need to recreate the event, also we have some data like propaged hexa Context which we realy don't need to reacreate it to emit event, some emittters like pulsar emitter add some data on sending event like "send_time", we should implement another emitter for db events, which does not use the normal emitter interface, because it need to get the events. we use this or some other thing which gets an struct which return db doc and on get new doc, it should emit it.

Todo:

  • Add support of protocol buffer to send events.
  • Add Extra []interface{} option to the SubscriptionOptions to get more features on each subscription relative to each driver. remove list of options in consumerGenerator(we can generate without a consumer generator or simple consumer generator) [Accepted].
  • Implement nats-streaming driver
  • Implement a new background process to remove old messages in the kafka outbox pattern.
  • Implement Mock driver
  • Write Tests.
  • Implement mock
  • Add badges to readme.
  • CI.

Implementation notes:

  • If you need to handle message deduplication, you should add following keys to your hexa context:
    • HEXA_EVENT_ID: A unique ID for the current event.
    • HEXA_EVENT_HANDLER_ACTION_NAME: The action's name that your event handler want to do. we use this value to do not conflict deduplication when you listen to one event multiple times for multiple actions in one microservice.
    • HEXA_ROOT_EVENT_ID: Its id of the base event in retry events (the root event's id). for the root event itself is equal to the eventId.
    • HEXA_ROOT_EVENT_HANDLER_ACTION_NAME: The action's name of the root event (the base event of the retry events)

Documentation

Overview

Package hevent open telemetry keys

Index

Constants

View Source
const (
	// HexaEventID is id of the event we use as key in hexa Context.
	HexaEventID = "HEXA_EVENT_ID"
	// HexaEventHandlerActionName is action's name that event handler wants to do.
	HexaEventHandlerActionName = "HEXA_EVENT_HANDLER_ACTION_NAME"
	// HexaRootEventID is id of the root event for retry events.
	HexaRootEventID = "HEXA_ROOT_EVENT_ID"
	// HexaRootEventHandlerActionName is action's name of the root event for retry events.
	HexaRootEventHandlerActionName = "HEXA_ROOT_EVENT_HANDLER_ACTION_NAME"
)
View Source
const (
	HeaderKeyReplyChannel   = "_reply_channel"
	HeaderKeyPayloadEncoder = "_payload_encoder" // the message body.
)
View Source
const Version = "1.0.0"

Version is the package current version.

Variables

View Source
var (
	MessagingActionName     = attribute.Key("messaging.action.name")
	MessagingRootActionName = attribute.Key("messaging.action.root.name")
)

open telemetry attribute keys:

View Source
var (
	MessagingWithError = attribute.Key("messaging.with_error")
)

Functions

func SubscribeMulti

func SubscribeMulti(r Receiver, options ...*SubscriptionOptions) error

SubscribeMulti subscribes to multiple channel.

Types

type Decoder

type Decoder interface {
	// Decode decodes payload to the provided value.
	Decode(val interface{}) error
}

Decoder is event payload decoder.

type Emitter

type Emitter interface {
	// Emit sends event to the channel.
	// context can be nil.
	// dont forget to validate the event here.
	Emit(context.Context, *Event) (msgID string, err error)
	hexa.Shutdownable
}

Emitter is the interface to emit events

type Encoder

type Encoder interface {
	// Name returns the Encoder name.
	Name() string
	Encode(interface{}) ([]byte, error)
	Decoder([]byte) Decoder
}

Encoder encode and decode the event payload.

func NewJsonEncoder

func NewJsonEncoder() Encoder

NewJsonEncoder returns new instance of the json encoder.

func NewProtobufEncoder

func NewProtobufEncoder() Encoder

NewProtobufEncoder returns new instance of the protobuf encoder.

type Event

type Event struct {
	Key          string // required, can use to specify partition number.(see pulsar docs)
	Channel      string
	ReplyChannel string // optional (use if need to reply the response)
	// It will encode using either protobuf,json,... encoder(relative to config of emitter).
	// Dont forget that your emitter encoder and event receivers decoder should match with each other.
	Payload interface{}
}

Event is the event to send.

func (Event) Validate

func (e Event) Validate() error

type EventHandler

type EventHandler func(HandlerContext, Message, error) error

EventHandler handle events. pulsar and hestan implementations just log returned error, in kafka if you return error, it will push event to the retry or DLQ topic.

func RecoverMiddleware

func RecoverMiddleware(h EventHandler) EventHandler

RecoverMiddleware is a event handler middleware which recover panic error.

func WithMiddlewares

func WithMiddlewares(h EventHandler, middlewares ...Middleware) EventHandler

WithMiddlewares adds middlewares to the handler too.

type HandlerContext

type HandlerContext interface {
	context.Context
	// Ack get the message and send ack.
	Ack()
	// Nack gets the message and send negative ack.
	Nack()
}

HandlerContext is the context that pass to the message handler.

type Message

type Message struct {
	// Primary is not the RawMessage. its the driver's
	// raw message.
	Primary interface{}

	Headers map[string][]byte

	CorrelationId string
	ReplyChannel  string
	Payload       Decoder
}

Message is the message that provide to event handler.

func (Message) Validate

func (m Message) Validate() error

type Middleware

type Middleware func(handler EventHandler) EventHandler

type RawMessage

type RawMessage struct {
	Headers map[string][]byte `json:"header,omitempty"`
	Payload []byte            `json:"payload"`
}

RawMessage is the message sent by emitter, we will convert RawMessage to message and then pass it to the event handler. Note: Some event drivers (kafkabox & hafka) do not push the marshaled RawMessage as the event value, they send RawMessage's headers in the headers section and RawMessage's payload in the Payload section of the event, so if you want to define extra fields in addition to Headers and Payload in the RawMessage, please be careful.

func (RawMessage) Validate

func (e RawMessage) Validate() error

type RawMessageConverter

type RawMessageConverter interface {
	EventToRaw(c context.Context, e *Event) (*RawMessage, error)
	// RawMsgToMessage converts the raw message to a message.
	// primary is the primary driver's message that its receiver will get.
	RawMsgToMessage(c context.Context, raw *RawMessage, primary interface{}) (context.Context, Message, error)
}

type Receiver

type Receiver interface {
	// Subscribe subscribe to the provided channel
	Subscribe(channel string, h EventHandler) error

	// SubscribeWithOptions subscribe by options.
	SubscribeWithOptions(*SubscriptionOptions) error

	hexa.Runnable     // to start receiving events.
	hexa.Shutdownable // to close connections and shutdown the server.
}

type SubscriptionOptions

type SubscriptionOptions struct {
	// Channel specify the channel name you will subscribe on.
	// Either Channel,Channels or ChannelsPattern are required when subscribing.
	Channel string

	// Channels contains name of channels which we want to subscribe.
	// Either Channel,Channels or ChannelsPattern are required when subscribing.
	Channels []string

	// ChannelsPattern is the pattern you will use to subscribe on all channels
	// which match with this pattern.
	// Either Channel,Channels or ChannelsPattern are required when subscribing.
	ChannelsPattern string

	// Handler is the event handler.
	Handler EventHandler
	// contains filtered or unexported fields
}

SubscriptionOptions contains options to subscribe to one or multiple channels.

func NewSubscriptionOptions

func NewSubscriptionOptions(channel string, handler EventHandler) *SubscriptionOptions

NewSubscriptionOptions returns new instance of the subscription options.

func (*SubscriptionOptions) Extra

func (so *SubscriptionOptions) Extra() []interface{}

Extra returns the extra data of the subscription options.

func (*SubscriptionOptions) Validate

func (so *SubscriptionOptions) Validate() error

func (*SubscriptionOptions) WithExtra

func (so *SubscriptionOptions) WithExtra(extra ...interface{}) *SubscriptionOptions

WithExtra add Extra data to the subscription options.

Directories

Path Synopsis
examples
Package hestan (hexa stan) in implementation of Nats-streaming broker for hexa SDK using stan client library of NATS.
Package hestan (hexa stan) in implementation of Nats-streaming broker for hexa SDK using stan client library of NATS.
Package hexapulsar implements hexa events.
Package hexapulsar implements hexa events.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL