Documentation ¶
Index ¶
- Constants
- type CreateRMQHandlerTask
- type DeclareQueueTask
- type RMQConnectionData
- type RMQDeliveryCallback
- type RMQDeliveryHandler
- func (d *RMQDeliveryHandler) Accept() errs.APIError
- func (d *RMQDeliveryHandler) CheckResponseError() errs.APIError
- func (d *RMQDeliveryHandler) GetCSTX(handler *RMQHandler) cstx.CrossServiceTransaction
- func (d *RMQDeliveryHandler) GetCorrelationID() string
- func (d *RMQDeliveryHandler) GetHeader(headerName string) (interface{}, bool)
- func (d *RMQDeliveryHandler) GetMessageBody() []byte
- func (d *RMQDeliveryHandler) GetResponseRoutingKeyHeader() (string, errs.APIError)
- func (d *RMQDeliveryHandler) GetRoutingKey() string
- func (d *RMQDeliveryHandler) Reject(requeue bool) errs.APIError
- type RMQErrorCallback
- type RMQExchangeDeclareTask
- type RMQHandler
- func (r *RMQHandler) DeclareExchanges(exchangeTypes map[string]string) errs.APIError
- func (r *RMQHandler) DeleteQueues(queueNames map[string][]string) errs.APIError
- func (r *RMQHandler) IsConnectionAlive() bool
- func (r *RMQHandler) IsQueueExists(name string) (bool, errs.APIError)
- func (r *RMQHandler) NewRMQHandler() *RMQHandler
- func (r *RMQHandler) NewRMQWorker(task WorkerTask) (*RMQWorker, errs.APIError)
- func (h *RMQHandler) NewRequestHandler(task RequestHandlerTask) (*RequestHandler, errs.APIError)
- func (r *RMQHandler) PublishCSXTToExchange(task structs.PublishToExchangeTask, tx cstx.CrossServiceTransaction) errs.APIError
- func (r *RMQHandler) PublishCSXTToQueue(task structs.RMQPublishRequestTask, tx cstx.CrossServiceTransaction) errs.APIError
- func (r *RMQHandler) PublishToExchange(task structs.PublishToExchangeTask, additionalHeaders ...structs.RMQHeader) errs.APIError
- func (r *RMQHandler) PublishToQueue(task structs.RMQPublishRequestTask, additionalHeaders ...structs.RMQHeader) errs.APIError
- func (r *RMQHandler) RMQPublishToExchange(message interface{}, exchangeName, routingKey string, ...) errs.APIError
- func (r *RMQHandler) RMQPublishToQueue(task structs.RMQPublishRequestTask) errs.APIError
- func (r *RMQHandler) SendRMQResponse(task *RMQPublishResponseTask, errorMsg ...*constants.APIError) errs.APIError
- func (handler *RMQHandler) StartCSTXAcksConsumer() errs.APIError
- type RMQPublishResponseTask
- type RMQQueueDeclareTask
- type RMQTimeoutCallback
- type RMQWorker
- func (w *RMQWorker) AwaitFinish()
- func (w *RMQWorker) Finish()
- func (w *RMQWorker) GetID() string
- func (w *RMQWorker) GetName() string
- func (w *RMQWorker) Reset()
- func (w *RMQWorker) Serve() errs.APIError
- func (w *RMQWorker) SetCheckResponseErrors(check bool) *RMQWorker
- func (w *RMQWorker) SetConsumerTag(uniqueTag string) *RMQWorker
- func (w *RMQWorker) SetConsumerTagFromName() *RMQWorker
- func (w *RMQWorker) SetID(id string) *RMQWorker
- func (w *RMQWorker) SetName(name string) *RMQWorker
- func (w *RMQWorker) Stop()
- type RequestHandler
- type RequestHandlerResponse
- type RequestHandlerTask
- type WorkerTask
Constants ¶
const ( ExchangeTypeDirect = "direct" ExchangeTypeTopic = "topic" ExchangeTypeFanout = "fanout" )
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type CreateRMQHandlerTask ¶ added in v1.25.0
type DeclareQueueTask ¶ added in v1.25.0
type DeclareQueueTask struct { Name string Durable bool AutoDelete bool // optional MessagesLifetime int64 MaxLength int64 DisableOverflow bool }
DeclareQueueTask - queue declare task data container
type RMQConnectionData ¶
type RMQConnectionData struct { User string `json:"user"` Password string `json:"password"` Host string `json:"host"` Port string `json:"port"` UseTLS bool `json:"tls"` }
RMQConnectionData - rmq connection data container
type RMQDeliveryCallback ¶
type RMQDeliveryCallback func(w *RMQWorker, deliveryHandler RMQDeliveryHandler)
RMQDeliveryCallback - RMQ delivery callback function
type RMQDeliveryHandler ¶
type RMQDeliveryHandler struct {
// contains filtered or unexported fields
}
RMQDeliveryHandler - RMQ delivery data container
func NewRMQDeliveryHandler ¶
func NewRMQDeliveryHandler(delivery amqp.Delivery) RMQDeliveryHandler
NewRMQDeliveryHandler - create new RMQ delivery handler
func (*RMQDeliveryHandler) Accept ¶
func (d *RMQDeliveryHandler) Accept() errs.APIError
Accept RMQ message delivery
func (*RMQDeliveryHandler) CheckResponseError ¶
func (d *RMQDeliveryHandler) CheckResponseError() errs.APIError
CheckResponseError - check RMQ response error
func (*RMQDeliveryHandler) GetCSTX ¶ added in v1.26.28
func (d *RMQDeliveryHandler) GetCSTX(handler *RMQHandler) cstx.CrossServiceTransaction
func (*RMQDeliveryHandler) GetCorrelationID ¶
func (d *RMQDeliveryHandler) GetCorrelationID() string
GetCorrelationID from RMQ delivery
func (*RMQDeliveryHandler) GetHeader ¶
func (d *RMQDeliveryHandler) GetHeader(headerName string) (interface{}, bool)
GetHeader from RMQ delivery headers. returns header value, is header exists (bool)
func (*RMQDeliveryHandler) GetMessageBody ¶
func (d *RMQDeliveryHandler) GetMessageBody() []byte
GetMessageBody from RMQ delivery
func (*RMQDeliveryHandler) GetResponseRoutingKeyHeader ¶
func (d *RMQDeliveryHandler) GetResponseRoutingKeyHeader() (string, errs.APIError)
GetResponseRoutingKeyHeader - get response routing key from delivery headers
func (*RMQDeliveryHandler) GetRoutingKey ¶
func (d *RMQDeliveryHandler) GetRoutingKey() string
GetRoutingKey from RMQ delivery
type RMQErrorCallback ¶ added in v1.22.14
type RMQErrorCallback func(w *RMQWorker, err *constants.APIError)
RMQErrorCallback - RMQ error callback function
type RMQExchangeDeclareTask ¶ added in v1.19.3
type RMQExchangeDeclareTask struct { ExchangeName string ExchangeType string MessagesLifetime int64 }
RMQExchangeDeclareTask - exchange declare task data container
type RMQHandler ¶
type RMQHandler struct {
// contains filtered or unexported fields
}
RMQHandler - RMQ connection handler
func NewRMQHandler ¶
func NewRMQHandler(task CreateRMQHandlerTask) (*RMQHandler, errs.APIError)
NewRMQHandler - create new RMQHandler
func (*RMQHandler) DeclareExchanges ¶
func (r *RMQHandler) DeclareExchanges(exchangeTypes map[string]string) errs.APIError
DeclareExchanges - declare RMQ exchanges list. exchange name -> exchange type
func (*RMQHandler) DeleteQueues ¶
func (r *RMQHandler) DeleteQueues(queueNames map[string][]string) errs.APIError
DeleteQueues - delete RMQ queues. map[manager name] -> array of queue names
func (*RMQHandler) IsConnectionAlive ¶ added in v1.29.1
func (r *RMQHandler) IsConnectionAlive() bool
IsConnectionAlive - check connection
func (*RMQHandler) IsQueueExists ¶ added in v1.23.17
func (r *RMQHandler) IsQueueExists(name string) (bool, errs.APIError)
IsQueueExists - is queue exists? /ᐠ。ꞈ。ᐟ\
func (*RMQHandler) NewRMQHandler ¶
func (r *RMQHandler) NewRMQHandler() *RMQHandler
NewRMQHandler - clone handler & open new RMQ channel
func (*RMQHandler) NewRMQWorker ¶
func (r *RMQHandler) NewRMQWorker(task WorkerTask) (*RMQWorker, errs.APIError)
NewRMQWorker - create new RMQ worker to receive messages
func (*RMQHandler) NewRequestHandler ¶ added in v1.18.3
func (h *RMQHandler) NewRequestHandler(task RequestHandlerTask) (*RequestHandler, errs.APIError)
NewRequestHandler - create new handler for one-time request
func (*RMQHandler) PublishCSXTToExchange ¶ added in v1.29.0
func (r *RMQHandler) PublishCSXTToExchange(task structs.PublishToExchangeTask, tx cstx.CrossServiceTransaction) errs.APIError
func (*RMQHandler) PublishCSXTToQueue ¶ added in v1.29.0
func (r *RMQHandler) PublishCSXTToQueue(task structs.RMQPublishRequestTask, tx cstx.CrossServiceTransaction) errs.APIError
func (*RMQHandler) PublishToExchange ¶ added in v1.26.23
func (r *RMQHandler) PublishToExchange(task structs.PublishToExchangeTask, additionalHeaders ...structs.RMQHeader) errs.APIError
PublishToExchange - publish message to exchangeю responseRoutingKey is optional to send requests to exchange
func (*RMQHandler) PublishToQueue ¶ added in v1.26.23
func (r *RMQHandler) PublishToQueue(task structs.RMQPublishRequestTask, additionalHeaders ...structs.RMQHeader) errs.APIError
PublishToQueue - send request to rmq queue
func (*RMQHandler) RMQPublishToExchange ¶
func (r *RMQHandler) RMQPublishToExchange( message interface{}, exchangeName, routingKey string, responseRoutingKey ...string, ) errs.APIError
RMQPublishToExchange - publish message to exchange. responseRoutingKey is optional to send requests to exchange. NOTE: should be replaced by PublishToExchange in later lib versions
func (*RMQHandler) RMQPublishToQueue ¶
func (r *RMQHandler) RMQPublishToQueue(task structs.RMQPublishRequestTask) errs.APIError
RMQPublishToQueue - send request to rmq queue. NOTE: should be replaced by PublishToExchange in later lib versions
func (*RMQHandler) SendRMQResponse ¶
func (r *RMQHandler) SendRMQResponse( task *RMQPublishResponseTask, errorMsg ...*constants.APIError, ) errs.APIError
SendRMQResponse - publish message to RMQ exchange
func (*RMQHandler) StartCSTXAcksConsumer ¶ added in v1.28.0
func (handler *RMQHandler) StartCSTXAcksConsumer() errs.APIError
type RMQPublishResponseTask ¶
type RMQPublishResponseTask struct { ExchangeName string ResponseRoutingKey string CorrelationID string MessageBody interface{} }
RMQPublishResponseTask - response for publish message to RMQ request
type RMQQueueDeclareTask ¶
type RMQQueueDeclareTask struct { QueueName string Durable bool AutoDelete bool FromExchangeName string RoutingKey string // optional MessagesLifetime int64 QueueLength int64 DisableOverflow bool }
RMQQueueDeclareTask - queue declare task data container
type RMQTimeoutCallback ¶
type RMQTimeoutCallback func(w *RMQWorker)
RMQTimeoutCallback - RMQ response timeout callback function
type RMQWorker ¶
RMQWorker - just RMQ worker
func (*RMQWorker) AwaitFinish ¶
func (w *RMQWorker) AwaitFinish()
AwaitFinish - wait for worker finished
func (*RMQWorker) Finish ¶ added in v1.26.14
func (w *RMQWorker) Finish()
Finish worker but continue listen messages
func (*RMQWorker) SetCheckResponseErrors ¶
SetCheckResponseErrors - determines whether the errors in the answers passed to headers will be checked
func (*RMQWorker) SetConsumerTag ¶ added in v1.15.2
SetConsumerTag - set worker unique consumer tag
func (*RMQWorker) SetConsumerTagFromName ¶ added in v1.15.2
SetConsumerTagFromName - assign a consumer tag to the worker based on its name and random ID
type RequestHandler ¶ added in v1.18.3
type RequestHandler struct { RMQH *RMQHandler Task RequestHandlerTask Worker *RMQWorker WorkerID string Response *RequestHandlerResponse LastError *constants.APIError Finished chan struct{} IsPaused bool }
RequestHandler - periodic request handler
func (*RequestHandler) DeleteQueues ¶ added in v1.26.6
func (r *RequestHandler) DeleteQueues() errs.APIError
func (*RequestHandler) Send ¶ added in v1.18.3
func (r *RequestHandler) Send(messageBody interface{}, responseRoutingKey string) (*RequestHandlerResponse, errs.APIError)
Send request (sync)
func (*RequestHandler) SetID ¶ added in v1.18.3
func (r *RequestHandler) SetID(id string) *RequestHandler
SetID for worker
func (*RequestHandler) Stop ¶ added in v1.26.10
func (r *RequestHandler) Stop()
Stop handler, cancel consumer
type RequestHandlerResponse ¶ added in v1.18.3
type RequestHandlerResponse struct {
ResponseBody []byte
}
RequestHandlerResponse - raw RMQ response data
func (*RequestHandlerResponse) Decode ¶ added in v1.18.3
func (s *RequestHandlerResponse) Decode(destination interface{}) errs.APIError
Decode response from JSON. pass pointer to struct or map in `destination`
type RequestHandlerTask ¶ added in v1.18.3
type RequestHandlerTask struct { // required ResponseFromExchangeName string RequestToQueueName string TempQueueName string AttemptsNumber int Timeout time.Duration // optional ExchangeInsteadOfQueue bool WorkerName string ForceQueueToDurable bool MethodFriendlyName string // the name of the operation performed by the vorker for the logs and errors }
RequestHandlerTask data
type WorkerTask ¶ added in v1.21.3
type WorkerTask struct { // required QueueName string RoutingKey string ISQueueDurable bool ISAutoDelete bool Callback RMQDeliveryCallback // callback to handle RMQ delivery // optional ID string // worker ID FromExchange string // exchange name to bind queue ExchangeType string // direct, topic, etc ConsumersCount int // default: 1 WorkerName string // worker name. default name when empty EnableRateLimiter bool // limit handle rmq messages rate MaxEventsPerSecond int // for limiter QueueLength int64 // how many maximum messages to keep in the queue MessagesLifetime int64 // milliseconds. 0 to disable limit DisableOverflow bool // disable queue overflow DisableCheckResponseErrors bool // if it is necessary to handle an error at the service level, not at the library level UseErrorCallback bool // handle worker errors with error handler ErrorCallback RMQErrorCallback // error handler callback Timeout time.Duration // timeout to limit worker time TimeoutCallback RMQTimeoutCallback // timeout callback DoNotStopOnTimeout bool ManualAck bool }
WorkerTask - new RMQ worker data