Documentation ¶
Index ¶
- Constants
- type Channel
- func (channel *Channel) AddUnackedMessage(dTag uint64, cTag string, queue string, message *amqp.Message)
- func (channel *Channel) GetConsumersCount() int
- func (channel *Channel) GetMetrics() *ChannelMetricsState
- func (channel *Channel) GetQos() *qos.AmqpQos
- func (channel *Channel) NextDeliveryTag() uint64
- func (channel *Channel) SendContent(method amqp.Method, message *amqp.Message) *amqp.Error
- func (channel *Channel) SendMethod(method amqp.Method)
- type ChannelMetricsState
- type ConnMetricsState
- type Connection
- type Server
- func (srv *Server) GetConnections() map[uint64]*Connection
- func (srv *Server) GetMetrics() *SrvMetricsState
- func (srv *Server) GetProtoVersion() string
- func (srv *Server) GetVhost(name string) *VirtualHost
- func (srv *Server) GetVhosts() map[string]*VirtualHost
- func (srv *Server) Start()
- func (srv *Server) Stop()
- type ServerState
- type SrvMetricsState
- type UnackedMessage
- type VirtualHost
- func (vhost *VirtualHost) AppendExchange(ex *exchange.Exchange)
- func (vhost *VirtualHost) AppendQueue(qu *queue.Queue) error
- func (vhost *VirtualHost) DeleteQueue(queueName string, ifUnused bool, ifEmpty bool) (uint64, error)
- func (vhost *VirtualHost) GetDefaultExchange() *exchange.Exchange
- func (vhost *VirtualHost) GetExchange(name string) *exchange.Exchange
- func (vhost *VirtualHost) GetExchanges() map[string]*exchange.Exchange
- func (vhost *VirtualHost) GetName() string
- func (vhost *VirtualHost) GetQueue(name string) *queue.Queue
- func (vhost *VirtualHost) GetQueues() map[string]*queue.Queue
- func (vhost *VirtualHost) NewQueue(name string, connID uint64, exclusive bool, autoDelete bool, durable bool, ...) *queue.Queue
- func (vhost *VirtualHost) PersistBinding(binding *binding.Binding)
- func (vhost *VirtualHost) RemoveBindings(bindings []*binding.Binding)
- func (vhost *VirtualHost) Stop() error
Constants ¶
const ( ConnStart = iota ConnStartOK ConnSecure ConnSecureOK ConnTune ConnTuneOK ConnOpen ConnOpenOK ConnCloseOK ConnClosed )
connection status list
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Channel ¶
type Channel struct {
// contains filtered or unexported fields
}
Channel is an implementation of the AMQP-channel entity Within a single socket connection, there can be multiple independent threads of control, called "channels"
func NewChannel ¶
func NewChannel(id uint16, conn *Connection) *Channel
NewChannel returns new instance of Channel
func (*Channel) AddUnackedMessage ¶
func (channel *Channel) AddUnackedMessage(dTag uint64, cTag string, queue string, message *amqp.Message)
AddUnackedMessage add message to unacked queue
func (*Channel) GetConsumersCount ¶
GetConsumersCount returns consumers count on channel
func (*Channel) GetMetrics ¶
func (channel *Channel) GetMetrics() *ChannelMetricsState
GetMetrics returns metrics
func (*Channel) NextDeliveryTag ¶
NextDeliveryTag returns next delivery tag for current channel
func (*Channel) SendContent ¶
SendContent send message to consumers or returns to publishers
func (*Channel) SendMethod ¶
SendMethod send method to client Method will be packed into frame and send to outgoing channel
type ChannelMetricsState ¶
type ChannelMetricsState struct { Publish *metrics.TrackCounter Confirm *metrics.TrackCounter Deliver *metrics.TrackCounter Get *metrics.TrackCounter Acknowledge *metrics.TrackCounter Unacked *metrics.TrackCounter }
type ConnMetricsState ¶
type ConnMetricsState struct { TrafficIn *metrics.TrackCounter TrafficOut *metrics.TrackCounter }
type Connection ¶
type Connection struct {
// contains filtered or unexported fields
}
Connection represents AMQP-connection
func NewConnection ¶
func NewConnection(server *Server, netConn *net.TCPConn) (connection *Connection)
NewConnection returns new instance of amqp Connection
func (*Connection) GetChannels ¶
func (conn *Connection) GetChannels() map[uint16]*Channel
func (*Connection) GetID ¶
func (conn *Connection) GetID() uint64
func (*Connection) GetMetrics ¶
func (conn *Connection) GetMetrics() *ConnMetricsState
GetMetrics returns metrics
func (*Connection) GetRemoteAddr ¶
func (conn *Connection) GetRemoteAddr() net.Addr
func (*Connection) GetUsername ¶
func (conn *Connection) GetUsername() string
func (*Connection) GetVirtualHost ¶
func (conn *Connection) GetVirtualHost() *VirtualHost
type Server ¶
type Server struct {
// contains filtered or unexported fields
}
Server implements AMQP server
func NewServer ¶
func NewServer(host string, port string, protoVersion string, config *config.Config) (server *Server)
NewServer returns new instance of AMQP Server
func (*Server) GetConnections ¶
func (srv *Server) GetConnections() map[uint64]*Connection
func (*Server) GetMetrics ¶
func (srv *Server) GetMetrics() *SrvMetricsState
func (*Server) GetProtoVersion ¶
func (*Server) GetVhost ¶
func (srv *Server) GetVhost(name string) *VirtualHost
func (*Server) GetVhosts ¶
func (srv *Server) GetVhosts() map[string]*VirtualHost
type ServerState ¶
type ServerState int
const ( Stopped ServerState = iota Running Stopping )
server state statuses
type SrvMetricsState ¶
type SrvMetricsState struct { Publish *metrics.TrackCounter Deliver *metrics.TrackCounter Confirm *metrics.TrackCounter Ack *metrics.TrackCounter Get *metrics.TrackCounter Ready *metrics.TrackCounter Unacked *metrics.TrackCounter Total *metrics.TrackCounter TrafficIn *metrics.TrackCounter TrafficOut *metrics.TrackCounter }
type UnackedMessage ¶
type UnackedMessage struct {
// contains filtered or unexported fields
}
UnackedMessage represents the unacknowledged message
type VirtualHost ¶
type VirtualHost struct {
// contains filtered or unexported fields
}
VirtualHost represents AMQP virtual host Each virtual host is "parent" for its queues and exchanges
func NewVhost ¶
func NewVhost(name string, system bool, msgStoragePersistent *msgstorage.MsgStorage, msgStorageTransient *msgstorage.MsgStorage, srv *Server) *VirtualHost
NewVhost returns instance of VirtualHost When instantiating virtual host we 1) init system exchanges 2) load durable exchanges, queues and bindings from server storage 3) load persisted messages from message store into all initiated queues 4) run confirm loop Only after that vhost is in state running msgStoragePersistent, msgStorageTransient
func (*VirtualHost) AppendExchange ¶
func (vhost *VirtualHost) AppendExchange(ex *exchange.Exchange)
AppendExchange append new exchange and persist if it is durable
func (*VirtualHost) AppendQueue ¶
func (vhost *VirtualHost) AppendQueue(qu *queue.Queue) error
AppendQueue append new queue and persist if it is durable and bindings into default exchange
func (*VirtualHost) DeleteQueue ¶
func (vhost *VirtualHost) DeleteQueue(queueName string, ifUnused bool, ifEmpty bool) (uint64, error)
DeleteQueue delete queue from virtual host and all bindings to that queue Also queue will be removed from server storage
func (*VirtualHost) GetDefaultExchange ¶
func (vhost *VirtualHost) GetDefaultExchange() *exchange.Exchange
GetDefaultExchange returns default exchange
func (*VirtualHost) GetExchange ¶
func (vhost *VirtualHost) GetExchange(name string) *exchange.Exchange
GetExchange returns exchange by name or nil if not exists
func (*VirtualHost) GetExchanges ¶
func (vhost *VirtualHost) GetExchanges() map[string]*exchange.Exchange
func (*VirtualHost) GetName ¶
func (vhost *VirtualHost) GetName() string
func (*VirtualHost) GetQueue ¶
func (vhost *VirtualHost) GetQueue(name string) *queue.Queue
GetQueue returns queue by name or nil if not exists
func (*VirtualHost) GetQueues ¶
func (vhost *VirtualHost) GetQueues() map[string]*queue.Queue
GetQueues return all vhost's queues
func (*VirtualHost) NewQueue ¶
func (vhost *VirtualHost) NewQueue(name string, connID uint64, exclusive bool, autoDelete bool, durable bool, shardSize int) *queue.Queue
NewQueue returns new instance of queue by params we can't use just queue.NewQueue, cause we need to set msgStorage to queue
func (*VirtualHost) PersistBinding ¶
func (vhost *VirtualHost) PersistBinding(binding *binding.Binding)
PersistBinding store binding into server storage
func (*VirtualHost) RemoveBindings ¶
func (vhost *VirtualHost) RemoveBindings(bindings []*binding.Binding)
RemoveBindings remove given bindings from server storage
func (*VirtualHost) Stop ¶
func (vhost *VirtualHost) Stop() error
Stop properly stop virtual host TODO: properly stop confirm loop