rabtap

package
v1.39.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 29, 2023 License: GPL-3.0 Imports: 17 Imported by: 0

Documentation

Overview

an implementation of MetadataService which uses in memory lookups

a service providing RabbitMQ metadata (queues, exchanges, connections...).

Index

Constants

View Source
const (
	FailEarly = true
)
View Source
const PrefetchCount = 1
View Source
const PrefetchSize = 0

Variables

View Source
var Dialer = net.Dial

Functions

func BindExchangeToExchange

func BindExchangeToExchange(session Session,
	sourceExchange, key, targetExchange string, args amqp.Table) error

BindExchangeToExchange binds the given queue to the given exchange.

func BindQueueToExchange

func BindQueueToExchange(session Session,
	queueName, key, exchangeName string, args amqp.Table) error

BindQueueToExchange binds the given queue to the given exchange.

func CreateExchange

func CreateExchange(session Session, exchangeName, exchangeType string,
	durable, autoDelete bool, args amqp.Table) error

CreateExchange creates a new echange on the given channel

func CreateQueue

func CreateQueue(session Session, queueName string,
	durable, autoDelete, exclusive bool, args amqp.Table) error

CreateQueue creates a new queue TODO(JD) get rid of bool types

func DialTLS

func DialTLS(uri string, tlsConfig *tls.Config) (*amqp.Connection, error)

DialTLS is a Wrapper for amqp.DialTLS that supports EXTERNAL auth for mtls can be removed when https://github.com/streadway/amqp/pull/121 gets some day merged.

func DiscoverBindingsForExchange

func DiscoverBindingsForExchange(ctx context.Context, rabbitAPIClient *RabbitHTTPClient, vhost, exchangeName string) ([]string, error)

DiscoverBindingsForExchange returns a string list of routing-keys that are used by the given exchange and broker. This list can be used to auto-tap to all queues on a given exchange

func EnsureAMQPTable

func EnsureAMQPTable(m interface{}) interface{}

EnsureAMQPTable returns an object where all map[string]interface{} are replaced by amqp.Table{} so it is compatible with the amqp libs type system when it comes to passing headers, which expects (nested) amqp.Table structures.

See https://github.com/streadway/amqp/blob/e6b33f460591b0acb2f13b04ef9cf493720ffe17/types.go#L227

func MergeTables

func MergeTables(first, second amqp.Table) amqp.Table

MergeTable merges the given amqp.Table's, the second one overrideing the values of the first one

func PurgeQueue

func PurgeQueue(session Session, queueName string) (int, error)

PurgeQueue clears a queue. Returns number of elements purged

func RemoveExchange

func RemoveExchange(session Session,
	exchangeName string, ifUnused bool) error

RemoveExchange removes a echange on the given channel

func RemoveQueue

func RemoveQueue(session Session,
	queueName string, ifUnused, ifEmpty bool) error

RemoveQueue removes a queue

func SimpleAmqpConnector

func SimpleAmqpConnector(amqpURL *url.URL, tlsConfig *tls.Config,
	run func(session Session) error) error

SimpleAmqpConnector opens an AMQP connection and channel, and calls a function with the channel as argument. Use this function for simple, one-shot operations like creation of queues, exchanges etc.

func ToAMQPTable

func ToAMQPTable(headers KeyValueMap) amqp.Table

ToAMQPTable converts a KeyValueMap to an amqp.Table, trying to infer data types: - integers - timestamps in RFC3339 format - strings (default)

func UnbindQueueFromExchange

func UnbindQueueFromExchange(session Session,
	queueName, key, exchangeName string, args amqp.Table) error

UnbindQueueFromExchange unbinds a queue from an exchange

Types

type AmqpConnector

type AmqpConnector struct {
	// contains filtered or unexported fields
}

AmqpConnector manages the connection to the amqp broker and automatically reconnects after connections losses

func NewAmqpConnector

func NewAmqpConnector(url *url.URL, tlsConfig *tls.Config, logger Logger) *AmqpConnector

NewAmqpConnector creates a new AmqpConnector object.

func (*AmqpConnector) Connect

func (s *AmqpConnector) Connect(ctx context.Context, worker AmqpWorkerFunc) error

Connect (re-)establishes the connection to RabbitMQ broker.

type AmqpPublish

type AmqpPublish struct {
	// contains filtered or unexported fields
}

AmqpPublish allows to send to a RabbitMQ exchange.

func NewAmqpPublish

func NewAmqpPublish(url *url.URL, tlsConfig *tls.Config,
	mandatory, confirms bool, logger Logger) *AmqpPublish

NewAmqpPublish returns a new AmqpPublish object associated with the RabbitMQ broker denoted by the uri parameter.

func (*AmqpPublish) EstablishConnection

func (s *AmqpPublish) EstablishConnection(
	ctx context.Context,
	publishChannel PublishChannel,
	errorChannel PublishErrorChannel) error

EstablishConnection sets up the connection to the broker

type AmqpSubscriber

type AmqpSubscriber struct {
	// contains filtered or unexported fields
}

AmqpSubscriber allows to tap to subscribe to queues

func NewAmqpSubscriber

func NewAmqpSubscriber(config AmqpSubscriberConfig, url *url.URL, tlsConfig *tls.Config, logger Logger) *AmqpSubscriber

NewAmqpSubscriber returns a new AmqpSubscriber object associated with the RabbitMQ broker denoted by the uri parameter.

func (*AmqpSubscriber) EstablishSubscription

func (s *AmqpSubscriber) EstablishSubscription(
	ctx context.Context,
	queueName string,
	tapCh TapChannel,
	errCh SubscribeErrorChannel) error

EstablishSubscription sets up the connection to the broker and sets up the tap, which is bound to the provided consumer function. Typically this function is run as a go-routine.

queueName is the queue to subscribe to. tapCh is where the consumed messages are sent to. errCh is the channel where errors are sent to.

type AmqpSubscriberConfig

type AmqpSubscriberConfig struct {
	Exclusive bool
	Args      amqp.Table
}

AmqpSubscriberConfig stores configuration of the subscriber

type AmqpTap

type AmqpTap struct {
	*AmqpSubscriber
	// contains filtered or unexported fields
}

AmqpTap allows to tap to an RabbitMQ exchange.

func NewAmqpTap

func NewAmqpTap(url *url.URL, tlsConfig *tls.Config, logger Logger) *AmqpTap

NewAmqpTap returns a new AmqpTap object associated with the RabbitMQ broker denoted by the uri parameter.

func (*AmqpTap) EstablishTap

func (s *AmqpTap) EstablishTap(
	ctx context.Context,
	exchangeConfigList []ExchangeConfiguration,
	tapCh TapChannel,
	errorCh SubscribeErrorChannel) error

EstablishTap sets up the connection to the broker and sets up the tap, which is bound to the provided consumer function. Typically this function is run as a go-routine.

type AmqpWorkerFunc

type AmqpWorkerFunc func(ctx context.Context, session Session) (ReconnectAction, error)

An AmqpWorkerFunc does the actual work after the connection is established. If the worker returns true, the caller should re-connect to the broker. If the worker returne false, the caller should finish its processing. The worker must return with NoReconnect if a ShutdownMessage is received via shutdownChan, otherwise with Reconnect.

type BrokerInfo

type BrokerInfo struct {
	Overview    RabbitOverview
	Connections []RabbitConnection
	Exchanges   []RabbitExchange
	Queues      []RabbitQueue
	Consumers   []RabbitConsumer
	Bindings    []RabbitBinding
	Channels    []RabbitChannel
	Vhosts      []RabbitVhost
}

BrokerInfo represents the state of various RabbitMQ ressources as returned by the RabbitMQ REST API

type ChannelDetails

type ChannelDetails struct {
	PeerHost       string `json:"peer_host"`
	PeerPort       OptInt `json:"peer_port"`
	ConnectionName string `json:"connection_name"`
	User           string `json:"user"`
	Number         int    `json:"number"`
	Node           string `json:"node"`
	Name           string `json:"name"`
}

ChannelDetails model channel_details in RabbitConsumer

func (*ChannelDetails) UnmarshalJSON

func (d *ChannelDetails) UnmarshalJSON(data []byte) error

UnmarshalJSON is a custom unmarshaler as a WORKAROUND for RabbitMQ API returning "[]" instead of null. To make sure deserialization does not break, we catch this case, and return an empty ChannelDetails struct. see e.g. https://github.com/rabbitmq/rabbitmq-management/issues/424

type ConnectionDetails

type ConnectionDetails struct {
	PeerHost string `json:"peer_host"`
	PeerPort OptInt `json:"peer_port"`
	Name     string `json:"name"`
}

func (*ConnectionDetails) UnmarshalJSON

func (d *ConnectionDetails) UnmarshalJSON(data []byte) error

UnmarshalJSON is a custom unmarshaler as a WORKAROUND for RabbitMQ API returning "[]" instead of null. To make sure deserialization does not break, we catch this case, and return an empty ChannelDetails struct. see e.g. https://github.com/rabbitmq/rabbitmq-management/issues/424

type DialFunc added in v1.39.0

type DialFunc func(network, addr string) (net.Conn, error)

type ExchangeConfiguration

type ExchangeConfiguration struct {
	// Exchange is the name of the exchange to bind to
	Exchange string
	// BindingKey is the binding key to use. The key depends on the the type
	// of exchange being tapped (e.g. direct, topic).
	BindingKey string
}

ExchangeConfiguration holds exchange and bindingkey for a single tap

func NewExchangeConfiguration

func NewExchangeConfiguration(exchangeAndBindingStr string) (*ExchangeConfiguration, error)

NewExchangeConfiguration returns a pointer to a newly created ExchangeConfiguration object

type Fanin

type Fanin struct {
	Ch chan interface{}
	// contains filtered or unexported fields
}

Fanin allows to do a select ("fan-in") on an set of channels

func NewFanin

func NewFanin(channels []interface{}) *Fanin

NewFanin creates a new Fanin object

func (*Fanin) Alive

func (s *Fanin) Alive() bool

Alive returns true if the fanin is running

func (*Fanin) Stop

func (s *Fanin) Stop() error

Stop stops the fanin go-routine

type InMemoryMetadataService

type InMemoryMetadataService struct {
	// contains filtered or unexported fields
}

func NewInMemoryMetadataService

func NewInMemoryMetadataService(brokerInfo BrokerInfo) *InMemoryMetadataService

func (InMemoryMetadataService) AllBindingsForExchange

func (s InMemoryMetadataService) AllBindingsForExchange(vhost, name string) []*RabbitBinding

func (InMemoryMetadataService) AllChannelsForConnection

func (s InMemoryMetadataService) AllChannelsForConnection(vhost, name string) []*RabbitChannel

func (InMemoryMetadataService) AllConsumersForChannel

func (s InMemoryMetadataService) AllConsumersForChannel(vhost, name string) []*RabbitConsumer

func (InMemoryMetadataService) Bindings

func (s InMemoryMetadataService) Bindings() []RabbitBinding

func (InMemoryMetadataService) Channels

func (s InMemoryMetadataService) Channels() []RabbitChannel

func (InMemoryMetadataService) Connections

func (s InMemoryMetadataService) Connections() []RabbitConnection

func (InMemoryMetadataService) Consumers

func (s InMemoryMetadataService) Consumers() []RabbitConsumer

func (InMemoryMetadataService) Exchanges

func (s InMemoryMetadataService) Exchanges() []RabbitExchange

func (InMemoryMetadataService) FindChannelByName

func (s InMemoryMetadataService) FindChannelByName(vhost, name string) *RabbitChannel

func (InMemoryMetadataService) FindConnectionByName

func (s InMemoryMetadataService) FindConnectionByName(vhost, name string) *RabbitConnection

func (InMemoryMetadataService) FindExchangeByName

func (s InMemoryMetadataService) FindExchangeByName(vhost, name string) *RabbitExchange

func (InMemoryMetadataService) FindQueueByName

func (s InMemoryMetadataService) FindQueueByName(vhost, name string) *RabbitQueue

func (InMemoryMetadataService) FindVhostByName

func (s InMemoryMetadataService) FindVhostByName(vhost string) *RabbitVhost

func (InMemoryMetadataService) Overview

func (InMemoryMetadataService) Queues

func (s InMemoryMetadataService) Queues() []RabbitQueue

func (InMemoryMetadataService) Vhosts

func (s InMemoryMetadataService) Vhosts() []RabbitVhost

type KeyValueMap

type KeyValueMap map[string]string

KeyValueMap is a string -> string map used to store key value pairs defined on the command line.

type Logger

type Logger interface {
	Debugf(format string, a ...interface{})
	Errorf(format string, a ...interface{})
	Infof(format string, a ...interface{})
}

we don't want to be this package dependent on a logging framework. So a logger is injected from the client code.

type MetadataService

type MetadataService interface {
	Overview() RabbitOverview
	Connections() []RabbitConnection
	Exchanges() []RabbitExchange
	Queues() []RabbitQueue
	Consumers() []RabbitConsumer
	Bindings() []RabbitBinding
	Channels() []RabbitChannel
	Vhosts() []RabbitVhost

	FindQueueByName(vhost, name string) *RabbitQueue
	FindExchangeByName(vhost, name string) *RabbitExchange
	FindChannelByName(vhost, name string) *RabbitChannel
	FindConnectionByName(vhost, name string) *RabbitConnection
	FindVhostByName(vhost string) *RabbitVhost

	AllChannelsForConnection(vhost, name string) []*RabbitChannel
	AllConsumersForChannel(vhost, name string) []*RabbitConsumer
	AllBindingsForExchange(vhost, name string) []*RabbitBinding
}

type OptInt

type OptInt int

func (*OptInt) UnmarshalJSON

func (d *OptInt) UnmarshalJSON(data []byte) error

UnmarshalJSON is a workaround to deserialize int attributes in the RabbitMQ API which are sometimes returned as strings, (i.e. the value "undefined").

type PublishChannel

type PublishChannel chan *PublishMessage

PublishChannel is a channel for PublishMessage message objects

type PublishError

type PublishError struct {
	Reason PublishErrorReason
	// Publishing stores the original message, if available (AckTimeout, Nack,
	// PublishFailed)
	Message *PublishMessage
	// ReturnedMessage stores the returned message in case of PublishErrorReturned
	ReturnedMessage *amqp.Return
	// Cause holds the error when a ChannelError happened
	Cause error
}

PublishError is sent back trough the error channel when there are problems during the publishing of messages

func (*PublishError) Error

func (s *PublishError) Error() string

type PublishErrorChannel

type PublishErrorChannel chan *PublishError

type PublishErrorReason

type PublishErrorReason int
const (
	PublishErrorAckTimeout PublishErrorReason = iota
	PublishErrorNack
	PublishErrorPublishFailed
	PublishErrorReturned
	PublishErrorChannelError
)

type PublishMessage

type PublishMessage struct {
	Routing    Routing
	Publishing *amqp.Publishing
}

PublishMessage is a message to be published by AmqpPublish via a PublishChannel

type RabbitBinding

type RabbitBinding struct {
	Source          string                 `json:"source"`
	Vhost           string                 `json:"vhost"`
	Destination     string                 `json:"destination"`
	DestinationType string                 `json:"destination_type"`
	RoutingKey      string                 `json:"routing_key"`
	Arguments       map[string]interface{} `json:"arguments,omitempty"`
	PropertiesKey   string                 `json:"properties_key"`
}

RabbitBinding models the /bindings resource of the rabbitmq http api

func (RabbitBinding) IsExchangeToExchange

func (s RabbitBinding) IsExchangeToExchange() bool

IsExchangeToExchange returns true if this is an exchange-to-exchange binding

type RabbitChannel

type RabbitChannel struct {
	ReductionsDetails struct {
		Rate float64 `json:"rate"`
	} `json:"reductions_details"`
	Reductions   int `json:"reductions"`
	MessageStats struct {
		ReturnUnroutableDetails struct {
			Rate float64 `json:"rate"`
		} `json:"return_unroutable_details"`
		ReturnUnroutable int `json:"return_unroutable"`
		ConfirmDetails   struct {
			Rate float64 `json:"rate"`
		} `json:"confirm_details"`
		Confirm        int `json:"confirm"`
		PublishDetails struct {
			Rate float64 `json:"rate"`
		} `json:"publish_details"`
		Publish    int `json:"publish"`
		Ack        int `json:"ack"`
		AckDetails struct {
			Rate float64 `json:"rate"`
		} `json:"ack_details"`
		Deliver        int `json:"deliver"`
		DeliverDetails struct {
			Rate float64 `json:"rate"`
		} `json:"deliver_details"`
		DeliverGet        int `json:"deliver_get"`
		DeliverGetDetails struct {
			Rate float64 `json:"rate"`
		} `json:"deliver_get_details"`
		DeliverNoAck        int `json:"deliver_no_ack"`
		DeliverNoAckDetails struct {
			Rate float64 `json:"rate"`
		} `json:"deliver_no_ack_details"`
		Get        int `json:"get"`
		GetDetails struct {
			Rate float64 `json:"rate"`
		} `json:"get_details"`
		GetEmpty        int `json:"get_empty"`
		GetEmptyDetails struct {
			Rate float64 `json:"rate"`
		} `json:"get_empty_details"`
		GetNoAck        int `json:"get_no_ack"`
		GetNoAckDetails struct {
			Rate float64 `json:"rate"`
		} `json:"get_no_ack_details"`
		Redeliver        int `json:"redeliver"`
		RedeliverDetails struct {
			Rate float64 `json:"rate"`
		} `json:"redeliver_details"`
	} `json:"message_stats"`
	Vhost             string            `json:"vhost"`
	User              string            `json:"user"`
	Number            int               `json:"number"`
	Name              string            `json:"name"`
	Node              string            `json:"node"`
	ConnectionDetails ConnectionDetails `json:"connection_details"`
	GarbageCollection struct {
		MinorGcs        int `json:"minor_gcs"`
		FullsweepAfter  int `json:"fullsweep_after"`
		MinHeapSize     int `json:"min_heap_size"`
		MinBinVheapSize int `json:"min_bin_vheap_size"`
		MaxHeapSize     int `json:"max_heap_size"`
	} `json:"garbage_collection"`
	State                  string `json:"state"`
	GlobalPrefetchCount    int    `json:"global_prefetch_count"`
	PrefetchCount          int    `json:"prefetch_count"`
	AcksUncommitted        int    `json:"acks_uncommitted"`
	MessagesUncommitted    int    `json:"messages_uncommitted"`
	MessagesUnconfirmed    int    `json:"messages_unconfirmed"`
	MessagesUnacknowledged int    `json:"messages_unacknowledged"`
	ConsumerCount          int    `json:"consumer_count"`
	Confirm                bool   `json:"confirm"`
	Transactional          bool   `json:"transactional"`
	IdleSince              string `json:"idle_since"`
}

RabbitChannel models the /channels resource of the rabbitmq http api

type RabbitConnection

type RabbitConnection struct {
	ReductionsDetails struct {
		Rate float64 `json:"rate"`
	} `json:"reductions_details"`
	Reductions     int `json:"reductions"`
	RecvOctDetails struct {
		Rate float64 `json:"rate"`
	} `json:"recv_oct_details"`
	RecvOct        int `json:"recv_oct"`
	SendOctDetails struct {
		Rate float64 `json:"rate"`
	} `json:"send_oct_details"`
	SendOct          int   `json:"send_oct"`
	ConnectedAt      int64 `json:"connected_at"`
	ClientProperties struct {
		Product        string `json:"product"`
		Version        string `json:"version"`
		ConnectionName string `json:"connection_name"`
		Capabilities   struct {
			ConnectionBlocked    bool `json:"connection.blocked"`
			ConsumerCancelNotify bool `json:"consumer_cancel_notify"`
		} `json:"capabilities"`
	} `json:"client_properties"`
	ChannelMax        int         `json:"channel_max"`
	FrameMax          int         `json:"frame_max"`
	Timeout           int         `json:"timeout"`
	Vhost             string      `json:"vhost"`
	User              string      `json:"user"`
	Protocol          string      `json:"protocol"`
	SslHash           interface{} `json:"ssl_hash"`
	SslCipher         interface{} `json:"ssl_cipher"`
	SslKeyExchange    interface{} `json:"ssl_key_exchange"`
	SslProtocol       interface{} `json:"ssl_protocol"`
	AuthMechanism     string      `json:"auth_mechanism"`
	PeerCertValidity  interface{} `json:"peer_cert_validity"`
	PeerCertIssuer    interface{} `json:"peer_cert_issuer"`
	PeerCertSubject   interface{} `json:"peer_cert_subject"`
	Ssl               bool        `json:"ssl"`
	PeerHost          string      `json:"peer_host"`
	Host              string      `json:"host"`
	PeerPort          int         `json:"peer_port"`
	Port              int         `json:"port"`
	Name              string      `json:"name"`
	Node              string      `json:"node"`
	Type              string      `json:"type"`
	GarbageCollection struct {
		MinorGcs        int `json:"minor_gcs"`
		FullsweepAfter  int `json:"fullsweep_after"`
		MinHeapSize     int `json:"min_heap_size"`
		MinBinVheapSize int `json:"min_bin_vheap_size"`
		MaxHeapSize     int `json:"max_heap_size"`
	} `json:"garbage_collection"`
	Channels int    `json:"channels"`
	State    string `json:"state"`
	SendPend int    `json:"send_pend"`
	SendCnt  int    `json:"send_cnt"`
	RecvCnt  int    `json:"recv_cnt"`
}

RabbitConnection models the /connections resource of the rabbitmq http api

type RabbitConsumer

type RabbitConsumer struct {
	//	Arguments      []interface{} `json:"arguments"`
	PrefetchCount  int    `json:"prefetch_count"`
	AckRequired    bool   `json:"ack_required"`
	Active         bool   `json:"active"`
	ActivityStatus string `json:"activity_status"`
	Exclusive      bool   `json:"exclusive"`
	ConsumerTag    string `json:"consumer_tag"`
	// see WORKAROUND above
	ChannelDetails ChannelDetails `json:"channel_details,omitempty"`
	Queue          struct {
		Vhost string `json:"vhost"`
		Name  string `json:"name"`
	} `json:"queue"`
}

RabbitConsumer models the /consumers resource of the rabbitmq http api

type RabbitExchange

type RabbitExchange struct {
	Name         string                 `json:"name"`
	Vhost        string                 `json:"vhost"`
	Type         string                 `json:"type"`
	Durable      bool                   `json:"durable"`
	AutoDelete   bool                   `json:"auto_delete"`
	Internal     bool                   `json:"internal"`
	Arguments    map[string]interface{} `json:"arguments,omitempty"`
	MessageStats struct {
		PublishOut        int `json:"publish_out"`
		PublishOutDetails struct {
			Rate float64 `json:"rate"`
		} `json:"publish_out_details"`
		PublishIn        int `json:"publish_in"`
		PublishInDetails struct {
			Rate float64 `json:"rate"`
		} `json:"publish_in_details"`
	} `json:"message_stats,omitempty"`
}

RabbitExchange models the /exchanges resource of the rabbitmq http api

type RabbitHTTPClient

type RabbitHTTPClient struct {
	// contains filtered or unexported fields
}

RabbitHTTPClient is a minimal client to the rabbitmq management REST api. It implements only functions needed by this tool (i.e. GET on some of the resources). The messages structs were generated using json-to-go ( https://mholt.github.io/json-to-go/).

func NewRabbitHTTPClient

func NewRabbitHTTPClient(url *url.URL, tlsConfig *tls.Config) *RabbitHTTPClient

NewRabbitHTTPClient returns a new instance of an RabbitHTTPClient. url is the base API URL of the REST server.

func (*RabbitHTTPClient) Bindings

func (s *RabbitHTTPClient) Bindings(ctx context.Context) ([]RabbitBinding, error)

Bindings returns the /bindings resource of the RabbitMQ REST API

func (*RabbitHTTPClient) BrokerInfo

func (s *RabbitHTTPClient) BrokerInfo(ctx context.Context) (BrokerInfo, error)

BrokerInfo gets all resources of the broker in parallel

func (*RabbitHTTPClient) Channels

func (s *RabbitHTTPClient) Channels(ctx context.Context) ([]RabbitChannel, error)

Channels returns the /channels resource of the RabbitMQ REST API

func (*RabbitHTTPClient) CloseConnection

func (s *RabbitHTTPClient) CloseConnection(ctx context.Context, conn, reason string) error

CloseConnection closes a connection by DELETING the associated resource

func (*RabbitHTTPClient) Connections

func (s *RabbitHTTPClient) Connections(ctx context.Context) ([]RabbitConnection, error)

Connections returns the /connections resource of the RabbitMQ REST API

func (*RabbitHTTPClient) Consumers

func (s *RabbitHTTPClient) Consumers(ctx context.Context) ([]RabbitConsumer, error)

Consumers returns the /consumers resource of the RabbitMQ REST API

func (*RabbitHTTPClient) Exchanges

func (s *RabbitHTTPClient) Exchanges(ctx context.Context) ([]RabbitExchange, error)

Exchanges returns the /exchanges resource of the RabbitMQ REST API

func (*RabbitHTTPClient) Overview

func (s *RabbitHTTPClient) Overview(ctx context.Context) (RabbitOverview, error)

Overview returns the /overview resource of the RabbitMQ REST API

func (*RabbitHTTPClient) Queues

func (s *RabbitHTTPClient) Queues(ctx context.Context) ([]RabbitQueue, error)

Queues returns the /queues resource of the RabbitMQ REST API

func (*RabbitHTTPClient) Vhosts

func (s *RabbitHTTPClient) Vhosts(ctx context.Context) ([]RabbitVhost, error)

Vhosts returns the /vhosts resource of the RabbitMQ REST API

type RabbitOverview

type RabbitOverview struct {
	ManagementVersion string `json:"management_version"`
	RatesMode         string `json:"rates_mode"`
	ExchangeTypes     []struct {
		Name        string `json:"name"`
		Description string `json:"description"`
		Enabled     bool   `json:"enabled"`
	} `json:"exchange_types"`
	RabbitmqVersion   string `json:"rabbitmq_version"`
	ClusterName       string `json:"cluster_name"`
	ErlangVersion     string `json:"erlang_version"`
	ErlangFullVersion string `json:"erlang_full_version"`
	MessageStats      struct {
		DiskReads        int `json:"disk_reads"`
		DiskReadsDetails struct {
			Rate float64 `json:"rate"`
		} `json:"disk_reads_details"`
		DiskWrites        int `json:"disk_writes"`
		DiskWritesDetails struct {
			Rate float64 `json:"rate"`
		} `json:"disk_writes_details"`
	} `json:"message_stats"`
	QueueTotals struct {
		MessagesReady        int `json:"messages_ready"`
		MessagesReadyDetails struct {
			Rate float64 `json:"rate"`
		} `json:"messages_ready_details"`
		MessagesUnacknowledged        int `json:"messages_unacknowledged"`
		MessagesUnacknowledgedDetails struct {
			Rate float64 `json:"rate"`
		} `json:"messages_unacknowledged_details"`
		Messages        int `json:"messages"`
		MessagesDetails struct {
			Rate float64 `json:"rate"`
		} `json:"messages_details"`
	} `json:"queue_totals"`
	ObjectTotals struct {
		Consumers   int `json:"consumers"`
		Queues      int `json:"queues"`
		Exchanges   int `json:"exchanges"`
		Connections int `json:"connections"`
		Channels    int `json:"channels"`
	} `json:"object_totals"`
	StatisticsDbEventQueue int    `json:"statistics_db_event_queue"`
	Node                   string `json:"node"`
	Listeners              []struct {
		Node      string `json:"node"`
		Protocol  string `json:"protocol"`
		IPAddress string `json:"ip_address"`
		Port      int    `json:"port"`
	} `json:"listeners"`
	Contexts []struct {
		Node        string `json:"node"`
		Description string `json:"description"`
		Path        string `json:"path"`
		Port        string `json:"port"`
		Ssl         string `json:"ssl"`
	} `json:"contexts"`
}

RabbitOverview models the /overview resource of the rabbitmq http api

type RabbitQueue

type RabbitQueue struct {
	MessagesDetails struct {
		Rate float64 `json:"rate"`
	} `json:"messages_details"`
	Messages                      int `json:"messages"`
	MessagesUnacknowledgedDetails struct {
		Rate float64 `json:"rate"`
	} `json:"messages_unacknowledged_details"`
	MessagesUnacknowledged int `json:"messages_unacknowledged"`
	MessagesReadyDetails   struct {
		Rate float64 `json:"rate"`
	} `json:"messages_ready_details"`
	MessagesReady     int `json:"messages_ready"`
	ReductionsDetails struct {
		Rate float64 `json:"rate"`
	} `json:"reductions_details"`
	Reductions int    `json:"reductions"`
	Node       string `json:"node"`
	Arguments  struct {
	} `json:"arguments"`
	Exclusive            bool   `json:"exclusive"`
	AutoDelete           bool   `json:"auto_delete"`
	Durable              bool   `json:"durable"`
	Vhost                string `json:"vhost"`
	Name                 string `json:"name"`
	Type                 string `json:"type"`
	MessageBytesPagedOut int    `json:"message_bytes_paged_out"`
	MessagesPagedOut     int    `json:"messages_paged_out"`
	BackingQueueStatus   struct {
		Mode string `json:"mode"`
		Q1   int    `json:"q1"`
		Q2   int    `json:"q2"`
		//		Delta             []interface{} `json:"delta"`
		Q3  int `json:"q3"`
		Q4  int `json:"q4"`
		Len int `json:"len"`
		//		TargetRAMCount    int     `json:"target_ram_count"`	// string or int -> need further research here when attr is in need ("infinity")
		NextSeqID         int     `json:"next_seq_id"`
		AvgIngressRate    float64 `json:"avg_ingress_rate"`
		AvgEgressRate     float64 `json:"avg_egress_rate"`
		AvgAckIngressRate float64 `json:"avg_ack_ingress_rate"`
		AvgAckEgressRate  float64 `json:"avg_ack_egress_rate"`
	} `json:"backing_queue_status"`
	//	HeadMessageTimestamp       interface{} `json:"head_message_timestamp"`
	MessageBytesPersistent     int `json:"message_bytes_persistent"`
	MessageBytesRAM            int `json:"message_bytes_ram"`
	MessageBytesUnacknowledged int `json:"message_bytes_unacknowledged"`
	MessageBytesReady          int `json:"message_bytes_ready"`
	MessageBytes               int `json:"message_bytes"`
	MessagesPersistent         int `json:"messages_persistent"`
	MessagesUnacknowledgedRAM  int `json:"messages_unacknowledged_ram"`
	MessagesReadyRAM           int `json:"messages_ready_ram"`
	MessagesRAM                int `json:"messages_ram"`
	GarbageCollection          struct {
		MinorGcs        int `json:"minor_gcs"`
		FullsweepAfter  int `json:"fullsweep_after"`
		MinHeapSize     int `json:"min_heap_size"`
		MinBinVheapSize int `json:"min_bin_vheap_size"`
		MaxHeapSize     int `json:"max_heap_size"`
	} `json:"garbage_collection"`
	State string `json:"state"`
	//	RecoverableSlaves    interface{} `json:"recoverable_slaves"`
	Consumers int `json:"consumers"`
	//	ExclusiveConsumerTag interface{} `json:"exclusive_consumer_tag"`
	//	Policy               interface{} `json:"policy"`
	ConsumerUtilisation float64 `json:"consumer_utilisation"`
	// TODO use custom marshaller and parse into time.Time
	IdleSince string `json:"idle_since"`
	Memory    int    `json:"memory"`
}

RabbitQueue models the /queues resource of the rabbitmq http api

type RabbitVhost

type RabbitVhost struct {
	// ClusterState struct {
	//     Rabbit1A92B8526E33 string `json:"rabbit@1a92b8526e33"`
	// } `json:"cluster_state"`
	Description  string `json:"description"`
	MessageStats struct {
		Ack        int `json:"ack"`
		AckDetails struct {
			Rate float64 `json:"rate"`
		} `json:"ack_details"`
		Confirm        int `json:"confirm"`
		ConfirmDetails struct {
			Rate float64 `json:"rate"`
		} `json:"confirm_details"`
		Deliver        int `json:"deliver"`
		DeliverDetails struct {
			Rate float64 `json:"rate"`
		} `json:"deliver_details"`
		DeliverGet        int `json:"deliver_get"`
		DeliverGetDetails struct {
			Rate float64 `json:"rate"`
		} `json:"deliver_get_details"`
		DeliverNoAck        int `json:"deliver_no_ack"`
		DeliverNoAckDetails struct {
			Rate float64 `json:"rate"`
		} `json:"deliver_no_ack_details"`
		DropUnroutable        int `json:"drop_unroutable"`
		DropUnroutableDetails struct {
			Rate float64 `json:"rate"`
		} `json:"drop_unroutable_details"`
		Get        int `json:"get"`
		GetDetails struct {
			Rate float64 `json:"rate"`
		} `json:"get_details"`
		GetEmpty        int `json:"get_empty"`
		GetEmptyDetails struct {
			Rate float64 `json:"rate"`
		} `json:"get_empty_details"`
		GetNoAck        int `json:"get_no_ack"`
		GetNoAckDetails struct {
			Rate float64 `json:"rate"`
		} `json:"get_no_ack_details"`
		Publish        int `json:"publish"`
		PublishDetails struct {
			Rate float64 `json:"rate"`
		} `json:"publish_details"`
		Redeliver        int `json:"redeliver"`
		RedeliverDetails struct {
			Rate float64 `json:"rate"`
		} `json:"redeliver_details"`
		ReturnUnroutable        int `json:"return_unroutable"`
		ReturnUnroutableDetails struct {
			Rate float64 `json:"rate"`
		} `json:"return_unroutable_details"`
	} `json:"message_stats"`
	Messages        int `json:"messages"`
	MessagesDetails struct {
		Rate float64 `json:"rate"`
	} `json:"messages_details"`
	MessagesReady        int `json:"messages_ready"`
	MessagesReadyDetails struct {
		Rate float64 `json:"rate"`
	} `json:"messages_ready_details"`
	MessagesUnacknowledged        int `json:"messages_unacknowledged"`
	MessagesUnacknowledgedDetails struct {
		Rate float64 `json:"rate"`
	} `json:"messages_unacknowledged_details"`
	Metadata struct {
		Description string        `json:"description"`
		Tags        []interface{} `json:"tags"`
	} `json:"metadata"`
	Name           string `json:"name"`
	RecvOct        int    `json:"recv_oct"`
	RecvOctDetails struct {
		Rate float64 `json:"rate"`
	} `json:"recv_oct_details"`
	SendOct        int `json:"send_oct"`
	SendOctDetails struct {
		Rate float64 `json:"rate"`
	} `json:"send_oct_details"`
	Tags    []interface{} `json:"tags"`
	Tracing bool          `json:"tracing"`
}

RabbitVhost models the /vhosts resource of the rabbitmq http api

type ReconnectAction

type ReconnectAction int

ReconnectAction signals if connection should be reconnected or not.

type Routing

type Routing struct {
	// contains filtered or unexported fields
}

Routing describes where a message should be published

func NewRouting

func NewRouting(exchange, key string, headers amqp.Table) Routing

func (Routing) Exchange

func (s Routing) Exchange() string

func (Routing) Headers

func (s Routing) Headers() amqp.Table

func (Routing) Key

func (s Routing) Key() string

func (Routing) String

func (s Routing) String() string

type Session

type Session struct {
	*amqp.Connection
	*amqp.Channel
}

Session composes an amqp.Connection with an amqp.Channel

func (*Session) NewChannel

func (s *Session) NewChannel() error

NewChannel opens a new Channel on the connection. Call when current got closed due to errors.

type SubscribeError

type SubscribeError struct {
	Reason SubscribeErrorReason
	// Cause holds the error when a ChannelError happened
	Cause error
}

SubscribeError is sent back trough the error channel when there are problems during the subsription of messages

func (*SubscribeError) Error

func (s *SubscribeError) Error() string

type SubscribeErrorChannel

type SubscribeErrorChannel chan *SubscribeError

type SubscribeErrorReason

type SubscribeErrorReason int
const (
	SubscribeErrorChannelError SubscribeErrorReason = iota
)

type TapChannel

type TapChannel chan TapMessage

TapChannel is a channel for *TapMessage objects

type TapConfiguration

type TapConfiguration struct {
	AMQPURL   *url.URL
	Exchanges []ExchangeConfiguration
}

TapConfiguration holds the set of ExchangeCOnfigurations to tap to for a single RabbitMQ host

func NewTapConfiguration

func NewTapConfiguration(amqpURL *url.URL, exchangesAndBindings string) (*TapConfiguration, error)

NewTapConfiguration returns a TapConfiguration object for a an rabbitMQ broker specified by an URI and a list of exchanges and bindings in the form of "exchange:binding,exchange:binding). Returns configuration object or an error if parsing failed.

type TapMessage

type TapMessage struct {
	AmqpMessage       *amqp.Delivery
	ReceivedTimestamp time.Time
}

TapMessage objects are passed through a tapChannel from tap to client

func NewTapMessage

func NewTapMessage(message *amqp.Delivery, ts time.Time) TapMessage

NewTapMessage constructs a new TapMessage

Directories

Path Synopsis
a Logger implementation for tests
a Logger implementation for tests

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL