Documentation ¶
Overview ¶
an implementation of MetadataService which uses in memory lookups
a service providing RabbitMQ metadata (queues, exchanges, connections...).
Index ¶
- Constants
- Variables
- func BindExchangeToExchange(session Session, sourceExchange, key, targetExchange string, args amqp.Table) error
- func BindQueueToExchange(session Session, queueName, key, exchangeName string, args amqp.Table) error
- func CreateExchange(session Session, exchangeName, exchangeType string, durable, autoDelete bool, ...) error
- func CreateQueue(session Session, queueName string, durable, autoDelete, exclusive bool, ...) error
- func DialTLS(uri string, tlsConfig *tls.Config) (*amqp.Connection, error)
- func DiscoverBindingsForExchange(ctx context.Context, rabbitAPIClient *RabbitHTTPClient, ...) ([]string, error)
- func EnsureAMQPTable(m interface{}) interface{}
- func MergeTables(first, second amqp.Table) amqp.Table
- func PurgeQueue(session Session, queueName string) (int, error)
- func RemoveExchange(session Session, exchangeName string, ifUnused bool) error
- func RemoveQueue(session Session, queueName string, ifUnused, ifEmpty bool) error
- func SimpleAmqpConnector(amqpURL *url.URL, tlsConfig *tls.Config, run func(session Session) error) error
- func ToAMQPTable(headers KeyValueMap) amqp.Table
- func UnbindQueueFromExchange(session Session, queueName, key, exchangeName string, args amqp.Table) error
- type AmqpConnector
- type AmqpPublish
- type AmqpSubscriber
- type AmqpSubscriberConfig
- type AmqpTap
- type AmqpWorkerFunc
- type BrokerInfo
- type ChannelDetails
- type ConnectionDetails
- type DialFunc
- type ExchangeConfiguration
- type Fanin
- type InMemoryMetadataService
- func (s InMemoryMetadataService) AllBindingsForExchange(vhost, name string) []*RabbitBinding
- func (s InMemoryMetadataService) AllChannelsForConnection(vhost, name string) []*RabbitChannel
- func (s InMemoryMetadataService) AllConsumersForChannel(vhost, name string) []*RabbitConsumer
- func (s InMemoryMetadataService) Bindings() []RabbitBinding
- func (s InMemoryMetadataService) Channels() []RabbitChannel
- func (s InMemoryMetadataService) Connections() []RabbitConnection
- func (s InMemoryMetadataService) Consumers() []RabbitConsumer
- func (s InMemoryMetadataService) Exchanges() []RabbitExchange
- func (s InMemoryMetadataService) FindChannelByName(vhost, name string) *RabbitChannel
- func (s InMemoryMetadataService) FindConnectionByName(vhost, name string) *RabbitConnection
- func (s InMemoryMetadataService) FindExchangeByName(vhost, name string) *RabbitExchange
- func (s InMemoryMetadataService) FindQueueByName(vhost, name string) *RabbitQueue
- func (s InMemoryMetadataService) FindVhostByName(vhost string) *RabbitVhost
- func (s InMemoryMetadataService) Overview() RabbitOverview
- func (s InMemoryMetadataService) Queues() []RabbitQueue
- func (s InMemoryMetadataService) Vhosts() []RabbitVhost
- type KeyValueMap
- type Logger
- type MetadataService
- type OptInt
- type PublishChannel
- type PublishError
- type PublishErrorChannel
- type PublishErrorReason
- type PublishMessage
- type RabbitBinding
- type RabbitChannel
- type RabbitConnection
- type RabbitConsumer
- type RabbitExchange
- type RabbitHTTPClient
- func (s *RabbitHTTPClient) Bindings(ctx context.Context) ([]RabbitBinding, error)
- func (s *RabbitHTTPClient) BrokerInfo(ctx context.Context) (BrokerInfo, error)
- func (s *RabbitHTTPClient) Channels(ctx context.Context) ([]RabbitChannel, error)
- func (s *RabbitHTTPClient) CloseConnection(ctx context.Context, conn, reason string) error
- func (s *RabbitHTTPClient) Connections(ctx context.Context) ([]RabbitConnection, error)
- func (s *RabbitHTTPClient) Consumers(ctx context.Context) ([]RabbitConsumer, error)
- func (s *RabbitHTTPClient) Exchanges(ctx context.Context) ([]RabbitExchange, error)
- func (s *RabbitHTTPClient) Overview(ctx context.Context) (RabbitOverview, error)
- func (s *RabbitHTTPClient) Queues(ctx context.Context) ([]RabbitQueue, error)
- func (s *RabbitHTTPClient) Vhosts(ctx context.Context) ([]RabbitVhost, error)
- type RabbitOverview
- type RabbitQueue
- type RabbitVhost
- type ReconnectAction
- type Routing
- type Session
- type SubscribeError
- type SubscribeErrorChannel
- type SubscribeErrorReason
- type TapChannel
- type TapConfiguration
- type TapMessage
Constants ¶
const (
FailEarly = true
)
const PrefetchCount = 1
const PrefetchSize = 0
Variables ¶
var Dialer = net.Dial
Functions ¶
func BindExchangeToExchange ¶
func BindExchangeToExchange(session Session, sourceExchange, key, targetExchange string, args amqp.Table) error
BindExchangeToExchange binds the given queue to the given exchange.
func BindQueueToExchange ¶
func BindQueueToExchange(session Session, queueName, key, exchangeName string, args amqp.Table) error
BindQueueToExchange binds the given queue to the given exchange.
func CreateExchange ¶
func CreateExchange(session Session, exchangeName, exchangeType string, durable, autoDelete bool, args amqp.Table) error
CreateExchange creates a new echange on the given channel
func CreateQueue ¶
func CreateQueue(session Session, queueName string, durable, autoDelete, exclusive bool, args amqp.Table) error
CreateQueue creates a new queue TODO(JD) get rid of bool types
func DialTLS ¶
DialTLS is a Wrapper for amqp.DialTLS that supports EXTERNAL auth for mtls can be removed when https://github.com/streadway/amqp/pull/121 gets some day merged.
func DiscoverBindingsForExchange ¶
func DiscoverBindingsForExchange(ctx context.Context, rabbitAPIClient *RabbitHTTPClient, vhost, exchangeName string) ([]string, error)
DiscoverBindingsForExchange returns a string list of routing-keys that are used by the given exchange and broker. This list can be used to auto-tap to all queues on a given exchange
func EnsureAMQPTable ¶
func EnsureAMQPTable(m interface{}) interface{}
EnsureAMQPTable returns an object where all map[string]interface{} are replaced by amqp.Table{} so it is compatible with the amqp libs type system when it comes to passing headers, which expects (nested) amqp.Table structures.
See https://github.com/streadway/amqp/blob/e6b33f460591b0acb2f13b04ef9cf493720ffe17/types.go#L227
func MergeTables ¶
MergeTable merges the given amqp.Table's, the second one overrideing the values of the first one
func PurgeQueue ¶
PurgeQueue clears a queue. Returns number of elements purged
func RemoveExchange ¶
RemoveExchange removes a echange on the given channel
func RemoveQueue ¶
RemoveQueue removes a queue
func SimpleAmqpConnector ¶
func SimpleAmqpConnector(amqpURL *url.URL, tlsConfig *tls.Config, run func(session Session) error) error
SimpleAmqpConnector opens an AMQP connection and channel, and calls a function with the channel as argument. Use this function for simple, one-shot operations like creation of queues, exchanges etc.
func ToAMQPTable ¶
func ToAMQPTable(headers KeyValueMap) amqp.Table
ToAMQPTable converts a KeyValueMap to an amqp.Table, trying to infer data types: - integers - timestamps in RFC3339 format - strings (default)
Types ¶
type AmqpConnector ¶
type AmqpConnector struct {
// contains filtered or unexported fields
}
AmqpConnector manages the connection to the amqp broker and automatically reconnects after connections losses
func NewAmqpConnector ¶
NewAmqpConnector creates a new AmqpConnector object.
func (*AmqpConnector) Connect ¶
func (s *AmqpConnector) Connect(ctx context.Context, worker AmqpWorkerFunc) error
Connect (re-)establishes the connection to RabbitMQ broker.
type AmqpPublish ¶
type AmqpPublish struct {
// contains filtered or unexported fields
}
AmqpPublish allows to send to a RabbitMQ exchange.
func NewAmqpPublish ¶
func NewAmqpPublish(url *url.URL, tlsConfig *tls.Config, mandatory, confirms bool, logger Logger) *AmqpPublish
NewAmqpPublish returns a new AmqpPublish object associated with the RabbitMQ broker denoted by the uri parameter.
func (*AmqpPublish) EstablishConnection ¶
func (s *AmqpPublish) EstablishConnection( ctx context.Context, publishChannel PublishChannel, errorChannel PublishErrorChannel) error
EstablishConnection sets up the connection to the broker
type AmqpSubscriber ¶
type AmqpSubscriber struct {
// contains filtered or unexported fields
}
AmqpSubscriber allows to tap to subscribe to queues
func NewAmqpSubscriber ¶
func NewAmqpSubscriber(config AmqpSubscriberConfig, url *url.URL, tlsConfig *tls.Config, logger Logger) *AmqpSubscriber
NewAmqpSubscriber returns a new AmqpSubscriber object associated with the RabbitMQ broker denoted by the uri parameter.
func (*AmqpSubscriber) EstablishSubscription ¶
func (s *AmqpSubscriber) EstablishSubscription( ctx context.Context, queueName string, tapCh TapChannel, errCh SubscribeErrorChannel) error
EstablishSubscription sets up the connection to the broker and sets up the tap, which is bound to the provided consumer function. Typically this function is run as a go-routine.
queueName is the queue to subscribe to. tapCh is where the consumed messages are sent to. errCh is the channel where errors are sent to.
type AmqpSubscriberConfig ¶
AmqpSubscriberConfig stores configuration of the subscriber
type AmqpTap ¶
type AmqpTap struct { *AmqpSubscriber // contains filtered or unexported fields }
AmqpTap allows to tap to an RabbitMQ exchange.
func NewAmqpTap ¶
NewAmqpTap returns a new AmqpTap object associated with the RabbitMQ broker denoted by the uri parameter.
func (*AmqpTap) EstablishTap ¶
func (s *AmqpTap) EstablishTap( ctx context.Context, exchangeConfigList []ExchangeConfiguration, tapCh TapChannel, errorCh SubscribeErrorChannel) error
EstablishTap sets up the connection to the broker and sets up the tap, which is bound to the provided consumer function. Typically this function is run as a go-routine.
type AmqpWorkerFunc ¶
type AmqpWorkerFunc func(ctx context.Context, session Session) (ReconnectAction, error)
An AmqpWorkerFunc does the actual work after the connection is established. If the worker returns true, the caller should re-connect to the broker. If the worker returne false, the caller should finish its processing. The worker must return with NoReconnect if a ShutdownMessage is received via shutdownChan, otherwise with Reconnect.
type BrokerInfo ¶
type BrokerInfo struct { Overview RabbitOverview Connections []RabbitConnection Exchanges []RabbitExchange Queues []RabbitQueue Consumers []RabbitConsumer Bindings []RabbitBinding Channels []RabbitChannel Vhosts []RabbitVhost }
BrokerInfo represents the state of various RabbitMQ ressources as returned by the RabbitMQ REST API
type ChannelDetails ¶
type ChannelDetails struct { PeerHost string `json:"peer_host"` PeerPort OptInt `json:"peer_port"` ConnectionName string `json:"connection_name"` User string `json:"user"` Number int `json:"number"` Node string `json:"node"` Name string `json:"name"` }
ChannelDetails model channel_details in RabbitConsumer
func (*ChannelDetails) UnmarshalJSON ¶
func (d *ChannelDetails) UnmarshalJSON(data []byte) error
UnmarshalJSON is a custom unmarshaler as a WORKAROUND for RabbitMQ API returning "[]" instead of null. To make sure deserialization does not break, we catch this case, and return an empty ChannelDetails struct. see e.g. https://github.com/rabbitmq/rabbitmq-management/issues/424
type ConnectionDetails ¶
type ConnectionDetails struct { PeerHost string `json:"peer_host"` PeerPort OptInt `json:"peer_port"` Name string `json:"name"` }
func (*ConnectionDetails) UnmarshalJSON ¶
func (d *ConnectionDetails) UnmarshalJSON(data []byte) error
UnmarshalJSON is a custom unmarshaler as a WORKAROUND for RabbitMQ API returning "[]" instead of null. To make sure deserialization does not break, we catch this case, and return an empty ChannelDetails struct. see e.g. https://github.com/rabbitmq/rabbitmq-management/issues/424
type ExchangeConfiguration ¶
type ExchangeConfiguration struct { // Exchange is the name of the exchange to bind to Exchange string // BindingKey is the binding key to use. The key depends on the the type // of exchange being tapped (e.g. direct, topic). BindingKey string }
ExchangeConfiguration holds exchange and bindingkey for a single tap
func NewExchangeConfiguration ¶
func NewExchangeConfiguration(exchangeAndBindingStr string) (*ExchangeConfiguration, error)
NewExchangeConfiguration returns a pointer to a newly created ExchangeConfiguration object
type Fanin ¶
type Fanin struct { Ch chan interface{} // contains filtered or unexported fields }
Fanin allows to do a select ("fan-in") on an set of channels
type InMemoryMetadataService ¶
type InMemoryMetadataService struct {
// contains filtered or unexported fields
}
func NewInMemoryMetadataService ¶
func NewInMemoryMetadataService(brokerInfo BrokerInfo) *InMemoryMetadataService
func (InMemoryMetadataService) AllBindingsForExchange ¶
func (s InMemoryMetadataService) AllBindingsForExchange(vhost, name string) []*RabbitBinding
func (InMemoryMetadataService) AllChannelsForConnection ¶
func (s InMemoryMetadataService) AllChannelsForConnection(vhost, name string) []*RabbitChannel
func (InMemoryMetadataService) AllConsumersForChannel ¶
func (s InMemoryMetadataService) AllConsumersForChannel(vhost, name string) []*RabbitConsumer
func (InMemoryMetadataService) Bindings ¶
func (s InMemoryMetadataService) Bindings() []RabbitBinding
func (InMemoryMetadataService) Channels ¶
func (s InMemoryMetadataService) Channels() []RabbitChannel
func (InMemoryMetadataService) Connections ¶
func (s InMemoryMetadataService) Connections() []RabbitConnection
func (InMemoryMetadataService) Consumers ¶
func (s InMemoryMetadataService) Consumers() []RabbitConsumer
func (InMemoryMetadataService) Exchanges ¶
func (s InMemoryMetadataService) Exchanges() []RabbitExchange
func (InMemoryMetadataService) FindChannelByName ¶
func (s InMemoryMetadataService) FindChannelByName(vhost, name string) *RabbitChannel
func (InMemoryMetadataService) FindConnectionByName ¶
func (s InMemoryMetadataService) FindConnectionByName(vhost, name string) *RabbitConnection
func (InMemoryMetadataService) FindExchangeByName ¶
func (s InMemoryMetadataService) FindExchangeByName(vhost, name string) *RabbitExchange
func (InMemoryMetadataService) FindQueueByName ¶
func (s InMemoryMetadataService) FindQueueByName(vhost, name string) *RabbitQueue
func (InMemoryMetadataService) FindVhostByName ¶
func (s InMemoryMetadataService) FindVhostByName(vhost string) *RabbitVhost
func (InMemoryMetadataService) Overview ¶
func (s InMemoryMetadataService) Overview() RabbitOverview
func (InMemoryMetadataService) Queues ¶
func (s InMemoryMetadataService) Queues() []RabbitQueue
func (InMemoryMetadataService) Vhosts ¶
func (s InMemoryMetadataService) Vhosts() []RabbitVhost
type KeyValueMap ¶
KeyValueMap is a string -> string map used to store key value pairs defined on the command line.
type Logger ¶
type Logger interface { Debugf(format string, a ...interface{}) Errorf(format string, a ...interface{}) Infof(format string, a ...interface{}) }
we don't want to be this package dependent on a logging framework. So a logger is injected from the client code.
type MetadataService ¶
type MetadataService interface { Overview() RabbitOverview Connections() []RabbitConnection Exchanges() []RabbitExchange Queues() []RabbitQueue Consumers() []RabbitConsumer Bindings() []RabbitBinding Channels() []RabbitChannel Vhosts() []RabbitVhost FindQueueByName(vhost, name string) *RabbitQueue FindExchangeByName(vhost, name string) *RabbitExchange FindChannelByName(vhost, name string) *RabbitChannel FindConnectionByName(vhost, name string) *RabbitConnection FindVhostByName(vhost string) *RabbitVhost AllChannelsForConnection(vhost, name string) []*RabbitChannel AllConsumersForChannel(vhost, name string) []*RabbitConsumer AllBindingsForExchange(vhost, name string) []*RabbitBinding }
type OptInt ¶
type OptInt int
func (*OptInt) UnmarshalJSON ¶
UnmarshalJSON is a workaround to deserialize int attributes in the RabbitMQ API which are sometimes returned as strings, (i.e. the value "undefined").
type PublishChannel ¶
type PublishChannel chan *PublishMessage
PublishChannel is a channel for PublishMessage message objects
type PublishError ¶
type PublishError struct { Reason PublishErrorReason // Publishing stores the original message, if available (AckTimeout, Nack, // PublishFailed) Message *PublishMessage // ReturnedMessage stores the returned message in case of PublishErrorReturned ReturnedMessage *amqp.Return // Cause holds the error when a ChannelError happened Cause error }
PublishError is sent back trough the error channel when there are problems during the publishing of messages
func (*PublishError) Error ¶
func (s *PublishError) Error() string
type PublishErrorChannel ¶
type PublishErrorChannel chan *PublishError
type PublishErrorReason ¶
type PublishErrorReason int
const ( PublishErrorAckTimeout PublishErrorReason = iota PublishErrorNack PublishErrorPublishFailed PublishErrorReturned PublishErrorChannelError )
type PublishMessage ¶
type PublishMessage struct { Routing Routing Publishing *amqp.Publishing }
PublishMessage is a message to be published by AmqpPublish via a PublishChannel
type RabbitBinding ¶
type RabbitBinding struct { Source string `json:"source"` Vhost string `json:"vhost"` Destination string `json:"destination"` DestinationType string `json:"destination_type"` RoutingKey string `json:"routing_key"` Arguments map[string]interface{} `json:"arguments,omitempty"` PropertiesKey string `json:"properties_key"` }
RabbitBinding models the /bindings resource of the rabbitmq http api
func (RabbitBinding) IsExchangeToExchange ¶
func (s RabbitBinding) IsExchangeToExchange() bool
IsExchangeToExchange returns true if this is an exchange-to-exchange binding
type RabbitChannel ¶
type RabbitChannel struct { ReductionsDetails struct { Rate float64 `json:"rate"` } `json:"reductions_details"` Reductions int `json:"reductions"` MessageStats struct { ReturnUnroutableDetails struct { Rate float64 `json:"rate"` } `json:"return_unroutable_details"` ReturnUnroutable int `json:"return_unroutable"` ConfirmDetails struct { Rate float64 `json:"rate"` } `json:"confirm_details"` Confirm int `json:"confirm"` PublishDetails struct { Rate float64 `json:"rate"` } `json:"publish_details"` Publish int `json:"publish"` Ack int `json:"ack"` AckDetails struct { Rate float64 `json:"rate"` } `json:"ack_details"` Deliver int `json:"deliver"` DeliverDetails struct { Rate float64 `json:"rate"` } `json:"deliver_details"` DeliverGet int `json:"deliver_get"` DeliverGetDetails struct { Rate float64 `json:"rate"` } `json:"deliver_get_details"` DeliverNoAck int `json:"deliver_no_ack"` DeliverNoAckDetails struct { Rate float64 `json:"rate"` } `json:"deliver_no_ack_details"` Get int `json:"get"` GetDetails struct { Rate float64 `json:"rate"` } `json:"get_details"` GetEmpty int `json:"get_empty"` GetEmptyDetails struct { Rate float64 `json:"rate"` } `json:"get_empty_details"` GetNoAck int `json:"get_no_ack"` GetNoAckDetails struct { Rate float64 `json:"rate"` } `json:"get_no_ack_details"` Redeliver int `json:"redeliver"` RedeliverDetails struct { Rate float64 `json:"rate"` } `json:"redeliver_details"` } `json:"message_stats"` Vhost string `json:"vhost"` User string `json:"user"` Number int `json:"number"` Name string `json:"name"` Node string `json:"node"` ConnectionDetails ConnectionDetails `json:"connection_details"` GarbageCollection struct { MinorGcs int `json:"minor_gcs"` FullsweepAfter int `json:"fullsweep_after"` MinHeapSize int `json:"min_heap_size"` MinBinVheapSize int `json:"min_bin_vheap_size"` MaxHeapSize int `json:"max_heap_size"` } `json:"garbage_collection"` State string `json:"state"` GlobalPrefetchCount int `json:"global_prefetch_count"` PrefetchCount int `json:"prefetch_count"` AcksUncommitted int `json:"acks_uncommitted"` MessagesUncommitted int `json:"messages_uncommitted"` MessagesUnconfirmed int `json:"messages_unconfirmed"` MessagesUnacknowledged int `json:"messages_unacknowledged"` ConsumerCount int `json:"consumer_count"` Confirm bool `json:"confirm"` Transactional bool `json:"transactional"` IdleSince string `json:"idle_since"` }
RabbitChannel models the /channels resource of the rabbitmq http api
type RabbitConnection ¶
type RabbitConnection struct { ReductionsDetails struct { Rate float64 `json:"rate"` } `json:"reductions_details"` Reductions int `json:"reductions"` RecvOctDetails struct { Rate float64 `json:"rate"` } `json:"recv_oct_details"` RecvOct int `json:"recv_oct"` SendOctDetails struct { Rate float64 `json:"rate"` } `json:"send_oct_details"` SendOct int `json:"send_oct"` ConnectedAt int64 `json:"connected_at"` ClientProperties struct { Product string `json:"product"` Version string `json:"version"` ConnectionName string `json:"connection_name"` Capabilities struct { ConnectionBlocked bool `json:"connection.blocked"` ConsumerCancelNotify bool `json:"consumer_cancel_notify"` } `json:"capabilities"` } `json:"client_properties"` ChannelMax int `json:"channel_max"` FrameMax int `json:"frame_max"` Timeout int `json:"timeout"` Vhost string `json:"vhost"` User string `json:"user"` Protocol string `json:"protocol"` SslHash interface{} `json:"ssl_hash"` SslCipher interface{} `json:"ssl_cipher"` SslKeyExchange interface{} `json:"ssl_key_exchange"` SslProtocol interface{} `json:"ssl_protocol"` AuthMechanism string `json:"auth_mechanism"` PeerCertValidity interface{} `json:"peer_cert_validity"` PeerCertIssuer interface{} `json:"peer_cert_issuer"` PeerCertSubject interface{} `json:"peer_cert_subject"` Ssl bool `json:"ssl"` PeerHost string `json:"peer_host"` Host string `json:"host"` PeerPort int `json:"peer_port"` Port int `json:"port"` Name string `json:"name"` Node string `json:"node"` Type string `json:"type"` GarbageCollection struct { MinorGcs int `json:"minor_gcs"` FullsweepAfter int `json:"fullsweep_after"` MinHeapSize int `json:"min_heap_size"` MinBinVheapSize int `json:"min_bin_vheap_size"` MaxHeapSize int `json:"max_heap_size"` } `json:"garbage_collection"` Channels int `json:"channels"` State string `json:"state"` SendPend int `json:"send_pend"` SendCnt int `json:"send_cnt"` RecvCnt int `json:"recv_cnt"` }
RabbitConnection models the /connections resource of the rabbitmq http api
type RabbitConsumer ¶
type RabbitConsumer struct { // Arguments []interface{} `json:"arguments"` PrefetchCount int `json:"prefetch_count"` AckRequired bool `json:"ack_required"` Active bool `json:"active"` ActivityStatus string `json:"activity_status"` Exclusive bool `json:"exclusive"` ConsumerTag string `json:"consumer_tag"` // see WORKAROUND above ChannelDetails ChannelDetails `json:"channel_details,omitempty"` Queue struct { Vhost string `json:"vhost"` Name string `json:"name"` } `json:"queue"` }
RabbitConsumer models the /consumers resource of the rabbitmq http api
type RabbitExchange ¶
type RabbitExchange struct { Name string `json:"name"` Vhost string `json:"vhost"` Type string `json:"type"` Durable bool `json:"durable"` AutoDelete bool `json:"auto_delete"` Internal bool `json:"internal"` Arguments map[string]interface{} `json:"arguments,omitempty"` MessageStats struct { PublishOut int `json:"publish_out"` PublishOutDetails struct { Rate float64 `json:"rate"` } `json:"publish_out_details"` PublishIn int `json:"publish_in"` PublishInDetails struct { Rate float64 `json:"rate"` } `json:"publish_in_details"` } `json:"message_stats,omitempty"` }
RabbitExchange models the /exchanges resource of the rabbitmq http api
type RabbitHTTPClient ¶
type RabbitHTTPClient struct {
// contains filtered or unexported fields
}
RabbitHTTPClient is a minimal client to the rabbitmq management REST api. It implements only functions needed by this tool (i.e. GET on some of the resources). The messages structs were generated using json-to-go ( https://mholt.github.io/json-to-go/).
func NewRabbitHTTPClient ¶
func NewRabbitHTTPClient(url *url.URL, tlsConfig *tls.Config) *RabbitHTTPClient
NewRabbitHTTPClient returns a new instance of an RabbitHTTPClient. url is the base API URL of the REST server.
func (*RabbitHTTPClient) Bindings ¶
func (s *RabbitHTTPClient) Bindings(ctx context.Context) ([]RabbitBinding, error)
Bindings returns the /bindings resource of the RabbitMQ REST API
func (*RabbitHTTPClient) BrokerInfo ¶
func (s *RabbitHTTPClient) BrokerInfo(ctx context.Context) (BrokerInfo, error)
BrokerInfo gets all resources of the broker in parallel
func (*RabbitHTTPClient) Channels ¶
func (s *RabbitHTTPClient) Channels(ctx context.Context) ([]RabbitChannel, error)
Channels returns the /channels resource of the RabbitMQ REST API
func (*RabbitHTTPClient) CloseConnection ¶
func (s *RabbitHTTPClient) CloseConnection(ctx context.Context, conn, reason string) error
CloseConnection closes a connection by DELETING the associated resource
func (*RabbitHTTPClient) Connections ¶
func (s *RabbitHTTPClient) Connections(ctx context.Context) ([]RabbitConnection, error)
Connections returns the /connections resource of the RabbitMQ REST API
func (*RabbitHTTPClient) Consumers ¶
func (s *RabbitHTTPClient) Consumers(ctx context.Context) ([]RabbitConsumer, error)
Consumers returns the /consumers resource of the RabbitMQ REST API
func (*RabbitHTTPClient) Exchanges ¶
func (s *RabbitHTTPClient) Exchanges(ctx context.Context) ([]RabbitExchange, error)
Exchanges returns the /exchanges resource of the RabbitMQ REST API
func (*RabbitHTTPClient) Overview ¶
func (s *RabbitHTTPClient) Overview(ctx context.Context) (RabbitOverview, error)
Overview returns the /overview resource of the RabbitMQ REST API
func (*RabbitHTTPClient) Queues ¶
func (s *RabbitHTTPClient) Queues(ctx context.Context) ([]RabbitQueue, error)
Queues returns the /queues resource of the RabbitMQ REST API
func (*RabbitHTTPClient) Vhosts ¶
func (s *RabbitHTTPClient) Vhosts(ctx context.Context) ([]RabbitVhost, error)
Vhosts returns the /vhosts resource of the RabbitMQ REST API
type RabbitOverview ¶
type RabbitOverview struct { ManagementVersion string `json:"management_version"` RatesMode string `json:"rates_mode"` ExchangeTypes []struct { Name string `json:"name"` Description string `json:"description"` Enabled bool `json:"enabled"` } `json:"exchange_types"` RabbitmqVersion string `json:"rabbitmq_version"` ClusterName string `json:"cluster_name"` ErlangVersion string `json:"erlang_version"` ErlangFullVersion string `json:"erlang_full_version"` MessageStats struct { DiskReads int `json:"disk_reads"` DiskReadsDetails struct { Rate float64 `json:"rate"` } `json:"disk_reads_details"` DiskWrites int `json:"disk_writes"` DiskWritesDetails struct { Rate float64 `json:"rate"` } `json:"disk_writes_details"` } `json:"message_stats"` QueueTotals struct { MessagesReady int `json:"messages_ready"` MessagesReadyDetails struct { Rate float64 `json:"rate"` } `json:"messages_ready_details"` MessagesUnacknowledged int `json:"messages_unacknowledged"` MessagesUnacknowledgedDetails struct { Rate float64 `json:"rate"` } `json:"messages_unacknowledged_details"` Messages int `json:"messages"` MessagesDetails struct { Rate float64 `json:"rate"` } `json:"messages_details"` } `json:"queue_totals"` ObjectTotals struct { Consumers int `json:"consumers"` Queues int `json:"queues"` Exchanges int `json:"exchanges"` Connections int `json:"connections"` Channels int `json:"channels"` } `json:"object_totals"` StatisticsDbEventQueue int `json:"statistics_db_event_queue"` Node string `json:"node"` Listeners []struct { Node string `json:"node"` Protocol string `json:"protocol"` IPAddress string `json:"ip_address"` Port int `json:"port"` } `json:"listeners"` Contexts []struct { Node string `json:"node"` Description string `json:"description"` Path string `json:"path"` Port string `json:"port"` Ssl string `json:"ssl"` } `json:"contexts"` }
RabbitOverview models the /overview resource of the rabbitmq http api
type RabbitQueue ¶
type RabbitQueue struct { MessagesDetails struct { Rate float64 `json:"rate"` } `json:"messages_details"` Messages int `json:"messages"` MessagesUnacknowledgedDetails struct { Rate float64 `json:"rate"` } `json:"messages_unacknowledged_details"` MessagesUnacknowledged int `json:"messages_unacknowledged"` MessagesReadyDetails struct { Rate float64 `json:"rate"` } `json:"messages_ready_details"` MessagesReady int `json:"messages_ready"` ReductionsDetails struct { Rate float64 `json:"rate"` } `json:"reductions_details"` Reductions int `json:"reductions"` Node string `json:"node"` Arguments struct { } `json:"arguments"` Exclusive bool `json:"exclusive"` AutoDelete bool `json:"auto_delete"` Durable bool `json:"durable"` Vhost string `json:"vhost"` Name string `json:"name"` Type string `json:"type"` MessageBytesPagedOut int `json:"message_bytes_paged_out"` MessagesPagedOut int `json:"messages_paged_out"` BackingQueueStatus struct { Mode string `json:"mode"` Q1 int `json:"q1"` Q2 int `json:"q2"` // Delta []interface{} `json:"delta"` Q3 int `json:"q3"` Q4 int `json:"q4"` Len int `json:"len"` // TargetRAMCount int `json:"target_ram_count"` // string or int -> need further research here when attr is in need ("infinity") NextSeqID int `json:"next_seq_id"` AvgIngressRate float64 `json:"avg_ingress_rate"` AvgEgressRate float64 `json:"avg_egress_rate"` AvgAckIngressRate float64 `json:"avg_ack_ingress_rate"` AvgAckEgressRate float64 `json:"avg_ack_egress_rate"` } `json:"backing_queue_status"` // HeadMessageTimestamp interface{} `json:"head_message_timestamp"` MessageBytesPersistent int `json:"message_bytes_persistent"` MessageBytesRAM int `json:"message_bytes_ram"` MessageBytesUnacknowledged int `json:"message_bytes_unacknowledged"` MessageBytesReady int `json:"message_bytes_ready"` MessageBytes int `json:"message_bytes"` MessagesPersistent int `json:"messages_persistent"` MessagesUnacknowledgedRAM int `json:"messages_unacknowledged_ram"` MessagesReadyRAM int `json:"messages_ready_ram"` MessagesRAM int `json:"messages_ram"` GarbageCollection struct { MinorGcs int `json:"minor_gcs"` FullsweepAfter int `json:"fullsweep_after"` MinHeapSize int `json:"min_heap_size"` MinBinVheapSize int `json:"min_bin_vheap_size"` MaxHeapSize int `json:"max_heap_size"` } `json:"garbage_collection"` State string `json:"state"` // RecoverableSlaves interface{} `json:"recoverable_slaves"` Consumers int `json:"consumers"` // ExclusiveConsumerTag interface{} `json:"exclusive_consumer_tag"` // Policy interface{} `json:"policy"` ConsumerUtilisation float64 `json:"consumer_utilisation"` // TODO use custom marshaller and parse into time.Time IdleSince string `json:"idle_since"` Memory int `json:"memory"` }
RabbitQueue models the /queues resource of the rabbitmq http api
type RabbitVhost ¶
type RabbitVhost struct { // ClusterState struct { // Rabbit1A92B8526E33 string `json:"rabbit@1a92b8526e33"` // } `json:"cluster_state"` Description string `json:"description"` MessageStats struct { Ack int `json:"ack"` AckDetails struct { Rate float64 `json:"rate"` } `json:"ack_details"` Confirm int `json:"confirm"` ConfirmDetails struct { Rate float64 `json:"rate"` } `json:"confirm_details"` Deliver int `json:"deliver"` DeliverDetails struct { Rate float64 `json:"rate"` } `json:"deliver_details"` DeliverGet int `json:"deliver_get"` DeliverGetDetails struct { Rate float64 `json:"rate"` } `json:"deliver_get_details"` DeliverNoAck int `json:"deliver_no_ack"` DeliverNoAckDetails struct { Rate float64 `json:"rate"` } `json:"deliver_no_ack_details"` DropUnroutable int `json:"drop_unroutable"` DropUnroutableDetails struct { Rate float64 `json:"rate"` } `json:"drop_unroutable_details"` Get int `json:"get"` GetDetails struct { Rate float64 `json:"rate"` } `json:"get_details"` GetEmpty int `json:"get_empty"` GetEmptyDetails struct { Rate float64 `json:"rate"` } `json:"get_empty_details"` GetNoAck int `json:"get_no_ack"` GetNoAckDetails struct { Rate float64 `json:"rate"` } `json:"get_no_ack_details"` Publish int `json:"publish"` PublishDetails struct { Rate float64 `json:"rate"` } `json:"publish_details"` Redeliver int `json:"redeliver"` RedeliverDetails struct { Rate float64 `json:"rate"` } `json:"redeliver_details"` ReturnUnroutable int `json:"return_unroutable"` ReturnUnroutableDetails struct { Rate float64 `json:"rate"` } `json:"return_unroutable_details"` } `json:"message_stats"` Messages int `json:"messages"` MessagesDetails struct { Rate float64 `json:"rate"` } `json:"messages_details"` MessagesReady int `json:"messages_ready"` MessagesReadyDetails struct { Rate float64 `json:"rate"` } `json:"messages_ready_details"` MessagesUnacknowledged int `json:"messages_unacknowledged"` MessagesUnacknowledgedDetails struct { Rate float64 `json:"rate"` } `json:"messages_unacknowledged_details"` Metadata struct { Description string `json:"description"` Tags []interface{} `json:"tags"` } `json:"metadata"` Name string `json:"name"` RecvOct int `json:"recv_oct"` RecvOctDetails struct { Rate float64 `json:"rate"` } `json:"recv_oct_details"` SendOct int `json:"send_oct"` SendOctDetails struct { Rate float64 `json:"rate"` } `json:"send_oct_details"` Tags []interface{} `json:"tags"` Tracing bool `json:"tracing"` }
RabbitVhost models the /vhosts resource of the rabbitmq http api
type ReconnectAction ¶
type ReconnectAction int
ReconnectAction signals if connection should be reconnected or not.
type Routing ¶
type Routing struct {
// contains filtered or unexported fields
}
Routing describes where a message should be published
type Session ¶
type Session struct { *amqp.Connection *amqp.Channel }
Session composes an amqp.Connection with an amqp.Channel
func (*Session) NewChannel ¶
NewChannel opens a new Channel on the connection. Call when current got closed due to errors.
type SubscribeError ¶
type SubscribeError struct { Reason SubscribeErrorReason // Cause holds the error when a ChannelError happened Cause error }
SubscribeError is sent back trough the error channel when there are problems during the subsription of messages
func (*SubscribeError) Error ¶
func (s *SubscribeError) Error() string
type SubscribeErrorChannel ¶
type SubscribeErrorChannel chan *SubscribeError
type SubscribeErrorReason ¶
type SubscribeErrorReason int
const (
SubscribeErrorChannelError SubscribeErrorReason = iota
)
type TapConfiguration ¶
type TapConfiguration struct { AMQPURL *url.URL Exchanges []ExchangeConfiguration }
TapConfiguration holds the set of ExchangeCOnfigurations to tap to for a single RabbitMQ host
func NewTapConfiguration ¶
func NewTapConfiguration(amqpURL *url.URL, exchangesAndBindings string) (*TapConfiguration, error)
NewTapConfiguration returns a TapConfiguration object for a an rabbitMQ broker specified by an URI and a list of exchanges and bindings in the form of "exchange:binding,exchange:binding). Returns configuration object or an error if parsing failed.
type TapMessage ¶
TapMessage objects are passed through a tapChannel from tap to client
func NewTapMessage ¶
func NewTapMessage(message *amqp.Delivery, ts time.Time) TapMessage
NewTapMessage constructs a new TapMessage
Source Files ¶
- amqp_connector.go
- amqp_message_loop.go
- amqp_simple_connector.go
- amqp_table.go
- dial.go
- dial_func.go
- dial_tls.go
- discovery.go
- exchange.go
- fanin.go
- key_value.go
- log.go
- publish.go
- queue.go
- rabbitmq_api_model.go
- rabbitmq_inmemory_metadata_service.go
- rabbitmq_metadata_service.go
- rabbitmq_rest_client.go
- routing.go
- session.go
- subscribe.go
- tap.go
- tap_configuration.go