eventbus

package
v0.0.18 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 12, 2023 License: Apache-2.0 Imports: 17 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// Consume properties
	CONSUME_AUTO_ACK = false

	// Common properties
	NO_WAIT = false

	// Exchange properties
	EXCHANGE_NAME        = "nocloud-event-bus"
	EXCHANGE_BUFFER      = EXCHANGE_NAME + "-buffer"
	EXCHANGE_DURABLE     = true // essential for retention
	EXCHANGE_AUTO_DELETE = false
	EXCHANGE_INTERNAL    = false
	EXCHANGE_NO_WAIT     = false
	EXCHANGE_KIND        = "topic"

	// Queue properties
	QUEUE_DURABLE     = true
	QUEUE_AUTO_DELETE = false
	QUEUE_EXCLUSIVE   = false

	// Qos properties
	PREFETCH_COUNT  = 1
	PREFETCH_SIZE   = 0
	PREFETCH_GLOBAL = false

	// Publish properties
	PUBLISH_IMEDIATE  = false
	PUBLISH_MANDATORY = false
)
View Source
const (
	TOPIC_FORMAT = "%s.%s"
)

Variables

View Source
var (
	RabbitMQConn string
)

Functions

func GetInstAccountHandler

func GetInstAccountHandler(ctx context.Context, event *pb.Event, db driver.Database) (*pb.Event, error)

func Topic

func Topic(msg interface{}) string

Types

type Connection

type Connection struct {
	// contains filtered or unexported fields
}

Connection wraps amqp connection to handle reconnects

func NewConnection

func NewConnection(conn *amqp091.Connection) (*Connection, error)

func (*Connection) Channel

func (c *Connection) Channel() *amqp091.Channel

Get existing channel if open. Otherwise open new channel

func (*Connection) Send

func (c *Connection) Send(ctx context.Context, exchange string, event *pb.Event) error

Send event to given exchange

type EventBus

type EventBus struct {
	// contains filtered or unexported fields
}

EventBus handles interservice communication throuh RabbitMQ

func NewEventBus

func NewEventBus(conn *amqp.Connection, logger *zap.Logger) (*EventBus, error)

func (*EventBus) Cancel

func (bus *EventBus) Cancel(ctx context.Context, req *pb.CancelRequest) error

func (*EventBus) List

func (bus *EventBus) List(ctx context.Context, req *pb.ConsumeRequest) ([]*pb.Event, error)

func (*EventBus) Pub

func (bus *EventBus) Pub(ctx context.Context, event *pb.Event) error

func (*EventBus) Sub

func (bus *EventBus) Sub(ctx context.Context, req *pb.ConsumeRequest) (<-chan *pb.Event, error)

func (*EventBus) Unsub

func (bus *EventBus) Unsub(req *pb.ConsumeRequest) error

type EventBusServer

type EventBusServer struct {
	pb.UnimplementedEventsServiceServer
	// contains filtered or unexported fields
}

func NewServer

func NewServer(logger *zap.Logger, conn *amqp.Connection, db driver.Database) *EventBusServer

func (*EventBusServer) Cancel

func (s *EventBusServer) Cancel(ctx context.Context, req *pb.CancelRequest) (*pb.Response, error)

func (*EventBusServer) Consume

func (*EventBusServer) List

func (s *EventBusServer) List(ctx context.Context, req *pb.ConsumeRequest) (*pb.Events, error)

func (*EventBusServer) ListenBusQueue

func (s *EventBusServer) ListenBusQueue(ctx context.Context)

func (*EventBusServer) Publish

func (s *EventBusServer) Publish(ctx context.Context, event *pb.Event) (*pb.Response, error)

type EventHandler

type EventHandler func(context.Context, *pb.Event, driver.Database) (*pb.Event, error)

type EventInfo

type EventInfo struct {
	Account         string  `json:"account"`
	Service         string  `json:"service"`
	Instance        string  `json:"instance"`
	Product         string  `json:"product,omitempty"`
	Ip              string  `json:"ip,omitempty"`
	NextPaymentDate float64 `json:"next_payment_date,omitempty"`
	Price           float64 `json:"price,omitempty"`
}

type Exchange

type Exchange struct {
	Name string
	// contains filtered or unexported fields
}

func NewExchange

func NewExchange(conn *Connection, name string, t ExchangeType) (*Exchange, error)

func (*Exchange) Bind

func (e *Exchange) Bind(q *Queue, key string) error

Bind exchange to queue so that:

Exchange -> Queue

func (*Exchange) DeriveQueue

func (e *Exchange) DeriveQueue(name string) (*Queue, error)

Create queue that is binded to exchange

func (*Exchange) Send

func (e *Exchange) Send(ctx context.Context, event *pb.Event) error

Send event to the exchange

type ExchangeType

type ExchangeType int64
const (
	DefaultExchange ExchangeType = iota
	AlternateExchange
)

type Queue

type Queue struct {
	amqp091.Queue
	// contains filtered or unexported fields
}

func NewQueue

func NewQueue(ch *Connection, name string, t QueueType) (*Queue, error)

func (*Queue) Cancel

func (q *Queue) Cancel(ctx context.Context, id string) error

func (*Queue) Consume

func (q *Queue) Consume() (<-chan *pb.Event, error)

Consume events from the queue

func (*Queue) List

func (q *Queue) List(ctx context.Context) ([]*pb.Event, error)

func (*Queue) Send

func (q *Queue) Send(ctx context.Context, event *pb.Event) error

Send event to default exchange with routing key equal queue name

type QueueType

type QueueType int64
const (
	DefaultQueue QueueType = iota
	UniqueQueue            // Add suffix to queue name
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL