mq

package
v1.3.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 12, 2024 License: AGPL-3.0 Imports: 16 Imported by: 0

Documentation

Index

Constants

View Source
const (
	RPC_MessageType_Request         = "request"
	RPC_MessageType_Cancel          = "cancel"
	RPC_MessageType_Response        = "response"
	RPC_MessageType_Error           = "error"
	RPC_MessageType_RequestReceived = "req-recv"
)

Variables

This section is empty.

Functions

func NewPublishingMessageFromRPC

func NewPublishingMessageFromRPC(rpcId, responseTo string) *amqp.Publishing

Types

type AmqpWriter

type AmqpWriter struct {
	io.Writer
	// contains filtered or unexported fields
}

func NewAmqpWriter

func NewAmqpWriter(c *amqp.Channel, exchange, key string) *AmqpWriter

func (*AmqpWriter) Write

func (a *AmqpWriter) Write(p []byte) (n int, err error)

type Broker

type Broker struct {
	// contains filtered or unexported fields
}

func NewBroker

func NewBroker(ctx context.Context, options ...BrokerConfigHandler) (*Broker, error)

func (*Broker) Close

func (b *Broker) Close()

func (*Broker) CreatePublisherFunc

func (b *Broker) CreatePublisherFunc(exchange string, routingKey string) func(msg amqp.Publishing) error

func (*Broker) CreateRPCClient

func (b *Broker) CreateRPCClient(exchange string, id string) (*RPCClient, error)

func (*Broker) CreateReader

func (b *Broker) CreateReader(p *ConsumingParam) (io.Reader, error)

func (*Broker) CreateWriter

func (b *Broker) CreateWriter(exchange, key string) io.Writer

func (*Broker) DoConfigure

func (b *Broker) DoConfigure(handlers ...BrokerConfigHandler)

func (*Broker) GetAuthBrokerConfigHandlers

func (b *Broker) GetAuthBrokerConfigHandlers() []BrokerConfigHandler

func (*Broker) GetPublisher

func (b *Broker) GetPublisher() *Publisher

func (*Broker) IsServing

func (b *Broker) IsServing() bool

func (*Broker) RunBackground

func (b *Broker) RunBackground() error

func (*Broker) Serve

func (b *Broker) Serve()

type BrokerConfigHandler

type BrokerConfigHandler func(b *Broker)

func BeforeChannelExit

func BeforeChannelExit(c ChannelHandler) BrokerConfigHandler

func BeforeConnectionExit

func BeforeConnectionExit(c ConnectionHandler) BrokerConfigHandler

func HookAfterChannelCreated

func HookAfterChannelCreated(c ChannelHandler) BrokerConfigHandler

func HookAfterConnectionCreated

func HookAfterConnectionCreated(c ConnectionHandler) BrokerConfigHandler

func HookAfterQueueAndExchangeDeclaring

func HookAfterQueueAndExchangeDeclaring(c ChannelHandler) BrokerConfigHandler

func RPCClientConfig

func RPCClientConfig(exchange, id string, cb func(requestId string, msg *amqp.Delivery)) BrokerConfigHandler

func WithAMQPUrl

func WithAMQPUrl(url string) BrokerConfigHandler

func WithConsumingParam

func WithConsumingParam(p *ConsumingParam) BrokerConfigHandler

func WithDialConfig

func WithDialConfig(c amqp.Config) BrokerConfigHandler

func WithQueueDeclarePassive

func WithQueueDeclarePassive(p *QueueDeclaringParam) BrokerConfigHandler

type ChannelHandler

type ChannelHandler func(broker *Broker, a *amqp.Channel) error

type Connection

type Connection struct {
	net.Conn
	// contains filtered or unexported fields
}

client connection

func NewConnection

func NewConnection(local, remote string, ctx context.Context, options ...BrokerConfigHandler) (*Connection, error)

func NewConnectionWithBroker

func NewConnectionWithBroker(local, remote string, broker *Broker) (*Connection, error)

func (*Connection) Close

func (c *Connection) Close() error

func (*Connection) GetPublisher

func (c *Connection) GetPublisher() *Publisher

func (*Connection) LocalAddr

func (c *Connection) LocalAddr() net.Addr

func (*Connection) Read

func (c *Connection) Read(b []byte) (n int, err error)

func (*Connection) RemoteAddr

func (c *Connection) RemoteAddr() net.Addr

func (*Connection) SetDeadline

func (c *Connection) SetDeadline(t time.Time) error

func (*Connection) SetReadDeadline

func (c *Connection) SetReadDeadline(t time.Time) error

func (*Connection) SetWriteDeadline

func (c *Connection) SetWriteDeadline(t time.Time) error

func (*Connection) Write

func (c *Connection) Write(b []byte) (n int, err error)

type ConnectionAddr

type ConnectionAddr struct {
	net.Addr
	// contains filtered or unexported fields
}

初始化网络地址 API

func (*ConnectionAddr) Network

func (a *ConnectionAddr) Network() string

func (*ConnectionAddr) String

func (a *ConnectionAddr) String() string

type ConnectionFrame

type ConnectionFrame struct {
	From string `json:"from"`
	Buf  []byte `json:"buf"`

	// 服务端和客户端都应该处理,客户端收到这个,就直接取消全部上下文
	// 服务端收到这个,马上关闭文件,删除缓存
	Closed bool `json:"closed"`

	// 只有服务端处理这个,创建会话
	First bool `json:"first"`
}

Connection Models

type ConnectionHandler

type ConnectionHandler func(broker *Broker, a *amqp.Connection) error

type ConsumingParam

type ConsumingParam struct {
	Queue     string
	Consumer  string
	AutoACK   bool
	Exclusive bool
	NoLocal   bool
	NoWait    bool
	Args      amqp.Table
	Handler   func(b *Broker, conn *amqp.Connection, channel *amqp.Channel, msg amqp.Delivery)
}

type ExchangeBindingParam

type ExchangeBindingParam struct {
	Destination string
	Key         string
	Source      string
	NoWait      bool
	Args        amqp.Table
}

type ExchangeDeclaringParam

type ExchangeDeclaringParam struct {
	Name       string
	Kind       string
	Durable    bool
	AutoDelete bool
	Internel   bool
	NoWait     bool
	Args       amqp.Table
}

type Listener

type Listener struct {
	net.Listener
	// contains filtered or unexported fields
}

func NewListener

func NewListener(b *Broker, addr string, concurrent int) (*Listener, error)

func (*Listener) Accept

func (l *Listener) Accept() (*Connection, error)

func (*Listener) Addr

func (l *Listener) Addr() net.Addr

func (*Listener) Close

func (l *Listener) Close() error

type Publisher

type Publisher struct {
	// contains filtered or unexported fields
}

func (*Publisher) Publish

func (p *Publisher) Publish(failRetry int, exchange, routingKey string, mandatory, immediately bool, msg amqp.Publishing) (err error)

func (*Publisher) PublishTo

func (p *Publisher) PublishTo(exchange, routingKey string, msg amqp.Publishing) error

func (*Publisher) PublishToQueue

func (p *Publisher) PublishToQueue(queue string, msg amqp.Publishing) error

type QueueBindingParam

type QueueBindingParam struct {
	Name     string
	Key      string
	Exchange string
	NoWait   bool
	Args     amqp.Table
}

type QueueDeclaringParam

type QueueDeclaringParam struct {
	Name       string
	Durable    bool
	AutoDelete bool
	Exclusive  bool
	NoWait     bool
	Args       amqp.Table
}

type RPCClient

type RPCClient struct {
	// contains filtered or unexported fields
}

func NewRPCClient

func NewRPCClient(ctx context.Context, exchange string, options ...BrokerConfigHandler) (*RPCClient, error)

func NewRPCClientWithBroker

func NewRPCClientWithBroker(broker *Broker, exchange string, id string) (*RPCClient, error)

func (*RPCClient) Call

func (r *RPCClient) Call(ctx context.Context, f, node string, req interface{}) ([]byte, error)

func (*RPCClient) Connect

func (r *RPCClient) Connect() error

func (*RPCClient) GetPublisher

func (r *RPCClient) GetPublisher() *Publisher

func (*RPCClient) GetRequestSentTimeout

func (r *RPCClient) GetRequestSentTimeout() time.Duration

func (*RPCClient) SetRequestSentTimeout

func (r *RPCClient) SetRequestSentTimeout(duration time.Duration)

type RPCDescription

type RPCDescription struct {
	FunctionType reflect.Type
}

type RPCServer

type RPCServer struct {
	NodeId   string
	Exchange string
	// contains filtered or unexported fields
}

func NewRPCServer

func NewRPCServer(ctx context.Context, exchange, nodeId string, options ...BrokerConfigHandler) (*RPCServer, error)

func (*RPCServer) DoConfigure

func (r *RPCServer) DoConfigure(options ...BrokerConfigHandler)

func (*RPCServer) GetBroker

func (r *RPCServer) GetBroker() *Broker

func (*RPCServer) GetRPCClient

func (r *RPCServer) GetRPCClient(id string) (*RPCClient, error)

一定用在 Serve 之前

func (*RPCServer) IsServing

func (r *RPCServer) IsServing() bool

func (*RPCServer) RegisterService

func (r *RPCServer) RegisterService(funcName string,
	cb func(broker *Broker, ctx context.Context, f, node string, delivery *amqp.Delivery) (message interface{}, e error))

func (*RPCServer) RegisterServices

func (r *RPCServer) RegisterServices(funcNames []string,
	cb func(broker *Broker, ctx context.Context, f, node string, delivery *amqp.Delivery) (message interface{}, e error))

func (*RPCServer) RunBackground

func (r *RPCServer) RunBackground() error

func (*RPCServer) Serve

func (r *RPCServer) Serve()

type RPCSpec

type RPCSpec struct {
	// contains filtered or unexported fields
}

func NewRPCSpec

func NewRPCSpec() *RPCSpec

func (*RPCSpec) Register

func (r *RPCSpec) Register(f string, desc *RPCDescription)

func (*RPCSpec) ValidateFunc

func (r *RPCSpec) ValidateFunc(name string, f interface{}) (bool, string)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL