server

package
v0.0.0-...-cdc279e Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 9, 2024 License: Apache-2.0 Imports: 54 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// EventMsgOffline 离线消息
	EventMsgOffline = "msg.offline"
	// EventMsgNotify 消息通知(将所有消息通知到第三方程序)
	EventMsgNotify = "msg.notify"
	// EventOnlineStatus 用户在线状态
	EventOnlineStatus = "user.onlinestatus"
)

Variables

View Source
var (
	ErrChannelNotFound = fmt.Errorf("channel not found")
	ErrParamInvalid    = fmt.Errorf("param invalid")
)
View Source
var (
	VERSION = "4.0.0" // 服务器版本
)

Functions

func GetCommunityTopicParentChannelID

func GetCommunityTopicParentChannelID(channelID string) string

GetCommunityTopicParentChannelID 获取社区话题频道的父频道ID

func GetFakeChannelIDWith

func GetFakeChannelIDWith(fromUID, toUID string) string

GetFakeChannelIDWith GetFakeChannelIDWith

func GetFromUIDAndToUIDWith

func GetFromUIDAndToUIDWith(channelID string) (string, string)

func GetHomeDir

func GetHomeDir() (string, error)

func MarshalMessage

func MarshalMessage(version uint8, m *Message) []byte

MarshalMessage MarshalMessage

func UnmarshalMessage

func UnmarshalMessage(data []byte, m *Message) error

UnmarshalMessage UnmarshalMessage

Types

type APIServer

type APIServer struct {
	oklog.Log
	// contains filtered or unexported fields
}

APIServer ApiServer

func NewAPIServer

func NewAPIServer(s *Server) *APIServer

NewAPIServer new一个api server

func (*APIServer) Start

func (s *APIServer) Start()

Start 开始

func (*APIServer) Stop

func (s *APIServer) Stop()

Stop 停止服务

type Channel

type Channel struct {
	*okstore.ChannelInfo

	oklog.Log
	// contains filtered or unexported fields
}

func NewChannel

func NewChannel(channelInfo *okstore.ChannelInfo, s *Server) *Channel

NewChannel NewChannel

func (*Channel) AddAllowlist

func (c *Channel) AddAllowlist(uids []string)

AddAllowlist 添加白名单

func (*Channel) AddDenylist

func (c *Channel) AddDenylist(uids []string)

AddDenylist 添加黑名单

func (*Channel) AddSubscriber

func (c *Channel) AddSubscriber(uid string)

AddSubscriber Add subscribers

func (*Channel) AddSubscribers

func (c *Channel) AddSubscribers(uids []string)

func (*Channel) AddTmpSubscriber

func (c *Channel) AddTmpSubscriber(uid string)

func (*Channel) AddTmpSubscribers

func (c *Channel) AddTmpSubscribers(uids []string)

func (*Channel) Allow

func (c *Channel) Allow(uid string) (bool, proto.ReasonCode)

Allow Whether to allow sending of messages If it is in the white list or not in the black list, it is allowed to send

func (*Channel) GetAllSubscribers

func (c *Channel) GetAllSubscribers() []string

GetAllSubscribers 获取所有订阅者

func (*Channel) GetAllTmpSubscribers

func (c *Channel) GetAllTmpSubscribers() []string

func (*Channel) IsDenylist

func (c *Channel) IsDenylist(uid string) bool

IsDenylist 是否在黑名单内

func (*Channel) IsSubscriber

func (c *Channel) IsSubscriber(uid string) bool

IsSubscriber 是否已订阅

func (*Channel) IsTmpSubscriber

func (c *Channel) IsTmpSubscriber(uid string) bool

IsTmpSubscriber 是否是临时订阅者

func (*Channel) LoadData

func (c *Channel) LoadData() error

LoadData load data

func (*Channel) Put

func (c *Channel) Put(messages []*Message, customSubscribers []string, fromUID string, fromDeviceFlag proto.DeviceFlag, fromDeviceID string) error

func (*Channel) RealSubscribers

func (c *Channel) RealSubscribers(customSubscribers []string) ([]string, error)

real subscribers

func (*Channel) RemoveAllSubscriber

func (c *Channel) RemoveAllSubscriber()

RemoveAllSubscriber 移除所有订阅者

func (*Channel) RemoveAllTmpSubscriber

func (c *Channel) RemoveAllTmpSubscriber()

RemoveAllTmpSubscriber 移除所有临时订阅者

func (*Channel) RemoveAllowlist

func (c *Channel) RemoveAllowlist(uids []string)

RemoveAllowlist 移除白名单

func (*Channel) RemoveDenylist

func (c *Channel) RemoveDenylist(uids []string)

RemoveDenylist 移除黑名单

func (*Channel) RemoveSubscriber

func (c *Channel) RemoveSubscriber(uid string)

RemoveSubscriber 移除订阅者

func (*Channel) RemoveSubscribers

func (c *Channel) RemoveSubscribers(uids []string)

func (*Channel) RemoveTmSubscriber

func (c *Channel) RemoveTmSubscriber(uid string)

func (*Channel) RemoveTmpSubscribers

func (c *Channel) RemoveTmpSubscribers(uids []string)

func (*Channel) SetAllowlist

func (c *Channel) SetAllowlist(uids []string)

SetAllowlist SetAllowlist

func (*Channel) SetDenylist

func (c *Channel) SetDenylist(uids []string)

SetDenylist SetDenylist

type ChannelAPI

type ChannelAPI struct {
	oklog.Log
	// contains filtered or unexported fields
}

ChannelAPI ChannelAPI

func NewChannelAPI

func NewChannelAPI(s *Server) *ChannelAPI

NewChannelAPI 创建API

func (*ChannelAPI) Route

func (ch *ChannelAPI) Route(r *okhttp.OKHttp)

Route Route

type ChannelCreateReq

type ChannelCreateReq struct {
	ChannelInfoReq
	Subscribers []string `json:"subscribers"` // 订阅者
}

ChannelCreateReq 频道创建请求

func (ChannelCreateReq) Check

func (r ChannelCreateReq) Check() error

Check 检查请求参数

type ChannelDeleteReq

type ChannelDeleteReq struct {
	ChannelID   string `json:"channel_id"`   // 频道ID
	ChannelType uint8  `json:"channel_type"` // 频道类型
}

ChannelDeleteReq 删除频道请求

type ChannelInfoReq

type ChannelInfoReq struct {
	ChannelID   string `json:"channel_id"`   // 频道ID
	ChannelType uint8  `json:"channel_type"` // 频道类型
	Large       int    `json:"large"`        // 是否是超大群
	Ban         int    `json:"ban"`          // 是否封禁频道(封禁后此频道所有人都将不能发消息,除了系统账号)
}

ChannelInfoReq ChannelInfoReq

func (ChannelInfoReq) ToChannelInfo

func (c ChannelInfoReq) ToChannelInfo() *okstore.ChannelInfo

type ChannelInfoResp

type ChannelInfoResp struct {
	Large int `json:"large"` // 是否是超大群
	Ban   int `json:"ban"`   // 是否封禁频道(封禁后此频道所有人都将不能发消息,除了系统账号)
}

func (ChannelInfoResp) ToChannelInfo

func (c ChannelInfoResp) ToChannelInfo() *okstore.ChannelInfo

type ChannelManager

type ChannelManager struct {
	oklog.Log
	// contains filtered or unexported fields
}

ChannelManager 频道管理

func NewChannelManager

func NewChannelManager(s *Server) *ChannelManager

NewChannelManager 创建一个频道管理者

func (*ChannelManager) CreateOrUpdatePersonChannel

func (cm *ChannelManager) CreateOrUpdatePersonChannel(uid string) error

CreateOrUpdatePersonChannel 创建或更新个人频道

func (*ChannelManager) CreateTmpChannel

func (cm *ChannelManager) CreateTmpChannel(channelID string, channelType uint8, subscribers []string) error

CreateTmpChannel 创建临时频道

func (*ChannelManager) DeleteChannel

func (cm *ChannelManager) DeleteChannel(channelID string, channelType uint8) error

DeleteChannel 删除频道

func (*ChannelManager) DeleteChannelFromCache

func (cm *ChannelManager) DeleteChannelFromCache(channelID string, channelType uint8)

DeleteChannelFromCache DeleteChannelFromCache

func (*ChannelManager) GetChannel

func (cm *ChannelManager) GetChannel(channelID string, channelType uint8) (*Channel, error)

GetChannel 获取频道

func (*ChannelManager) GetOrCreateDataChannel

func (cm *ChannelManager) GetOrCreateDataChannel(channelID string, channelType uint8) *Channel

func (*ChannelManager) GetPersonChannel

func (cm *ChannelManager) GetPersonChannel(channelID string, channelType uint8) (*Channel, error)

GetPersonChannel 创建临时频道

func (*ChannelManager) GetTmpChannel

func (cm *ChannelManager) GetTmpChannel(channelID string, channelType uint8) (*Channel, error)

GetTmpChannel 获取临时频道

func (*ChannelManager) RemoveDataChannel

func (cm *ChannelManager) RemoveDataChannel(channelID string, channelType uint8)

type ConnInfo

type ConnInfo struct {
	ID           int64     `json:"id"`            // 连接ID
	UID          string    `json:"uid"`           // 用户uid
	IP           string    `json:"ip"`            // 客户端IP
	Port         int       `json:"port"`          // 客户端端口
	LastActivity time.Time `json:"last_activity"` // 最后一次活动时间
	Uptime       string    `json:"uptime"`        // 启动时间
	Idle         string    `json:"idle"`          // 客户端闲置时间
	PendingBytes int       `json:"pending_bytes"` // 等待发送的字节数
	InMsgs       int64     `json:"in_msgs"`       // 流入的消息数
	OutMsgs      int64     `json:"out_msgs"`      // 流出的消息数量
	InBytes      int64     `json:"in_bytes"`      // 流入的字节数量
	OutBytes     int64     `json:"out_bytes"`     // 流出的字节数量
	Device       string    `json:"device"`        // 设备
	DeviceID     string    `json:"device_id"`     // 设备ID
	Version      uint8     `json:"version"`       // 客户端协议版本
}

type ConnManager

type ConnManager struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

func NewConnManager

func NewConnManager(s *Server) *ConnManager

func (*ConnManager) AddConn

func (c *ConnManager) AddConn(conn oknet.Conn)

func (*ConnManager) ExistConnsWithUID

func (c *ConnManager) ExistConnsWithUID(uid string) bool

func (*ConnManager) GetConn

func (c *ConnManager) GetConn(id int64) oknet.Conn

func (*ConnManager) GetConnCountWith

func (c *ConnManager) GetConnCountWith(uid string, deviceFlag okproto.DeviceFlag) (int, int)

GetConnCountWith 获取设备的在线数量和用户所有设备的在线数量

func (*ConnManager) GetConnWithDeviceID

func (c *ConnManager) GetConnWithDeviceID(uid string, deviceID string) oknet.Conn

func (*ConnManager) GetConnsWith

func (c *ConnManager) GetConnsWith(uid string, deviceFlag okproto.DeviceFlag) []oknet.Conn

func (*ConnManager) GetConnsWithUID

func (c *ConnManager) GetConnsWithUID(uid string) []oknet.Conn

func (*ConnManager) GetOnlineConns

func (c *ConnManager) GetOnlineConns(uids []string) []oknet.Conn

GetOnlineConns 传一批uids 返回在线的uids

func (*ConnManager) RemoveConn

func (c *ConnManager) RemoveConn(conn oknet.Conn)

func (*ConnManager) RemoveConnWithID

func (c *ConnManager) RemoveConnWithID(id int64)

type Connz

type Connz struct {
	Connections []*ConnInfo `json:"connections"` // 连接数
	Now         time.Time   `json:"now"`         // 查询时间
	Total       int         `json:"total"`       // 总连接数量
	Offset      int         `json:"offset"`      // 偏移位置
	Limit       int         `json:"limit"`       // 限制数量
}

type ConnzAPI

type ConnzAPI struct {
	oklog.Log
	// contains filtered or unexported fields
}

func NewConnzAPI

func NewConnzAPI(s *Server) *ConnzAPI

func (*ConnzAPI) HandleConnz

func (co *ConnzAPI) HandleConnz(c *okhttp.Context)

func (*ConnzAPI) Route

func (co *ConnzAPI) Route(r *okhttp.OKHttp)

type ConversationAPI

type ConversationAPI struct {
	oklog.Log
	// contains filtered or unexported fields
}

ConversationAPI ConversationAPI

func NewConversationAPI

func NewConversationAPI(s *Server) *ConversationAPI

NewConversationAPI NewConversationAPI

func (*ConversationAPI) Route

func (s *ConversationAPI) Route(r *okhttp.OKHttp)

Route 路由

type ConversationManager

type ConversationManager struct {
	oklog.Log
	// contains filtered or unexported fields
}

ConversationManager ConversationManager

func NewConversationManager

func NewConversationManager(s *Server) *ConversationManager

NewConversationManager NewConversationManager

func (*ConversationManager) AddOrUpdateConversation

func (cm *ConversationManager) AddOrUpdateConversation(uid string, conversation *okstore.Conversation)

func (*ConversationManager) DeleteConversation

func (cm *ConversationManager) DeleteConversation(uids []string, channelID string, channelType uint8) error

DeleteConversation 删除最近会话

func (*ConversationManager) FlushConversations

func (cm *ConversationManager) FlushConversations()

FlushConversations 同步最近会话

func (*ConversationManager) GetConversation

func (cm *ConversationManager) GetConversation(uid string, channelID string, channelType uint8) *okstore.Conversation

func (*ConversationManager) GetConversations

func (cm *ConversationManager) GetConversations(uid string, version int64, larges []*okproto.Channel) []*okstore.Conversation

GetConversations GetConversations

func (*ConversationManager) PushMessage

func (cm *ConversationManager) PushMessage(message *Message, subscribers []string)

PushMessage PushMessage

func (*ConversationManager) SetConversationUnread

func (cm *ConversationManager) SetConversationUnread(uid string, channelID string, channelType uint8, unread int, messageSeq uint32) error

SetConversationUnread set unread data from conversation

func (*ConversationManager) Start

func (cm *ConversationManager) Start()

Start Start

func (*ConversationManager) Stop

func (cm *ConversationManager) Stop()

Stop Stop

type Datasource

type Datasource struct {
	// contains filtered or unexported fields
}

Datasource Datasource

func (*Datasource) GetBlacklist

func (d *Datasource) GetBlacklist(channelID string, channelType uint8) ([]string, error)

GetBlacklist 获取频道的黑名单

func (*Datasource) GetChannelInfo

func (d *Datasource) GetChannelInfo(channelID string, channelType uint8) (*okstore.ChannelInfo, error)

func (*Datasource) GetSubscribers

func (d *Datasource) GetSubscribers(channelID string, channelType uint8) ([]string, error)

GetSubscribers 获取频道的订阅者

func (*Datasource) GetSystemUIDs

func (d *Datasource) GetSystemUIDs() ([]string, error)

GetSystemUIDs 获取系统账号

func (*Datasource) GetWhitelist

func (d *Datasource) GetWhitelist(channelID string, channelType uint8) ([]string, error)

GetWhitelist 获取频道的白明单

type DeliveryManager

type DeliveryManager struct {
	oklog.Log
	// contains filtered or unexported fields
}

func NewDeliveryManager

func NewDeliveryManager(s *Server) *DeliveryManager

type DemoServer

type DemoServer struct {
	oklog.Log
	// contains filtered or unexported fields
}

func NewDemoServer

func NewDemoServer(s *Server) *DemoServer

NewDemoServer new一个demo server

func (*DemoServer) Start

func (s *DemoServer) Start()

Start 开始

func (*DemoServer) Stop

func (s *DemoServer) Stop()

Stop 停止服务

type Dispatch

type Dispatch struct {
	oklog.Log
	// contains filtered or unexported fields
}

func NewDispatch

func NewDispatch(s *Server) *Dispatch

func (*Dispatch) Start

func (d *Dispatch) Start() error

func (*Dispatch) Stop

func (d *Dispatch) Stop() error

type Event

type Event struct {
	Event string      `json:"event"` // 事件标示
	Data  interface{} `json:"data"`  // 事件数据
}

Event Event

func (*Event) String

func (e *Event) String() string

type FramePool

type FramePool struct {
	// contains filtered or unexported fields
}

func NewFramePool

func NewFramePool() *FramePool

func (*FramePool) GetRecvackPackets

func (f *FramePool) GetRecvackPackets() []*okproto.RecvackPacket

func (*FramePool) GetSendPackets

func (f *FramePool) GetSendPackets() []*okproto.SendPacket

func (*FramePool) GetSubPackets

func (f *FramePool) GetSubPackets() []*okproto.SubPacket

func (*FramePool) GetSubackPackets

func (f *FramePool) GetSubackPackets() []*okproto.SubackPacket

func (*FramePool) PutRecvackPackets

func (f *FramePool) PutRecvackPackets(recvackPackets []*okproto.RecvackPacket)

func (*FramePool) PutSendPackets

func (f *FramePool) PutSendPackets(sendPackets []*okproto.SendPacket)

func (*FramePool) PutSubPackets

func (f *FramePool) PutSubPackets(subPackets []*okproto.SubPacket)

func (*FramePool) PutSubackPackets

func (f *FramePool) PutSubackPackets(subackPackets []*okproto.SubackPacket)

type FrameWorkPool

type FrameWorkPool struct {
}

func NewFrameWorkPool

func NewFrameWorkPool() *FrameWorkPool

func (*FrameWorkPool) Submit

func (f *FrameWorkPool) Submit(task func())

type IDatasource

type IDatasource interface {
	// 获取订阅者
	GetSubscribers(channelID string, channelType uint8) ([]string, error)
	// 获取黑名单
	GetBlacklist(channelID string, channelType uint8) ([]string, error)
	// 获取白名单
	GetWhitelist(channelID string, channelType uint8) ([]string, error)
	// 获取系统账号的uid集合 系统账号可以给任何人发消息
	GetSystemUIDs() ([]string, error)
	// 获取频道信息
	GetChannelInfo(channelID string, channelType uint8) (*okstore.ChannelInfo, error)
}

IDatasource 数据源第三方应用可以提供

func NewDatasource

func NewDatasource(s *Server) IDatasource

NewDatasource 创建一个数据源

type Message

type Message struct {
	*okproto.RecvPacket
	ToUID       string   // 接受者
	Subscribers []string // 订阅者 如果此字段有值 则表示消息只发送给指定的订阅者
	// contains filtered or unexported fields
}

func (*Message) Decode

func (m *Message) Decode(msg []byte) error

func (*Message) DeepCopy

func (m *Message) DeepCopy() (*Message, error)

func (*Message) Encode

func (m *Message) Encode() []byte

func (*Message) GetMessageID

func (m *Message) GetMessageID() int64

func (*Message) GetSeq

func (m *Message) GetSeq() uint32

func (*Message) SetSeq

func (m *Message) SetSeq(seq uint32)

func (*Message) StreamIng

func (m *Message) StreamIng() bool

func (*Message) StreamStart

func (m *Message) StreamStart() bool

type MessageAPI

type MessageAPI struct {
	oklog.Log
	// contains filtered or unexported fields
}

MessageAPI MessageAPI

func NewMessageAPI

func NewMessageAPI(s *Server) *MessageAPI

NewMessageAPI NewMessageAPI

func (*MessageAPI) Route

func (m *MessageAPI) Route(r *okhttp.OKHttp)

Route route

type MessageHeader

type MessageHeader struct {
	NoPersist int `json:"no_persist"` // Is it not persistent
	RedDot    int `json:"red_dot"`    // Whether to show red dot
	SyncOnce  int `json:"sync_once"`  // This message is only synchronized or consumed once
}

MessageHeader Message header

type MessageOfflineNotify

type MessageOfflineNotify struct {
	MessageResp
	ToUIDs          []string `json:"to_uids"`
	Compress        string   `json:"compress,omitempty"`         // 压缩ToUIDs 如果为空 表示不压缩 为gzip则采用gzip压缩
	CompresssToUIDs []byte   `json:"compress_to_uids,omitempty"` // 已压缩的to_uids
	SourceID        int64    `json:"source_id,omitempty"`        // 来源节点ID
}

type MessageResp

type MessageResp struct {
	Header       MessageHeader      `json:"header"`                // 消息头
	Setting      uint8              `json:"setting"`               // 设置
	MessageID    int64              `json:"message_id"`            // 服务端的消息ID(全局唯一)
	MessageIDStr string             `json:"message_idstr"`         // 服务端的消息ID(全局唯一)
	ClientMsgNo  string             `json:"client_msg_no"`         // 客户端消息唯一编号
	StreamNo     string             `json:"stream_no,omitempty"`   // 流编号
	StreamSeq    uint32             `json:"stream_seq,omitempty"`  // 流序号
	StreamFlag   okproto.StreamFlag `json:"stream_flag,omitempty"` // 流标记
	MessageSeq   uint32             `json:"message_seq"`           // 消息序列号 (用户唯一,有序递增)
	FromUID      string             `json:"from_uid"`              // 发送者UID
	ChannelID    string             `json:"channel_id"`            // 频道ID
	ChannelType  uint8              `json:"channel_type"`          // 频道类型
	Topic        string             `json:"topic,omitempty"`       // 话题ID
	Expire       uint32             `json:"expire"`                // 消息过期时间
	Timestamp    int32              `json:"timestamp"`             // 服务器消息时间戳(10位,到秒)
	Payload      []byte             `json:"payload"`               // 消息内容
	Streams      []*StreamItemResp  `json:"streams,omitempty"`     // 消息流内容
}

MessageResp 消息返回

type MessageRespSlice

type MessageRespSlice []*MessageResp

MessageRespSlice MessageRespSlice

func (MessageRespSlice) Len

func (m MessageRespSlice) Len() int

func (MessageRespSlice) Less

func (m MessageRespSlice) Less(i, j int) bool

func (MessageRespSlice) Swap

func (m MessageRespSlice) Swap(i, j int)

type MessageSendReq

type MessageSendReq struct {
	Header      MessageHeader `json:"header"`        // 消息头
	ClientMsgNo string        `json:"client_msg_no"` // 客户端消息编号(相同编号,客户端只会显示一条)
	StreamNo    string        `json:"stream_no"`     // 消息流编号
	FromUID     string        `json:"from_uid"`      // 发送者UID
	ChannelID   string        `json:"channel_id"`    // 频道ID
	ChannelType uint8         `json:"channel_type"`  // 频道类型
	Expire      uint32        `json:"expire"`        // 消息过期时间
	Subscribers []string      `json:"subscribers"`   // 订阅者 如果此字段有值,表示消息只发给指定的订阅者
	Payload     []byte        `json:"payload"`       // 消息内容
}

MessageSendReq 消息发送请求

func (MessageSendReq) Check

func (m MessageSendReq) Check() error

Check 检查输入

type Mode

type Mode string
const (
	//debug 模式
	DebugMode Mode = "debug"
	// 正式模式
	ReleaseMode Mode = "release"
	// 压力测试模式
	BenchMode Mode = "bench"
	// TestMode indicates gin mode is test.
	TestMode = "test"
)

type MonitorAPI

type MonitorAPI struct {
	oklog.Log
	// contains filtered or unexported fields
}

func NewMonitorAPI

func NewMonitorAPI(s *Server) *MonitorAPI

NewMonitorAPI NewMonitorAPI

func (*MonitorAPI) Route

func (m *MonitorAPI) Route(r *okhttp.OKHttp)

Route 用户相关路由配置

type MonitorServer

type MonitorServer struct {
	oklog.Log
	// contains filtered or unexported fields
}

func NewMonitorServer

func NewMonitorServer(s *Server) *MonitorServer

func (*MonitorServer) Start

func (m *MonitorServer) Start()

func (*MonitorServer) Stop

func (m *MonitorServer) Stop() error

type OnlinestatusResp

type OnlinestatusResp struct {
	UID        string `json:"uid"`         // 在线用户uid
	DeviceFlag uint8  `json:"device_flag"` // 设备标记 0. APP 1.web
	Online     int    `json:"online"`      // 是否在线
}

type Options

type Options struct {
	ID          int64  // 节点ID
	Mode        Mode   // 模式 debug 测试 release 正式 bench 压力测试
	HTTPAddr    string // http api的监听地址 默认为 0.0.0.0:5001
	Addr        string // tcp监听地址 例如:tcp://0.0.0.0:5100
	RootDir     string // 根目录
	DataDir     string // 数据目录
	GinMode     string // gin框架的模式
	WSAddr      string // websocket 监听地址 例如:ws://0.0.0.0:5200
	WSSAddr     string // wss 监听地址 例如:wss://0.0.0.0:5210
	WSTLSConfig *tls.Config
	WSSConfig   struct {
		CertFile string // 证书文件
		KeyFile  string // 私钥文件
	}

	Logger struct {
		Dir     string // 日志存储目录
		Level   zapcore.Level
		LineNum bool // 是否显示代码行数
	}
	Monitor struct {
		On   bool   // 是否开启监控
		Addr string // 监控地址 默认为 0.0.0.0:5300
	}
	Demo struct {
		On   bool   // 是否开启demo
		Addr string // demo服务地址 默认为 0.0.0.0:5172
	}
	External struct {
		IP          string // 外网IP
		TCPAddr     string // 节点的TCP地址 对外公开,APP端长连接通讯  格式: ip:port
		WSAddr      string //  节点的wsAdd地址 对外公开 WEB端长连接通讯 格式: ws://ip:port
		WSSAddr     string // 节点的wssAddr地址 对外公开 WEB端长连接通讯 格式: wss://ip:port
		MonitorAddr string // 对外访问的监控地址
		APIUrl      string // 对外访问的API基地址 格式: http://ip:port
	}
	Channel struct {
		CacheCount                int  // 频道缓存数量
		CreateIfNoExist           bool // 如果频道不存在是否创建
		SubscriberCompressOfCount int  // 订订阅者数组多大开始压缩(离线推送的时候订阅者数组太大 可以设置此参数进行压缩 默认为0 表示不压缩 )

	}
	TmpChannel struct {
		Suffix     string // 临时频道的后缀
		CacheCount int    // 临时频道缓存数量
	}
	Webhook struct {
		HTTPAddr                    string        // webhook的http地址 通过此地址通知数据给第三方 格式为 http://xxxxx
		GRPCAddr                    string        //  webhook的grpc地址 如果此地址有值 则不会再调用HttpAddr配置的地址,格式为 ip:port
		MsgNotifyEventPushInterval  time.Duration // 消息通知事件推送间隔,默认500毫秒发起一次推送
		MsgNotifyEventCountPerPush  int           // 每次webhook消息通知事件推送消息数量限制 默认一次请求最多推送100条
		MsgNotifyEventRetryMaxCount int           // 消息通知事件消息推送失败最大重试次数 默认为5次,超过将丢弃
	}
	Datasource struct {
		Addr          string // 数据源地址
		ChannelInfoOn bool   // 是否开启频道信息获取
	}
	Conversation struct {
		On           bool          // 是否开启最近会话
		CacheExpire  time.Duration // 最近会话缓存过期时间 (这个是热数据缓存时间,并非最近会话数据的缓存时间)
		SyncInterval time.Duration // 最近会话同步间隔
		SyncOnce     int           //  当多少最近会话数量发送变化就保存一次
		UserMaxCount int           // 每个用户最大最近会话数量 默认为500
	}
	ManagerToken   string // 管理者的token
	ManagerUID     string // 管理者的uid
	ManagerTokenOn bool   // 管理者的token是否开启

	Proto okproto.Protocol // IM protocol

	Version string

	UnitTest       bool // 是否开启单元测试
	HandlePoolSize int

	ConnIdleTime    time.Duration // 连接空闲时间 超过此时间没数据传输将关闭
	TimingWheelTick time.Duration // The time-round training interval must be 1ms or more
	TimingWheelSize int64         // Time wheel size

	UserMsgQueueMaxSize int // 用户消息队列最大大小,超过此大小此用户将被限速,0为不限制

	TokenAuthOn bool // 是否开启token验证 不配置将根据mode属性判断 debug模式下默认为false release模式为true

	EventPoolSize int // 事件协程池大小,此池主要处理im的一些通知事件 比如webhook,上下线等等 默认为1024

	WhitelistOffOfPerson int
	DeliveryMsgPoolSize  int // 投递消息协程池大小,此池的协程主要用来将消息投递给在线用户 默认大小为 10240

	MessageRetry struct {
		Interval     time.Duration // 消息重试间隔,如果消息发送后在此间隔内没有收到ack,将会在此间隔后重新发送
		MaxCount     int           // 消息最大重试次数
		ScanInterval time.Duration //  每隔多久扫描一次超时队列,看超时队列里是否有需要重试的消息
	}

	SlotNum int // 槽数量
	// contains filtered or unexported fields
}

func NewOptions

func NewOptions() *Options

func NewServerOptions

func NewServerOptions() *Options

NewServerOptions NewServerOptions

func NewTestOptions

func NewTestOptions(logLevel ...zapcore.Level) *Options

func (*Options) ConfigFileUsed

func (o *Options) ConfigFileUsed() string

func (*Options) ConfigureWithViper

func (o *Options) ConfigureWithViper(vp *viper.Viper)

func (*Options) GetCustomerServiceVisitorUID

func (o *Options) GetCustomerServiceVisitorUID(channelID string) (string, bool)

获取客服频道的访客id

func (*Options) HasDatasource

func (o *Options) HasDatasource() bool

HasDatasource 是否有配置数据源

func (*Options) IsFakeChannel

func (o *Options) IsFakeChannel(channelID string) bool

IsFakeChannel 是fake频道

func (*Options) IsTmpChannel

func (o *Options) IsTmpChannel(channelID string) bool

IsTmpChannel 是否是临时频道

func (*Options) WebhookGRPCOn

func (o *Options) WebhookGRPCOn() bool

WebhookGRPCOn 是否配置了webhook grpc地址

func (*Options) WebhookOn

func (o *Options) WebhookOn() bool

WebhookOn WebhookOn

type Processor

type Processor struct {
	oklog.Log
	// contains filtered or unexported fields
}

func NewProcessor

func NewProcessor(s *Server) *Processor

type PullMode

type PullMode int // 拉取模式
const (
	PullModeDown PullMode = iota // 向下拉取
	PullModeUp                   // 向上拉取
)

type Queue

type Queue struct {
	sync.Mutex
	// contains filtered or unexported fields
}

Queue Queue

func NewQueue

func NewQueue() *Queue

NewQueue 创建队列

func (*Queue) Close

func (e *Queue) Close()

Close Close

func (*Queue) Len

func (e *Queue) Len() int

Len 获取队列长度

func (*Queue) Pop

func (e *Queue) Pop() (v interface{})

Pop 取出队列,(阻塞模式)

func (*Queue) Push

func (e *Queue) Push(v interface{})

Push Push

func (*Queue) TryPop

func (e *Queue) TryPop() (v interface{}, ok bool)

TryPop 试着取出队列(非阻塞模式)返回ok == false 表示空

func (*Queue) Wait

func (e *Queue) Wait()

Wait 等待队列消费完成

type RetryQueue

type RetryQueue struct {
	// contains filtered or unexported fields
}

RetryQueue 重试队列

func NewRetryQueue

func NewRetryQueue(s *Server) *RetryQueue

NewRetryQueue NewRetryQueue

func (*RetryQueue) Start

func (r *RetryQueue) Start()

Start 开始运行重试

func (*RetryQueue) Stop

func (r *RetryQueue) Stop()

type RouteAPI

type RouteAPI struct {
	oklog.Log
	// contains filtered or unexported fields
}

这个主要为了模拟proxy模式。

func NewRouteAPI

func NewRouteAPI(s *Server) *RouteAPI

NewRouteAPI NewRouteAPI

func (*RouteAPI) Route

func (a *RouteAPI) Route(r *okhttp.OKHttp)

Route Route

type Server

type Server struct {
	oklog.Log // 日志
	// contains filtered or unexported fields
}

func New

func New(opts *Options) *Server

func NewTestServer

func NewTestServer(ots ...*Options) *Server

NewTestServer NewTestServer

func (*Server) AddIPBlacklist

func (s *Server) AddIPBlacklist(ips []string)

func (*Server) AllowIP

func (s *Server) AllowIP(ip string) bool

func (*Server) GetConnInfos

func (s *Server) GetConnInfos(sortOpt SortOpt, offset, limit int) []oknet.Conn

func (*Server) Init

func (s *Server) Init(env svc.Environment) error

func (*Server) RemoveIPBlacklist

func (s *Server) RemoveIPBlacklist(ips []string)

func (*Server) Schedule

func (s *Server) Schedule(interval time.Duration, f func()) *timingwheel.Timer

Schedule 延迟任务

func (*Server) Start

func (s *Server) Start() error

func (*Server) Stop

func (s *Server) Stop() error

type SortOpt

type SortOpt string
const (
	ByID     SortOpt = "id"     // 通过连接id排序
	ByIDDesc SortOpt = "idDesc" // 通过连接id排序

	ByInMsg            SortOpt = "inMsg"            // 通过收到消息排序
	ByInMsgDesc        SortOpt = "inMsgDesc"        // 通过收到消息排序
	ByOutMsg           SortOpt = "outMsg"           // 通过发送消息排序
	ByOutMsgDesc       SortOpt = "outMsgDesc"       // 通过发送消息排序
	ByInBytes          SortOpt = "inBytes"          // 通过收到字节数排序
	ByInBytesDesc      SortOpt = "inBytesDesc"      // 通过收到字节数排序
	ByOutBytes         SortOpt = "outBytes"         // 通过发送字节数排序
	ByOutBytesDesc     SortOpt = "outBytesDesc"     // 通过发送字节数排序
	ByPendingBytes     SortOpt = "pendingBytes"     // 通过等待发送字节数排序
	ByPendingBytesDesc SortOpt = "pendingBytesDesc" // 通过等待发送字节数排序
	ByUptime           SortOpt = "uptime"           // 通过启动时间排序
	ByUptimeDesc       SortOpt = "uptimeDesc"       // 通过启动时间排序
)

type StreamItemResp

type StreamItemResp struct {
	StreamSeq   uint32 `json:"stream_seq"`    // 流序号
	ClientMsgNo string `json:"client_msg_no"` // 客户端消息唯一编号
	Blob        []byte `json:"blob"`          // 消息内容
}

type SystemAPI

type SystemAPI struct {
	oklog.Log
	// contains filtered or unexported fields
}

func NewSystemAPI

func NewSystemAPI(s *Server) *SystemAPI

NewSystemAPI 创建API

func (*SystemAPI) Route

func (s *SystemAPI) Route(r *okhttp.OKHttp)

Route Route

type SystemUIDManager

type SystemUIDManager struct {
	// contains filtered or unexported fields
}

SystemUIDManager System uid management

func NewSystemUIDManager

func NewSystemUIDManager(s *Server) *SystemUIDManager

NewSystemUIDManager NewSystemUIDManager

func (*SystemUIDManager) AddSystemUIDs

func (s *SystemUIDManager) AddSystemUIDs(uids []string) error

AddSystemUID AddSystemUID

func (*SystemUIDManager) LoadIfNeed

func (s *SystemUIDManager) LoadIfNeed() error

LoadIfNeed LoadIfNeed

func (*SystemUIDManager) RemoveSystemUIDs

func (s *SystemUIDManager) RemoveSystemUIDs(uids []string) error

RemoveSystemUID RemoveSystemUID

func (*SystemUIDManager) SystemUID

func (s *SystemUIDManager) SystemUID(uid string) bool

SystemUID Is it a system account?

type TestConn

type TestConn struct {
	// contains filtered or unexported fields
}

func NewTestConn

func NewTestConn() *TestConn

func (*TestConn) Authed

func (t *TestConn) Authed() bool

func (*TestConn) Close

func (t *TestConn) Close() error

func (*TestConn) GetID

func (t *TestConn) GetID() uint32

func (*TestConn) OutboundBuffered

func (t *TestConn) OutboundBuffered() int

func (*TestConn) RemoteAddr

func (t *TestConn) RemoteAddr() net.Addr

func (*TestConn) SetAuthed

func (t *TestConn) SetAuthed(v bool)

func (*TestConn) SetID

func (t *TestConn) SetID(id uint32)

func (*TestConn) SetReadDeadline

func (t *TestConn) SetReadDeadline(tm time.Time) error

func (*TestConn) SetVersion

func (t *TestConn) SetVersion(version uint8)

func (*TestConn) SetWriteDeadline

func (t *TestConn) SetWriteDeadline(tm time.Time) error

func (*TestConn) Version

func (t *TestConn) Version() uint8

func (*TestConn) Write

func (t *TestConn) Write(buf []byte) (n int, err error)

func (*TestConn) WriteChan

func (t *TestConn) WriteChan() chan []byte

type UpdateTokenReq

type UpdateTokenReq struct {
	UID         string              `json:"uid"`          // 用户唯一uid
	Token       string              `json:"token"`        // 用户的token
	DeviceFlag  okproto.DeviceFlag  `json:"device_flag"`  // 设备标识  0.app 1.web
	DeviceLevel okproto.DeviceLevel `json:"device_level"` // 设备等级 0.为从设备 1.为主设备
}

UpdateTokenReq 更新token请求

func (UpdateTokenReq) Check

func (u UpdateTokenReq) Check() error

Check 检查输入

type UserAPI

type UserAPI struct {
	oklog.Log
	// contains filtered or unexported fields
}

UserAPI 用户相关API

func NewUserAPI

func NewUserAPI(s *Server) *UserAPI

NewUserAPI NewUserAPI

func (*UserAPI) Route

func (u *UserAPI) Route(r *okhttp.OKHttp)

Route 用户相关路由配置

type Varz

type Varz struct {
	ServerID    string  `json:"server_id"`   // 服务端ID
	ServerName  string  `json:"server_name"` // 服务端名称
	Version     string  `json:"version"`     // 服务端版本
	Connections int     `json:"connections"` // 当前连接数量
	Uptime      string  `json:"uptime"`      // 上线时间
	Mem         int64   `json:"mem"`         // 内存
	CPU         float64 `json:"cpu"`         // cpu

	InMsgs      int64 `json:"in_msgs"`      // 流入消息数量
	OutMsgs     int64 `json:"out_msgs"`     // 流出消息数量
	InBytes     int64 `json:"in_bytes"`     // 流入字节数量
	OutBytes    int64 `json:"out_bytes"`    // 流出字节数量
	SlowClients int64 `json:"slow_clients"` // 慢客户端数量
	RetryQueue  int64 `json:"retry_queue"`  // 重试队列数量

	TCPAddr     string `json:"tcp_addr"`     // tcp地址
	WSAddr      string `json:"ws_addr"`      // ws地址
	WSSAddr     string `json:"wss_addr"`     // wss地址
	MonitorAddr string `json:"monitor_addr"` // 监控地址
	MonitorOn   int    `json:"monitor_on"`   // 监控是否开启
	Commit      string `json:"commit"`       // git commit id
	CommitDate  string `json:"commit_date"`  // git commit date
	TreeState   string `json:"tree_state"`   // git tree state
	APIURL      string `json:"api_url"`      // api地址

	ManagerUID     string `json:"manager_uid"`      // 管理员uid
	ManagerTokenOn int    `json:"manager_token_on"` // 管理员token是否开启

	Conns []*ConnInfo `json:"conns,omitempty"` // 连接信息

}

func CreateVarz

func CreateVarz(s *Server) *Varz

type VarzAPI

type VarzAPI struct {
	oklog.Log
	// contains filtered or unexported fields
}

func NewVarzAPI

func NewVarzAPI(s *Server) *VarzAPI

func (*VarzAPI) HandleVarz

func (v *VarzAPI) HandleVarz(c *okhttp.Context)

func (*VarzAPI) Route

func (v *VarzAPI) Route(r *okhttp.OKHttp)

type Webhook

type Webhook struct {
	oklog.Log
	// contains filtered or unexported fields
}

func NewWebhook

func NewWebhook(s *Server) *Webhook

func (*Webhook) Offline

func (w *Webhook) Offline(uid string, deviceFlag okproto.DeviceFlag, id int64, onlineCount int, totalOnlineCount int)

Offline 用户离线 id 为用户在当前系统中的socket id left 为剩余在线数量

func (*Webhook) Online

func (w *Webhook) Online(uid string, deviceFlag okproto.DeviceFlag, id int64, onlineCount int, totalOnlineCount int)

Online 用户在线

func (*Webhook) Start

func (w *Webhook) Start()

func (*Webhook) Stop

func (w *Webhook) Stop()

func (*Webhook) TriggerEvent

func (w *Webhook) TriggerEvent(event *Event)

TriggerEvent 触发事件

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL