transport

package
v0.0.0-...-5e9c659 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 20, 2024 License: Apache-2.0 Imports: 37 Imported by: 0

README

传输层 包括TCP HTTP GRPC

传输层 包含SOCK 传输编码定义 最后提供server调用

Documentation

Overview

Package socket provides a pseudo socket

Index

Constants

View Source
const (
	MagicNumber byte = 0x08

	// Request is message type of request
	MT_ERROR MessageType = iota
	MT_REQUEST
	MT_RESPONSE // Response is message type of response
)

Variables

View Source
var (
	// Default is 0 that means does not limit length of messages.
	// It is used to validate when read messages from io.Reader.
	MaxMessageLength = 0
	// ErrMetaKVMissing some keys or values are mssing.
	ErrMetaKVMissing = errors.New("wrong metadata lines. some keys or values are missing")
	// ErrMessageToLong message is too long
	ErrMessageToLong = errors.New("message is too long")
)
View Source
var (
	// RemoteConnContextKey is a context key. It can be used in
	// services with context.WithValue to access the connection arrived on.
	// The associated value will be of type net.Conn.
	RemoteConnContextKey = &contextKey{"remote-conn"}
	// StartRequestContextKey records the start time
	StartRequestContextKey = &contextKey{"start-parse-request"}
	// StartSendRequestContextKey records the start time
	StartSendRequestContextKey = &contextKey{"start-send-request"}
)
View Source
var (
	DefaultTimeout = time.Second * 5
)

Functions

func Logger

func Logger() logger.ILogger

func NewProxyDialer

func NewProxyDialer(proxyURLStr string, UserAgent string) (proxy.Dialer, error)

newConnectDialer creates a dialer to issue CONNECT requests and tunnel traffic via HTTP/S proxy. proxyUrlStr must provide Scheme and Host, may provide credentials and port. Example: https://username:password@golang.org:443

func NewTcpTransportSocket

func NewTcpTransportSocket(conn net.Conn, readTimeout, writeTimeout time.Duration) *tcpTransportSocket

func ParseDateString

func ParseDateString(dt string) (time.Time, error)

ParseDateString takes a string and passes it through Approxidate Parses into a time.Time

func PutMessageToPool

func PutMessageToPool(msg *Message)

func Unzip

func Unzip(data []byte) ([]byte, error)

Unzip unzips data.

func Zip

func Zip(data []byte) ([]byte, error)

Zip zips data.

Types

type Bom

type Bom [12]byte

byte-order mark is the first part of Message and has fixed size. Format:

func (Bom) CheckMagicNumber

func (self Bom) CheckMagicNumber() bool

CheckMagicNumber checks whether header starts rpc magic number.

func (Bom) CompressType

func (self Bom) CompressType() CompressType

CompressType returns compression type of messages.

func (Bom) IsHeartbeat

func (self Bom) IsHeartbeat() bool

IsHeartbeat returns whether the message is heartbeat message.

func (Bom) IsOneway

func (self Bom) IsOneway() bool

IsOneway returns whether the message is one-way message. If true, server won't send responses.

func (Bom) MessageStatusType

func (self Bom) MessageStatusType() MessageStatusType

MessageStatusType returns the message status type.

func (Bom) MessageType

func (self Bom) MessageType() MessageType

MessageType returns the message type.

func (Bom) Seq

func (self Bom) Seq() uint64

Seq returns sequence number of messages.

func (Bom) SerializeType

func (self Bom) SerializeType() codec.SerializeType

SerializeType returns serialization type of payload.

func (*Bom) SetCompressType

func (self *Bom) SetCompressType(ct CompressType)

SetCompressType sets the compression type.

func (*Bom) SetHeartbeat

func (self *Bom) SetHeartbeat(hb bool)

SetHeartbeat sets the heartbeat flag.

func (*Bom) SetMessageStatusType

func (self *Bom) SetMessageStatusType(mt MessageStatusType)

SetMessageStatusType sets message status type.

func (*Bom) SetMessageType

func (self *Bom) SetMessageType(mt MessageType)

SetMessageType sets message type.

func (*Bom) SetOneway

func (self *Bom) SetOneway(oneway bool)

SetOneway sets the oneway flag.

func (*Bom) SetSeq

func (self *Bom) SetSeq(seq uint64)

SetSeq sets sequence number.

func (*Bom) SetSerializeType

func (self *Bom) SetSerializeType(st codec.SerializeType)

SetSerializeType sets the serialization type.

func (*Bom) SetVersion

func (self *Bom) SetVersion(v byte)

SetVersion sets version for this Bom.

func (Bom) Version

func (self Bom) Version() byte

Version returns version of rpc protocol.

type CompressType

type CompressType byte

CompressType defines decompression type.

const (
	// None does not compress.
	None CompressType = iota
	// Gzip uses gzip compression.
	Gzip
)

type Config

type Config struct {
	config.Config
	Name       string `field:"-"` // config name/path in config file
	PrefixName string `field:"-"` // config prefix name
	Listener   IListener
	// Addrs is the list of intermediary addresses to connect to
	Addrs []string

	// 证书
	EnableACME   bool          `field:"enable_acme"`
	ACMEHosts    []string      `field:"acme_hosts"`
	ACMEProvider acme.Provider `field:"-"`
	// Secure tells the transport to secure the connection.
	// In the case TLSConfig is not specified best effort self-signed
	// certs should be used
	Secure bool
	// TLSConfig to secure the connection. The assumption is that this
	// is mTLS keypair
	TlsConfig *tls.Config

	//DialTimeout sets timeout for dialing
	DialTimeout time.Duration
	// ReadTimeout sets readdeadline for underlying net.Conns
	ReadTimeout time.Duration
	// WriteTimeout sets writedeadline for underlying net.Conns
	WriteTimeout time.Duration

	// Other options for implementations of the interface
	// can be stored in a context
	Context context.Context
}

func (*Config) Init

func (self *Config) Init(opts ...Option)

func (*Config) Load

func (self *Config) Load() error

func (*Config) Save

func (self *Config) Save(immed ...bool) error

func (*Config) String

func (self *Config) String() string

type ContextKeyHeader

type ContextKeyHeader struct{}

ContextKeyHeader Users of context.WithValue should define their own types for keys

type Cookie struct {
	Name  string `json:"name"`
	Value string `json:"value"`

	Path        string `json:"path"`   // optional
	Domain      string `json:"domain"` // optional
	Expires     time.Time
	JSONExpires Time   `json:"expires"`    // optional
	RawExpires  string `json:"rawExpires"` // for reading cookies only

	// MaxAge=0 means no 'Max-Age' attribute specified.
	// MaxAge<0 means delete cookie now, equivalently 'Max-Age: 0'
	// MaxAge>0 means Max-Age attribute present and given in seconds
	MaxAge   int           `json:"maxAge"`
	Secure   bool          `json:"secure"`
	HTTPOnly bool          `json:"httpOnly"`
	SameSite http.SameSite `json:"sameSite"`
	Raw      string
	Unparsed []string `json:"unparsed"` // Raw text of unparsed attribute-value pairs
}

A Cookie represents an HTTP cookie as sent in the Set-Cookie header of an HTTP response or the Cookie header of an HTTP request.

See https://tools.ietf.org/html/rfc6265 for details. Stolen from Net/http/cookies

type DialConfig

type DialConfig struct {
	// Tells the transport this is a streaming connection with
	// multiple calls to send/recv and that send may not even be called
	Stream bool
	// Other options for implementations of the interface
	// can be stored in a context
	Secure      bool
	DialTimeout time.Duration
	// ReadTimeout sets readdeadline for underlying net.Conns
	ReadTimeout time.Duration
	// WriteTimeout sets writedeadline for underlying net.Conns
	WriteTimeout time.Duration

	// TODO: add tls options when dialling
	// Currently set in global options
	Ja3      Ja3 // TODO 添加加缓存
	ProxyURL string

	Network string

	Context context.Context
	// contains filtered or unexported fields
}

func (*DialConfig) Init

func (self *DialConfig) Init(opts ...DialOption)

type DialOption

type DialOption func(*DialConfig)

func WithContext

func WithContext(ctx context.Context) DialOption

func WithDialTimeout

func WithDialTimeout(timeout time.Duration) DialOption

func WithDialer

func WithDialer(dialer proxy.Dialer) DialOption

func WithJa3

func WithJa3(ja3, userAgent string) DialOption

func WithNetwork

func WithNetwork(network string) DialOption

func WithProxyURL

func WithProxyURL(proxyURL string) DialOption

func WithReadTimeout

func WithReadTimeout(timeout time.Duration) DialOption

func WithStream

func WithStream() DialOption

Indicates whether this is a streaming connection

func WithTLS

func WithTLS() DialOption

func WithTimeout

func WithTimeout(dial, read, write time.Duration) DialOption

func WithWriteTimeout

func WithWriteTimeout(timeout time.Duration) DialOption

type ErrExtensionNotExist

type ErrExtensionNotExist string

ErrExtensionNotExist is returned when an extension is not supported by the library

func (ErrExtensionNotExist) Error

func (e ErrExtensionNotExist) Error() string

Error is the error value which contains the extension that does not exist

type Handler

type Handler interface {
	String() string
	Handler() interface{}
}

the handler interface

type HttpConn

type HttpConn struct {
	// contains filtered or unexported fields
}

func (*HttpConn) Close

func (h *HttpConn) Close() error

func (*HttpConn) Conn

func (t *HttpConn) Conn() net.Conn

func (*HttpConn) Local

func (h *HttpConn) Local() string

func (*HttpConn) Recv

func (h *HttpConn) Recv(m *Message) error

func (*HttpConn) Remote

func (h *HttpConn) Remote() string

func (*HttpConn) Request

func (self *HttpConn) Request() *http.Request

func (*HttpConn) Response

func (self *HttpConn) Response() http.ResponseWriter

func (*HttpConn) Send

func (h *HttpConn) Send(m *Message) error

type HttpTransport

type HttpTransport struct {
	sync.Mutex
	// contains filtered or unexported fields
}

func NewHTTPTransport

func NewHTTPTransport(opts ...Option) *HttpTransport

func (*HttpTransport) Config

func (self *HttpTransport) Config() *Config
func (h *httpTransport) Request(msg Message, sock *Socket, cde codec.ICodec) IRequest {
	return nil
}
func (h *httpTransport) Response(sock *Socket, cde codec.ICodec) IResponse {
	return nil
}

func (*HttpTransport) Dial

func (self *HttpTransport) Dial(addr string, opts ...DialOption) (IClient, error)

to make a Dial with server

func (*HttpTransport) Init

func (self *HttpTransport) Init(opts ...Option) error

func (*HttpTransport) Listen

func (self *HttpTransport) Listen(addr string, opts ...ListenOption) (IListener, error)

func (*HttpTransport) Protocol

func (self *HttpTransport) Protocol() string

func (*HttpTransport) String

func (self *HttpTransport) String() string

type IBody

type IBody interface {
	Read(interface{}) error
	Write(interface{}) error
}

type IClient

type IClient interface {
	ISocket
	Transport() ITransport
}

type IHeader

type IHeader interface {
	Add(key, value string)
	Set(key, value string)
	Get(key string) string
	//has(key string) bool
	Del(key string)
}

type IListener

type IListener interface {
	Addr() net.Addr
	Close() error
	Accept() (net.Conn, error)
	//Serve(func(ISocket)) error // 阻塞监听
	Serve(Handler) error // 阻塞监听

}

type IRequest

type IRequest interface {
	// The service to call
	Service() string
	// The action to take
	Method() string
	// The content type
	ContentType() string
	// write a response directly to the client
	Body() IBody // *body.TBody

	Codec() codec.ICodec
}

type IResponse

type IResponse interface {
	// write a response directly to the client
	Write([]byte) (int, error)
	WriteStream(interface{}) error
	Body() *body.TBody
}

提供给服务器客户端最基本接口

type IResponseWriter

type IResponseWriter interface {
	http.ResponseWriter
	// Status returns the status code of the response or 0 if the response has not been written.
	Status() int
	// Written returns whether or not the ResponseWriter has been written.
	Written() bool
	// Size returns the size of the response body.
	Size() int
}

参考Tango

type ISocket

type ISocket interface {
	Recv(*Message) error
	Send(*Message) error
	Close() error
	Local() string  // Local IP
	Remote() string // Remote IP
	Conn() net.Conn // 接口提供更灵活扩展
}

type ITransport

type ITransport interface {
	Init(...Option) error
	Config() *Config
	Dial(addr string, opts ...DialOption) (IClient, error)       // for client 详细查看pool.NewPool
	Listen(addr string, opts ...ListenOption) (IListener, error) // for server
	String() string
	Protocol() string
}

func Default

func Default(set ...ITransport) ITransport

func NewTCPTransport

func NewTCPTransport(opts ...Option) ITransport

type Ja3

type Ja3 struct {
	Ja3       string
	UserAgent string
	Hash      string
}

type ListenConfig

type ListenConfig struct {

	// Other options for implementations of the interface
	// can be stored in a context
	Context context.Context
}

type ListenOption

type ListenOption func(*ListenConfig)

type Message

type Message struct {
	*Bom // 字节码
	//Service  string            // service path
	//Endpoint string            // method path
	Path    string            // the path
	Header  map[string]string // 消息头
	Body    []byte            // 消息主体
	Payload []byte            // 消息主体中的内容

}

func GetMessageFromPool

func GetMessageFromPool() *Message

func ReadMessage

func ReadMessage(r io.Reader) (*Message, error)

Read reads a message from r.

func (Message) CloneTo

func (m Message) CloneTo(msg *Message) *Message

Clone clones from an message.

func (*Message) Decode

func (m *Message) Decode(r io.Reader) error

Decode decodes a message from reader.

func (Message) Encode

func (m Message) Encode() []byte

Encode encodes messages.

func (*Message) Reset

func (m *Message) Reset()

Reset clean data of this message but keep allocated data

type MessageStatusType

type MessageStatusType byte

MessageStatusType is status of messages.

const (
	StatusOK MessageStatusType = iota // Normal Not an error; returned on success.
	// Error indicates some errors occur.
	StatusError
	//Internal errors. This means that some invariants expected by the underlying system have been broken. This error code is reserved for serious errors.
	StatusInternalError
	// The caller does not have permission to execute the specified operation. StatusForbidden must not be used for rejections caused by exhausting some resource (use RESOURCE_EXHAUSTED instead for those errors). StatusForbidden must not be used if the caller can not be identified (use UNAUTHENTICATED instead for those errors). This error code does not imply the request is valid or the requested entity exists or satisfies other pre-conditions.
	StatusForbidden
	StatusNotFound
	StatusUnknown
	StatusAborted
	// The operation is not implemented or is not supported/enabled in this service.
	StatusNotImplemented
	// The service is currently unavailable. This is most likely a transient condition, which can be corrected by retrying with a backoff. Note that it is not always safe to retry non-idempotent operations.
	StatusServiceUnavailable
	// The request does not have valid authentication credentials for the operation.
	StatusUnauthorized
)

type MessageType

type MessageType byte

*

  • Bom Protocol
  • +-------MagicNumber(1Byte)-------|------MessageType(Byte)--------+
  • +----------------------------------------------------------------+
  • | 0| LENGTH |
  • +----------------------------------------------------------------+
  • | 0| HEADER MAGIC | FLAGS |
  • +----------------------------------------------------------------+
  • | SEQUENCE NUMBER |
  • +----------------------------------------------------------------+
  • | 0| Header Size(/32) | ...
  • +--------------------------------- *
  • Message is of variable size:
  • (and starts at offset 14) *
  • +----------------------------------------------------------------+
  • | Bom 12
  • +----------------------------------------------------------------+
  • | Path
  • +----------------------------------------------------------------+
  • | Header |
  • +----------------------------------------------------------------+
  • | INFO 0 ID (uint8) | INFO 0 DATA ...
  • +----------------------------------------------------------------+
  • | ... ... |
  • +----------------------------------------------------------------+
  • | |
  • | PAYLOAD |
  • | |
  • +----------------------------------------------------------------+

MessageType is message type of requests and resposnes.

type Option

type Option func(*Config)

func ACMEHosts

func ACMEHosts(hosts ...string) Option

func ACMEProvider

func ACMEProvider(p acme.Provider) Option

func Addrs

func Addrs(addrs ...string) Option

Addrs to use for transport

func Debug

func Debug() Option

func DialTimeout

func DialTimeout(t time.Duration) Option

Timeout sets the timeout for Send/Recv execution

func EnableACME

func EnableACME(b bool) Option

func ReadTimeout

func ReadTimeout(t time.Duration) Option

Timeout sets the timeout for Send/Recv execution

func Secure

func Secure(b bool) Option

Use secure communication. If TLSConfig is not specified we use InsecureSkipVerify and generate a self signed cert

func TLSConfig

func TLSConfig(t *tls.Config) Option

TLSConfig to be used for the transport.

func Timeout

func Timeout(t time.Duration) Option

Timeout sets the timeout for Send/Recv execution

func WithConfigPrefixName

func WithConfigPrefixName(prefixName string) Option

修改Config.json的路径

func WriteTimeout

func WriteTimeout(t time.Duration) Option

Timeout sets the timeout for Send/Recv execution

type Pool

type Pool struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

func NewPool

func NewPool() *Pool

NewPool returns a new socket pool

func (*Pool) Close

func (p *Pool) Close()

Close the pool and delete all the sockets

func (*Pool) Get

func (p *Pool) Get(id string) (*Socket, bool)

func (*Pool) Release

func (p *Pool) Release(s *Socket)

type RpcRequest

type RpcRequest struct {
	Message    *Message
	RemoteAddr string

	// Context is either the client or server context. It should only
	// be modified via copying the whole Request using WithContext.
	// It is unexported to prevent people from using Context wrong
	// and mutating the contexts held by callers of the same request.
	Context context.Context
	// contains filtered or unexported fields
}

func NewRpcRequest

func NewRpcRequest(ctx context.Context, message *Message, socket ISocket) *RpcRequest

提供给Router的context使用

func (*RpcRequest) Body

func (self *RpcRequest) Body() *body.TBody

func (*RpcRequest) Codec

func (self *RpcRequest) Codec() codec.ICodec

func (*RpcRequest) ContentType

func (self *RpcRequest) ContentType() string

func (*RpcRequest) Endpoint

func (self *RpcRequest) Endpoint() string

func (*RpcRequest) Header

func (self *RpcRequest) Header() header.Header

header 这是通讯协议包中附带的数据,有区别于body内

func (*RpcRequest) Method

func (self *RpcRequest) Method() string

func (*RpcRequest) Service

func (self *RpcRequest) Service() string

func (*RpcRequest) Stream

func (self *RpcRequest) Stream() bool

type RpcResponse

type RpcResponse struct {
	Request *RpcRequest // request for this response
	// contains filtered or unexported fields
}

func NewRpcResponse

func NewRpcResponse(ctx context.Context, req *RpcRequest, socket ISocket) *RpcResponse

提供给Router的context使用

func (*RpcResponse) Body

func (self *RpcResponse) Body() *body.TBody

func (*RpcResponse) Write

func (self *RpcResponse) Write(b []byte) (int, error)

func (*RpcResponse) WriteHeader

func (self *RpcResponse) WriteHeader(code MessageStatusType)

TODO 写状态

func (*RpcResponse) WriteStream

func (self *RpcResponse) WriteStream(data interface{}) error

write data as stream

type Socket

type Socket struct {
	// contains filtered or unexported fields
}

Socket is our pseudo socket for transport.Socket

func New

func New(id string) *Socket

New returns a new pseudo socket which can be used in the place of a transport socket. Messages are sent to the socket via Accept and receives from the socket via Process. SetLocal/SetRemote should be called before using the socket.

func (*Socket) Accept

func (s *Socket) Accept(m *Message) error

Accept passes a message to the socket which will be processed by the call to Recv

func (*Socket) Close

func (s *Socket) Close() error

Close closes the socket

func (*Socket) Local

func (s *Socket) Local() string

func (*Socket) Process

func (s *Socket) Process(m *Message) error

Process takes the next message off the send queue created by a call to Send

func (*Socket) Recv

func (s *Socket) Recv(m *Message) error

func (*Socket) Remote

func (s *Socket) Remote() string

func (*Socket) Send

func (s *Socket) Send(m *Message) error

func (*Socket) SetLocal

func (s *Socket) SetLocal(l string)

func (*Socket) SetRemote

func (s *Socket) SetRemote(r string)

type THttpRequest

type THttpRequest struct {
	*http.Request
	Transport ITransport
	// contains filtered or unexported fields
}

func NewHttpRequest

func NewHttpRequest(req *http.Request) *THttpRequest

提供给Router的context使用

func (*THttpRequest) Body

func (self *THttpRequest) Body() *body.TBody

func (*THttpRequest) Codec

func (self *THttpRequest) Codec() codec.ICodec

func (*THttpRequest) Header

func (self *THttpRequest) Header() http.Header

Header of the request

func (*THttpRequest) Interface

func (self *THttpRequest) Interface() interface{}

func (*THttpRequest) Read

func (self *THttpRequest) Read() ([]byte, error)

Body is the initial decoded value Body() interface{} Read the undecoded request body

func (*THttpRequest) Stream

func (self *THttpRequest) Stream() bool

The encoded message stream Codec() codec.Reader Indicates whether its a stream

type THttpResponse

type THttpResponse struct {
	http.ResponseWriter

	Val reflect.Value
	// contains filtered or unexported fields
}

参考Tango

func NewHttpResponse

func NewHttpResponse(ctx context.Context, req *THttpRequest) *THttpResponse

func (*THttpResponse) Body

func (self *THttpResponse) Body() *body.TBody

func (*THttpResponse) Close

func (self *THttpResponse) Close()

func (*THttpResponse) Connect

func (self *THttpResponse) Connect(w http.ResponseWriter)

Inite and Connect a new ResponseWriter when a new request is coming

func (*THttpResponse) Flush

func (self *THttpResponse) Flush()

func (*THttpResponse) Hijack

func (self *THttpResponse) Hijack() (net.Conn, *bufio.ReadWriter, error)

Hijack让调用者接管连接,在调用Hijack()后,http server库将不再对该连接进行处理,对于该连接的管理和关闭责任将由调用者接管.

func (*THttpResponse) Size

func (self *THttpResponse) Size() int

func (*THttpResponse) Status

func (self *THttpResponse) Status() int

func (*THttpResponse) Value

func (self *THttpResponse) Value() reflect.Value

func (*THttpResponse) Write

func (self *THttpResponse) Write(b []byte) (int, error)

func (*THttpResponse) WriteHeader

func (self *THttpResponse) WriteHeader(s int)

func (*THttpResponse) WriteStream

func (self *THttpResponse) WriteStream(data interface{}) error

write data as stream

func (*THttpResponse) Written

func (self *THttpResponse) Written() bool

writable

type Time

type Time struct {
	time.Time
}

Time wraps time.Time overriddin the json marshal/unmarshal to pass timestamp as integer

func (*Time) UnmarshalJSON

func (t *Time) UnmarshalJSON(buf []byte) error

UnmarshalJSON implements json.Unmarshaler inferface.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL