Documentation ¶
Index ¶
- Constants
- Variables
- type HttpBackend
- func NewHttpBackendWithConsul[T any](consulAddr, tag string, event HttpEvent[T]) (*HttpBackend[T], error)
- func NewHttpBackendWithGoRedis[T any](cfg *goredis.Config, key string, serverNames []string, event HttpEvent[T]) (*HttpBackend[T], error)
- func NewHttpBackendWithRedis[T any](cfg *redis.Config, key string, serverNames []string, event HttpEvent[T]) (*HttpBackend[T], error)
- func (hb *HttpBackend[T]) GetGroup(serviceName string) *HttpGroup[T]
- func (hb *HttpBackend[T]) GetGroups() map[string]*HttpGroup[T]
- func (hb *HttpBackend[T]) GetService(serviceName, serviceId string) *HttpService[T]
- func (hb *HttpBackend[T]) GetServiceByHash(serviceName, hash string) *HttpService[T]
- func (hb *HttpBackend[T]) GetServiceByTagAndHash(serviceName, tag, hash string) *HttpService[T]
- func (hb *HttpBackend[T]) RegHook(h HttpHook[T])
- type HttpEvent
- type HttpEventHandler
- type HttpGroup
- type HttpHook
- type HttpService
- type ServiceConfig
- type ServiceIdConfMap
- type ServiceNameConfMap
- type TCPHook
- type TcpBackend
- func NewTcpBackendWithConsul[T any](consulAddr, tag string, event TcpEvent[T]) (*TcpBackend[T], error)
- func NewTcpBackendWithGoRedis[T any](cfg *goredis.Config, key string, serverNames []string, event TcpEvent[T]) (*TcpBackend[T], error)
- func NewTcpBackendWithRedis[T any](cfg *redis.Config, key string, serverNames []string, event TcpEvent[T]) (*TcpBackend[T], error)
- func (tb *TcpBackend[T]) Broad(ctx context.Context, buf []byte)
- func (tb *TcpBackend[T]) BroadGroup(ctx context.Context, serviceName string, buf []byte)
- func (tb *TcpBackend[T]) BroadGroupMsg(ctx context.Context, serviceName string, msg utils.SendMsger)
- func (tb *TcpBackend[T]) BroadMsg(ctx context.Context, msg utils.SendMsger)
- func (tb *TcpBackend[T]) BroadMsgByHash(ctx context.Context, hash string, msg utils.SendMsger)
- func (tb *TcpBackend[T]) BroadMsgByTagAndHash(ctx context.Context, tag, hash string, msg utils.SendMsger)
- func (tb *TcpBackend[T]) GetGroup(serviceName string) *TcpGroup[T]
- func (tb *TcpBackend[T]) GetGroups() map[string]*TcpGroup[T]
- func (tb *TcpBackend[T]) GetService(serviceName, serviceId string) *TcpService[T]
- func (tb *TcpBackend[T]) GetServiceByHash(serviceName, hash string) *TcpService[T]
- func (tb *TcpBackend[T]) GetServiceByTagAndHash(serviceName, tag, hash string) *TcpService[T]
- func (tb *TcpBackend[T]) RegHook(h TCPHook[T])
- func (tb *TcpBackend[T]) Send(ctx context.Context, serviceName string, serviceId string, buf []byte) error
- func (tb *TcpBackend[T]) SendByHash(ctx context.Context, serviceName, hash string, buf []byte) error
- func (tb *TcpBackend[T]) SendByTagAndHash(ctx context.Context, serviceName, tag, hash string, buf []byte) error
- func (tb *TcpBackend[T]) SendMsg(ctx context.Context, serviceName string, serviceId string, msg utils.SendMsger) error
- func (tb *TcpBackend[T]) SendMsgByHash(ctx context.Context, serviceName string, hash string, msg utils.SendMsger) error
- func (tb *TcpBackend[T]) SendMsgByTagAndHash(ctx context.Context, serviceName, tag, hash string, msg utils.SendMsger) error
- type TcpEvent
- type TcpEventHandler
- func (*TcpEventHandler[T]) CheckRPCResp(msg interface{}) interface{}
- func (*TcpEventHandler[T]) ConsulFilter(confs []*consul.RegistryInfo) []*ServiceConfig
- func (h *TcpEventHandler[T]) Context(parent context.Context, msg interface{}) context.Context
- func (*TcpEventHandler[T]) DecodeMsg(ctx context.Context, data []byte, ts *TcpService[T]) (interface{}, int, error)
- func (*TcpEventHandler[T]) GoRedisFilter(confs []*goredis.RegistryInfo) []*ServiceConfig
- func (*TcpEventHandler[T]) OnConnected(ctx context.Context, ts *TcpService[T])
- func (*TcpEventHandler[T]) OnDisConnect(ctx context.Context, ts *TcpService[T])
- func (*TcpEventHandler[T]) OnMsg(ctx context.Context, msg interface{}, ts *TcpService[T])
- func (*TcpEventHandler[T]) OnTick(ctx context.Context, ts *TcpService[T])
- func (*TcpEventHandler[T]) RedisFilter(confs []*redis.RegistryInfo) []*ServiceConfig
- type TcpGroup
- func (g *TcpGroup[T]) Broad(ctx context.Context, buf []byte)
- func (g *TcpGroup[T]) BroadMsg(ctx context.Context, msg utils.SendMsger)
- func (g *TcpGroup[T]) GetService(serviceId string) *TcpService[T]
- func (g *TcpGroup[T]) GetServiceByHash(hash string) *TcpService[T]
- func (g *TcpGroup[T]) GetServiceByTagAndHash(tag, hash string) *TcpService[T]
- func (g *TcpGroup[T]) GetServices() map[string]*TcpService[T]
- type TcpParamConfig
- type TcpService
- func (ts *TcpService[T]) Conf() *ServiceConfig
- func (ts *TcpService[T]) Conn() *tcp.TCPConn
- func (ts *TcpService[T]) ConnName() string
- func (ts *TcpService[T]) Info() *T
- func (ts *TcpService[T]) OnClose(tc *tcp.TCPConn)
- func (ts *TcpService[T]) OnDialFail(err error, t *tcp.TCPConn) error
- func (ts *TcpService[T]) OnDialSuccess(t *tcp.TCPConn)
- func (ts *TcpService[T]) OnDisConnect(err error, t *tcp.TCPConn) error
- func (ts *TcpService[T]) OnRecv(data []byte, tc *tcp.TCPConn) (int, error)
- func (ts *TcpService[T]) OnSend(data []byte, tc *tcp.TCPConn) ([]byte, error)
- func (ts *TcpService[T]) Send(ctx context.Context, data []byte) error
- func (ts *TcpService[T]) SendMsg(ctx context.Context, msg utils.SendMsger) error
- func (ts *TcpService[T]) SendRPCMsg(ctx context.Context, rpcId interface{}, msg utils.SendMsger, ...) (interface{}, error)
- func (ts *TcpService[T]) ServiceId() string
- func (ts *TcpService[T]) ServiceName() string
Constants ¶
const CtxKey_scheme = utils.CtxKey("scheme")
Variables ¶
var TcpParamConf loader.JsonLoader[TcpParamConfig]
Functions ¶
This section is empty.
Types ¶
type HttpBackend ¶
type HttpBackend[T any] struct { sync.RWMutex // 注意只保护group的变化 不要保护group内的操作 // contains filtered or unexported fields }
T是和业务相关的客户端信息结构 透传给HttpService
func NewHttpBackendWithConsul ¶
func NewHttpBackendWithConsul[T any](consulAddr, tag string, event HttpEvent[T]) (*HttpBackend[T], error)
创建HttpBackend,使用Redis做服务器发现
func NewHttpBackendWithRedis ¶
func NewHttpBackendWithRedis[T any](cfg *redis.Config, key string, serverNames []string, event HttpEvent[T]) (*HttpBackend[T], error)
创建HttpBackend,使用Redis做服务器发现 key 表示服务器发现的key serverName 表示监听哪些服务器 为空表示监听全部的服务器
func (*HttpBackend[T]) GetGroup ¶
func (hb *HttpBackend[T]) GetGroup(serviceName string) *HttpGroup[T]
func (*HttpBackend[T]) GetGroups ¶
func (hb *HttpBackend[T]) GetGroups() map[string]*HttpGroup[T]
func (*HttpBackend[T]) GetService ¶
func (hb *HttpBackend[T]) GetService(serviceName, serviceId string) *HttpService[T]
func (*HttpBackend[T]) GetServiceByHash ¶
func (hb *HttpBackend[T]) GetServiceByHash(serviceName, hash string) *HttpService[T]
根据哈希环获取,哈希环行记录的都是状态测试健康的
func (*HttpBackend[T]) GetServiceByTagAndHash ¶
func (hb *HttpBackend[T]) GetServiceByTagAndHash(serviceName, tag, hash string) *HttpService[T]
根据哈希环获取,哈希环行记录的都是状态测试健康的
type HttpEvent ¶
type HttpEvent[T any] interface { // consul服务器配置过滤器,返回符合条件的服务器 ConsulFilter(confs []*consul.RegistryInfo) []*ServiceConfig // redis服务器配置过滤器,返回符合条件的服务器 RedisFilter(confs []*redis.RegistryInfo) []*ServiceConfig // goredis服务器配置过滤器,返回符合条件的服务器 GoRedisFilter(confs []*goredis.RegistryInfo) []*ServiceConfig }
type HttpEventHandler ¶
type HttpEventHandler[T any] struct { }
HttpEventHandler HttpEvent的内置实现 如果不想实现HttpEvent的所有接口,可以继承它实现部分方法
func (*HttpEventHandler[T]) ConsulFilter ¶
func (*HttpEventHandler[T]) ConsulFilter(confs []*consul.RegistryInfo) []*ServiceConfig
func (*HttpEventHandler[T]) GoRedisFilter ¶
func (*HttpEventHandler[T]) GoRedisFilter(confs []*goredis.RegistryInfo) []*ServiceConfig
func (*HttpEventHandler[T]) RedisFilter ¶
func (*HttpEventHandler[T]) RedisFilter(confs []*redis.RegistryInfo) []*ServiceConfig
type HttpGroup ¶
功能相同的一组服务器 T是和业务相关的客户端信息结构 透传给HttpService
func NewHttpGroup ¶
func NewHttpGroup[T any](hb *HttpBackend[T]) *HttpGroup[T]
func (*HttpGroup[T]) GetService ¶
func (g *HttpGroup[T]) GetService(serviceId string) *HttpService[T]
func (*HttpGroup[T]) GetServiceByHash ¶
func (g *HttpGroup[T]) GetServiceByHash(hash string) *HttpService[T]
根据哈希环获取对象 hash可以用用户id或者其他稳定的数据
func (*HttpGroup[T]) GetServiceByTagAndHash ¶
func (g *HttpGroup[T]) GetServiceByTagAndHash(tag, hash string) *HttpService[T]
根据tag和哈希环获取对象 hash可以用用户id或者其他稳定的数据
func (*HttpGroup[T]) GetServices ¶
func (g *HttpGroup[T]) GetServices() map[string]*HttpService[T]
type HttpHook ¶
type HttpHook[T any] interface { // 添加服务器 OnAdd(ts *HttpService[T]) // 移除一个服务器,彻底移除 OnRemove(ts *HttpService[T]) // 连接成功 OnConnected(ts *HttpService[T]) // 连接掉线 OnDisConnect(ts *HttpService[T]) }
Hook
type HttpService ¶
type HttpService[T any] struct { // contains filtered or unexported fields }
T是和业务相关的客户端信息结构
func NewHttpService ¶
func NewHttpService[T any](conf *ServiceConfig, g *HttpGroup[T]) (*HttpService[T], error)
func (*HttpService[T]) Address ¶
func (hs *HttpService[T]) Address() string
func (*HttpService[T]) Info ¶
func (hs *HttpService[T]) Info() *T
func (*HttpService[T]) ServiceId ¶
func (hs *HttpService[T]) ServiceId() string
func (*HttpService[T]) ServiceName ¶
func (hs *HttpService[T]) ServiceName() string
type ServiceConfig ¶
type ServiceConfig struct { // 要求字符串类型的字段小写且去掉前后空格 ServiceName string `json:"servicename,omitempty"` // 服务器类型名,用来分组【内部会转化成去空格的小写】 ServiceId string `json:"serviceid,omitempty"` // 服务器唯一ID【内部会转化成去空格的小写】 ServiceAddr string `json:"serviceaddr,omitempty"` // 服务器对外暴露的地址 ServicePort int `json:"serviceport,omitempty"` // 服务器对外暴露的端口 RoutingTag string `json:"routertag,omitempty"` // 支持路由tag group内再次进行分组【内部会转化成去空格的小写】 }
服务配置
type ServiceIdConfMap ¶
type ServiceIdConfMap = map[string]*ServiceConfig
type ServiceNameConfMap ¶
type ServiceNameConfMap = map[string]ServiceIdConfMap
type TCPHook ¶
type TCPHook[T any] interface { // 添加服务器 OnAdd(ts *TcpService[T]) // 移除一个服务器,彻底移除 OnRemove(ts *TcpService[T]) // 连接成功 OnConnected(ts *TcpService[T]) // 连接掉线 OnDisConnect(ts *TcpService[T]) // 发送数据 OnSend(ts *TcpService[T], len int) // 接受数据 OnRecv(ts *TcpService[T], len int) }
Hook
type TcpBackend ¶
type TcpBackend[T any] struct { sync.RWMutex // 注意只保护group的变化 不要保护group内的操作 // contains filtered or unexported fields }
T是和业务相关的客户端信息结构 透传给TcpService
func NewTcpBackendWithConsul ¶
func NewTcpBackendWithConsul[T any](consulAddr, tag string, event TcpEvent[T]) (*TcpBackend[T], error)
创建TcpBackend,使用Consul做服务器发现
func NewTcpBackendWithRedis ¶
func NewTcpBackendWithRedis[T any](cfg *redis.Config, key string, serverNames []string, event TcpEvent[T]) (*TcpBackend[T], error)
创建TcpBackend,使用Redis做服务器发现 key 表示服务器发现的key serverName 表示监听哪些服务器 为空表示监听全部的服务器
func (*TcpBackend[T]) Broad ¶
func (tb *TcpBackend[T]) Broad(ctx context.Context, buf []byte)
向所有的TcpService发消息发送消息
func (*TcpBackend[T]) BroadGroup ¶
func (tb *TcpBackend[T]) BroadGroup(ctx context.Context, serviceName string, buf []byte)
向Group中的所有TcpService发送消息
func (*TcpBackend[T]) BroadGroupMsg ¶
func (*TcpBackend[T]) BroadMsg ¶
func (tb *TcpBackend[T]) BroadMsg(ctx context.Context, msg utils.SendMsger)
func (*TcpBackend[T]) BroadMsgByHash ¶
向每个组中的其中一个TcpService发消息,使用哈希获取service
func (*TcpBackend[T]) BroadMsgByTagAndHash ¶
func (tb *TcpBackend[T]) BroadMsgByTagAndHash(ctx context.Context, tag, hash string, msg utils.SendMsger)
向每个组中的指定的tag组中的其中一个TcpService发消息,使用哈希获取service
func (*TcpBackend[T]) GetGroup ¶
func (tb *TcpBackend[T]) GetGroup(serviceName string) *TcpGroup[T]
func (*TcpBackend[T]) GetGroups ¶
func (tb *TcpBackend[T]) GetGroups() map[string]*TcpGroup[T]
func (*TcpBackend[T]) GetService ¶
func (tb *TcpBackend[T]) GetService(serviceName, serviceId string) *TcpService[T]
func (*TcpBackend[T]) GetServiceByHash ¶
func (tb *TcpBackend[T]) GetServiceByHash(serviceName, hash string) *TcpService[T]
根据哈希环获取,哈希环行记录的都是连接成功的
func (*TcpBackend[T]) GetServiceByTagAndHash ¶
func (tb *TcpBackend[T]) GetServiceByTagAndHash(serviceName, tag, hash string) *TcpService[T]
根据哈希环获取,哈希环行记录的都是连接成功的
func (*TcpBackend[T]) Send ¶
func (tb *TcpBackend[T]) Send(ctx context.Context, serviceName string, serviceId string, buf []byte) error
向TcpService发消息,指定serviceId的
func (*TcpBackend[T]) SendByHash ¶
func (tb *TcpBackend[T]) SendByHash(ctx context.Context, serviceName, hash string, buf []byte) error
向TcpGroup发消息,使用哈希获取service
func (*TcpBackend[T]) SendByTagAndHash ¶
func (tb *TcpBackend[T]) SendByTagAndHash(ctx context.Context, serviceName, tag, hash string, buf []byte) error
向TcpGroup发消息,使用哈希获取service
func (*TcpBackend[T]) SendMsgByHash ¶
func (*TcpBackend[T]) SendMsgByTagAndHash ¶
type TcpEvent ¶
type TcpEvent[T any] interface { // consul服务器配置过滤器,返回符合条件的服务器 ConsulFilter(confs []*consul.RegistryInfo) []*ServiceConfig // redis服务器配置过滤器,返回符合条件的服务器 RedisFilter(confs []*redis.RegistryInfo) []*ServiceConfig // goredis服务器配置过滤器,返回符合条件的服务器 GoRedisFilter(confs []*goredis.RegistryInfo) []*ServiceConfig // 网络连接成功 OnConnected(ctx context.Context, ts *TcpService[T]) // 网络失去连接 OnDisConnect(ctx context.Context, ts *TcpService[T]) // DecodeMsg 解码实现 // 返回值为 msg,len,err // msg 解码出的消息体 // len 解码消息的数据长度,内部根据len来删除已解码的数据 // err 解码错误,若发生error,服务器将重连 DecodeMsg(ctx context.Context, data []byte, ts *TcpService[T]) (interface{}, int, error) // Context 生成Context, 目前OnMsg、OnTick参数使用 // msg为nil时 表示是OnTick调用 Context(parent context.Context, msg interface{}) context.Context // CheckRPCResp 判断是否RPC返回消息,如果使用SendRPCMsg需要实现此函数 // 返回值为 rpcid // rpcid 对应请求SendRPC的id, 返回nil表示非rpc调用 CheckRPCResp(msg interface{}) interface{} // OnRecv 收到消息,解码成功后调用 异步调用 OnMsg(ctx context.Context, msg interface{}, ts *TcpService[T]) // 每秒tick下 异步调用 OnTick(ctx context.Context, ts *TcpService[T]) }
type TcpEventHandler ¶
type TcpEventHandler[T any] struct { }
TcpEventHandler TcpEvent的内置实现 如果不想实现TcpEvent的所有接口,可以继承它实现部分方法
func (*TcpEventHandler[T]) CheckRPCResp ¶
func (*TcpEventHandler[T]) CheckRPCResp(msg interface{}) interface{}
func (*TcpEventHandler[T]) ConsulFilter ¶
func (*TcpEventHandler[T]) ConsulFilter(confs []*consul.RegistryInfo) []*ServiceConfig
func (*TcpEventHandler[T]) Context ¶
func (h *TcpEventHandler[T]) Context(parent context.Context, msg interface{}) context.Context
func (*TcpEventHandler[T]) DecodeMsg ¶
func (*TcpEventHandler[T]) DecodeMsg(ctx context.Context, data []byte, ts *TcpService[T]) (interface{}, int, error)
func (*TcpEventHandler[T]) GoRedisFilter ¶
func (*TcpEventHandler[T]) GoRedisFilter(confs []*goredis.RegistryInfo) []*ServiceConfig
func (*TcpEventHandler[T]) OnConnected ¶
func (*TcpEventHandler[T]) OnConnected(ctx context.Context, ts *TcpService[T])
func (*TcpEventHandler[T]) OnDisConnect ¶
func (*TcpEventHandler[T]) OnDisConnect(ctx context.Context, ts *TcpService[T])
func (*TcpEventHandler[T]) OnMsg ¶
func (*TcpEventHandler[T]) OnMsg(ctx context.Context, msg interface{}, ts *TcpService[T])
func (*TcpEventHandler[T]) OnTick ¶
func (*TcpEventHandler[T]) OnTick(ctx context.Context, ts *TcpService[T])
func (*TcpEventHandler[T]) RedisFilter ¶
func (*TcpEventHandler[T]) RedisFilter(confs []*redis.RegistryInfo) []*ServiceConfig
type TcpGroup ¶
功能相同的一组服务器 T是和业务相关的客户端信息结构 透传给TcpService
func NewTcpGroup ¶
func NewTcpGroup[T any](serviceName string, tb *TcpBackend[T]) *TcpGroup[T]
func (*TcpGroup[T]) GetService ¶
func (g *TcpGroup[T]) GetService(serviceId string) *TcpService[T]
func (*TcpGroup[T]) GetServiceByHash ¶
func (g *TcpGroup[T]) GetServiceByHash(hash string) *TcpService[T]
根据哈希环获取对象 hash可以用用户id或者其他稳定的数据
func (*TcpGroup[T]) GetServiceByTagAndHash ¶
func (g *TcpGroup[T]) GetServiceByTagAndHash(tag, hash string) *TcpService[T]
根据tag和哈希环获取对象 hash可以用用户id或者其他稳定的数据
func (*TcpGroup[T]) GetServices ¶
func (g *TcpGroup[T]) GetServices() map[string]*TcpService[T]
type TcpParamConfig ¶
type TcpParamConfig struct { // SendMsg接口中,输出日志等级会按照下面的配置来执行,否则按照Debug输出 // 日志级别和zerolog.Level一致 LogLevelMsg int `json:"loglevelmsg,omitempty"` // msg消息默认的消息级别,不配置就是debug级别 LogLevelByMsg map[string]int `json:"loglevelbymsg,omitempty"` // 根据消息ID区分的消息日志级别,消息ID:日志级别,不配置就使用LogLevelMsg级别 MsgSeq bool `json:"msgseq,omitempty"` // 消息顺序执行 Immediately bool `json:"immediately,omitempty"` // 立即模式 如果服务器发现逻辑服务器不存在了立刻删除服务对象,否则等socket失去连接后删除服务对象 }
参数配置
func (*TcpParamConfig) Create ¶
func (c *TcpParamConfig) Create()
func (*TcpParamConfig) MsgLogLevel ¶
func (c *TcpParamConfig) MsgLogLevel(msgid string) int
type TcpService ¶
type TcpService[T any] struct { // contains filtered or unexported fields }
后端连接对象 协程安全对象 T是和业务相关的客户端信息结构
func NewTcpService ¶
func NewTcpService[T any](conf *ServiceConfig, g *TcpGroup[T]) (*TcpService[T], error)
func (*TcpService[T]) ConnName ¶
func (ts *TcpService[T]) ConnName() string
func (*TcpService[T]) Info ¶
func (ts *TcpService[T]) Info() *T
func (*TcpService[T]) OnClose ¶
func (ts *TcpService[T]) OnClose(tc *tcp.TCPConn)
func (*TcpService[T]) OnDialFail ¶
func (ts *TcpService[T]) OnDialFail(err error, t *tcp.TCPConn) error
func (*TcpService[T]) OnDialSuccess ¶
func (ts *TcpService[T]) OnDialSuccess(t *tcp.TCPConn)
func (*TcpService[T]) OnDisConnect ¶
func (ts *TcpService[T]) OnDisConnect(err error, t *tcp.TCPConn) error
func (*TcpService[T]) SendMsg ¶
SendMsg 发送消息对象,会调用消息对象的MsgMarshal来编码消息 消息对象可实现zerolog.LogObjectMarshaler接口,更好的输出日志,通过ParamConf.LogLevelMsg配置可控制日志级别
func (*TcpService[T]) SendRPCMsg ¶
func (ts *TcpService[T]) SendRPCMsg(ctx context.Context, rpcId interface{}, msg utils.SendMsger, timeout time.Duration) (interface{}, error)
SendRPCMsg 发送RPC消息并等待消息回复,需要依赖event.CheckRPCResp来判断是否rpc调用 成功返回解析后的消息 消息对象可实现zerolog.LogObjectMarshaler接口,更好的输出日志,通过ParamConf.LogLevelMsg配置可控制日志级别
func (*TcpService[T]) ServiceId ¶
func (ts *TcpService[T]) ServiceId() string
func (*TcpService[T]) ServiceName ¶
func (ts *TcpService[T]) ServiceName() string