exchange

package
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 15, 2023 License: Apache-2.0 Imports: 9 Imported by: 0

Documentation

Index

Constants

View Source
const (
	ExchangeTypeLocal   = "local"
	ExchangeTypeRabitMQ = "rabbitmq"
	ExchangeTypeReids   = "redis"
)

Variables

This section is empty.

Functions

This section is empty.

Types

type Event

type Event int
const (
	EventSystem   Event = 0 // 系统消息
	EventSingle   Event = 1 // 单聊消息
	EventChatRoom Event = 2 // 群聊消息
)

type Exchange

type Exchange interface {
	Push(message Message)           // 将消息推送到交换器
	Receive(consume MessageConsume) // 从交换器接收消息
	Start(ctx context.Context)      // 启动
	Stop()                          // 停止消息接收
}

Exchange 消息交换器

type Fields

type Fields map[string]interface{} // 维护动态字段

type LocalExchange

type LocalExchange struct {
	// contains filtered or unexported fields
}

func NewLocalExchange

func NewLocalExchange(log common.Logger) (*LocalExchange, error)

func (*LocalExchange) Push

func (l *LocalExchange) Push(message Message)

Push 将消息推送到交换器

func (*LocalExchange) Receive

func (l *LocalExchange) Receive(consume MessageConsume)

Receive 消费消息(从交换器接收消息)

func (*LocalExchange) Start

func (l *LocalExchange) Start(ctx context.Context)

func (*LocalExchange) Stop

func (l *LocalExchange) Stop()

type Message

type Message struct {
	AppID          string `json:"app_id,required"`          // 应用ID
	From           string `json:"from,required"`            //发送者ID
	To             string `json:"to,required"`              //接收者ID
	Event          Event  `json:"event,required"`           //事件类型
	Data           Fields `json:"data,required"`            //消息内容
	MsgID          string `json:"msg_id,required"`          //消息ID
	SendAt         int64  `json:"send_at,required"`         //发送时间
	ConversationID string `json:"conversation_id,required"` //会话ID
}

type MessageConsume

type MessageConsume func(message Message)

type RabbitMQExchange

type RabbitMQExchange struct {
	// contains filtered or unexported fields
}

func NewRabbitMQExchange

func NewRabbitMQExchange(uri, exchangeName, queueName string, log common.Logger) (ex *RabbitMQExchange, err error)

func (*RabbitMQExchange) Dial

func (ex *RabbitMQExchange) Dial() (err error)

func (*RabbitMQExchange) Push

func (ex *RabbitMQExchange) Push(message Message)

func (*RabbitMQExchange) Receive

func (ex *RabbitMQExchange) Receive(consume MessageConsume)

func (*RabbitMQExchange) Start

func (ex *RabbitMQExchange) Start(ctx context.Context)

func (*RabbitMQExchange) Stop

func (ex *RabbitMQExchange) Stop()

type RedisConn

type RedisConn struct {
	Uri       string // redis地址
	QueueName string
	Conn      *redis.Client
	Ch        *redis.PubSub
}

func NewRedisConn

func NewRedisConn(uri, queueName string) *RedisConn

func (*RedisConn) Close

func (c *RedisConn) Close() (err error)

func (*RedisConn) Dial

func (c *RedisConn) Dial(ctx context.Context) (err error)

func (*RedisConn) Receive

func (c *RedisConn) Receive(ctx context.Context, mq chan Message) (err error)

type RedisExchange

type RedisExchange struct {
	// contains filtered or unexported fields
}

func NewRedisExchange

func NewRedisExchange(ctx context.Context, uri, queueName string, log common.Logger) (ex *RedisExchange, err error)

func (*RedisExchange) Dial

func (ex *RedisExchange) Dial(ctx context.Context) (err error)

func (*RedisExchange) Push

func (ex *RedisExchange) Push(message Message)

func (*RedisExchange) Receive

func (ex *RedisExchange) Receive(consume MessageConsume)

func (*RedisExchange) Start

func (ex *RedisExchange) Start(ctx context.Context)

func (*RedisExchange) Stop

func (ex *RedisExchange) Stop()

type Session

type Session struct {
	ExchangeName string
	QueueName    string
	ExchangeType string
	Conn         *amqp.Connection
	Ch           *amqp.Channel

	Uri string
	// contains filtered or unexported fields
}

func NewSession

func NewSession(uri, exchangeName, queueName string) *Session

func (*Session) Close

func (s *Session) Close()

Close 关闭连接

func (*Session) Dial

func (s *Session) Dial() (err error)

func (*Session) Receive

func (s *Session) Receive(mq chan Message) (err error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL