client

package
v0.0.0-...-564fdec Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 22, 2019 License: Apache-2.0 Imports: 24 Imported by: 0

Documentation

Index

Constants

View Source
const (
	XVersion           = "X-RPCX-Version"           // rpcx版本
	XMessageType       = "X-RPCX-MesssageType"      // 消息类型: http/tcp
	XHeartbeat         = "X-RPCX-Heartbeat"         // 消息是否为心跳包(默认值直接忽略)
	XOneway            = "X-RPCX-Oneway"            // 是否丢弃response
	XMessageStatusType = "X-RPCX-MessageStatusType" // 消息状态类型
	XSerializeType     = "X-RPCX-SerializeType"     // 消息序列化类型:json、[]byte、pf
	XMessageID         = "X-RPCX-MessageID"         // 消息ID
	XServicePath       = "X-RPCX-ServicePath"       // 服务
	XServiceMethod     = "X-RPCX-ServiceMethod"     // 服务中具体的方法
	XMeta              = "X-RPCX-Meta"              // 元数据类似header中key-value对
	XErrorMessage      = "X-RPCX-ErrorMessage"      //
)
View Source
const (
	// ReaderBuffsize is used for bufio reader.
	// 读缓存容量,默认16KB
	ReaderBuffsize = 16 * 1024
	// WriterBuffsize is used for bufio writer.
	// 写缓存容量, 默认16KB
	WriterBuffsize = 16 * 1024
)

Variables

View Source
var (
	ErrShutdown         = errors.New("connection is shut down") // connection关闭触发的error
	ErrUnsupportedCodec = errors.New("unsupported codec")       // 目前rpcx提供格式不支持该内容完成编解码
)

ErrShutdown connection is closed.

View Source
var (
	// ErrXClientShutdown xclient is shutdown.
	ErrXClientShutdown = errors.New("xClient is shut down")
	// ErrXClientNoServer selector can't found one server.
	ErrXClientNoServer = errors.New("can not found any server")
	// ErrServerUnavailable selected server is unavailable.
	ErrServerUnavailable = errors.New("selected server is unavilable")
)
View Source
var DefaultOption = Option{
	Retries:        3,
	RPCPath:        share.DefaultRPCPath,
	ConnectTimeout: 10 * time.Second,
	SerializeType:  protocol.MsgPack,
	CompressType:   protocol.None,
	BackupLatency:  10 * time.Millisecond,
}

DefaultOption is a common option configuration for client. 默认通用的client配置

Functions

This section is empty.

Types

type Breaker

type Breaker interface {
	Call(func() error, time.Duration) error
	Fail()
	Success()
	Ready() bool
}

Breaker is a CircuitBreaker interface. 断路器接口

var CircuitBreaker Breaker = circuit.NewRateBreaker(0.95, 100)

CircuitBreaker is a default circuit breaker (RateBreaker(0.95, 100)). 默认断路器: 失败率达到0.95将service变为不可用,并在100ms之后再重试该service,成功close断路器,否则继续停止该service并在100ms再重新

type Call

type Call struct {
	ServicePath   string            // The name of the service and method to call.
	ServiceMethod string            // The name of the service and method to call.
	Metadata      map[string]string //metadata
	ResMetadata   map[string]string
	Args          interface{} // The argument to the function (*struct).  提供rpc具体服务的func的参数
	Reply         interface{} // The reply from the function (*struct).   提供rpc具体服务的func的结果
	Error         error       // After completion, the error status.      一次rpc执行完成后 error状态
	Done          chan *Call  // Strobes when call is complete.
	Raw           bool        // raw message or not   是否为原始消息
}

Call represents an active RPC. RPC

type Client

type Client struct {
	Conn net.Conn // connection

	Plugins PluginContainer // 插件容器

	ServerMessageChan chan<- *protocol.Message
	// contains filtered or unexported fields
}

Client represents a RPC client.

func NewClient

func NewClient(option Option) *Client

NewClient returns a new Client with the option.

func (*Client) Call

func (client *Client) Call(ctx context.Context, servicePath, serviceMethod string, args interface{}, reply interface{}) error

Call invokes the named function, waits for it to complete, and returns its error status. 同步调用,完成一次rpc 并返回当前的错误状态

func (*Client) Close

func (client *Client) Close() error

Close calls the underlying connection's Close method. If the connection is already shutting down, ErrShutdown is returned. 关闭连接

func (*Client) Connect

func (c *Client) Connect(network, address string) error

Connect connects the server via specified network.

func (*Client) Go

func (client *Client) Go(ctx context.Context, servicePath, serviceMethod string, args interface{}, reply interface{}, done chan *Call) *Call

异步调用,当通过Go方法进行一次call请求,将返回该请求调用的Call对象(ServicePath服务地址、ServiceMethod服务方法、MetaData元数据、Args服务方法参数、Reply服务返回结果、Done完成call通道)

         当本次call完成后将channel发送信号:完成后的Call对象
注意:本次Call绑定的channel 不存在时则新建,存在需要保证同时进行的rpc请求;没有buffered存放Call 会触发panic

func (*Client) IsClosing

func (client *Client) IsClosing() bool

IsClosing client is closing or not.

func (*Client) IsShutdown

func (client *Client) IsShutdown() bool

IsShutdown client is shutdown or not.

func (*Client) RegisterServerMessageChan

func (client *Client) RegisterServerMessageChan(ch chan<- *protocol.Message)

RegisterServerMessageChan registers the channel that receives server requests.

func (*Client) SendRaw

func (client *Client) SendRaw(ctx context.Context, r *protocol.Message) (map[string]string, []byte, error)

发送原始信息 不需要关注func参数和响应结果 需要使用者自行包装请求信息Message SendRaw sends raw messages. You don't care args and replys.

func (*Client) UnregisterServerMessageChan

func (client *Client) UnregisterServerMessageChan()

UnregisterServerMessageChan removes ServerMessageChan.

type FailMode

type FailMode int

FailMode decides how clients action when clients fail to invoke services

const (
	//Failover selects another server automaticaly
	Failover FailMode = iota
	//Failfast returns error immediately
	Failfast
	//Failtry use current client again
	Failtry
	//Failbackup select another server if the first server doesn't respon in specified time and use the fast response.
	Failbackup
)

type KVPair

type KVPair struct {
	Key   string
	Value string
}

KVPair contains a key and a string.

type Option

type Option struct {
	// Group is used to select the services in the same group. Services set group info in their meta.
	// If it is empty, clients will ignore group.
	Group string // 设定services在相同的group便于service的隔离及管理;并且将group信息设置到meta data里;若未指定group 则client忽略

	// Retries retries to send
	Retries int // 重试次数

	// TLSConfig for tcp and quic
	TLSConfig *tls.Config // tls配置用于(tcp/quic协议)
	// kcp.BlockCrypt
	Block interface{} // 加密
	// RPCPath for http connection
	RPCPath string // http连接转为rpc请求 指定的path
	//ConnectTimeout sets timeout for dialing
	ConnectTimeout time.Duration // connection 有效期
	// ReadTimeout sets readdeadline for underlying net.Conns
	ReadTimeout time.Duration // 读有效期
	// WriteTimeout sets writedeadline for underlying net.Conns
	WriteTimeout time.Duration // 写有效期

	// BackupLatency is used for Failbackup mode. rpcx will sends another request if the first response doesn't return in BackupLatency time.
	BackupLatency time.Duration // Failbackup模式下,当第一个request在有效期内(latency time)未能返回结果,则rpcx会重新发送另一个request,两个request任意一个返回结果即认为请求成功

	// Breaker is used to config CircuitBreaker
	GenBreaker func() Breaker // 断路器

	SerializeType protocol.SerializeType // 编解码类型(一般client和server相同的编码格式)
	CompressType  protocol.CompressType  // 压缩类型

	Heartbeat         bool          // 是否为心跳信息
	HeartbeatInterval time.Duration // 心跳间隔
}

Option contains all options for creating clients. 关于client的配置选项

type RPCClient

type RPCClient interface {
	Connect(network, address string) error                                                                                 // 连接
	Go(ctx context.Context, servicePath, serviceMethod string, args interface{}, reply interface{}, done chan *Call) *Call // 异步调用
	Call(ctx context.Context, servicePath, serviceMethod string, args interface{}, reply interface{}) error                // 同步调用
	SendRaw(ctx context.Context, r *protocol.Message) (map[string]string, []byte, error)                                   // 直连 发送原始内容
	Close() error                                                                                                          // 关闭connection

	RegisterServerMessageChan(ch chan<- *protocol.Message) //
	UnregisterServerMessageChan()

	IsClosing() bool
	IsShutdown() bool
}

RPCClient is interface that defines one client to call one server. 定义client调用server的相关接口

type SelectMode

type SelectMode int

SelectMode defines the algorithm of selecting a services from candidates.

const (
	//RandomSelect is selecting randomly
	RandomSelect SelectMode = iota
	//RoundRobin is selecting by round robin
	RoundRobin
	//WeightedRoundRobin is selecting by weighted round robin
	WeightedRoundRobin
	//WeightedICMP is selecting by weighted Ping time
	WeightedICMP
	//ConsistentHash is selecting by hashing
	ConsistentHash
	//Closest is selecting the closest server
	Closest

	// SelectByUser is selecting by implementation of users
	SelectByUser = 1000
)

type Selector

type Selector interface {
	Select(ctx context.Context, servicePath, serviceMethod string, args interface{}) string
	UpdateServer(servers map[string]string)
}

Selector defines selector that selects one service from candidates.

type ServiceDiscovery

type ServiceDiscovery interface {
	GetServices() []*KVPair                    // 获取注册的服务
	WatchService() chan []*KVPair              // 监听注册的服务
	RemoveWatcher(ch chan []*KVPair)           // 移除服务监听
	Clone(servicePath string) ServiceDiscovery // 复制
	Close()                                    // 关闭
}

ServiceDiscovery defines ServiceDiscovery of zookeeper, etcd and consul 服务发现:提供zookeeper、etcd、consul注册中心、还有peer2peer、mutilpleServers、mDns、Inprocess

type ServiceError

type ServiceError string // 服务端错误字符串形式

ServiceError is an error from server.

func (ServiceError) Error

func (e ServiceError) Error() string

type XClient

type XClient interface {
	SetPlugins(plugins PluginContainer)            // 设定插件
	SetSelector(s Selector)                        // 服务路由模式:随机、轮训、加权轮询、加权ICMP(ping时间)、一致性hash、基于地理位置就近选择、用户自定义路由
	ConfigGeoSelector(latitude, longitude float64) // 指定地理位置
	Auth(auth string)                              // 鉴权

	Go(ctx context.Context, serviceMethod string, args interface{}, reply interface{}, done chan *Call) (*Call, error) // 异步请求
	Call(ctx context.Context, serviceMethod string, args interface{}, reply interface{}) error                         // 同步请求
	Broadcast(ctx context.Context, serviceMethod string, args interface{}, reply interface{}) error                    // 请求所有节点:请求成功只返回一个节点的结果;若是出现错误,也只会将一个节点error返回
	Fork(ctx context.Context, serviceMethod string, args interface{}, reply interface{}) error                         //
	SendRaw(ctx context.Context, r *protocol.Message) (map[string]string, []byte, error)                               // 自己封装protocol.Message 采用原始发送请求server
	Close() error                                                                                                      // 关闭clien
}

XClient is an interface that used by client with service discovery and service governance. One XClient is used only for one service. You should create multiple XClient for multiple services.

XClient用于具备服务发现和服务治理的client。 XClient与Service是一一关联的,定义多个service就需要对应个数的XClient XClient是在前面的client基础上增加路由、失败模式、超时机制、断路器等功能

func NewBidirectionalXClient

func NewBidirectionalXClient(servicePath string, failMode FailMode, selectMode SelectMode, discovery ServiceDiscovery, option Option, serverMessageChan chan<- *protocol.Message) XClient

NewBidirectionalXClient creates a new xclient that can receive notifications from servers.

正常RPC只有client向server的单向通信,而没有server向client的通信 通过该方法能够实现client与server双向通信,大体和NewXClient很类似

func NewXClient

func NewXClient(servicePath string, failMode FailMode, selectMode SelectMode, discovery ServiceDiscovery, option Option) XClient

NewXClient creates a XClient that supports service discovery and service governance.

创建XClient:需指定失败模式、路由模式、服务发现方式、client额外选项

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL