messagebus

package
v5.3.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 18, 2023 License: Apache-2.0 Imports: 13 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// DefaultQueueExpire value for queue auto expire
	DefaultQueueExpire = 30000
	// DefaultMessageExpire value message expire
	DefaultMessageExpire = 30000
)
View Source
const DefaultReconnectionAttemts = 5

DefaultReconnectionAttemts is the default number of reconnection attempts It implements a hard coded fault tolerance for a starting NATS cluster

View Source
const DefaultReconnectionWait = 5 * time.Second

DefaultReconnectionWait is the default wating time between each reconnection attempt

Variables

This section is empty.

Functions

This section is empty.

Types

type AnnounceHandler

type AnnounceHandler func(o *proxy.Announcement)

AnnounceHandler handles announce messages

type Client

type Client interface {
	Connect() error
	Close()

	SubscribeAnnounce(topic string, callback AnnounceHandler) (Subscription, error)
	SubscribeEvent(topic string, queue string, callback EventHandler) (Subscription, error)

	PublishPing(topic string) error
	Request(topic string, req *proxy.Request) (*proxy.Response, error)
	MultipleRequest(topic string, req *proxy.Request, expectedResp int) ([]*proxy.Response, error)
	MultipleRequestReturnFirstGoodResponse(topic string, req *proxy.Request, expectedResp int) (*proxy.Response, error)

	TimeoutCount() int64
	GetWildcardString(w WildcardType) string
}

Client defines the functions used on ari-proxy client

type Config

type Config struct {
	URL            string
	TimeoutRetries int
	RequestTimeout time.Duration
}

Config has general configuration for MessageBus

type EventHandler

type EventHandler func(b []byte)

EventHandler handles event messages

type NatsBus

type NatsBus struct {
	Config Config
	Log    log15.Logger
	// contains filtered or unexported fields
}

NatsBus is MessageBus implementation for RabbitMQ

func NewNatsBus

func NewNatsBus(config Config, options ...OptionNatsFunc) *NatsBus

NewNatsBus creates a NatsBus

func (*NatsBus) Close

func (n *NatsBus) Close()

Close closes the connection

func (*NatsBus) Connect

func (n *NatsBus) Connect() error

Connect creates a NATS connection

func (*NatsBus) GetWildcardString

func (n *NatsBus) GetWildcardString(w WildcardType) string

GetWildcardString returns wildcard based on type

func (*NatsBus) MultipleRequest

func (n *NatsBus) MultipleRequest(topic string, req *proxy.Request, expectedResp int) ([]*proxy.Response, error)

MultipleRequest sends a request message to multiple consumers

func (*NatsBus) MultipleRequestReturnFirstGoodResponse

func (n *NatsBus) MultipleRequestReturnFirstGoodResponse(topic string, req *proxy.Request, expectedResp int) (*proxy.Response, error)

MultipleRequestReturnFirstGoodResponse sends a request message to multiple consumers and returns the first good response

func (*NatsBus) PublishAnnounce

func (n *NatsBus) PublishAnnounce(topic string, msg *proxy.Announcement) error

PublishAnnounce sends announce message

func (*NatsBus) PublishEvent

func (n *NatsBus) PublishEvent(topic string, msg ari.Event) error

PublishEvent sends event message

func (*NatsBus) PublishPing

func (n *NatsBus) PublishPing(topic string) error

PublishPing sends ping message

func (*NatsBus) PublishResponse

func (n *NatsBus) PublishResponse(topic string, msg *proxy.Response) error

PublishResponse sends response message

func (*NatsBus) Request

func (n *NatsBus) Request(topic string, req *proxy.Request) (*proxy.Response, error)

Request sends a request message

func (*NatsBus) SubscribeAnnounce

func (n *NatsBus) SubscribeAnnounce(topic string, callback AnnounceHandler) (Subscription, error)

SubscribeAnnounce subscribe announce messages

func (*NatsBus) SubscribeCreateRequest

func (n *NatsBus) SubscribeCreateRequest(topic string, queue string, callback RequestHandler) (Subscription, error)

SubscribeCreateRequest subscribe create request messages

func (*NatsBus) SubscribeEvent

func (n *NatsBus) SubscribeEvent(topic string, queue string, callback EventHandler) (Subscription, error)

SubscribeEvent subscribe event messages

func (*NatsBus) SubscribePing

func (n *NatsBus) SubscribePing(topic string, callback PingHandler) (Subscription, error)

SubscribePing subscribe ping messages

func (*NatsBus) SubscribeRequest

func (n *NatsBus) SubscribeRequest(topic string, callback RequestHandler) (Subscription, error)

SubscribeRequest subscribe request messages

func (*NatsBus) SubscribeRequests

func (n *NatsBus) SubscribeRequests(topics []string, callback RequestHandler) (Subscription, error)

SubscribeRequests subscribe request messages using multiple topics

func (*NatsBus) TimeoutCount

func (n *NatsBus) TimeoutCount() int64

TimeoutCount is the amount of times the communication times out

type NatsMSubscription

type NatsMSubscription struct {
	Subscriptions []*nats.Subscription
}

NatsMSubscription handle multiple subscriptions with same handler

func (*NatsMSubscription) Unsubscribe

func (n *NatsMSubscription) Unsubscribe() error

Unsubscribe removes the multiple subscriptions

type OptionNatsFunc

type OptionNatsFunc func(n *NatsBus)

OptionNatsFunc options for RabbitMQ

func WithNatsConn

func WithNatsConn(nconn *nats.EncodedConn) OptionNatsFunc

WithNatsConn binds an existing NATS connection

type OptionRabbitmqFunc

type OptionRabbitmqFunc func(n *RabbitmqBus)

OptionRabbitmqFunc options for RabbitMQ

func WithRabbitmqConn

func WithRabbitmqConn(rconn *amqp091.Connection) OptionRabbitmqFunc

WithRabbitmqConn binds an existing RabbitMQ connection

type PingHandler

type PingHandler func()

PingHandler handles ping messages

type RabbitmqBus

type RabbitmqBus struct {
	Config Config
	Log    log15.Logger
	// contains filtered or unexported fields
}

RabbitmqBus is MessageBus implementation for RabbitMQ

func NewRabbitmqBus

func NewRabbitmqBus(config Config, options ...OptionRabbitmqFunc) *RabbitmqBus

NewRabbitmqBus creates a RabbitmqBus

func (*RabbitmqBus) Close

func (r *RabbitmqBus) Close()

Close closes the connection

func (*RabbitmqBus) Connect

func (r *RabbitmqBus) Connect() error

Connect creates a NATS connection

func (*RabbitmqBus) GetWildcardString

func (r *RabbitmqBus) GetWildcardString(w WildcardType) string

GetWildcardString returns wildcard based on type

func (*RabbitmqBus) MultipleRequest

func (r *RabbitmqBus) MultipleRequest(topic string, req *proxy.Request, expectedResp int) ([]*proxy.Response, error)

MultipleRequest sends a request message to multiple consumers

func (*RabbitmqBus) MultipleRequestReturnFirstGoodResponse

func (r *RabbitmqBus) MultipleRequestReturnFirstGoodResponse(topic string, req *proxy.Request, expectedResp int) (*proxy.Response, error)

MultipleRequestReturnFirstGoodResponse sends a request message to multiple consumers and returns the first good response

func (*RabbitmqBus) PublishAnnounce

func (r *RabbitmqBus) PublishAnnounce(topic string, msg *proxy.Announcement) error

PublishAnnounce sends announce message

func (*RabbitmqBus) PublishEvent

func (r *RabbitmqBus) PublishEvent(topic string, msg ari.Event) error

PublishEvent sends event message

func (*RabbitmqBus) PublishPing

func (r *RabbitmqBus) PublishPing(topic string) error

PublishPing sends ping message

func (*RabbitmqBus) PublishResponse

func (r *RabbitmqBus) PublishResponse(topic string, msg *proxy.Response) error

PublishResponse sends response message

func (*RabbitmqBus) Request

func (r *RabbitmqBus) Request(topic string, req *proxy.Request) (*proxy.Response, error)

Request sends a request message

func (*RabbitmqBus) SubscribeAnnounce

func (r *RabbitmqBus) SubscribeAnnounce(topic string, callback AnnounceHandler) (Subscription, error)

SubscribeAnnounce subscribe announce messages

func (*RabbitmqBus) SubscribeCreateRequest

func (r *RabbitmqBus) SubscribeCreateRequest(topic string, queue string, callback RequestHandler) (Subscription, error)

SubscribeCreateRequest subscribe create request messages

func (*RabbitmqBus) SubscribeEvent

func (r *RabbitmqBus) SubscribeEvent(topic string, queue string, callback EventHandler) (Subscription, error)

SubscribeEvent subscribe event messages

func (*RabbitmqBus) SubscribePing

func (r *RabbitmqBus) SubscribePing(topic string, callback PingHandler) (Subscription, error)

SubscribePing subscribe ping messages

func (*RabbitmqBus) SubscribeRequest

func (r *RabbitmqBus) SubscribeRequest(topic string, callback RequestHandler) (Subscription, error)

SubscribeRequest subscribe request messages

func (*RabbitmqBus) SubscribeRequests

func (r *RabbitmqBus) SubscribeRequests(topics []string, callback RequestHandler) (Subscription, error)

SubscribeRequests subscribe request messages using multiple topics

func (*RabbitmqBus) TimeoutCount

func (r *RabbitmqBus) TimeoutCount() int64

TimeoutCount is the amount of times the communication times out

type RequestHandler

type RequestHandler func(subject string, reply string, req *proxy.Request)

RequestHandler handles requests messages

type ResponseHandler

type ResponseHandler func(req *proxy.Response)

ResponseHandler handles response messages

type RmqSubscription

type RmqSubscription struct {
	Topics       []string
	Queue        string
	Exchange     string
	ExchangeKind string
	QueueArgs    amqp091.Table
	// contains filtered or unexported fields
}

RmqSubscription handle RabbitMQ subscription

func (*RmqSubscription) Unsubscribe

func (rs *RmqSubscription) Unsubscribe() error

Unsubscribe remove the subscription

type Server

type Server interface {
	Connect() error
	Close()

	SubscribePing(topic string, callback PingHandler) (Subscription, error)
	SubscribeRequest(topic string, callback RequestHandler) (Subscription, error)
	SubscribeRequests(topics []string, callback RequestHandler) (Subscription, error)
	SubscribeCreateRequest(topic string, queue string, callback RequestHandler) (Subscription, error)
	PublishResponse(topic string, msg *proxy.Response) error
	PublishAnnounce(topic string, msg *proxy.Announcement) error
	PublishEvent(topic string, msg ari.Event) error
}

Server defines the functions used on ari-proxy server

type Subscription

type Subscription interface {
	Unsubscribe() error
}

Subscription defines subscription interface

type Type

type Type int

Type is the type of MessageBus (RabbitMQ / NATS)

const (
	TypeUnknown  Type = iota // unknown type
	TypeNats                 // NATS type
	TypeRabbitmq             // RabbitMQ type
)

types

func GetType

func GetType(url string) Type

GetType identifies message bus type from an url

type WildcardType

type WildcardType int

WildcardType used to identify wildcards used on routing keys on message bus

const (
	WildcardUndefined       WildcardType = iota // undefined type
	WildcardOneWord                             // one word like pre.*.post
	WildcardZeroOrMoreWords                     // zero or more words like pre.>
)

wildcard types

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL