rpc

package
v1.0.0-beta1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 24, 2022 License: MPL-2.0 Imports: 40 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DefaultClientConnReadBufSize  = maxGoAllocSizeClass
	DefaultClientConnWriteBufSize = maxGoAllocSizeClass
)
View Source
const (
	FlagHijackTransport = uint32(0xDB000000) // Experimental, use same port for RPC and custom transport over PacketConn

	DefaultClientPongTimeout = 10 * time.Second

	DefaultConnTimeoutAccuracy = 100 * time.Millisecond
)
View Source
const (
	DefaultMaxWorkers             = 1024
	DefaultMaxConns               = 131072 // note, this number of connections will require 10+ GB of memory
	DefaultMaxInflightPackets     = 256
	DefaultRequestMemoryLimit     = 256 * 1024 * 1024
	DefaultResponseMemoryLimit    = 2048 * 1024 * 1024
	DefaultServerConnReadBufSize  = maxGoAllocSizeClass
	DefaultServerConnWriteBufSize = maxGoAllocSizeClass
	DefaultServerRequestBufSize   = 4096                // TODO: should be at least bytes.MinRead for now
	DefaultServerResponseBufSize  = maxGoAllocSizeClass // TODO: should be at least bytes.MinRead for now
	DefaultResponseMemEstimate    = 1024 * 1024         // we likely over-account unknown response length before the handler has finished

)
View Source
const (
	MissingMultiRequestID = uint64(math.MaxUint64)
)

Variables

View Source
var (
	ErrClientClosed                 = errors.New("rpc: Client closed")
	ErrClientConnClosedSideEffect   = errors.New("rpc: client connection closed after request sent")
	ErrClientConnClosedNoSideEffect = errors.New("rpc: client connection closed (or connect failed) before request sent")
)
View Source
var (
	ErrServerClosed = errors.New("rpc: Server closed")
	ErrNoHandler    = &Error{Code: tlErrorNoHandler, Description: "rpc: no handler"} // Never wrap this error

)

Functions

func HashSlice

func HashSlice(key []byte) uint64

func HashString

func HashString(key string) uint64

func IsHijackedResponse

func IsHijackedResponse(err error) bool

func KeyIDFromCryptoKey

func KeyIDFromCryptoKey(cryptoKey string) (keyID [4]byte)

first 4 bytes of cryptoKey are identifier. This is not a problem because arbitrary long keys are allowed.

func NoopLogf

func NoopLogf(string, ...interface{})

NoopLogf is a do-nothing log function

func ParseTrustedSubnets

func ParseTrustedSubnets(groups [][]string) (trustedSubnetGroups [][]*net.IPNet, errs []error)

Function returns all groups that parsed successfully and all errors

func SchemaToString

func SchemaToString(schema int32) string

Types

type Client

type Client struct {
	Logf LoggerFunc // defaults to log.Printf; set to NoopLogf to disable all logging

	TrustedSubnetGroups [][]string
	ForceEncryption     bool
	CryptoKey           string
	ConnReadBufSize     int
	ConnWriteBufSize    int
	PongTimeout         time.Duration // defaults to rpc.DefaultClientPongTimeout
	// contains filtered or unexported fields
}

func (*Client) Close

func (c *Client) Close() error

func (*Client) Do

func (c *Client) Do(ctx context.Context, network string, address string, req *Request) (*Response, error)

Do supports only "tcp4" and "unix" networks

func (*Client) DoMulti

func (c *Client) DoMulti(
	ctx context.Context,
	addresses []NetAddr,
	prepareRequest func(addr NetAddr, req *Request) error,
	processResponse func(addr NetAddr, resp *Response, err error) error,
) error

DoMulti is a convenient way of doing multiple RPCs at once. If you need more control, consider using Multi directly.

func (*Client) GetRequest

func (c *Client) GetRequest() *Request

func (*Client) Multi

func (c *Client) Multi(n int) *Multi

Multi must be followed with a call to Multi.Close to release request state resources

func (*Client) PutResponse

func (c *Client) PutResponse(resp *Response)

type ClusterClient

type ClusterClient struct {
	Client               *Client
	BigCluster           bool // set if expected cluster size is greater than 500 shards
	ShardSelectKeyModulo bool // override default Maglev sharding with simple modulo
	// contains filtered or unexported fields
}

func (*ClusterClient) DoAll

func (cc *ClusterClient) DoAll(
	ctx context.Context,
	write bool,
	prepareRequest func(addr NetAddr, req *Request) error,
	processResponse func(addr NetAddr, resp *Response, err error) error,
) error

func (*ClusterClient) DoAny

func (cc *ClusterClient) DoAny(ctx context.Context, write bool, req *Request) (*Response, error)

func (*ClusterClient) DoKey

func (cc *ClusterClient) DoKey(ctx context.Context, write bool, req *Request, key uint64) (*Response, error)

func (*ClusterClient) SelectAll

func (cc *ClusterClient) SelectAll(write bool) []NetAddr

func (*ClusterClient) SelectAny

func (cc *ClusterClient) SelectAny(write bool) NetAddr

SelectAny returns empty address when no backend is found

func (*ClusterClient) SelectKey

func (cc *ClusterClient) SelectKey(write bool, key uint64) NetAddr

SelectKey returns empty address when no backend is found

func (*ClusterClient) UpdateCluster

func (cc *ClusterClient) UpdateCluster(shards []ClusterShard) error

type ClusterShard

type ClusterShard struct {
	Name       string
	ReadNodes  []NetAddr
	WriteNodes []NetAddr
}

type DictionaryFieldString

type DictionaryFieldString struct {
	Key   string
	Value string
}

func (*DictionaryFieldString) Read

func (e *DictionaryFieldString) Read(w []byte) (_ []byte, err error)

func (*DictionaryFieldString) ReadBoxed

func (e *DictionaryFieldString) ReadBoxed(w []byte) (_ []byte, err error)

func (*DictionaryFieldString) Reset

func (e *DictionaryFieldString) Reset()

func (DictionaryFieldString) TLName

func (DictionaryFieldString) TLName() string

func (DictionaryFieldString) TLTag

func (DictionaryFieldString) TLTag() uint32

func (*DictionaryFieldString) Write

func (e *DictionaryFieldString) Write(w []byte) (_ []byte, err error)

func (*DictionaryFieldString) WriteBoxed

func (e *DictionaryFieldString) WriteBoxed(w []byte) ([]byte, error)

type Error

type Error struct {
	Code        int32
	Description string
}

func (Error) Error

func (err Error) Error() string

type HandlerContext

type HandlerContext struct {
	ActorID     []uint64
	Extra       []InvokeReqExtra
	QueryID     int64
	RequestTime time.Time

	Request []byte

	Response []byte

	// UserData allows caching common state between different requests.
	UserData interface{}
	// contains filtered or unexported fields
}

HandlerContext must not be used outside the handler

func GetHandlerContext

func GetHandlerContext(ctx context.Context) *HandlerContext

rpc.HandlerContext must never be used outside of the handler

func (*HandlerContext) AccountResponseMem

func (hctx *HandlerContext) AccountResponseMem(respBodySizeEstimate int) error

func (*HandlerContext) HijackResponse

func (hctx *HandlerContext) HijackResponse() error

HijackResponse releases Request bytes for reuse, so must be called only after Request processing is complete

func (*HandlerContext) KeyID

func (hctx *HandlerContext) KeyID() [4]byte

func (*HandlerContext) ListenAddr

func (hctx *HandlerContext) ListenAddr() net.Addr

func (*HandlerContext) LocalAddr

func (hctx *HandlerContext) LocalAddr() net.Addr

func (*HandlerContext) RemoteAddr

func (hctx *HandlerContext) RemoteAddr() net.Addr

func (*HandlerContext) SendHijackedResponse

func (hctx *HandlerContext) SendHijackedResponse(err error)

func (*HandlerContext) WithContext

func (hctx *HandlerContext) WithContext(ctx context.Context) context.Context

type HandlerFunc

type HandlerFunc func(ctx context.Context, hctx *HandlerContext) error

func ChainHandler

func ChainHandler(ff ...HandlerFunc) HandlerFunc

type InvokeReqExtra

type InvokeReqExtra struct {
	WaitBinlogPos               int64    // Conditional: {flags}.16
	StringForwardKeys           []string // Conditional: {flags}.18
	IntForwardKeys              []int64  // Conditional: {flags}.19
	StringForward               string   // Conditional: {flags}.20
	IntForward                  int64    // Conditional: {flags}.21
	CustomTimeoutMs             int32    // Conditional: {flags}.23
	SupportedCompressionVersion int32    // Conditional: {flags}.25
	RandomDelay                 float64  // Conditional: {flags}.26

	FailIfNoConnection bool // Experimental. Not serialized. Requests fail immediately when connection fails, so that switch to fallback is faster
	// contains filtered or unexported fields
}

InvokeReqExtra описывает следующий комбинатор:

rpcInvokeReqExtra {flags:#}
	return_binlog_pos:flags.0?%True
	return_binlog_time:flags.1?%True
	return_pid:flags.2?%True
	return_request_sizes:flags.3?%True
	return_failed_subqueries:flags.4?%True
	return_query_stats:flags.6?%True
	no_result:flags.7?%True
	wait_binlog_pos:flags.16?%Long
	string_forward_keys:flags.18?%(Vector %String)
	int_forward_keys:flags.19?%(Vector %Long)
	string_forward:flags.20?%String
	int_forward:flags.21?%Long
	custom_timeout_ms:flags.23?%Int
	supported_compression_version:flags.25?%Int
	random_delay:flags.26?%Double
	= RpcInvokeReqExtra flags

func (*InvokeReqExtra) IsSetCustomTimeoutMs

func (e *InvokeReqExtra) IsSetCustomTimeoutMs() bool

func (*InvokeReqExtra) IsSetIntForward

func (e *InvokeReqExtra) IsSetIntForward() bool

func (*InvokeReqExtra) IsSetIntForwardKeys

func (e *InvokeReqExtra) IsSetIntForwardKeys() bool

func (*InvokeReqExtra) IsSetNoResult

func (e *InvokeReqExtra) IsSetNoResult() bool

func (*InvokeReqExtra) IsSetRandomDelay

func (e *InvokeReqExtra) IsSetRandomDelay() bool

func (*InvokeReqExtra) IsSetReturnBinlogPos

func (e *InvokeReqExtra) IsSetReturnBinlogPos() bool

func (*InvokeReqExtra) IsSetReturnBinlogTime

func (e *InvokeReqExtra) IsSetReturnBinlogTime() bool

func (*InvokeReqExtra) IsSetReturnFailedSubqueries

func (e *InvokeReqExtra) IsSetReturnFailedSubqueries() bool

func (*InvokeReqExtra) IsSetReturnPid

func (e *InvokeReqExtra) IsSetReturnPid() bool

func (*InvokeReqExtra) IsSetReturnQueryStats

func (e *InvokeReqExtra) IsSetReturnQueryStats() bool

func (*InvokeReqExtra) IsSetReturnRequestSizes

func (e *InvokeReqExtra) IsSetReturnRequestSizes() bool

func (*InvokeReqExtra) IsSetStringForward

func (e *InvokeReqExtra) IsSetStringForward() bool

func (*InvokeReqExtra) IsSetStringForwardKeys

func (e *InvokeReqExtra) IsSetStringForwardKeys() bool

func (*InvokeReqExtra) IsSetSupportedCompressionVersion

func (e *InvokeReqExtra) IsSetSupportedCompressionVersion() bool

func (*InvokeReqExtra) IsSetWaitBinlogPos

func (e *InvokeReqExtra) IsSetWaitBinlogPos() bool

func (*InvokeReqExtra) Read

func (e *InvokeReqExtra) Read(w []byte) (_ []byte, err error)

func (*InvokeReqExtra) SetCustomTimeoutMs

func (e *InvokeReqExtra) SetCustomTimeoutMs(v int32)

func (*InvokeReqExtra) SetIntForward

func (e *InvokeReqExtra) SetIntForward(v int64)

func (*InvokeReqExtra) SetIntForwardKeys

func (e *InvokeReqExtra) SetIntForwardKeys(v []int64)

func (*InvokeReqExtra) SetNoResult

func (e *InvokeReqExtra) SetNoResult()

func (*InvokeReqExtra) SetRandomDelay

func (e *InvokeReqExtra) SetRandomDelay(v float64)

func (*InvokeReqExtra) SetReturnBinlogPos

func (e *InvokeReqExtra) SetReturnBinlogPos()

func (*InvokeReqExtra) SetReturnBinlogTime

func (e *InvokeReqExtra) SetReturnBinlogTime()

func (*InvokeReqExtra) SetReturnFailedSubqueries

func (e *InvokeReqExtra) SetReturnFailedSubqueries()

func (*InvokeReqExtra) SetReturnPid

func (e *InvokeReqExtra) SetReturnPid()

func (*InvokeReqExtra) SetReturnQueryStats

func (e *InvokeReqExtra) SetReturnQueryStats()

func (*InvokeReqExtra) SetReturnRequestSizes

func (e *InvokeReqExtra) SetReturnRequestSizes()

func (*InvokeReqExtra) SetStringForward

func (e *InvokeReqExtra) SetStringForward(v string)

func (*InvokeReqExtra) SetStringForwardKeys

func (e *InvokeReqExtra) SetStringForwardKeys(v []string)

func (*InvokeReqExtra) SetSupportedCompressionVersion

func (e *InvokeReqExtra) SetSupportedCompressionVersion(v int32)

func (*InvokeReqExtra) SetWaitBinlogPos

func (e *InvokeReqExtra) SetWaitBinlogPos(v int64)

func (*InvokeReqExtra) Write

func (e *InvokeReqExtra) Write(w []byte) (_ []byte, err error)

type LoggerFunc

type LoggerFunc func(format string, args ...interface{})

type Multi

type Multi struct {
	// contains filtered or unexported fields
}

func (*Multi) Close

func (m *Multi) Close()

func (*Multi) Start

func (m *Multi) Start(ctx context.Context, network string, address string, req *Request, id uint64) error

func (*Multi) Wait

func (m *Multi) Wait(ctx context.Context, id uint64) (*Response, error)

func (*Multi) WaitAny

func (m *Multi) WaitAny(ctx context.Context) (uint64, *Response, error)

type NetAddr

type NetAddr struct {
	Network string
	Address string
}

func (NetAddr) String

func (na NetAddr) String() string

type NetPID

type NetPID struct {
	IP   uint32
	Port uint16
	PID  uint16
	Time int32
}

type PacketConn

type PacketConn struct {
	// contains filtered or unexported fields
}

transport stream, encrypted using standard VK rpc scheme

func NewPacketConn

func NewPacketConn(c net.Conn, readBufSize int, writeBufSize int, timeoutAccuracy time.Duration) *PacketConn

func (*PacketConn) Close

func (pc *PacketConn) Close() error

func (*PacketConn) HandshakeClient

func (pc *PacketConn) HandshakeClient(cryptoKey string, trustedSubnetGroups [][]*net.IPNet, forceEncryption bool, startTime int32, flags uint32) error

func (*PacketConn) HandshakeServer

func (pc *PacketConn) HandshakeServer(cryptoKeys []string, trustedSubnetGroups [][]*net.IPNet, forceEncryption bool, startTime int32) ([]byte, uint32, error)

func (*PacketConn) LocalAddr

func (pc *PacketConn) LocalAddr() string

func (*PacketConn) ReadPacket

func (pc *PacketConn) ReadPacket(body []byte, timeout time.Duration) (tip uint32, _ []byte, err error)

ReadPacket will resize/reuse body to size of packet

func (*PacketConn) RemoteAddr

func (pc *PacketConn) RemoteAddr() string

func (*PacketConn) ShutdownWrite

func (pc *PacketConn) ShutdownWrite() error

Motivation - you call ShutdownWrite, and your blocking ReadPacket* will stop after receiveing FIN with compatible sockets if you receive error for this method, you should call Close()

func (*PacketConn) WritePacket

func (pc *PacketConn) WritePacket(packetType uint32, body []byte, timeout time.Duration) error

type ReqResultExtra

type ReqResultExtra struct {
	BinlogPos          int64             // Conditional: {flags}.0
	BinlogTime         int64             // Conditional: {flags}.1
	EnginePID          NetPID            // Conditional: {flags}.2
	RequestSize        int32             // Conditional: {flags}.3
	ResponseSize       int32             // Conditional: {flags}.3
	FailedSubqueries   int32             // Conditional: {flags}.4
	CompressionVersion int32             // Conditional: {flags}.5
	Stats              map[string]string // Conditional: {flags}.6
	// contains filtered or unexported fields
}

ReqResultExtra описывает следующий комбинатор:

rpcReqResultExtra {flags:#} binlog_pos:flags.0?%Long binlog_time:flags.1?%Long engine_pid:flags.2?%net.Pid request_size:flags.3?%Int response_size:flags.3?%Int failed_subqueries:flags.4?%Int compression_version:flags.5?%Int stats:flags.6?%(Dictionary %String) = RpcReqResultExtra flags

func (*ReqResultExtra) IsSetBinlogPos

func (e *ReqResultExtra) IsSetBinlogPos() bool

func (*ReqResultExtra) IsSetBinlogTime

func (e *ReqResultExtra) IsSetBinlogTime() bool

func (*ReqResultExtra) IsSetCompressionVersion

func (e *ReqResultExtra) IsSetCompressionVersion() bool

func (*ReqResultExtra) IsSetEnginePID

func (e *ReqResultExtra) IsSetEnginePID() bool

func (*ReqResultExtra) IsSetFailedSubqueries

func (e *ReqResultExtra) IsSetFailedSubqueries() bool

func (*ReqResultExtra) IsSetRequestSize

func (e *ReqResultExtra) IsSetRequestSize() bool

func (*ReqResultExtra) IsSetResponseSize

func (e *ReqResultExtra) IsSetResponseSize() bool

func (*ReqResultExtra) IsSetStats

func (e *ReqResultExtra) IsSetStats() bool

func (*ReqResultExtra) Read

func (e *ReqResultExtra) Read(w []byte) (_ []byte, err error)

func (*ReqResultExtra) SetBinlogPos

func (e *ReqResultExtra) SetBinlogPos(v int64)

func (*ReqResultExtra) SetBinlogTime

func (e *ReqResultExtra) SetBinlogTime(v int64)

func (*ReqResultExtra) SetCompressionVersion

func (e *ReqResultExtra) SetCompressionVersion(v int32)

func (*ReqResultExtra) SetEnginePID

func (e *ReqResultExtra) SetEnginePID(v NetPID)

func (*ReqResultExtra) SetFailedSubqueries

func (e *ReqResultExtra) SetFailedSubqueries(v int32)

func (*ReqResultExtra) SetRequestSize

func (e *ReqResultExtra) SetRequestSize(v int32)

func (*ReqResultExtra) SetResponseSize

func (e *ReqResultExtra) SetResponseSize(v int32)

func (*ReqResultExtra) SetStats

func (e *ReqResultExtra) SetStats(v map[string]string)

func (*ReqResultExtra) Write

func (e *ReqResultExtra) Write(w []byte) (_ []byte, err error)

type Request

type Request struct {
	Body    []byte
	ActorID uint64
	Extra   InvokeReqExtra
}

type Response

type Response struct {
	Body  []byte
	Extra []ReqResultExtra
	// contains filtered or unexported fields
}

type Server

type Server struct {
	Handler          HandlerFunc
	StatsHandler     StatsHandlerFunc
	VerbosityHandler VerbosityHandlerFunc
	Version          string
	Logf             LoggerFunc // defaults to log.Printf; set to NoopLogf to disable all logging

	TransportHijackHandler func(conn *PacketConn) // Experimental, server handles connection to this function if FlagHijackTransport client flag set

	TrustedSubnetGroups    [][]string
	ForceEncryption        bool
	CryptoKeys             []string
	MaxConns               int           // defaults to DefaultMaxConns
	MaxWorkers             int           // defaults to DefaultMaxWorkers; negative values disable worker pool completely
	MaxInflightPackets     int           // defaults to DefaultMaxInflightPackets
	RequestMemoryLimit     int           // defaults to DefaultRequestMemoryLimit
	ResponseMemoryLimit    int           // defaults to DefaultResponseMemoryLimit
	ConnReadBufSize        int           // defaults to DefaultServerConnReadBufSize
	ConnWriteBufSize       int           // defaults to DefaultServerConnWriteBufSize
	RequestBufSize         int           // defaults to DefaultServerRequestBufSize
	ResponseBufSize        int           // defaults to DefaultServerResponseBufSize
	ResponseMemEstimate    int           // defaults to DefaultResponseMemEstimate; must be greater than ResponseBufSize
	DefaultResponseTimeout time.Duration // defaults to no timeout
	ResponseTimeoutAdjust  time.Duration
	DisableContextTimeout  bool
	DisableTCPReuseAddr    bool
	LogCommonNetworkErrors bool
	// contains filtered or unexported fields
}

func (*Server) Close

func (s *Server) Close() error

Close stops server from accepting new requests, closes all connections and waits for all goroutines to exit.

func (*Server) ListenAndServe

func (s *Server) ListenAndServe(network string, address string) error

ListenAndServe supports only "tcp4" and "unix" networks

func (*Server) Serve

func (s *Server) Serve(ln net.Listener) error

type StatsHandlerFunc

type StatsHandlerFunc func(map[string]string)

type TargetError

type TargetError struct {
	Address NetAddr
	Err     error
}

func (TargetError) Error

func (err TargetError) Error() string

func (TargetError) Unwrap

func (err TargetError) Unwrap() error

type VerbosityHandlerFunc

type VerbosityHandlerFunc func(int) error

Directories

Path Synopsis
internal

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL