server

package
v0.0.0-...-93d9d6a Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 24, 2024 License: MIT Imports: 34 Imported by: 1

Documentation

Index

Constants

View Source
const (
	ConnStart = iota
	ConnStartOK
	ConnSecure
	ConnSecureOK
	ConnTune
	ConnTuneOK
	ConnOpen
	ConnOpenOK
	ConnCloseOK
	ConnClosed
)

connection status list

Variables

This section is empty.

Functions

This section is empty.

Types

type Channel

type Channel struct {
	// contains filtered or unexported fields
}

Channel is an implementation of the AMQP-channel entity Within a single socket connection, there can be multiple independent threads of control, called "channels"

func NewChannel

func NewChannel(id uint16, conn *Connection) *Channel

NewChannel returns new instance of Channel

func (*Channel) AddUnackedMessage

func (channel *Channel) AddUnackedMessage(dTag uint64, cTag string, queue string, message *amqp.Message)

AddUnackedMessage add message to unacked queue

func (*Channel) GetConsumersCount

func (channel *Channel) GetConsumersCount() int

GetConsumersCount returns consumers count on channel

func (*Channel) GetMetrics

func (channel *Channel) GetMetrics() *ChannelMetricsState

GetMetrics returns metrics

func (*Channel) GetQos

func (channel *Channel) GetQos() *qos.AmqpQos

func (*Channel) NextDeliveryTag

func (channel *Channel) NextDeliveryTag() uint64

NextDeliveryTag returns next delivery tag for current channel

func (*Channel) SendContent

func (channel *Channel) SendContent(method amqp.Method, message *amqp.Message) *amqp.Error

SendContent send message to consumers or returns to publishers

func (*Channel) SendMethod

func (channel *Channel) SendMethod(method amqp.Method)

SendMethod send method to client Method will be packed into frame and send to outgoing channel

type ChannelMetricsState

type ChannelMetricsState struct {
	Publish     *metrics.TrackCounter
	Confirm     *metrics.TrackCounter
	Deliver     *metrics.TrackCounter
	Get         *metrics.TrackCounter
	Acknowledge *metrics.TrackCounter
	Unacked     *metrics.TrackCounter
}

type ConnMetricsState

type ConnMetricsState struct {
	TrafficIn  *metrics.TrackCounter
	TrafficOut *metrics.TrackCounter
}

type Connection

type Connection struct {
	// contains filtered or unexported fields
}

Connection represents AMQP-connection

func NewConnection

func NewConnection(server *Server, netConn *net.TCPConn) (connection *Connection)

NewConnection returns new instance of amqp Connection

func (*Connection) GetChannels

func (conn *Connection) GetChannels() map[uint16]*Channel

func (*Connection) GetID

func (conn *Connection) GetID() uint64

func (*Connection) GetMetrics

func (conn *Connection) GetMetrics() *ConnMetricsState

GetMetrics returns metrics

func (*Connection) GetRemoteAddr

func (conn *Connection) GetRemoteAddr() net.Addr

func (*Connection) GetUsername

func (conn *Connection) GetUsername() string

func (*Connection) GetVirtualHost

func (conn *Connection) GetVirtualHost() *VirtualHost

type Server

type Server struct {
	// contains filtered or unexported fields
}

Server implements AMQP server

func NewServer

func NewServer(host string, port string, protoVersion string, config *config.Config) (server *Server)

NewServer returns new instance of AMQP Server

func (*Server) GetConnections

func (srv *Server) GetConnections() map[uint64]*Connection

func (*Server) GetMetrics

func (srv *Server) GetMetrics() *SrvMetricsState

func (*Server) GetProtoVersion

func (srv *Server) GetProtoVersion() string

func (*Server) GetVhost

func (srv *Server) GetVhost(name string) *VirtualHost

func (*Server) GetVhosts

func (srv *Server) GetVhosts() map[string]*VirtualHost

func (*Server) Start

func (srv *Server) Start()

Start starts main server loop

func (*Server) Stop

func (srv *Server) Stop()

Stop stop server and all vhosts

type ServerState

type ServerState int
const (
	Stopped ServerState = iota
	Running
	Stopping
)

server state statuses

type SrvMetricsState

type SrvMetricsState struct {
	Publish *metrics.TrackCounter
	Deliver *metrics.TrackCounter
	Confirm *metrics.TrackCounter
	Ack     *metrics.TrackCounter
	Get     *metrics.TrackCounter

	Ready   *metrics.TrackCounter
	Unacked *metrics.TrackCounter
	Total   *metrics.TrackCounter

	TrafficIn  *metrics.TrackCounter
	TrafficOut *metrics.TrackCounter
}

type UnackedMessage

type UnackedMessage struct {
	// contains filtered or unexported fields
}

UnackedMessage represents the unacknowledged message

type VirtualHost

type VirtualHost struct {
	// contains filtered or unexported fields
}

VirtualHost represents AMQP virtual host Each virtual host is "parent" for its queues and exchanges

func NewVhost

func NewVhost(name string, system bool, msgStoragePersistent *msgstorage.MsgStorage, msgStorageTransient *msgstorage.MsgStorage, srv *Server) *VirtualHost

NewVhost returns instance of VirtualHost When instantiating virtual host we 1) init system exchanges 2) load durable exchanges, queues and bindings from server storage 3) load persisted messages from message store into all initiated queues 4) run confirm loop Only after that vhost is in state running msgStoragePersistent, msgStorageTransient

func (*VirtualHost) AppendExchange

func (vhost *VirtualHost) AppendExchange(ex *exchange.Exchange)

AppendExchange append new exchange and persist if it is durable

func (*VirtualHost) AppendQueue

func (vhost *VirtualHost) AppendQueue(qu *queue.Queue) error

AppendQueue append new queue and persist if it is durable and bindings into default exchange

func (*VirtualHost) DeleteQueue

func (vhost *VirtualHost) DeleteQueue(queueName string, ifUnused bool, ifEmpty bool) (uint64, error)

DeleteQueue delete queue from virtual host and all bindings to that queue Also queue will be removed from server storage

func (*VirtualHost) GetDefaultExchange

func (vhost *VirtualHost) GetDefaultExchange() *exchange.Exchange

GetDefaultExchange returns default exchange

func (*VirtualHost) GetExchange

func (vhost *VirtualHost) GetExchange(name string) *exchange.Exchange

GetExchange returns exchange by name or nil if not exists

func (*VirtualHost) GetExchanges

func (vhost *VirtualHost) GetExchanges() map[string]*exchange.Exchange

func (*VirtualHost) GetName

func (vhost *VirtualHost) GetName() string

func (*VirtualHost) GetQueue

func (vhost *VirtualHost) GetQueue(name string) *queue.Queue

GetQueue returns queue by name or nil if not exists

func (*VirtualHost) GetQueues

func (vhost *VirtualHost) GetQueues() map[string]*queue.Queue

GetQueues return all vhost's queues

func (*VirtualHost) NewQueue

func (vhost *VirtualHost) NewQueue(name string, connID uint64, exclusive bool, autoDelete bool, durable bool, shardSize int) *queue.Queue

NewQueue returns new instance of queue by params we can't use just queue.NewQueue, cause we need to set msgStorage to queue

func (*VirtualHost) PersistBinding

func (vhost *VirtualHost) PersistBinding(binding *binding.Binding)

PersistBinding store binding into server storage

func (*VirtualHost) RemoveBindings

func (vhost *VirtualHost) RemoveBindings(bindings []*binding.Binding)

RemoveBindings remove given bindings from server storage

func (*VirtualHost) Stop

func (vhost *VirtualHost) Stop() error

Stop properly stop virtual host TODO: properly stop confirm loop

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL