backend

package
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 2, 2024 License: GPL-3.0 Imports: 16 Imported by: 0

Documentation

Index

Constants

View Source
const CtxKey_scheme = utils.CtxKey("scheme")

Variables

Functions

This section is empty.

Types

type HttpBackend

type HttpBackend[T any] struct {
	sync.RWMutex // 注意只保护group的变化 不要保护group内的操作
	// contains filtered or unexported fields
}

T是和业务相关的客户端信息结构 透传给HttpService

func NewHttpBackendWithConsul

func NewHttpBackendWithConsul[T any](consulAddr, tag string, event HttpEvent[T]) (*HttpBackend[T], error)

创建HttpBackend,使用Redis做服务器发现

func NewHttpBackendWithGoRedis

func NewHttpBackendWithGoRedis[T any](cfg *goredis.Config, key string, serverNames []string, event HttpEvent[T]) (*HttpBackend[T], error)

func NewHttpBackendWithRedis

func NewHttpBackendWithRedis[T any](cfg *redis.Config, key string, serverNames []string, event HttpEvent[T]) (*HttpBackend[T], error)

创建HttpBackend,使用Redis做服务器发现 key 表示服务器发现的key serverName 表示监听哪些服务器 为空表示监听全部的服务器

func (*HttpBackend[T]) GetGroup

func (hb *HttpBackend[T]) GetGroup(serviceName string) *HttpGroup[T]

func (*HttpBackend[T]) GetGroups

func (hb *HttpBackend[T]) GetGroups() map[string]*HttpGroup[T]

func (*HttpBackend[T]) GetService

func (hb *HttpBackend[T]) GetService(serviceName, serviceId string) *HttpService[T]

func (*HttpBackend[T]) GetServiceByHash

func (hb *HttpBackend[T]) GetServiceByHash(serviceName, hash string) *HttpService[T]

根据哈希环获取,哈希环行记录的都是状态测试健康的

func (*HttpBackend[T]) GetServiceByTagAndHash

func (hb *HttpBackend[T]) GetServiceByTagAndHash(serviceName, tag, hash string) *HttpService[T]

根据哈希环获取,哈希环行记录的都是状态测试健康的

func (*HttpBackend[T]) RegHook

func (hb *HttpBackend[T]) RegHook(h HttpHook[T])

注册hook

type HttpEvent

type HttpEvent[T any] interface {
	// consul服务器配置过滤器,返回符合条件的服务器
	ConsulFilter(confs []*consul.RegistryInfo) []*ServiceConfig

	// redis服务器配置过滤器,返回符合条件的服务器
	RedisFilter(confs []*redis.RegistryInfo) []*ServiceConfig

	// goredis服务器配置过滤器,返回符合条件的服务器
	GoRedisFilter(confs []*goredis.RegistryInfo) []*ServiceConfig
}

type HttpEventHandler

type HttpEventHandler[T any] struct {
}

HttpEventHandler HttpEvent的内置实现 如果不想实现HttpEvent的所有接口,可以继承它实现部分方法

func (*HttpEventHandler[T]) ConsulFilter

func (*HttpEventHandler[T]) ConsulFilter(confs []*consul.RegistryInfo) []*ServiceConfig

func (*HttpEventHandler[T]) GoRedisFilter

func (*HttpEventHandler[T]) GoRedisFilter(confs []*goredis.RegistryInfo) []*ServiceConfig

func (*HttpEventHandler[T]) RedisFilter

func (*HttpEventHandler[T]) RedisFilter(confs []*redis.RegistryInfo) []*ServiceConfig

type HttpGroup

type HttpGroup[T any] struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

功能相同的一组服务器 T是和业务相关的客户端信息结构 透传给HttpService

func NewHttpGroup

func NewHttpGroup[T any](hb *HttpBackend[T]) *HttpGroup[T]

func (*HttpGroup[T]) GetService

func (g *HttpGroup[T]) GetService(serviceId string) *HttpService[T]

func (*HttpGroup[T]) GetServiceByHash

func (g *HttpGroup[T]) GetServiceByHash(hash string) *HttpService[T]

根据哈希环获取对象 hash可以用用户id或者其他稳定的数据

func (*HttpGroup[T]) GetServiceByTagAndHash

func (g *HttpGroup[T]) GetServiceByTagAndHash(tag, hash string) *HttpService[T]

根据tag和哈希环获取对象 hash可以用用户id或者其他稳定的数据

func (*HttpGroup[T]) GetServices

func (g *HttpGroup[T]) GetServices() map[string]*HttpService[T]

type HttpHook

type HttpHook[T any] interface {
	// 添加服务器
	OnAdd(ts *HttpService[T])
	// 移除一个服务器,彻底移除
	OnRemove(ts *HttpService[T])

	// 连接成功
	OnConnected(ts *HttpService[T])
	// 连接掉线
	OnDisConnect(ts *HttpService[T])
}

Hook

type HttpService

type HttpService[T any] struct {
	// contains filtered or unexported fields
}

T是和业务相关的客户端信息结构

func NewHttpService

func NewHttpService[T any](conf *ServiceConfig, g *HttpGroup[T]) (*HttpService[T], error)

func (*HttpService[T]) Address

func (hs *HttpService[T]) Address() string

func (*HttpService[T]) Conf

func (hs *HttpService[T]) Conf() *ServiceConfig

获取配置 获取后外层要求只读

func (*HttpService[T]) Info

func (hs *HttpService[T]) Info() *T

func (*HttpService[T]) ServiceId

func (hs *HttpService[T]) ServiceId() string

func (*HttpService[T]) ServiceName

func (hs *HttpService[T]) ServiceName() string

type ServiceConfig

type ServiceConfig struct {
	// 要求字符串类型的字段小写且去掉前后空格
	ServiceName string `json:"servicename,omitempty"` // 服务器类型名,用来分组【内部会转化成去空格的小写】
	ServiceId   string `json:"serviceid,omitempty"`   // 服务器唯一ID【内部会转化成去空格的小写】
	ServiceAddr string `json:"serviceaddr,omitempty"` // 服务器对外暴露的地址
	ServicePort int    `json:"serviceport,omitempty"` // 服务器对外暴露的端口
	RoutingTag  string `json:"routertag,omitempty"`   // 支持路由tag group内再次进行分组【内部会转化成去空格的小写】
}

服务配置

type ServiceIdConfMap

type ServiceIdConfMap = map[string]*ServiceConfig

type ServiceNameConfMap

type ServiceNameConfMap = map[string]ServiceIdConfMap

type TCPHook

type TCPHook[T any] interface {
	// 添加服务器
	OnAdd(ts *TcpService[T])
	// 移除一个服务器,彻底移除
	OnRemove(ts *TcpService[T])

	// 连接成功
	OnConnected(ts *TcpService[T])
	// 连接掉线
	OnDisConnect(ts *TcpService[T])

	// 发送数据
	OnSend(ts *TcpService[T], len int)
	// 接受数据
	OnRecv(ts *TcpService[T], len int)
}

Hook

type TcpBackend

type TcpBackend[T any] struct {
	sync.RWMutex // 注意只保护group的变化 不要保护group内的操作
	// contains filtered or unexported fields
}

T是和业务相关的客户端信息结构 透传给TcpService

func NewTcpBackendWithConsul

func NewTcpBackendWithConsul[T any](consulAddr, tag string, event TcpEvent[T]) (*TcpBackend[T], error)

创建TcpBackend,使用Consul做服务器发现

func NewTcpBackendWithGoRedis

func NewTcpBackendWithGoRedis[T any](cfg *goredis.Config, key string, serverNames []string, event TcpEvent[T]) (*TcpBackend[T], error)

func NewTcpBackendWithRedis

func NewTcpBackendWithRedis[T any](cfg *redis.Config, key string, serverNames []string, event TcpEvent[T]) (*TcpBackend[T], error)

创建TcpBackend,使用Redis做服务器发现 key 表示服务器发现的key serverName 表示监听哪些服务器 为空表示监听全部的服务器

func (*TcpBackend[T]) Broad

func (tb *TcpBackend[T]) Broad(ctx context.Context, buf []byte)

向所有的TcpService发消息发送消息

func (*TcpBackend[T]) BroadGroup

func (tb *TcpBackend[T]) BroadGroup(ctx context.Context, serviceName string, buf []byte)

向Group中的所有TcpService发送消息

func (*TcpBackend[T]) BroadGroupMsg

func (tb *TcpBackend[T]) BroadGroupMsg(ctx context.Context, serviceName string, msg utils.SendMsger)

func (*TcpBackend[T]) BroadMsg

func (tb *TcpBackend[T]) BroadMsg(ctx context.Context, msg utils.SendMsger)

func (*TcpBackend[T]) BroadMsgByHash

func (tb *TcpBackend[T]) BroadMsgByHash(ctx context.Context, hash string, msg utils.SendMsger)

向每个组中的其中一个TcpService发消息,使用哈希获取service

func (*TcpBackend[T]) BroadMsgByTagAndHash

func (tb *TcpBackend[T]) BroadMsgByTagAndHash(ctx context.Context, tag, hash string, msg utils.SendMsger)

向每个组中的指定的tag组中的其中一个TcpService发消息,使用哈希获取service

func (*TcpBackend[T]) GetGroup

func (tb *TcpBackend[T]) GetGroup(serviceName string) *TcpGroup[T]

func (*TcpBackend[T]) GetGroups

func (tb *TcpBackend[T]) GetGroups() map[string]*TcpGroup[T]

func (*TcpBackend[T]) GetService

func (tb *TcpBackend[T]) GetService(serviceName, serviceId string) *TcpService[T]

func (*TcpBackend[T]) GetServiceByHash

func (tb *TcpBackend[T]) GetServiceByHash(serviceName, hash string) *TcpService[T]

根据哈希环获取,哈希环行记录的都是连接成功的

func (*TcpBackend[T]) GetServiceByTagAndHash

func (tb *TcpBackend[T]) GetServiceByTagAndHash(serviceName, tag, hash string) *TcpService[T]

根据哈希环获取,哈希环行记录的都是连接成功的

func (*TcpBackend[T]) RegHook

func (tb *TcpBackend[T]) RegHook(h TCPHook[T])

注册hook

func (*TcpBackend[T]) Send

func (tb *TcpBackend[T]) Send(ctx context.Context, serviceName string, serviceId string, buf []byte) error

向TcpService发消息,指定serviceId的

func (*TcpBackend[T]) SendByHash

func (tb *TcpBackend[T]) SendByHash(ctx context.Context, serviceName, hash string, buf []byte) error

向TcpGroup发消息,使用哈希获取service

func (*TcpBackend[T]) SendByTagAndHash

func (tb *TcpBackend[T]) SendByTagAndHash(ctx context.Context, serviceName, tag, hash string, buf []byte) error

向TcpGroup发消息,使用哈希获取service

func (*TcpBackend[T]) SendMsg

func (tb *TcpBackend[T]) SendMsg(ctx context.Context, serviceName string, serviceId string, msg utils.SendMsger) error

func (*TcpBackend[T]) SendMsgByHash

func (tb *TcpBackend[T]) SendMsgByHash(ctx context.Context, serviceName string, hash string, msg utils.SendMsger) error

func (*TcpBackend[T]) SendMsgByTagAndHash

func (tb *TcpBackend[T]) SendMsgByTagAndHash(ctx context.Context, serviceName, tag, hash string, msg utils.SendMsger) error

type TcpEvent

type TcpEvent[T any] interface {
	// consul服务器配置过滤器,返回符合条件的服务器
	ConsulFilter(confs []*consul.RegistryInfo) []*ServiceConfig

	// redis服务器配置过滤器,返回符合条件的服务器
	RedisFilter(confs []*redis.RegistryInfo) []*ServiceConfig

	// goredis服务器配置过滤器,返回符合条件的服务器
	GoRedisFilter(confs []*goredis.RegistryInfo) []*ServiceConfig

	// 网络连接成功
	OnConnected(ctx context.Context, ts *TcpService[T])

	// 网络失去连接
	OnDisConnect(ctx context.Context, ts *TcpService[T])

	// DecodeMsg 解码实现
	// 返回值为 msg,len,err
	// msg     解码出的消息体
	// len     解码消息的数据长度,内部根据len来删除已解码的数据
	// err     解码错误,若发生error,服务器将重连
	DecodeMsg(ctx context.Context, data []byte, ts *TcpService[T]) (interface{}, int, error)

	// Context 生成Context, 目前OnMsg、OnTick参数使用
	// msg为nil时 表示是OnTick调用
	Context(parent context.Context, msg interface{}) context.Context

	// CheckRPCResp 判断是否RPC返回消息,如果使用SendRPCMsg需要实现此函数
	// 返回值为 rpcid
	// rpcid   对应请求SendRPC的id, 返回nil表示非rpc调用
	CheckRPCResp(msg interface{}) interface{}

	// OnRecv 收到消息,解码成功后调用 异步调用
	OnMsg(ctx context.Context, msg interface{}, ts *TcpService[T])

	// 每秒tick下 异步调用
	OnTick(ctx context.Context, ts *TcpService[T])
}

type TcpEventHandler

type TcpEventHandler[T any] struct {
}

TcpEventHandler TcpEvent的内置实现 如果不想实现TcpEvent的所有接口,可以继承它实现部分方法

func (*TcpEventHandler[T]) CheckRPCResp

func (*TcpEventHandler[T]) CheckRPCResp(msg interface{}) interface{}

func (*TcpEventHandler[T]) ConsulFilter

func (*TcpEventHandler[T]) ConsulFilter(confs []*consul.RegistryInfo) []*ServiceConfig

func (*TcpEventHandler[T]) Context

func (h *TcpEventHandler[T]) Context(parent context.Context, msg interface{}) context.Context

func (*TcpEventHandler[T]) DecodeMsg

func (*TcpEventHandler[T]) DecodeMsg(ctx context.Context, data []byte, ts *TcpService[T]) (interface{}, int, error)

func (*TcpEventHandler[T]) GoRedisFilter

func (*TcpEventHandler[T]) GoRedisFilter(confs []*goredis.RegistryInfo) []*ServiceConfig

func (*TcpEventHandler[T]) OnConnected

func (*TcpEventHandler[T]) OnConnected(ctx context.Context, ts *TcpService[T])

func (*TcpEventHandler[T]) OnDisConnect

func (*TcpEventHandler[T]) OnDisConnect(ctx context.Context, ts *TcpService[T])

func (*TcpEventHandler[T]) OnMsg

func (*TcpEventHandler[T]) OnMsg(ctx context.Context, msg interface{}, ts *TcpService[T])

func (*TcpEventHandler[T]) OnTick

func (*TcpEventHandler[T]) OnTick(ctx context.Context, ts *TcpService[T])

func (*TcpEventHandler[T]) RedisFilter

func (*TcpEventHandler[T]) RedisFilter(confs []*redis.RegistryInfo) []*ServiceConfig

type TcpGroup

type TcpGroup[T any] struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

功能相同的一组服务器 T是和业务相关的客户端信息结构 透传给TcpService

func NewTcpGroup

func NewTcpGroup[T any](serviceName string, tb *TcpBackend[T]) *TcpGroup[T]

func (*TcpGroup[T]) Broad

func (g *TcpGroup[T]) Broad(ctx context.Context, buf []byte)

广播消息

func (*TcpGroup[T]) BroadMsg

func (g *TcpGroup[T]) BroadMsg(ctx context.Context, msg utils.SendMsger)

func (*TcpGroup[T]) GetService

func (g *TcpGroup[T]) GetService(serviceId string) *TcpService[T]

func (*TcpGroup[T]) GetServiceByHash

func (g *TcpGroup[T]) GetServiceByHash(hash string) *TcpService[T]

根据哈希环获取对象 hash可以用用户id或者其他稳定的数据

func (*TcpGroup[T]) GetServiceByTagAndHash

func (g *TcpGroup[T]) GetServiceByTagAndHash(tag, hash string) *TcpService[T]

根据tag和哈希环获取对象 hash可以用用户id或者其他稳定的数据

func (*TcpGroup[T]) GetServices

func (g *TcpGroup[T]) GetServices() map[string]*TcpService[T]

type TcpParamConfig

type TcpParamConfig struct {
	// SendMsg接口中,输出日志等级会按照下面的配置来执行,否则按照Debug输出
	// 日志级别和zerolog.Level一致
	LogLevelMsg   int            `json:"loglevelmsg,omitempty"`   // msg消息默认的消息级别,不配置就是debug级别
	LogLevelByMsg map[string]int `json:"loglevelbymsg,omitempty"` // 根据消息ID区分的消息日志级别,消息ID:日志级别,不配置就使用LogLevelMsg级别

	MsgSeq      bool `json:"msgseq,omitempty"`      // 消息顺序执行
	Immediately bool `json:"immediately,omitempty"` // 立即模式 如果服务器发现逻辑服务器不存在了立刻删除服务对象,否则等socket失去连接后删除服务对象
}

参数配置

func (*TcpParamConfig) Create

func (c *TcpParamConfig) Create()

func (*TcpParamConfig) MsgLogLevel

func (c *TcpParamConfig) MsgLogLevel(msgid string) int

type TcpService

type TcpService[T any] struct {
	// contains filtered or unexported fields
}

后端连接对象 协程安全对象 T是和业务相关的客户端信息结构

func NewTcpService

func NewTcpService[T any](conf *ServiceConfig, g *TcpGroup[T]) (*TcpService[T], error)

func (*TcpService[T]) Conf

func (ts *TcpService[T]) Conf() *ServiceConfig

获取配置 获取后外层要求只读

func (*TcpService[T]) Conn

func (ts *TcpService[T]) Conn() *tcp.TCPConn

获取连接对象

func (*TcpService[T]) ConnName

func (ts *TcpService[T]) ConnName() string

func (*TcpService[T]) Info

func (ts *TcpService[T]) Info() *T

func (*TcpService[T]) OnClose

func (ts *TcpService[T]) OnClose(tc *tcp.TCPConn)

func (*TcpService[T]) OnDialFail

func (ts *TcpService[T]) OnDialFail(err error, t *tcp.TCPConn) error

func (*TcpService[T]) OnDialSuccess

func (ts *TcpService[T]) OnDialSuccess(t *tcp.TCPConn)

func (*TcpService[T]) OnDisConnect

func (ts *TcpService[T]) OnDisConnect(err error, t *tcp.TCPConn) error

func (*TcpService[T]) OnRecv

func (ts *TcpService[T]) OnRecv(data []byte, tc *tcp.TCPConn) (int, error)

func (*TcpService[T]) OnSend

func (ts *TcpService[T]) OnSend(data []byte, tc *tcp.TCPConn) ([]byte, error)

func (*TcpService[T]) Send

func (ts *TcpService[T]) Send(ctx context.Context, data []byte) error

func (*TcpService[T]) SendMsg

func (ts *TcpService[T]) SendMsg(ctx context.Context, msg utils.SendMsger) error

SendMsg 发送消息对象,会调用消息对象的MsgMarshal来编码消息 消息对象可实现zerolog.LogObjectMarshaler接口,更好的输出日志,通过ParamConf.LogLevelMsg配置可控制日志级别

func (*TcpService[T]) SendRPCMsg

func (ts *TcpService[T]) SendRPCMsg(ctx context.Context, rpcId interface{}, msg utils.SendMsger, timeout time.Duration) (interface{}, error)

SendRPCMsg 发送RPC消息并等待消息回复,需要依赖event.CheckRPCResp来判断是否rpc调用 成功返回解析后的消息 消息对象可实现zerolog.LogObjectMarshaler接口,更好的输出日志,通过ParamConf.LogLevelMsg配置可控制日志级别

func (*TcpService[T]) ServiceId

func (ts *TcpService[T]) ServiceId() string

func (*TcpService[T]) ServiceName

func (ts *TcpService[T]) ServiceName() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL