Documentation ¶
Index ¶
- Constants
- Variables
- func Default()
- func GetMetadata(ctx context.Context) *utils.MetaDict
- func GetPayload(unmarshal func([]byte, any) error, data []byte) (obj any)
- func GetStatus(data []byte) uint16
- func HijackerSend(data []byte, send WriterFunc) error
- func MetadataContext(ctx context.Context, m *utils.MetaDict) context.Context
- func Stop()
- type CircuitBreaker
- type Client
- func (c *Client) Call(ctx context.Context, serviceMethod string, args, reply any) error
- func (c *Client) Close()
- func (c *Client) CloseStream(s *Stream)
- func (c *Client) NewStream(ctx context.Context, serviceMethod string) (*Stream, error)
- func (c *Client) Send(b []byte) error
- func (c *Client) Subscribe(topic string, handler WriterFunc) error
- func (c *Client) Unsubscribe(topic string) error
- type Frame
- type MethodInfo
- type Option
- func WithBreaker(allow func() error, success, failed func()) Option
- func WithCodec(m func(any, io.Writer) error, um func([]byte, any) error) Option
- func WithHijacker(h func([]byte, WriterFunc) error) Option
- func WithIntletHook(chain ...func([]byte, WriterFunc) ([]byte, error)) Option
- func WithLogger(l utils.ILogger) Option
- func WithOutletHook(chain ...func([]byte, WriterFunc) ([]byte, error)) Option
- func WithProtocolMagicNumber(pm uint32) Option
- type Options
- type Service
- type Stream
- type TCPServer
- type TCPSession
- type TokenBucketLimiter
- type Topic
- type WriterFunc
Constants ¶
const ( StatusBreakerClosed int32 = iota + 1 StatusBreakerOpen StatusBreakerHalfOpen )
const DefaultDeadlineDuration = 10 * time.Second
DefaultDeadlineDuration IO超时
const DefaultHeartbeatDuration time.Duration = time.Second * 5
DefaultHeartbeatDuration 心跳包
const DefaultHeartbeatPeriodDuration time.Duration = (DefaultHeartbeatDuration * 9) / 10
DefaultHeartbeatPeriodDuration 心跳包间距
const FrameMinLenght int = 18
FrameMinLenght 长度
Variables ¶
var SlotSize = 1024 * 4
SlotSize 槽大小
var SnowFlakeStartupTime int64 = time.Date(2023, time.January, 1, 0, 0, 0, 0, time.UTC).UnixNano()
var TCPBufferSize = 4 * 1024
TCPBufferSize 缓存大小
Functions ¶
func GetPayload ¶
GetPayload
Types ¶
type CircuitBreaker ¶
type CircuitBreaker struct {
// contains filtered or unexported fields
}
CircuitBreaker 回路 SRE过载保护算法 降低 K 值会使自适应限流算法更加激进(允许客户端在算法启动时拒绝更多本地请求) 增加 K 值会使自适应限流算法不再那么激进(允许服务端在算法启动时尝试接收更多的请求,与上面相反)
func NewCircuitBreaker ¶
func NewCircuitBreaker(request int64, k float64) *CircuitBreaker
NewCircuitBreakcer 新加回路
func (*CircuitBreaker) AllowRequest ¶
func (c *CircuitBreaker) AllowRequest() error
func (*CircuitBreaker) MarkFailed ¶
func (c *CircuitBreaker) MarkFailed()
func (*CircuitBreaker) MarkSuccess ¶
func (c *CircuitBreaker) MarkSuccess()
type Client ¶
type Client struct { Options // contains filtered or unexported fields }
Client 连接
func (*Client) CloseStream ¶
type Frame ¶
type Frame struct { Status uint16 Seq int64 ServiceMethod string Metadata *utils.MetaDict Payload any }
Frame 帧
func (Frame) MarshalBinary ¶
MarshalBinary 编码
type MethodInfo ¶
type MethodInfo struct {
// contains filtered or unexported fields
}
MethodInfo 方法
type Option ¶
type Option func(*Options)
Option 选项赋值
func WithBreaker ¶
WithBreaker 熔断器
func WithHijacker ¶
func WithHijacker(h func([]byte, WriterFunc) error) Option
WithHijacker 劫持者 部分option的设置失效,需自行实现,不能阻塞,只能传递[]byte
func WithIntletHook ¶
func WithIntletHook(chain ...func([]byte, WriterFunc) ([]byte, error)) Option
WithIntletHook 入口拦截器
func WithOutletHook ¶
func WithOutletHook(chain ...func([]byte, WriterFunc) ([]byte, error)) Option
WithOutletHook 出口拦截器
func WithProtocolMagicNumber ¶
type Options ¶
type Options struct { ProtocolMagicNumber uint32 //编码器 Marshal func(any, io.Writer) error //解码器 Unmarshal func([]byte, any) error //劫持者 Hijacker func([]byte, WriterFunc) error //入口拦截器 IntletHook []func([]byte, WriterFunc) ([]byte, error) //出口拦截器 OutletHook []func([]byte, WriterFunc) ([]byte, error) //熔断器 AllowRequest func() error MarkSuccess func() MarkFailed func() //平衡器 //Balancer func([]int) int //Registry IRegistry //日志 Logger utils.ILogger // contains filtered or unexported fields }
Options 配置
func NewOptions ¶
NewOptions 创建并返回一个配置:接收Option函数类型的不定向参数列表
type Service ¶
type Service struct { Options // contains filtered or unexported fields }
Service 服务端应答
func (*Service) RegisterRPC ¶
RegisterRPC 函数必须是导出的,即首字母为大写
func (*Service) RegisterTopic ¶
RegisterTopic 注册主题
func (*Service) SetNonblock ¶
type TCPServer ¶
type TCPServer struct { ProtocolMagicNumber uint32 Hijacker func([]byte, WriterFunc) error Handler func([]byte, WriterFunc, func()) error Logger utils.ILogger // contains filtered or unexported fields }
TCPServer TCP服务
func NewTCPServer ¶
func NewTCPServer(port string, hijacker func([]byte, WriterFunc) error, handler func([]byte, WriterFunc, func()) error, logger utils.ILogger) *TCPServer
NewTCPServer 新建
type TCPSession ¶
type TCPSession struct { Hijacker func([]byte, WriterFunc) error Handler func([]byte, WriterFunc, func()) error Logger utils.ILogger // contains filtered or unexported fields }
TCPSession 会话
func TCPDial ¶
func TCPDial(url string, protocolMagicNumber uint32, hijacker func([]byte, WriterFunc) error, handler func([]byte, WriterFunc, func()) error, logger utils.ILogger) (*TCPSession, error)
TCPDial 连接
type TokenBucketLimiter ¶
type TokenBucketLimiter struct { //限流器速率,每秒处理的令牌数 LimitRate int64 //限流器大小,存放令牌的最大值 LimitSize int64 //加入的时间间隔 Snippet time.Duration // contains filtered or unexported fields }
Limiter 限流器 Token Bucket(令牌桶) 每隔一段时间加入一批令牌,达到上限后,不再增加。
func NewTokenBucketLimiter ¶
func NewTokenBucketLimiter(limitRate, limitSize int64, snippet time.Duration, l utils.ILogger) *TokenBucketLimiter
NewTokenBucketLimiter limitRate, limitSize,snippet数值较小时,准确度低。
func (*TokenBucketLimiter) Take ¶
func (t *TokenBucketLimiter) Take(n int64) error
Wait 等待,申请n个令牌,取不到足够数量时返回错误。