kumnats

package module
v2.8.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 12, 2021 License: MIT Imports: 11 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// DeadMessagesRedisKey :nodoc:
	DeadMessagesRedisKey = "nats:dead-messages"
	// FailedMessagesRedisKey :nodoc:
	FailedMessagesRedisKey = "nats:failed-messages"
)

Variables

View Source
var (
	// ErrBadUnmarshalResult given when unmarshal result from a message's Data is not as intended
	ErrBadUnmarshalResult = errors.New("kumnatserr: bad unmarshal result")
	// ErrCastingPayloadToStruct given when unmarshal result from a message's Data is not as intended
	ErrCastingPayloadToStruct = errors.New("kumnatserr: failed to cast payload to specified struct")
	// ErrGiveUpProcessingMessagePayload given when message's payload(data) is already processed x times, but always failed
	ErrGiveUpProcessingMessagePayload = errors.New("kumnatserr: give up processing message payload")
	// ErrNilMessagePayload given when message's payload(data) is nil
	ErrNilMessagePayload = errors.New("kumnatserr: nil message payload given")
)

Functions

func NewNATSMessageHandler added in v2.2.0

func NewNATSMessageHandler(payload MessagePayload, retryAttempts int, retryInterval time.Duration, lambda func(payload MessagePayload) error) stan.MsgHandler

NewNATSMessageHandler a wrapper to standardize how we handle NATS messages Payload (arg 0) should always be empty when the method is called. The payload data will later parse data from msg.Data.

func NewNATSMessageWithErrorHandler added in v2.6.0

func NewNATSMessageWithErrorHandler(payload MessagePayload, retryAttempts int, retryInterval time.Duration, msgHandler MessageHandler, errHandler MessageHandler) stan.MsgHandler

NewNATSMessageWithErrorHandler a wrapper to standardize how we handle NATS messages Payload (arg 0) should always be empty when the method is called. The payload data will later parse data from msg.Data.

Types

type AuditLogMessage added in v2.8.0

type AuditLogMessage struct {
	ServiceName    string    `json:"service_name"`
	UserID         int64     `json:"user_id"`
	AuditableType  string    `json:"auditable_type"`
	AuditableID    string    `json:"auditable_id"`
	Action         string    `json:"action"`
	AuditedChanges string    `json:"audited_changes"`
	OldData        string    `json:"old_data,omitempty"`
	NewData        string    `json:"new_data,omitempty"`
	CreatedAt      time.Time `json:"created_at"`
}

AuditLogMessage :nodoc:

func (*AuditLogMessage) ParseFromBytes added in v2.8.0

func (m *AuditLogMessage) ParseFromBytes(data []byte) (err error)

ParseFromBytes implementation of AuditLogMessage

type EventType

type EventType string

EventType :nodoc:

type Logger

type Logger interface {
	Error(args ...interface{})
	Errorf(format string, args ...interface{})
}

Logger :nodoc:

type MessageHandler added in v2.6.0

type MessageHandler func(payload MessagePayload) (err error)

MessageHandler type

type MessagePayload added in v2.2.0

type MessagePayload interface {
	ParseFromBytes(data []byte) error
}

MessagePayload :nodoc:

type NATS

type NATS interface {
	Publish(subject string, value []byte) error
	SafePublish(subject string, value []byte) error
	Subscribe(subject string, cb stan.MsgHandler, opts ...stan.SubscriptionOption) (stan.Subscription, error)
	QueueSubscribe(subject, queueGroup string, cb stan.MsgHandler, opts ...stan.SubscriptionOption) (stan.Subscription, error)
	Close() error
}

NATS :nodoc:

func NewNATSWithCallback

func NewNATSWithCallback(clusterID, clientID, url string, fn NatsCallback, stanOptions []stan.Option, options ...Option) (NATS, error)

NewNATSWithCallback IMPORTANT! Not to send any stan.NatsURL or stan.SetConnectionLostHandler as options

type NatsCallback

type NatsCallback func(conn NATS)

NatsCallback :nodoc:

type NatsEvent added in v2.7.0

type NatsEvent struct {
	ID     int64
	UserID int64
	Type   EventType
}

NatsEvent :nodoc:

func (*NatsEvent) GetID added in v2.7.0

func (n *NatsEvent) GetID() int64

GetID :nodoc:

func (*NatsEvent) GetType added in v2.7.0

func (n *NatsEvent) GetType() EventType

GetType :nodoc:

func (*NatsEvent) GetUserID added in v2.7.0

func (n *NatsEvent) GetUserID() int64

GetUserID :nodoc:

type NatsEventMessage added in v2.7.0

type NatsEventMessage struct {
	NatsEvent *NatsEvent
	Body      string
	OldBody   string
	Request   []byte
	Error     error
}

NatsEventMessage :nodoc:

func NewNatsEventMessage added in v2.7.0

func NewNatsEventMessage() *NatsEventMessage

NewNatsEventMessage :nodoc:

func (*NatsEventMessage) Build added in v2.7.0

func (n *NatsEventMessage) Build() (data []byte, err error)

Build :nodoc:

func (*NatsEventMessage) WithBody added in v2.7.0

func (n *NatsEventMessage) WithBody(body interface{}) *NatsEventMessage

WithBody :nodoc:

func (*NatsEventMessage) WithEvent added in v2.7.0

func (n *NatsEventMessage) WithEvent(e *NatsEvent) *NatsEventMessage

WithEvent :nodoc:

func (*NatsEventMessage) WithOldBody added in v2.7.0

func (n *NatsEventMessage) WithOldBody(body interface{}) *NatsEventMessage

WithOldBody :nodoc:

func (*NatsEventMessage) WithRequest added in v2.7.0

func (n *NatsEventMessage) WithRequest(req proto.Message) *NatsEventMessage

WithRequest :nodoc:

type NatsMessage

type NatsMessage struct {
	ID     int64     `json:"id"`
	UserID int64     `json:"user_id"`
	Type   EventType `json:"type"`
	Time   string    `json:"time"`
	// deprecated field
	// keep it for now for backwards compatibility
	Body string `json:"body,omitempty"`

	// new fields
	Request []byte `json:"request"`
}

NatsMessage :nodoc:

func (*NatsMessage) ParseFromBytes added in v2.2.0

func (m *NatsMessage) ParseFromBytes(data []byte) (err error)

ParseFromBytes implementation of NatsMessage

type NatsMessageWithOldData added in v2.4.0

type NatsMessageWithOldData struct {
	NatsMessage
	OldData string `json:"old_data,omitempty"`
}

NatsMessageWithOldData :nodoc:

func (*NatsMessageWithOldData) ParseFromBytes added in v2.4.0

func (n *NatsMessageWithOldData) ParseFromBytes(data []byte) (err error)

ParseFromBytes implementation of NatsMessageWithOldData

type Option

type Option func(*Options) error

Option :nodoc:

func WithDeadMessageRedisKey added in v2.1.0

func WithDeadMessageRedisKey(key string) Option

WithDeadMessageRedisKey :nodoc:

func WithFailedMessagePublishInterval

func WithFailedMessagePublishInterval(seconds uint64) Option

WithFailedMessagePublishInterval :nodoc:

func WithFailedMessageRedisKey

func WithFailedMessageRedisKey(key string) Option

WithFailedMessageRedisKey :nodoc:

func WithLogger

func WithLogger(logger Logger) Option

WithLogger :nodoc:

func WithReconnectInterval

func WithReconnectInterval(duration time.Duration) Option

WithReconnectInterval :nodoc:

func WithRedis

func WithRedis(conn *redigo.Pool) Option

WithRedis :nodoc:

type Options

type Options struct {
	// contains filtered or unexported fields
}

Options :nodoc:

Directories

Path Synopsis
Package mock is a generated GoMock package.
Package mock is a generated GoMock package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL