xserver

package
v1.2.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 14, 2023 License: GPL-2.0, GPL-2.0 Imports: 41 Imported by: 0

Documentation

Overview

提供了服务注册发现、服务互联互通、线路负载均衡、业务逻辑承载等功能.

Index

Constants

View Source
const (
	CONSUL_RESP_OK    = "ok"     // Consul响应200
	CONSUL_CHECK_PATH = "/check" // Consul心跳检测
)
View Source
const (
	LAN_CIN_MAX_FRAME  = 50000 // 最大输入网络帧数
	LAN_COUT_MAX_FRAME = 50000 // 最大输出网络帧数
)
View Source
const (
	EVT_SERVER_STARTED = -1 // 服务就绪(配置就绪 & 日志就绪 & DB就绪 & Redis就绪 & Lan就绪)
	EVT_SERVER_CHANGED = -2 // 服务变更(参数类型:[]interface{}{added map[string][]string, removed map[string][]string})
	EVT_SERVER_PREQUIT = -3 // 服务即将退出
)
View Source
const (
	SERVER_SLEEP time.Duration = 10 * time.Millisecond // 帧刷新间隔
)
View Source
const (
	UPDATE_SLEEP time.Duration = 10 * time.Millisecond // 帧刷新间隔
)

Variables

View Source
var (
	CslClt *consulapi.Client // Consul连接
	CslCfg *consulapi.Config // Consul配置
)
View Source
var (
	GMsg = xevt.NewEvtMgr(true)  // Msg消息中心
	GRpc = xevt.NewEvtMgr(false) // Rpc消息中心
	GCgi = xevt.NewEvtMgr(false) // Cgi消息中心
	GEvt = xevt.NewEvtMgr(true)  // Evt消息中心
)
View Source
var (
	ERR_SEND_CHAN_FULL  = errors.New("send chan is full")
	ERR_NO_ROUTE_FOUND  = errors.New("no route found")
	ERR_RPC_TIMEOUT     = errors.New("rpc call timeout")
	ERR_CGI_TIMEOUT     = errors.New("cgi call timeout")
	ERR_RPC_INTERRUPTED = errors.New("rpc call has been interrupted, see log context for more details.")
	ERR_CGI_INTERRUPTED = errors.New("cgi call has been interrupted, see log context for more details.")
)
View Source
var (
	GWrap   *Wrap   // 服务封装器
	GServer IServer // 全局服务
)
View Source
var CGIROUTEMAP map[int]*CgiRoute // cgi路由
View Source
var MSGROUTEMAP map[int]*MsgRoute // msg路由
View Source
var (
	MainTID int64 = -1 // 主线程ID
)
View Source
var RPCROUTEMAP map[int]*RpcRoute // rpc路由

Functions

func BackupLan

func BackupLan() int

线路备份

func ClearInterval

func ClearInterval(id int, tid ...int64)

取消间歇调用(务必在逻辑线程中调用或指定线程ID)

id: 定时器ID
tid: 线程ID

func ClearTimeout

func ClearTimeout(id int, tid ...int64)

取消超时调用(务必在逻辑线程中调用或指定线程ID)

id: 定时器ID
tid: 线程ID

func CloseLan

func CloseLan()

关闭线路

func MonitorLan

func MonitorLan()

监控线路

func NotifyCgi

func NotifyCgi(id int, creq *xproto.CgiReq, cresp *xproto.CgiResp) bool

广播Cgi消息(用于客户端和服务器之间交互)(全局)

id: 消息ID
creq: 消息请求
cresp: 消息响应

func NotifyEvt

func NotifyEvt(id int, param interface{}) bool

广播Evt消息(用于服务器内部)(全局)

id: 消息ID
param: 透传参数

func NotifyMsg

func NotifyMsg(id int, mreq *xproto.MsgReq) bool

广播Msg消息(用于客户端和服务器之间交互)(全局)

id: 消息ID
mreq: 消息对象

func NotifyRpc

func NotifyRpc(id int, rreq *xproto.RpcReq, rresp *xproto.RpcResp) bool

广播Rpc消息(用于服务器之间交互)(全局)

id: 消息ID
rreq: 消息请求
rresp: 消息响应

func PauseLan

func PauseLan()

暂停线路

func PostKV

func PostKV(key string, value string, version string, block ...bool) bool

推送KV(键值对)至Consul Storage

key: 键
value: 值
version: 版本号,以'VERSION_'为前缀
block-是否阻塞

func PullKV

func PullKV(key string) []byte

从Consul Storage中拉取KV(键值对)(阻塞)

key: 键

func RecvLan

func RecvLan()

线路接收

func RegCgi

func RegCgi(id int, fun func(creq *xproto.CgiReq, cresp *xproto.CgiResp)) int

注册Cgi消息(用于客户端和服务器之间交互)(全局)

id:	消息ID
fun: 消息回调

func RegCgiRoute

func RegCgiRoute(_map map[int]*CgiRoute)

注册Cgi路由

func RegEvt

func RegEvt(id int, fun func(interface{})) int

注册Evt消息(用于服务器内部)(全局)

id:	消息ID
fun: 消息回调

func RegMsg

func RegMsg(id int, fun func(*xproto.MsgReq)) int

注册Msg消息(用于客户端和服务器之间交互)(全局)

id:	消息ID
fun: 消息回调

func RegMsgRoute

func RegMsgRoute(_map map[int]*MsgRoute)

注册Msg路由

func RegRpc

func RegRpc(id int, fun func(rreq *xproto.RpcReq, rresp *xproto.RpcResp)) int

注册Rpc消息(用于服务器之间交互)(全局)

id:	消息ID
fun: 消息回调

func RegRpcRoute

func RegRpcRoute(_map map[int]*RpcRoute)

注册Rpc路由

func RestoreLan

func RestoreLan()

线路恢复

func ResumeLan

func ResumeLan()

恢复线路

func RunIn

func RunIn(tid int64, fun func()) chan bool

在指定逻辑线程中调用(返回的chan可用于阻塞当前线程)

tid: 线程ID
fun: 回调函数

func RunInMain

func RunInMain(fun func()) chan bool

在逻辑主线程中调用(返回的chan可用于阻塞当前线程)

fun: 回调函数

func SendAsync

func SendAsync(id int, uid int, req proto.Message, addr string, callback func(frame *xproto.RpcResp, err error), offsetAndTimeout ...int)

发送Rpc消息(异步)

id: 消息ID
uid: 用户ID(负载均衡)
req: 请求结构体
addr: 目标服务器
callback: 回调函数
offset: 目标协程ID偏移(基于protocol中定义)
timeout: 超时时长

func SendCgi

func SendCgi(id int, uid int, req *http.Request, addr string, timeout ...int) (cresp *xproto.CgiResp, err error)

发送Cgi消息(同步,否则ResponseWriter无法输出)

id: 消息ID
uid: 用户ID(负载均衡)
req: 请求结构体
addr: 目标服务器
timeout: 超时时长

func SendFrame

func SendFrame(frame xproto.IFrame) bool

发送网络帧(根据UID负载均衡)

frame: 网络帧

func SendMsg

func SendMsg(id int, msg proto.Message, mreq *xproto.MsgReq) bool

发送Msg消息

id: 消息ID
msg: 结构体
mreq: msg帧

func SendSync

func SendSync(id int, uid int, req proto.Message, resp proto.Message, addr string, offsetAndTimeout ...int) error

发送Rpc消息(同步)

id: 消息ID
uid: 用户ID(负载均衡)
req: 请求结构体
resp: 返回结构体
addr: 目标服务器
offset: 目标协程偏移(基于protocol中定义)
timeout: 超时时长

func Start

func Start(server IServer)

启动

server: 服务对象

func StartLan

func StartLan(lanCfg *LanCfg, handleMsg func(*xproto.MsgReq),
	handleRpc func(*xproto.RpcReq, *xproto.RpcResp),
	handleCgi func(*xproto.CgiReq, *xproto.CgiResp))

启动线路

lancfg: 线路配置
handleMsg: 消息处理函数

func Stop

func Stop()

停止

func SubKV

func SubKV(key string, interval int, onUpdate func(data []byte))

订阅Consul Storage中的KV(键值对)(阻塞)

key: 键(注意订阅的Key需要设置版本,以'VERSION_'为前缀)
interval: 间歇时间

func UnregCgi

func UnregCgi(id int, hid int) bool

注销Cgi消息(用于客户端和服务器之间交互)(全局)

id: 消息ID
hid: 句柄ID

func UnregEvt

func UnregEvt(id int, hid int) bool

注销Evt消息(用于服务器内部)(全局)

id: 消息ID
hid: 句柄ID

func UnregMsg

func UnregMsg(id int, hid int) bool

注销Msg消息(用于客户端和服务器之间交互)(全局)

id: 消息ID
hid: 句柄ID

func UnregRpc

func UnregRpc(id int, hid int) bool

注销Rpc消息(用于服务器之间交互)(全局)

id: 消息ID
hid: 句柄ID

func WatchSignal

func WatchSignal() <-chan string

Types

type CgiFunc

type CgiFunc func(req *xproto.CgiReq, resp *xproto.CgiResp)

Cgi函数类型

func (CgiFunc) Handle

func (this CgiFunc) Handle(reply *xevt.EvtReply, req interface{}, resp interface{})

处理回调

reply: 响应对象
param1: 参数1
param2: 参数2

type CgiRoute

type CgiRoute struct {
	Route
	Method  []string // 请求方式
	Timeout int      // 超时时间
}

Cgi路由

type EvtFunc

type EvtFunc func(interface{})

Evt函数类型

func (EvtFunc) Handle

func (this EvtFunc) Handle(reply *xevt.EvtReply, param1 interface{}, param2 interface{})

处理回调

reply: 响应对象
param1: 参数1
param2: 参数2

type IServer

type IServer interface {
	Init() bool                                         // 初始化
	Start()                                             // 服务启动
	Update(delta float32)                               // 服务循环
	Destroy()                                           // 服务结束
	PreQuit()                                           // 服务即将退出
	Name() string                                       // 服务名称
	InitConfig() bool                                   // 读取配置
	GetConfig() *SvrCfg                                 // 获取配置
	GetFPS() int                                        // 获取帧率
	UpdateTitle() string                                // 更新标题
	GetTitle() string                                   // 获取标题
	RecvMsg(mreq *xproto.MsgReq)                        // 接收Msg消息
	RecvRpc(rreq *xproto.RpcReq, rresp *xproto.RpcResp) // 接收Rpc消息
	RecvCgi(creq *xproto.CgiReq, cresp *xproto.CgiResp) // 接收Cgi消息
}

服务接口

type LanCfg

type LanCfg struct {
	Name     string // 名称
	Addr     string // tcp://$ip:$port
	Raw      string // $ip:$port
	IP       string // IP
	Port     int    // 端口
	GO       int    // 逻辑线程数
	MaxRx    int    // 最大接收字节数(KB)
	MsgProto string // msg消息协议类型,可选pb/json,默认pb
	CgiProto string // cgi消息协议类型,可选pb/json,默认json
}

线路配置

func NewLanCfg

func NewLanCfg(name, addr string) *LanCfg

创建线路配置

name: 线路名称
addr: 线路地址

func (*LanCfg) ServerID

func (this *LanCfg) ServerID() string

服务器ID($name@tcp://$ip:$port)

type LanClt

type LanClt struct {
	*LanCfg
	Sockets []mangos.Socket // Socket连接
}

线路连接

func NewLanClt

func NewLanClt(cfg *LanCfg) *LanClt

新建线路连接

func (*LanClt) Close

func (this *LanClt) Close()

关闭连接

func (*LanClt) Send

func (this *LanClt) Send(bytes []byte, idx int) error

发送数据

bytes: 数据
idx: 连接索引

type LanSvr

type LanSvr struct {
	*LanCfg
	mangos.Socket
	Clients  sync.Map // 连接池(map[string][]*LanClt)
	ClientID sync.Map // 连接映射(map[string]*LanClt)
	SClosed  bool     // 是否关闭
}

线路服务

var (
	GLan  *LanSvr // 全局线路服务
	GProc []*Proc // 全局业务处理器
)

func NewLanSvr

func NewLanSvr(cfg *LanCfg) *LanSvr

新建线路服务

cfg: 线路配置

func (*LanSvr) Close

func (this *LanSvr) Close()

线路关闭

func (*LanSvr) Recv

func (this *LanSvr) Recv() ([]byte, error)

线路接收

func (*LanSvr) SelectAll

func (this *LanSvr) SelectAll(svr string) []*LanClt

选择所有指定类型的线路

svr: 服务类型

func (*LanSvr) SelectRand

func (this *LanSvr) SelectRand(svr string) *LanClt

随机选择指定类型的线路

svr: 服务类型

func (*LanSvr) SendData

func (this *LanSvr) SendData(svr string, bytes []byte, idx int) error

发送数据

svr: 服务类型
bytes: 数据
idx: 连接索引

func (*LanSvr) Update

func (this *LanSvr) Update(smap map[string][]string)

路由更新

smap: 路由表

type MsgFunc

type MsgFunc func(*xproto.MsgReq)

Msg函数类型

func (MsgFunc) Handle

func (this MsgFunc) Handle(reply *xevt.EvtReply, param1 interface{}, param2 interface{})

处理回调

reply: 响应对象
param1: 参数1
param2: 参数2

type MsgRoute

type MsgRoute struct {
	Route
}

Msg路由

type Proc

type Proc struct {
	TID   int64              // 线路的GoID
	Num   int                // 线路线程总数
	CIN   chan xproto.IFrame // 输入队列
	COUT  chan xproto.IFrame // 输出队列
	Loop  bool               // 循环标识
	Pause bool               // 暂停标识
	Resp  sync.Map           // map[int64]chan *xproto.RpcReq/*xproto.CgiFrame

}

业务处理器

func NewProc

func NewProc() *Proc

新建业务处理器

func (*Proc) MaxID

func (this *Proc) MaxID() int64

自增ID

func (*Proc) PopCIN

func (this *Proc) PopCIN() (xproto.IFrame, bool)

弹出第一个输入网络帧

func (*Proc) PushCIN

func (this *Proc) PushCIN(frame xproto.IFrame) bool

压入一个输入网络帧

frame: 网络帧

type Route

type Route struct {
	ID   int      // 路由ID
	Name string   // 路由名称
	GoL  int      // 协程ID(左)
	GoR  int      //协程ID(右)
	RW   bool     // 可读可写(默认true)
	Log  int      // 日志层级(参考xlog的LogLevel)
	Dst  []string // 目标
}

路由信息

func (*Route) GetLog

func (this *Route) GetLog() int

获取日志层级,若未指定则使用全局日志层级

type RpcFunc

type RpcFunc func(rreq *xproto.RpcReq, rresp *xproto.RpcResp)

Rpc函数类型

func (RpcFunc) Handle

func (this RpcFunc) Handle(reply *xevt.EvtReply, param1 interface{}, param2 interface{})

处理回调

reply: 响应对象
param1: 参数1
param2: 参数2

type RpcRoute

type RpcRoute struct {
	Route
}

Rpc路由

type Server

type Server struct {
	xobj.OBJECT
	REAL   IServer
	Config *SvrCfg // 配置信息
	FPS    int     // 应用帧率
	Title  string  // 应用标题
}

服务对象

func (*Server) CTOR

func (this *Server) CTOR(CHILD interface{})

构造函数

func (*Server) Destroy

func (this *Server) Destroy()

服务结束

func (*Server) GetConfig

func (this *Server) GetConfig() *SvrCfg

获取配置

func (*Server) GetFPS added in v1.2.0

func (this *Server) GetFPS() int

获取帧率

func (*Server) GetTitle added in v1.2.0

func (this *Server) GetTitle() string

获取标题

func (*Server) Init

func (_this *Server) Init() bool

初始化

func (*Server) InitConfig

func (this *Server) InitConfig() bool

读取配置

func (*Server) Name

func (this *Server) Name() string

服务名称

func (*Server) PreQuit

func (_this *Server) PreQuit()

func (*Server) RecvCgi

func (this *Server) RecvCgi(rreq *xproto.CgiReq, rresp *xproto.CgiResp)

接收Cgi消息

func (*Server) RecvMsg

func (this *Server) RecvMsg(mreq *xproto.MsgReq)

func (*Server) RecvRpc

func (this *Server) RecvRpc(rreq *xproto.RpcReq, rresp *xproto.RpcResp)

接收Rpc消息

func (*Server) Start

func (this *Server) Start()

服务启动

func (*Server) Update

func (this *Server) Update(delta float32)

func (*Server) UpdateTitle

func (this *Server) UpdateTitle() string

type SvrCfg

type SvrCfg struct {
	Raw              xconfig.Configer
	Env              string  // 环境标识: 测试,内测,生产
	LanCfg           *LanCfg // 线路配置
	LinkServer       string  // 需要连接的内部服务器
	ConsulAddr       string  // Consul中心地址
	ConsulHttp       string  // Consul检测地址
	ConsulTimeout    string  // Consul超时时间
	ConsulInterval   string  // Consul访问间隔
	ConsulDeregister string  // Consul延迟注销
}

服务配置

func (*SvrCfg) Init

func (this *SvrCfg) Init(config string) bool

初始化

config: 配置内容

func (*SvrCfg) IsDebug added in v1.2.2

func (this *SvrCfg) IsDebug() bool

是否调试环境

func (*SvrCfg) SvrID

func (this *SvrCfg) SvrID() string

服务ID

func (*SvrCfg) SvrName

func (this *SvrCfg) SvrName() string

服务名称

type TimerEntity

type TimerEntity struct {
	ID      int         // 定时器ID
	Func    func()      // 定时器回调
	Time    int         // 定时时间
	RawTime int         // 初始时间
	Repeat  bool        // 循环调用
	Crash   bool        // 是否崩溃
	RW      bool        // 是否读写
	Tag     interface{} // 日志标签
	Log     int         // 日志层级
}

定时器对象

func RunInNext

func RunInNext(fun func()) *TimerEntity

在当前逻辑线程中的下一帧调用

fun: 回调函数

func SetInterval

func SetInterval(fun func(), interval float32, tid ...int64) *TimerEntity

设置间歇调用(务必在逻辑线程中调用或指定线程ID)

fun: 回调函数
interval: 间歇时间(秒)
tid: 线程ID

func SetTimeout

func SetTimeout(fun func(), timeout float32, tid ...int64) *TimerEntity

设置超时调用(务必在逻辑线程中调用或指定线程ID)

fun: 回调函数
timeout: 超时时间(秒)
tid: 线程ID

func (*TimerEntity) SetLog

func (this *TimerEntity) SetLog(log int) *TimerEntity

设置会话的日志层级

func (*TimerEntity) SetRW

func (this *TimerEntity) SetRW(sig bool) *TimerEntity

设置会话的可读性(默认为可读可写)

func (*TimerEntity) SetTag

func (this *TimerEntity) SetTag(tag interface{}) *TimerEntity

设置会话的标签

type TimerRecord

type TimerRecord struct {
	Timers   sync.Map // 定时器映射
	TimerID  int64    // 定时器ID
	LastTime int      // 上次时间
}

定时器句柄

func (*TimerRecord) MaxID

func (this *TimerRecord) MaxID() int64

自增ID

type Wrap

type Wrap struct {
	Svr    IServer
	ChQuit chan bool // 阻塞chan
}

服务封装器

func NewWrap

func NewWrap(server IServer) *Wrap

新建服务封装器

server: 服务对象

func (*Wrap) Destroy

func (this *Wrap) Destroy()

销毁

func (*Wrap) Init

func (this *Wrap) Init() bool

初始化

func (*Wrap) Run

func (this *Wrap) Run()

运行

func (*Wrap) Stop

func (this *Wrap) Stop()

停止

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL