rmqworker

package module
v1.36.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 9, 2024 License: MIT Imports: 16 Imported by: 0

README

rmqworker-lib

Description

Библиотека, позволяющая работать с RMQ:

  1. покдлючение к RabbitMQ;
  2. поддержка соединения, переподключение;
  3. получение сообщений из очереди с помощью воркера;
  4. воркер для получения ответов на запросы;

Documentation

Index

Constants

View Source
const (
	ExchangeTypeDirect = "direct"
	ExchangeTypeTopic  = "topic"
	ExchangeTypeFanout = "fanout"
)

Variables

This section is empty.

Functions

This section is empty.

Types

type CreateRMQHandlerTask added in v1.25.0

type CreateRMQHandlerTask struct {
	Data                    RMQConnectionData
	UseErrorCallback        bool
	ConnectionErrorCallback func(err errs.APIError)
	Logger                  *zap.Logger
}

type DeclareQueueTask added in v1.25.0

type DeclareQueueTask struct {
	Name       string
	Durable    bool
	AutoDelete bool

	// optional
	MessagesLifetime int64
	MaxLength        int64
	DisableOverflow  bool
}

DeclareQueueTask - queue declare task data container

type RMQConnectionData

type RMQConnectionData struct {
	User     string `json:"user"`
	Password string `json:"password"`
	Host     string `json:"host"`
	Port     string `json:"port"`
	UseTLS   bool   `json:"tls"`
}

RMQConnectionData - rmq connection data container

type RMQDeliveryCallback

type RMQDeliveryCallback func(w *RMQWorker, deliveryHandler RMQDeliveryHandler)

RMQDeliveryCallback - RMQ delivery callback function

type RMQDeliveryHandler

type RMQDeliveryHandler struct {
	// contains filtered or unexported fields
}

RMQDeliveryHandler - RMQ delivery data container

func NewRMQDeliveryHandler

func NewRMQDeliveryHandler(delivery amqp.Delivery) RMQDeliveryHandler

NewRMQDeliveryHandler - create new RMQ delivery handler

func (*RMQDeliveryHandler) Accept

func (d *RMQDeliveryHandler) Accept() errs.APIError

Accept RMQ message delivery

func (*RMQDeliveryHandler) CheckResponseError

func (d *RMQDeliveryHandler) CheckResponseError() errs.APIError

CheckResponseError - check RMQ response error

func (*RMQDeliveryHandler) GetCSTX added in v1.26.28

func (*RMQDeliveryHandler) GetCorrelationID

func (d *RMQDeliveryHandler) GetCorrelationID() string

GetCorrelationID from RMQ delivery

func (*RMQDeliveryHandler) GetHeader

func (d *RMQDeliveryHandler) GetHeader(headerName string) (interface{}, bool)

GetHeader from RMQ delivery headers. returns header value, is header exists (bool)

func (*RMQDeliveryHandler) GetMessageBody

func (d *RMQDeliveryHandler) GetMessageBody() []byte

GetMessageBody from RMQ delivery

func (*RMQDeliveryHandler) GetResponseRoutingKeyHeader

func (d *RMQDeliveryHandler) GetResponseRoutingKeyHeader() (string, errs.APIError)

GetResponseRoutingKeyHeader - get response routing key from delivery headers

func (*RMQDeliveryHandler) GetRoutingKey

func (d *RMQDeliveryHandler) GetRoutingKey() string

GetRoutingKey from RMQ delivery

func (*RMQDeliveryHandler) Reject added in v1.23.14

func (d *RMQDeliveryHandler) Reject(requeue bool) errs.APIError

Reject RMQ message delivery

type RMQErrorCallback added in v1.22.14

type RMQErrorCallback func(w *RMQWorker, err *constants.APIError)

RMQErrorCallback - RMQ error callback function

type RMQExchangeDeclareTask added in v1.19.3

type RMQExchangeDeclareTask struct {
	ExchangeName     string
	ExchangeType     string
	MessagesLifetime int64
}

RMQExchangeDeclareTask - exchange declare task data container

type RMQHandler

type RMQHandler struct {
	// contains filtered or unexported fields
}

RMQHandler - RMQ connection handler

func NewRMQHandler

func NewRMQHandler(task CreateRMQHandlerTask) (*RMQHandler, errs.APIError)

NewRMQHandler - create new RMQHandler

func (*RMQHandler) DeclareExchanges

func (r *RMQHandler) DeclareExchanges(exchangeTypes map[string]string) errs.APIError

DeclareExchanges - declare RMQ exchanges list. exchange name -> exchange type

func (*RMQHandler) DeleteQueues

func (r *RMQHandler) DeleteQueues(queueNames map[string][]string) errs.APIError

DeleteQueues - delete RMQ queues. map[manager name] -> array of queue names

func (*RMQHandler) IsConnectionAlive added in v1.29.1

func (r *RMQHandler) IsConnectionAlive() bool

IsConnectionAlive - check connection

func (*RMQHandler) IsQueueExists added in v1.23.17

func (r *RMQHandler) IsQueueExists(name string) (bool, errs.APIError)

IsQueueExists - is queue exists? /ᐠ。ꞈ。ᐟ\

func (*RMQHandler) NewRMQHandler

func (r *RMQHandler) NewRMQHandler() *RMQHandler

NewRMQHandler - clone handler & open new RMQ channel

func (*RMQHandler) NewRMQWorker

func (r *RMQHandler) NewRMQWorker(task WorkerTask) (*RMQWorker, errs.APIError)

NewRMQWorker - create new RMQ worker to receive messages

func (*RMQHandler) NewRequestHandler added in v1.18.3

func (h *RMQHandler) NewRequestHandler(task RequestHandlerTask) (*RequestHandler, errs.APIError)

NewRequestHandler - create new handler for one-time request

func (*RMQHandler) PublishCSXTToExchange added in v1.29.0

func (*RMQHandler) PublishCSXTToQueue added in v1.29.0

func (*RMQHandler) PublishToExchange added in v1.26.23

func (r *RMQHandler) PublishToExchange(task structs.PublishToExchangeTask, additionalHeaders ...structs.RMQHeader) errs.APIError

PublishToExchange - publish message to exchangeю responseRoutingKey is optional to send requests to exchange

func (*RMQHandler) PublishToQueue added in v1.26.23

func (r *RMQHandler) PublishToQueue(task structs.RMQPublishRequestTask, additionalHeaders ...structs.RMQHeader) errs.APIError

PublishToQueue - send request to rmq queue

func (*RMQHandler) RMQPublishToExchange

func (r *RMQHandler) RMQPublishToExchange(
	message interface{},
	exchangeName,
	routingKey string,
	responseRoutingKey ...string,
) errs.APIError

RMQPublishToExchange - publish message to exchange. responseRoutingKey is optional to send requests to exchange. NOTE: should be replaced by PublishToExchange in later lib versions

func (*RMQHandler) RMQPublishToQueue

func (r *RMQHandler) RMQPublishToQueue(task structs.RMQPublishRequestTask) errs.APIError

RMQPublishToQueue - send request to rmq queue. NOTE: should be replaced by PublishToExchange in later lib versions

func (*RMQHandler) SendRMQResponse

func (r *RMQHandler) SendRMQResponse(
	task *RMQPublishResponseTask,
	errorMsg ...*constants.APIError,
) errs.APIError

SendRMQResponse - publish message to RMQ exchange

func (*RMQHandler) StartCSTXAcksConsumer added in v1.28.0

func (handler *RMQHandler) StartCSTXAcksConsumer() errs.APIError

type RMQPublishResponseTask

type RMQPublishResponseTask struct {
	ExchangeName       string
	ResponseRoutingKey string
	CorrelationID      string
	MessageBody        interface{}
}

RMQPublishResponseTask - response for publish message to RMQ request

type RMQQueueDeclareTask

type RMQQueueDeclareTask struct {
	QueueName        string
	Durable          bool
	AutoDelete       bool
	FromExchangeName string
	RoutingKey       string

	// optional
	MessagesLifetime int64
	QueueLength      int64
	DisableOverflow  bool
}

RMQQueueDeclareTask - queue declare task data container

type RMQTimeoutCallback

type RMQTimeoutCallback func(w *RMQWorker)

RMQTimeoutCallback - RMQ response timeout callback function

type RMQWorker

type RMQWorker struct {
	Logger *zap.Logger
	// contains filtered or unexported fields
}

RMQWorker - just RMQ worker

func (*RMQWorker) AwaitFinish

func (w *RMQWorker) AwaitFinish()

AwaitFinish - wait for worker finished

func (*RMQWorker) Finish added in v1.26.14

func (w *RMQWorker) Finish()

Finish worker but continue listen messages

func (*RMQWorker) GetID

func (w *RMQWorker) GetID() string

GetID - get worker ID

func (*RMQWorker) GetName

func (w *RMQWorker) GetName() string

GetName - get worker name

func (*RMQWorker) Reset

func (w *RMQWorker) Reset()

Reset worker channels

func (*RMQWorker) Serve

func (w *RMQWorker) Serve() errs.APIError

Serve - start consumer(s)

func (*RMQWorker) SetCheckResponseErrors

func (w *RMQWorker) SetCheckResponseErrors(check bool) *RMQWorker

SetCheckResponseErrors - determines whether the errors in the answers passed to headers will be checked

func (*RMQWorker) SetConsumerTag added in v1.15.2

func (w *RMQWorker) SetConsumerTag(uniqueTag string) *RMQWorker

SetConsumerTag - set worker unique consumer tag

func (*RMQWorker) SetConsumerTagFromName added in v1.15.2

func (w *RMQWorker) SetConsumerTagFromName() *RMQWorker

SetConsumerTagFromName - assign a consumer tag to the worker based on its name and random ID

func (*RMQWorker) SetID

func (w *RMQWorker) SetID(id string) *RMQWorker

SetID - set RMQ worker ID

func (*RMQWorker) SetName

func (w *RMQWorker) SetName(name string) *RMQWorker

SetName - set RMQ worker name for logs

func (*RMQWorker) Stop

func (w *RMQWorker) Stop()

Stop RMQ messages listen

type RequestHandler added in v1.18.3

type RequestHandler struct {
	RMQH   *RMQHandler
	Task   RequestHandlerTask
	Worker *RMQWorker

	WorkerID  string
	Response  *RequestHandlerResponse
	LastError *constants.APIError

	Finished chan struct{}
	IsPaused bool
}

RequestHandler - periodic request handler

func (*RequestHandler) DeleteQueues added in v1.26.6

func (r *RequestHandler) DeleteQueues() errs.APIError

func (*RequestHandler) Send added in v1.18.3

func (r *RequestHandler) Send(messageBody interface{}, responseRoutingKey string) (*RequestHandlerResponse, errs.APIError)

Send request (sync)

func (*RequestHandler) SetID added in v1.18.3

func (r *RequestHandler) SetID(id string) *RequestHandler

SetID for worker

func (*RequestHandler) Stop added in v1.26.10

func (r *RequestHandler) Stop()

Stop handler, cancel consumer

type RequestHandlerResponse added in v1.18.3

type RequestHandlerResponse struct {
	ResponseBody []byte
}

RequestHandlerResponse - raw RMQ response data

func (*RequestHandlerResponse) Decode added in v1.18.3

func (s *RequestHandlerResponse) Decode(destination interface{}) errs.APIError

Decode response from JSON. pass pointer to struct or map in `destination`

type RequestHandlerTask added in v1.18.3

type RequestHandlerTask struct {
	// required
	ResponseFromExchangeName string
	RequestToQueueName       string
	TempQueueName            string
	AttemptsNumber           int
	Timeout                  time.Duration

	// optional
	ExchangeInsteadOfQueue bool
	WorkerName             string
	ForceQueueToDurable    bool
	MethodFriendlyName     string // the name of the operation performed by the vorker for the logs and errors
}

RequestHandlerTask data

type WorkerTask added in v1.21.3

type WorkerTask struct {
	// required
	QueueName      string
	RoutingKey     string
	ISQueueDurable bool
	ISAutoDelete   bool
	Callback       RMQDeliveryCallback // callback to handle RMQ delivery

	// optional
	ID                         string             // worker ID
	FromExchange               string             // exchange name to bind queue
	ExchangeType               string             // direct, topic, etc
	ConsumersCount             int                // default: 1
	WorkerName                 string             // worker name. default name when empty
	EnableRateLimiter          bool               // limit handle rmq messages rate
	MaxEventsPerSecond         int                // for limiter
	QueueLength                int64              // how many maximum messages to keep in the queue
	MessagesLifetime           int64              // milliseconds. 0 to disable limit
	DisableOverflow            bool               // disable queue overflow
	DisableCheckResponseErrors bool               // if it is necessary to handle an error at the service level, not at the library level
	UseErrorCallback           bool               // handle worker errors with error handler
	ErrorCallback              RMQErrorCallback   // error handler callback
	Timeout                    time.Duration      // timeout to limit worker time
	TimeoutCallback            RMQTimeoutCallback // timeout callback
	DoNotStopOnTimeout         bool
	ManualAck                  bool
}

WorkerTask - new RMQ worker data

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL