cellnet

package
v0.0.0-...-29ffec6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 22, 2017 License: MIT, MIT Imports: 10 Imported by: 0

README

cellnet

Build Status Go Report Card MIT licensed GoDoc

cellnet是一个高性能,简单,方便的开源服务器网络库

自由混合编码,业务代码无需调整。

TCP和html5的应用都可以直接使用cellnet迅速搭建服务器框架。

如果你熟悉Java的Netty或Mina网络库,Handler机制将给予你强大定制功能。

特性

数据协议

基于handler处理链

  • 自定义, 组装收发流程

  • 支持专有日志调试

队列及IO

  • 支持多个队列, 实现单线程/多线程收发处理消息

  • 发送时自动合并封包(性能效果决定于实际请求和发送比例)

RPC

  • 异步/同步远程过程调用

消息日志

  • 可以方便的通过日志查看收发消息的每一个字段消息

第三方库依赖

  • github.com/davyxu/golog

  • github.com/davyxu/goobjfmt

编码包可选支持

  • github.com/golang/protobuf

  • github.com/davyxu/gosproto

websocket可选支持

  • github.com/gorilla/websocket

获取+编译

	go get -u -v github.com/davyxu/cellnet

例子主要采用了protobuf做编码,因此需要安装protobuf支持

	go get -v github.com/golang/protobuf

性能测试

命令行: go test -v github.com/davyxu/cellnet/benchmark/io

平台: Windows 7 x64/CentOS 6.5 x64

测试用例: localhost 1000连接 同时对服务器进行实时PingPong测试

配置1: i7 6700 3.4GHz 8核

IOPS: 12.5w

配置2: i5 4590 3.3GHz 4核

IOPS: 10.1w

例子

Echo



func server() {

	queue := cellnet.NewEventQueue()

	p := socket.NewAcceptor(queue).Start("127.0.0.1:7201")

	cellnet.RegisterMessage(p, "gamedef.TestEchoACK", func(ev *cellnet.Event) {
		msg := ev.Msg.(*gamedef.TestEchoACK)

		log.Debugln("server recv:", msg.Content)

		ev.Send(&gamedef.TestEchoACK{
			Content: msg.String(),
		})

	})

	queue.StartLoop()

}

func client() {

	queue := cellnet.NewEventQueue()

	p := socket.NewConnector(queue).Start("127.0.0.1:7301")

	cellnet.RegisterMessage(p, "gamedef.TestEchoACK", func(ev *cellnet.Event) {
		msg := ev.Msg.(*gamedef.TestEchoACK)

		log.Debugln("client recv:", msg.Content)
	})

	cellnet.RegisterMessage(p, "coredef.SessionConnected", func(ev *cellnet.Event) {

		log.Debugln("client connected")

		ev.Send(&gamedef.TestEchoACK{
			Content: "hello",
		})

	})

	cellnet.RegisterMessage(p, "coredef.SessionConnectFailed", func(ev *cellnet.SessionEvent) {

		msg := ev.Msg.(*coredef.SessionConnectFailed)

		log.Debugln(msg.Reason)

	})

	queue.StartLoop()
}

文件夹功能

benchmark\		    性能测试用例

proto\			    cellnet内部的proto

    binary\         内部系统消息,rpc消息协议

    pb\             使用pb例子的消息

    sproto\         使用sproto例子的消息

protoc-gen-msg\     protobuf的protoc插件, 消息id生成, 使用pb编码时使用

objprotogen\        binary格式的消息绑定工具, 使用binary编码时使用

rpc\			    异步远程过程调用封装

socket\			    套接字,连接管理等封装

example\			测试用例/例子

    classicrecv\    传统的固定消息处理函数例子
	
   	echo_pb\	    基于protobuf和json混合编码的pingpong测试,

   	echo_sproto\	基于sproto编码的pingpong测试,

   	echo_websocket\	基于websocket协议,

   	gracefulexit\	平滑退出

	rpc\		    异步远程过程调用

	sendclose\		发送消息后保证消息送达后再断开连接
	
	timer\		    异步计时器
	

timer\			计时器接口

util\			工具库

FAQ

  • 这个代码的入口在哪里? 怎么编译为exe?

    本代码是一个网络库, 需要根据需求, 整合逻辑

  • 支持WebSocket么?

    支持!

    本网络库的Websocket基于第三方整合, 包格式基于文本: 包名\n+json内容

    参见example/echo_websocket

  • 混合编码有何用途?

    在与多种语言写成的服务器进行通信时, 可以使用不同的编码, 最终在逻辑层都是统一的结构能让逻辑编写更加方便, 无需关注底层处理细节

  • 内建支持的二进制协议能与其他语言写成的网络库互通么?

    完全支持, 但内建二进制协议支持更适合网关与后台服务器. 不建议与客户端通信中使用, 二进制协议不会忽略使用默认值的字段

  • 我能通过Handler处理链进行怎样的扩展?

    封包需要加密, 统计, 预处理时, 可以使用Handler. 每个Handler建议无状态, 需要存储的数据, 可以通过Event中的Tag进行扩展

  • 如何查看Handler处理流程? 在程序启动时, 调用如下代码

    cellnet.EnableHandlerLog = true

可在日志中看到如下日志格式

    [DEBUG] cellnet 2000/00/00 01:02:03 9 Event_Connected [svc->agent] <DecodePacketHandler> SesID: 1 MsgID: 3551021301(coredef.SessionConnected) {} Tag: <nil> TransmitTag: <nil> Raw: (0)[]
9 表示一个Event处理序号, 同一序号表示1个处理流程, 例如1个接收/发送流程

Event_Connected 表示事件名

[svc->agent] 表示peer的名称

<DecodePacketHandler> 表示Handler的名称, 通过反射取得

SesID 表示 会话ID, 由SessionManager分配

MsgID 表示消息号, 后面括号中是对应的消息名, 如果未在系统中注册, 显示为空, 后续是消息内容

TransmitTag, Tag 附属上下文内容

Raw, 表示消息的原始二进制信息
  • 所有的例子都是单线程的, 能编写多线程的逻辑么?

    完全可以, cellnet并没有全局的队列, 只需在Acceptor和Connector创建时, 传入不同的队列, socket收到的消息就会被放到这个队列中 传入空队列时, 使用并发方式(io线程)调用处理回调

  • 消息日志为什么与处理函数日志顺序不统一?

    由于消息日志反应的是收到消息的日志, 因此必须放置在io线程中处理. 而单线程逻辑与io线程分别在不同的线程. 日志顺序错位是正常的 如果需要顺序日志: 可以在进程启动时, 调用runtime.GOMAXPROCS(1), 将go的线程调度默认为1CPU

  • cellnet有网关和db支持么?

    cellnet专注于服务器底层.你可以根据自己需要编写网关及db支持

  • cellnet的私有tcp封包格式是怎样的?

    功能 类型 备注
    序号 uint16 初始为1, 接收一次自增1
    消息ID uint32 包.消息名 的hash值(util.StringHash)
    包体大小 uint32 包体大小
    包体 []byte 包体内容, 长度为包体大小指定, 变长

    封包解析请参考: https://github.com/davyxu/cellnet/blob/master/socket/handler_privatepkt.go

  • 怎样定制私有tcp封包? 使用cellnet.Peer下组合接口的HandlerChainManager.SetReadWriteChain进行设置, 写法如

    self.SetReadWriteChain(func() *cellnet.HandlerChain {
    	return cellnet.NewHandlerChain(
    		cellnet.NewFixedLengthFrameReader(10),
    		NewPrivatePacketReader(),
    	)
    }, func() *cellnet.HandlerChain {
    	return cellnet.NewHandlerChain(NewPrivatePacketWriter(),
    		cellnet.NewFixedLengthFrameWriter(),
    	)
    })
    
    

    HandlerChainManager拥有读写链和收发链, 处理流程如下:

    读链->接收链->逻辑处理->发送链->写链

Handler全图

版本历史

2017.8 v3版本 详细请查看

2017.1 v2版本 详细请查看

2015.8 v1版本

贡献者

viwii(viwii@sina.cn), 提供一个可能造成死锁的bug

IronsDu(https://github.com/IronsDu), 大幅度性能优化

Chris Lonng(https://github.com/lonnng), 提供一个最大封包约束造成服务器间连接断开的bug

备注

感觉不错请star, 谢谢!

博客: http://www.cppblog.com/sunicdavy

知乎: http://www.zhihu.com/people/sunicdavy

提交bug及特性: https://github.com/davyxu/cellnet/issues

Documentation

Index

Constants

View Source
const DefaultQueueSize = 100

Variables

View Source
var EnableHandlerLog bool
View Source
var ErrCodecNotFound = errors.New("codec not found")
View Source
var (
	ErrMessageNotFound = errors.New("msg not exists")
)

Functions

func BlockMessageLog

func BlockMessageLog(msgName string) error

func DecodeMessage

func DecodeMessage(msgid uint32, data []byte) (interface{}, error)

func EncodeMessage

func EncodeMessage(msg interface{}) (data []byte, msgid uint32, err error)

func HandlerChainCall

func HandlerChainCall(hlist []EventHandler, ev *Event)

func HandlerLog

func HandlerLog(h EventHandler, ev *Event)

func HandlerName

func HandlerName(h EventHandler) string

显示handler的名称

func HandlerString

func HandlerString(h EventHandler) string

func IsBlockedMessageByID

func IsBlockedMessageByID(msgid uint32) bool

func MessageFullName

func MessageFullName(rtype reflect.Type) string

消息全名

func MessageNameByID

func MessageNameByID(id uint32) string

根据id查找消息名, 没找到返回空

func MsgLog

func MsgLog(ev *Event)

func RegisterCodec

func RegisterCodec(name string, c Codec)

func RegisterMessageMeta

func RegisterMessageMeta(codecName string, name string, msgType reflect.Type, id uint32)

注册消息元信息(代码生成专用)

func VisitMessageMeta

func VisitMessageMeta(callback func(*MessageMeta))

遍历消息元信息

Types

type CallbackHandler

type CallbackHandler struct {
	// contains filtered or unexported fields
}

func (*CallbackHandler) Call

func (self *CallbackHandler) Call(ev *Event)

type Codec

type Codec interface {
	Encode(interface{}) ([]byte, error)

	Decode([]byte, interface{}) error

	Name() string
}

func FetchCodec

func FetchCodec(name string) Codec

type DecodePacketHandler

type DecodePacketHandler struct {
}

func (*DecodePacketHandler) Call

func (self *DecodePacketHandler) Call(ev *Event)

type EncodePacketHandler

type EncodePacketHandler struct {
}

func (*EncodePacketHandler) Call

func (self *EncodePacketHandler) Call(ev *Event)

type Event

type Event struct {
	UID int64

	Type EventType // 事件类型

	MsgID uint32      // 消息ID
	Msg   interface{} // 消息对象
	Data  []byte      // 消息序列化后的数据

	Tag         interface{} // 事件的连接, 一个处理流程后被Reset
	TransmitTag interface{} // 接收过程可以传递到发送过程, 不会被清空

	Ses       Session       // 会话
	ChainSend *HandlerChain // 发送handler override
	// contains filtered or unexported fields
}

会话事件

func NewEvent

func NewEvent(t EventType, s Session) *Event

func (*Event) Clone

func (self *Event) Clone() *Event

func (*Event) FromMessage

func (self *Event) FromMessage(msg interface{}) *Event

func (*Event) MsgName

func (self *Event) MsgName() string

func (*Event) MsgSize

func (self *Event) MsgSize() int

func (*Event) MsgString

func (self *Event) MsgString() string

func (*Event) Parse

func (self *Event) Parse()

根据消息内容, 自动填充其他部分, 以方便输出日志

func (*Event) PeerName

func (self *Event) PeerName() string

func (*Event) Result

func (self *Event) Result() Result

func (*Event) Send

func (self *Event) Send(data interface{})

兼容普通消息发送和rpc消息返回, 推荐

func (*Event) SessionID

func (self *Event) SessionID() int64

func (*Event) SetResult

func (self *Event) SetResult(r Result)

type EventHandler

type EventHandler interface {
	Call(*Event)
}

func NewCallbackHandler

func NewCallbackHandler(userCallback func(*Event)) EventHandler

func NewFixedLengthFrameReader

func NewFixedLengthFrameReader(size int) EventHandler

func NewFixedLengthFrameWriter

func NewFixedLengthFrameWriter() EventHandler

func NewMatchMsgIDHandler

func NewMatchMsgIDHandler(msgid uint32) EventHandler

func NewQueuePostHandler

func NewQueuePostHandler(q EventQueue, hlist ...EventHandler) EventHandler

func StaticDecodePacketHandler

func StaticDecodePacketHandler() EventHandler

func StaticEncodePacketHandler

func StaticEncodePacketHandler() EventHandler

func StaticMsgLogHandler

func StaticMsgLogHandler() EventHandler

type EventQueue

type EventQueue interface {
	StartLoop()

	StopLoop(result int)

	// 等待退出
	Wait() int

	// 投递事件, 通过队列到达消费者端
	Post(callback func())
}

func NewEventQueue

func NewEventQueue() EventQueue

func NewEventQueueByLen

func NewEventQueueByLen(l int) EventQueue

type EventType

type EventType int32
const (
	Event_None EventType = iota
	Event_Connected
	Event_ConnectFailed
	Event_Accepted
	Event_AcceptFailed
	Event_Closed
	Event_Recv
	Event_Send
)

func (EventType) String

func (self EventType) String() string

type FixedLengthFrameReader

type FixedLengthFrameReader struct {
	// contains filtered or unexported fields
}

func (*FixedLengthFrameReader) Call

func (self *FixedLengthFrameReader) Call(ev *Event)

type FixedLengthFrameWriter

type FixedLengthFrameWriter struct {
	// contains filtered or unexported fields
}

func (*FixedLengthFrameWriter) Call

func (self *FixedLengthFrameWriter) Call(ev *Event)

type HandlerChain

type HandlerChain struct {
	// contains filtered or unexported fields
}

func NewHandlerChain

func NewHandlerChain(objlist ...interface{}) *HandlerChain

func (*HandlerChain) Add

func (self *HandlerChain) Add(h EventHandler)

添加1个

func (*HandlerChain) AddAny

func (self *HandlerChain) AddAny(objlist ...interface{})

启动匹配类型

func (*HandlerChain) AddBatch

func (self *HandlerChain) AddBatch(h ...EventHandler)

添加多个

func (*HandlerChain) Call

func (self *HandlerChain) Call(ev *Event)

func (*HandlerChain) String

func (self *HandlerChain) String() string

type HandlerChainList

type HandlerChainList []*HandlerChain

func (HandlerChainList) Call

func (self HandlerChainList) Call(ev *Event)

func (HandlerChainList) String

func (self HandlerChainList) String() string

type HandlerChainManager

type HandlerChainManager interface {

	// 添加一条接收处理链
	AddChainRecv(recv *HandlerChain) int64

	// 移除接收处理链, 根据添加时的id
	RemoveChainRecv(id int64)

	// 获取当前的处理链(乱序)
	ChainListRecv() HandlerChainList

	// 设置发送处理链
	SetChainSend(chain *HandlerChain)

	// 获取当前发送处理链
	ChainSend() *HandlerChain

	// 读写链
	CreateChainWrite() *HandlerChain
	CreateChainRead() *HandlerChain

	// 设置读写练
	SetReadWriteChain(read, write func() *HandlerChain)
}

type HandlerChainManagerImplement

type HandlerChainManagerImplement struct {
	// contains filtered or unexported fields
}

Peer间的共享数据

func NewHandlerChainManager

func NewHandlerChainManager() *HandlerChainManagerImplement

func (*HandlerChainManagerImplement) AddChainRecv

func (self *HandlerChainManagerImplement) AddChainRecv(recv *HandlerChain) (autoID int64)

func (*HandlerChainManagerImplement) ChainListRecv

func (self *HandlerChainManagerImplement) ChainListRecv() HandlerChainList

func (*HandlerChainManagerImplement) ChainSend

func (self *HandlerChainManagerImplement) ChainSend() *HandlerChain

func (*HandlerChainManagerImplement) ChainString

func (self *HandlerChainManagerImplement) ChainString() string

func (*HandlerChainManagerImplement) CreateChainRead

func (self *HandlerChainManagerImplement) CreateChainRead() *HandlerChain

func (*HandlerChainManagerImplement) CreateChainWrite

func (self *HandlerChainManagerImplement) CreateChainWrite() *HandlerChain

func (*HandlerChainManagerImplement) RemoveChainRecv

func (self *HandlerChainManagerImplement) RemoveChainRecv(id int64)

func (*HandlerChainManagerImplement) SetChainSend

func (self *HandlerChainManagerImplement) SetChainSend(chain *HandlerChain)

func (*HandlerChainManagerImplement) SetReadWriteChain

func (self *HandlerChainManagerImplement) SetReadWriteChain(read, write func() *HandlerChain)

type MatchMsgIDHandler

type MatchMsgIDHandler struct {
	// contains filtered or unexported fields
}

func (*MatchMsgIDHandler) Call

func (self *MatchMsgIDHandler) Call(ev *Event)

func (*MatchMsgIDHandler) String

func (self *MatchMsgIDHandler) String() string

type MessageMeta

type MessageMeta struct {
	Type  reflect.Type
	Name  string
	ID    uint32
	Codec Codec
}

func MessageMetaByID

func MessageMetaByID(id uint32) *MessageMeta

根据id查找消息元信息

func MessageMetaByName

func MessageMetaByName(name string) *MessageMeta

根据名字查找消息元信息

func MessageMetaByType

func MessageMetaByType(t reflect.Type) *MessageMeta

根据类型查找消息元信息

type MsgLogHandler

type MsgLogHandler struct {
}

func (*MsgLogHandler) Call

func (self *MsgLogHandler) Call(ev *Event)

type Peer

type Peer interface {

	// 开启/关闭
	Start(address string) Peer

	Stop()

	Queue() EventQueue

	// 基础信息
	PeerProfile

	// 定制处理链
	HandlerChainManager

	// 会话管理
	SessionAccessor
}

端, Connector或Acceptor

type PeerProfile

type PeerProfile interface {
	// 名字
	SetName(string)
	Name() string

	// 地址
	SetAddress(string)
	Address() string

	// Tag
	SetTag(interface{})
	Tag() interface{}
}

type PeerProfileImplement

type PeerProfileImplement struct {
	// contains filtered or unexported fields
}

Peer间的共享数据

func NewPeerProfile

func NewPeerProfile() *PeerProfileImplement

func (*PeerProfileImplement) Address

func (self *PeerProfileImplement) Address() string

func (*PeerProfileImplement) IsRunning

func (self *PeerProfileImplement) IsRunning() bool

func (*PeerProfileImplement) Name

func (self *PeerProfileImplement) Name() string

func (*PeerProfileImplement) NameOrAddress

func (self *PeerProfileImplement) NameOrAddress() string

func (*PeerProfileImplement) SetAddress

func (self *PeerProfileImplement) SetAddress(address string)

func (*PeerProfileImplement) SetName

func (self *PeerProfileImplement) SetName(name string)

func (*PeerProfileImplement) SetRunning

func (self *PeerProfileImplement) SetRunning(v bool)

func (*PeerProfileImplement) SetTag

func (self *PeerProfileImplement) SetTag(tag interface{})

func (*PeerProfileImplement) Tag

func (self *PeerProfileImplement) Tag() interface{}

type QueuePostHandler

type QueuePostHandler struct {
	// contains filtered or unexported fields
}

func (*QueuePostHandler) Call

func (self *QueuePostHandler) Call(ev *Event)

type RegisterMessageContext

type RegisterMessageContext struct {
	*MessageMeta
}

func RegisterHandler

func RegisterHandler(p Peer, msgName string, handlers ...EventHandler) *RegisterMessageContext

注册消息处理的一系列Handler, 当有队列时, 投放到队列

func RegisterMessage

func RegisterMessage(p Peer, msgName string, userCallback func(*Event)) *RegisterMessageContext

注册消息处理回调

func RegisterRawHandler

func RegisterRawHandler(p Peer, msgName string, handlers ...EventHandler) *RegisterMessageContext

直接注册回调

type Result

type Result int32
const (
	Result_OK            Result = iota
	Result_SocketError          // 网络错误
	Result_SocketTimeout        // Socket超时
	Result_PackageCrack         // 封包破损
	Result_CodecError
	Result_RequestClose // 请求关闭
	Result_NextChain
)

type Session

type Session interface {

	// 发包
	Send(interface{})

	// 直接发送封包
	RawSend(*Event)

	// 断开
	Close()

	// 标示ID
	ID() int64

	// 归属端
	FromPeer() Peer

	SetTag(tag interface{})

	Tag() interface{}
}

会话

type SessionAccessor

type SessionAccessor interface {

	// 获取一个连接
	GetSession(int64) Session

	// 遍历连接
	VisitSession(func(Session) bool)

	// 连接数量
	SessionCount() int

	// 关闭所有连接
	CloseAllSession()
}

会话访问

type SessionManager

type SessionManager interface {
	SessionAccessor

	Add(Session)
	Remove(Session)
}

完整功能的会话管理

func NewSessionManager

func NewSessionManager() SessionManager

type SessionManagerImplement

type SessionManagerImplement struct {
	// contains filtered or unexported fields
}

func (*SessionManagerImplement) Add

func (self *SessionManagerImplement) Add(ses Session)

func (*SessionManagerImplement) CloseAllSession

func (self *SessionManagerImplement) CloseAllSession()

func (*SessionManagerImplement) GetSession

func (self *SessionManagerImplement) GetSession(id int64) Session

获得一个连接

func (*SessionManagerImplement) Remove

func (self *SessionManagerImplement) Remove(ses Session)

func (*SessionManagerImplement) SessionCount

func (self *SessionManagerImplement) SessionCount() int

func (*SessionManagerImplement) VisitSession

func (self *SessionManagerImplement) VisitSession(callback func(Session) bool)

Directories

Path Synopsis
codec
pb
proto
binary/coredef
Generated by github.com/davyxu/cellnet/objprotogen DO NOT EDIT!
Generated by github.com/davyxu/cellnet/objprotogen DO NOT EDIT!
pb/gamedef
Package gamedef is a generated protocol buffer package.
Package gamedef is a generated protocol buffer package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL