wrpc

package module
v0.0.0-...-b5156d6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 18, 2022 License: MIT Imports: 16 Imported by: 0

Documentation

Index

Constants

View Source
const (
	StatusBreakerClosed int32 = iota + 1
	StatusBreakerOpen
	StatusBreakerHalfOpen
)
View Source
const DefaultDeadlineDuration = 10 * time.Second

DefaultDeadlineDuration IO超时

View Source
const DefaultHeartbeatDuration time.Duration = time.Second * 5

DefaultHeartbeatDuration 心跳包

View Source
const DefaultHeartbeatPeriodDuration time.Duration = (DefaultHeartbeatDuration * 9) / 10

DefaultHeartbeatPeriodDuration 心跳包间距

View Source
const FrameMinLenght int = 18

FrameMinLenght 长度

Variables

View Source
var SlotSize = 1024 * 4

SlotSize 槽大小

View Source
var SnowFlakeStartupTime int64 = time.Date(2023, time.January, 1, 0, 0, 0, 0, time.UTC).UnixNano()
View Source
var TCPBufferSize = 4 * 1024

TCPBufferSize 缓存大小

Functions

func Default

func Default()

Default

func GetMetadata

func GetMetadata(ctx context.Context) *utils.MetaDict

func GetPayload

func GetPayload(unmarshal func([]byte, any) error, data []byte) (obj any)

GetPayload

func GetStatus

func GetStatus(data []byte) uint16

GetStatus

func HijackerSend

func HijackerSend(data []byte, send WriterFunc) error

发送的[]byte前部需留 6 的空间

func MetadataContext

func MetadataContext(ctx context.Context, m *utils.MetaDict) context.Context

func Stop

func Stop()

Stop

Types

type CircuitBreaker

type CircuitBreaker struct {
	// contains filtered or unexported fields
}

CircuitBreaker 回路 SRE过载保护算法 降低 K 值会使自适应限流算法更加激进(允许客户端在算法启动时拒绝更多本地请求) 增加 K 值会使自适应限流算法不再那么激进(允许服务端在算法启动时尝试接收更多的请求,与上面相反)

func NewCircuitBreaker

func NewCircuitBreaker(request int64, k float64) *CircuitBreaker

NewCircuitBreakcer 新加回路

func (*CircuitBreaker) AllowRequest

func (c *CircuitBreaker) AllowRequest() error

func (*CircuitBreaker) MarkFailed

func (c *CircuitBreaker) MarkFailed()

func (*CircuitBreaker) MarkSuccess

func (c *CircuitBreaker) MarkSuccess()

type Client

type Client struct {
	Options
	// contains filtered or unexported fields
}

Client 连接

func NewTCPClient

func NewTCPClient(url string, o *Options) (*Client, error)

NewTCPClient 新建

func (*Client) Call

func (c *Client) Call(ctx context.Context, serviceMethod string, args, reply any) error

Call 调用指定的服务,方法,等待调用返回,将结果写入reply,然后返回执行的错误状态 request and response/请求-响应

func (*Client) Close

func (c *Client) Close()

Close 关闭

func (*Client) CloseStream

func (c *Client) CloseStream(s *Stream)

func (*Client) NewStream

func (c *Client) NewStream(ctx context.Context, serviceMethod string) (*Stream, error)

NewStream

func (*Client) Send

func (c *Client) Send(b []byte) error

Send 发送

func (*Client) Subscribe

func (c *Client) Subscribe(topic string, handler WriterFunc) error

Subscribe 订阅主题

func (*Client) Unsubscribe

func (c *Client) Unsubscribe(topic string) error

Unsubscribe 退订主题

type Frame

type Frame struct {
	Status        uint16
	Seq           int64
	ServiceMethod string
	Metadata      *utils.MetaDict
	Payload       any
}

Frame 帧

func (Frame) MarshalBinary

func (f Frame) MarshalBinary(marshal func(any, io.Writer) error, buf *buffer) error

MarshalBinary 编码

func (*Frame) UnmarshalHeader

func (f *Frame) UnmarshalHeader(data []byte) (int, error)

UnmarshalHeader 解码头部,Payload不解析,返会头长度及错误

type MethodInfo

type MethodInfo struct {
	// contains filtered or unexported fields
}

MethodInfo 方法

type Option

type Option func(*Options)

Option 选项赋值

func WithBreaker

func WithBreaker(allow func() error, success, failed func()) Option

WithBreaker 熔断器

func WithCodec

func WithCodec(m func(any, io.Writer) error, um func([]byte, any) error) Option

WithCodec 编码

func WithHijacker

func WithHijacker(h func([]byte, WriterFunc) error) Option

WithHijacker 劫持者 部分option的设置失效,需自行实现,不能阻塞,只能传递[]byte

func WithIntletHook

func WithIntletHook(chain ...func([]byte, WriterFunc) ([]byte, error)) Option

WithIntletHook 入口拦截器

func WithLogger

func WithLogger(l utils.ILogger) Option

WithLogger 日志

func WithOutletHook

func WithOutletHook(chain ...func([]byte, WriterFunc) ([]byte, error)) Option

WithOutletHook 出口拦截器

func WithProtocolMagicNumber

func WithProtocolMagicNumber(pm uint32) Option

type Options

type Options struct {
	ProtocolMagicNumber uint32

	//编码器
	Marshal func(any, io.Writer) error
	//解码器
	Unmarshal func([]byte, any) error
	//劫持者
	Hijacker func([]byte, WriterFunc) error
	//入口拦截器
	IntletHook []func([]byte, WriterFunc) ([]byte, error)
	//出口拦截器
	OutletHook []func([]byte, WriterFunc) ([]byte, error)
	//熔断器
	AllowRequest func() error
	MarkSuccess  func()
	MarkFailed   func()
	//平衡器
	//Balancer func([]int) int
	//Registry  IRegistry
	//日志
	Logger utils.ILogger
	// contains filtered or unexported fields
}

Options 配置

func NewOptions

func NewOptions(opts ...Option) *Options

NewOptions 创建并返回一个配置:接收Option函数类型的不定向参数列表

type Service

type Service struct {
	Options
	// contains filtered or unexported fields
}

Service 服务端应答

func NewService

func NewService(o *Options) *Service

NewService 新建

func (*Service) RegisterRPC

func (sh *Service) RegisterRPC(target string, rcvr any) error

RegisterRPC 函数必须是导出的,即首字母为大写

func (*Service) RegisterTopic

func (sh *Service) RegisterTopic(name string) *Topic

RegisterTopic 注册主题

func (*Service) RemoveTopic

func (sh *Service) RemoveTopic(name string)

RemoveTopic 移除主题

func (*Service) SetNonblock

func (sh *Service) SetNonblock(key string)

func (*Service) Stop

func (sh *Service) Stop()

Stop

func (*Service) TCPServer

func (sh *Service) TCPServer(port string) error

TCPServer tcp服务

type Stream

type Stream struct {
	// contains filtered or unexported fields
}

Stream 流

func (*Stream) Recv

func (s *Stream) Recv() (any, error)

Recv 非顺序接受数据

func (*Stream) Send

func (s *Stream) Send(data any) error

type TCPServer

type TCPServer struct {
	ProtocolMagicNumber uint32

	Hijacker func([]byte, WriterFunc) error
	Handler  func([]byte, WriterFunc, func()) error

	Logger utils.ILogger
	// contains filtered or unexported fields
}

TCPServer TCP服务

func NewTCPServer

func NewTCPServer(port string, hijacker func([]byte, WriterFunc) error, handler func([]byte, WriterFunc, func()) error, logger utils.ILogger) *TCPServer

NewTCPServer 新建

func (*TCPServer) Run

func (s *TCPServer) Run()

Run 运行

func (*TCPServer) Stop

func (s *TCPServer) Stop()

Stop 关闭

type TCPSession

type TCPSession struct {
	Hijacker func([]byte, WriterFunc) error
	Handler  func([]byte, WriterFunc, func()) error

	Logger utils.ILogger
	// contains filtered or unexported fields
}

TCPSession 会话

func TCPDial

func TCPDial(url string, protocolMagicNumber uint32, hijacker func([]byte, WriterFunc) error, handler func([]byte, WriterFunc, func()) error, logger utils.ILogger) (*TCPSession, error)

TCPDial 连接

func (*TCPSession) Send

func (ts *TCPSession) Send(message []byte) error

Send 发送

type TokenBucketLimiter

type TokenBucketLimiter struct {
	//限流器速率,每秒处理的令牌数
	LimitRate int64
	//限流器大小,存放令牌的最大值
	LimitSize int64
	//加入的时间间隔
	Snippet time.Duration
	// contains filtered or unexported fields
}

Limiter 限流器 Token Bucket(令牌桶) 每隔一段时间加入一批令牌,达到上限后,不再增加。

func NewTokenBucketLimiter

func NewTokenBucketLimiter(limitRate, limitSize int64, snippet time.Duration, l utils.ILogger) *TokenBucketLimiter

NewTokenBucketLimiter limitRate, limitSize,snippet数值较小时,准确度低。

func (*TokenBucketLimiter) Close

func (t *TokenBucketLimiter) Close()

Close 关闭。

func (*TokenBucketLimiter) Take

func (t *TokenBucketLimiter) Take(n int64) error

Wait 等待,申请n个令牌,取不到足够数量时返回错误。

type Topic

type Topic struct {
	*Service
	Name string
	// contains filtered or unexported fields
}

Topic 主题

func (*Topic) Broadcast

func (t *Topic) Broadcast(data []byte) error

Broadcast 广播

type WriterFunc

type WriterFunc func([]byte) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL