tcpx

package module
v0.0.0-...-0c5332c Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 20, 2021 License: MIT Imports: 28 Imported by: 96

README

A very convenient tcp framework in golang.

Start

go get github.com/fwhezfwhez/tcpx

Dependency

if you want to run program in this repo,you should prepare protoc,proto-gen-go environment. It's good to compile yourself from these repos,but there is already release versions referring to their doc. Make sure run protoc --version available.

protoc: https://github.com/golang/protobuf

proto-gen-go:https://github.com/golang/protobuf/tree/master/protoc-gen-go

Benchmark

https://github.com/fwhezfwhez/tcpx/blob/master/benchmark_test.go

cases exec times cost time per loop cost mem per loop cost object num per loop url
OnMessage 2000000 643 ns/op 1368 B/op 5 allocs/op click to location
Mux without middleware 2000000 761 ns/op 1368 B/op 5 allocs/op click to location
Mux with middleware 2000000 768 ns/op 1368 B/op 5 allocs/op click to location
Pack

Tcpx has its well-designed pack. To focus on detail, you can refer to: https://github.com/fwhezfwhez/tcpx/tree/master/examples/modules/pack-detail

[4]byte -- length             fixed_size,binary big endian encode
[4]byte -- messageID          fixed_size,binary big endian encode
[4]byte -- headerLength       fixed_size,binary big endian encode
[4]byte -- bodyLength         fixed_size,binary big endian encode
[]byte -- header              marshal by json
[]byte -- body                marshal by marshaller

According to this pack rule, tcpx has 2 well-designed routing ways and their pack structure:

messageID type pack

header:
{
    "Router-Type": "MESSAGE_ID"
}

urlPattern pack

header:
{
    "Router-Type": "URL_PATTERN"
    "Router-Pattern-Value": "/login/"
}
Chat

https://github.com/fwhezfwhez/tcpx/tree/master/examples/modules/chat

It examples a chat using tcpx.

Raw

https://github.com/fwhezfwhez/tcpx/tree/master/examples/modules/raw

It examples how to send stream without rule, nothing to do with messageID/urlPattern system. You can send all stream you want. Global middleware and anchor middleware are still working as the example said.

IM

Here is an example of IM system using tcpx.

https://github.com/q1n9-jair/tcpx-demo

Product practice

tcpx云架构

tcpx云架构

Documentation

Overview

Package tcpx provides udp,tcp,kcp three kinds of protocol.

Index

Constants

View Source
const (
	// debug mode, logger of tcpx will print
	DEBUG = 1 + iota
	// release mode, logger of tcpx will not print
	RELEASE
)
View Source
const (
	SERVER_ERROR = 500
	CLIENT_ERROR = 400
	OK           = 200
	NOT_AUTH     = 403
)
View Source
const (
	MESSAGEID  = "MESSAGE_ID"
	URLPATTERN = "URL_PATTERN"
)

router type marked name

View Source
const (
	DEFAULT_HEARTBEAT_MESSAGEID = 1392
	DEFAULT_AUTH_MESSAGEID      = 1393

	STATE_RUNNING = 1
	STATE_STOP    = 2

	PIPED = "[tcpx-buffer-in-serial]"
)
View Source
const ABORT = 2019
View Source
const CONTEXT_OFFLINE = 2
View Source
const CONTEXT_ONLINE = 1
View Source
const (
	// context's anchor middleware will expire when call UnUse(),
	// middleware added by Use() will be set 2019 anchor index by default
	NOT_EXPIRE = 2019
)

Variables

View Source
var (
	HEADER_ROUTER_KEY   = "Router-Type"          // value ranged [MESSAGE_ID, URL_PATTERN]
	HEADER_ROUTER_VALUE = "Router-Pattern-Value" // value ranged [MESSAGE_ID, URL_PATTERN]

	HEADER_PACK_TYPE = "Pack-Content-Type" // value ranged [JSON, PROTOBUF, TOML, YAML, NONE]
)

header const key

View Source
var (
	B  = 1
	KB = 1024
	MB = 1024 * 1024
	GB = 1024 * 1024 * 1024
)
View Source
var Logger = Log{
	Logger: log.New(os.Stderr, "[tcpx] ", log.LstdFlags|log.Llongfile),
	Mode:   DEBUG,
}

Global instance of logger

View Source
var PackJSON = NewPackx(JsonMarshaller{})
View Source
var PackProtobuf = NewPackx(ProtobufMarshaller{})
View Source
var PackTOML = NewPackx(TomlMarshaller{})
View Source
var PackYAML = NewPackx(YamlMarshaller{})

Functions

func BindJSON

func BindJSON(bodyBuf []byte, dest interface{}) error

func BodyBytesOf

func BodyBytesOf(stream []byte) ([]byte, error)

body bytes of a block

func BodyLengthOf

func BodyLengthOf(stream []byte) (int32, error)

Body length of a stream received

func CloseChanel

func CloseChanel(f func())

CloseChanel(func(){close(chan)})

func Debug

func Debug(src interface{}) string

func Defer

func Defer(f func(), handlePanicError ...func(interface{}))

Defer eliminates all panic cases and handle panic reason by handlePanicError

func FirstBlockOf

func FirstBlockOf(r io.Reader) ([]byte, error)

Since FirstBlockOf has nothing to do with packx instance, so make it alone, for old usage remaining useful, old packx.FirstBlockOf is still useful

func FirstBlockOfBytes

func FirstBlockOfBytes(buffer []byte) ([]byte, error)

func FirstBlockOfLimitMaxByte

func FirstBlockOfLimitMaxByte(r io.Reader, maxByte int32) ([]byte, error)

func HeaderBytesOf

func HeaderBytesOf(stream []byte) ([]byte, error)

Header bytes of a block

func HeaderLengthOf

func HeaderLengthOf(stream []byte) (int32, error)

Header length of a stream received

func HeaderOf

func HeaderOf(stream []byte) (map[string]interface{}, error)

header of a block

func In

func In(s string, arr []string) bool

Whether s in arr Support %%

func LengthOf

func LengthOf(stream []byte) (int32, error)

Length of the stream starting validly. Length doesn't include length flag itself, it refers to a valid message length after it.

func MD5

func MD5(rawMsg string) string

func MarshalTOML

func MarshalTOML(src interface{}) ([]byte, error)

func MessageIDOf

func MessageIDOf(stream []byte) (int32, error)

messageID of a stream. Use this to choose which struct for unpacking.

func Pack

func Pack(messageID int32, header map[string]interface{}, src interface{}, marshaller Marshaller) ([]byte, error)

pack detail

func PackHeartbeat

func PackHeartbeat() []byte

func PackStuff

func PackStuff(messageID int32) []byte

pack short signal which only contains messageID

func PackWithMarshaller

func PackWithMarshaller(message Message, marshaller Marshaller) ([]byte, error)

PackWithMarshaller will encode message into blocks of length,messageID,headerLength,header,bodyLength,body. Users don't need to know how pack serializes itself if users use UnpackPWithMarshaller.

If users want to use this protocol across languages, here are the protocol details: (they are ordered as list) [0 0 0 24 0 0 0 1 0 0 0 6 0 0 0 6 2 1 19 18 13 11 11 3 1 23 12 132] header: [0 0 0 24] mesageID: [0 0 0 1] headerLength, bodyLength [0 0 0 6] header: [2 1 19 18 13 11] body: [11 3 1 23 12 132] [4]byte -- length fixed_size,binary big endian encode [4]byte -- messageID fixed_size,binary big endian encode [4]byte -- headerLength fixed_size,binary big endian encode [4]byte -- bodyLength fixed_size,binary big endian encode []byte -- header marshal by json []byte -- body marshal by marshaller

func PackWithMarshallerAndBody

func PackWithMarshallerAndBody(message Message, body []byte) ([]byte, error)

This method is used to pack message whose body is well-marshaled.

func PackWithMarshallerName

func PackWithMarshallerName(message Message, marshallerName string) ([]byte, error)

same as above

func PipeJSON

func PipeJSON(conn net.Conn, args ...interface{}) error

func ReadAllUDP

func ReadAllUDP(conn net.PacketConn, maxBufferSize ...int) ([]byte, net.Addr, error)

func RouteTypeOf

func RouteTypeOf(stream []byte) (string, error)

func SetLogFlags

func SetLogFlags(flags int)

Set global instance logger flags

func SetLogMode

func SetLogMode(mode int)

Set global instance logger mode

func TCPCallOnceJSON

func TCPCallOnceJSON(network string, url string, messageID int, data interface{}) error

func TCPConnect

func TCPConnect(network string, url string) (net.Conn, error)

TCPConnect will establish a tcp connection and return it

func URLPatternOf

func URLPatternOf(stream []byte) (string, error)

func UnPackFromReader

func UnPackFromReader(r io.Reader) (int32, map[string]interface{}, []byte, error)

returns the first block's messageID, header, body marshalled stream, error.

func UnmarshalTOML

func UnmarshalTOML(buf []byte, dest interface{}) error

func UnpackToBlockFromReader

func UnpackToBlockFromReader(reader io.Reader) ([]byte, error)

unpack the first block from the reader. protocol is PackWithMarshaller(). [4]byte -- length fixed_size,binary big endian encode [4]byte -- messageID fixed_size,binary big endian encode [4]byte -- headerLength fixed_size,binary big endian encode [4]byte -- bodyLength fixed_size,binary big endian encode []byte -- header marshal by json []byte -- body marshal by marshaller ussage:

for {
    blockBuf, e:= UnpackToBlockFromReader(reader)
	   go func(buf []byte){
        // handle a message block apart
    }(blockBuf)
    continue
}

func UnpackToBlockFromReaderLimitMaxLengthOfByte

func UnpackToBlockFromReaderLimitMaxLengthOfByte(reader io.Reader, maxByTe int) ([]byte, error)

func WriteConn

func WriteConn(buf []byte, conn net.Conn) error

Write full buf In case buf is too big and conn can't write once.

if len(buf)>65535 {
    connLock.Lock()
    WriteConn(buf, conn)
    connLock.Unlock()
 } else {
    conn.Write(buf)
}

func WriteJSON

func WriteJSON(conn net.Conn, messageID int32, src interface{}) error

WriteJSON will write conn a message wrapped by tcpx.JSONMarshaller

Types

type AnchorMiddlewareInfo

type AnchorMiddlewareInfo struct {
	WhereUse   []string
	WhereUnUse []string
}

type ClientPool

type ClientPool struct {
	Clients map[string]*Context
	// contains filtered or unexported fields
}
var GlobalClientPool *ClientPool

func NewClientPool

func NewClientPool() *ClientPool

func (*ClientPool) DeleteFromClientPool

func (cp *ClientPool) DeleteFromClientPool(username string)

func (*ClientPool) GetClientPool

func (cp *ClientPool) GetClientPool(username string) *Context

func (*ClientPool) Offline

func (cp *ClientPool) Offline(username string)

func (*ClientPool) Online

func (cp *ClientPool) Online(username string, ctx *Context)

func (*ClientPool) SetClientPool

func (cp *ClientPool) SetClientPool(username string, ctx *Context)

type Context

type Context struct {
	// for tcp conn
	Conn net.Conn
	// context scope lock
	L *sync.RWMutex

	// for udp conn
	PacketConn net.PacketConn
	Addr       net.Addr

	// for k-v pair shared in connection/request scope
	PerConnectionContext *sync.Map
	PerRequestContext    *sync.Map

	// for pack and unpack
	Packx *Packx

	// for raw message
	ConnReader io.Reader
	ConnWriter io.Writer

	Stream []byte
	// contains filtered or unexported fields
}

Context has two concurrently safe context: PerConnectionContext is used for connection, once the connection is built ,this is connection scope. PerRequestContext is used for request, when connection was built, then many requests can be sent between client and server. Each request has an independently scoped context , this is PerRequestContext. Packx used to save a marshaller helping marshal and unMarshal stream Stream is read from net.Conn per request

func NewContext

func NewContext(conn net.Conn, marshaller Marshaller) *Context

New a context. This is used for new a context for tcp server.

func NewTCPContext

func NewTCPContext(conn net.Conn, marshaller Marshaller) *Context

New a context. This is used for new a context for tcp server.

func NewUDPContext

func NewUDPContext(conn net.PacketConn, addr net.Addr, marshaller Marshaller) *Context

New a context. This is used for new a context for udp server.

func (*Context) Abort

func (ctx *Context) Abort()

stop middleware chain

func (*Context) AuthChan

func (ctx *Context) AuthChan() <-chan int

func (*Context) Bind

func (ctx *Context) Bind(dest interface{}) (Message, error)

func (*Context) BindWithMarshaller

func (ctx *Context) BindWithMarshaller(dest interface{}, marshaller Marshaller) (Message, error)

BindWithMarshaller will specific marshaller. in contract, c.Bind() will use its inner packx object marshaller

func (Context) ClientIP

func (ctx Context) ClientIP() string

client ip

func (*Context) CloseConn

func (ctx *Context) CloseConn() error

Close its connection

func (*Context) ConnectionProtocolType

func (ctx *Context) ConnectionProtocolType() string

ConnectionProtocol returns server protocol, tcp, udp, kcp

func (*Context) GetCtxPerConn

func (ctx *Context) GetCtxPerConn(k interface{}) (interface{}, bool)

When context serves for tcp, get context k-v pair of PerConnectionContext. When context serves for udp, get context k-v pair of PerRequestContext.

func (*Context) GetCtxPerRequest

func (ctx *Context) GetCtxPerRequest(k interface{}) (interface{}, bool)

func (*Context) GetPoolRef

func (ctx *Context) GetPoolRef() *ClientPool

func (*Context) GetURLPattern

func (ctx *Context) GetURLPattern() (string, error)

func (*Context) GetUsername

func (ctx *Context) GetUsername() string

Context's connection scope saves an unique key to the connection pool Before using this, ctx.SetUsername should be call first

func (*Context) HeartBeatChan

func (ctx *Context) HeartBeatChan() chan int

HeartBeatChan returns a prepared chan int to save heart-beat signal. It will never be nil, if not exist the channel, it will auto-make.

func (*Context) InitReaderAndWriter

func (ctx *Context) InitReaderAndWriter() error

func (*Context) IsOffline

func (ctx *Context) IsOffline() bool

func (*Context) IsOnline

func (ctx *Context) IsOnline() bool

func (*Context) JSON

func (ctx *Context) JSON(messageID int32, src interface{}, headers ...map[string]interface{}) error

Reply to client using json marshaller. Whatever ctx.Packx.Marshaller.MarshalName is 'json' or not , message block will marshal its header and body by json marshaller.

func (*Context) JSONURLPattern

func (ctx *Context) JSONURLPattern(src interface{}) error

Will reply to client a message with specific url-pattern, used when message_type routing by url-pattern

func (Context) Network

func (ctx Context) Network() string

func (*Context) Next

func (ctx *Context) Next()

Since middlewares are divided into 3 kinds: global, messageIDSelfRelated, anchorType, offset can't be used straightly to control middlewares like middlewares[offset](). Thus, c.Next() means actually do nothing.

func (*Context) Offline

func (ctx *Context) Offline() error

Only used when tcpX instance's builtInPool is true, otherwise you should design your own client pool(github.com/fwhezfwhez/tcpx/clientPool/client-pool.go), and manage it yourself, like: ```

var myPool = clientPool.NewClientPool()
func main() {
    srv := tcpx.NewTcpX(nil)
    srv.AddHandler(1, func(c *tcpx.Context){
        type Login struct{
           Username string
        }
        var userLogin Login
        c.Bind(&userLogin)
        myPool.Online(userLogin.Username, c)
    })
    srv.AddHandler(2, func(c *tcpx.Context){
        myPool.Offline(userLogin.Username)
    })
}

```

func (*Context) Online

func (ctx *Context) Online(username string) error

No strategy to ensure username repeat or not , if username exists, it will replace the old connection context in the pool. Only used when tcpX instance's builtInPool is true, otherwise you should design your own client pool(github.com/fwhezfwhez/tcpx/clientPool/client-pool.go), and manage it yourself, like: ```

var myPool = clientPool.NewClientPool()
func main() {
    srv := tcpx.NewTcpX(nil)
    srv.AddHandler(1, func(c *tcpx.Context){
        type Login struct{
           Username string
        }
        var userLogin Login
        c.Bind(&userLogin)
        myPool.Online(userLogin.Username, c)
    })
    srv.AddHandler(2, func(c *tcpx.Context){
        username, ok := ctx.Username()
        if !ok {
            fmt.Println("anonymous user no need to offline")
        }
        myPool.Offline(username)
    })
}

```

func (*Context) ProtoBuf

func (ctx *Context) ProtoBuf(messageID int32, src interface{}, headers ...map[string]interface{}) error

Reply to client using protobuf marshaller. Whatever ctx.Packx.Marshaller.MarshalName is 'protobuf' or not , message block will marshal its header and body by protobuf marshaller.

func (*Context) ProtobufURLPattern

func (ctx *Context) ProtobufURLPattern(src interface{}) error

Will reply to client a message with specific url-pattern. Its payload is marshalled by protobuf and require src implements proto.Message. Used when message_type routing by url-pattern

func (*Context) RawStream

func (ctx *Context) RawStream() ([]byte, error)

ctx.Stream is well marshaled by pack tool. ctx.RawStream is help to access raw stream.

func (*Context) RecvAuthDeny

func (ctx *Context) RecvAuthDeny()

func (*Context) RecvAuthPass

func (ctx *Context) RecvAuthPass()

func (*Context) RecvHeartBeat

func (ctx *Context) RecvHeartBeat()

RecvHeartBeat

func (*Context) Reply

func (ctx *Context) Reply(messageID int32, src interface{}, headers ...map[string]interface{}) error

Reply to client using ctx's well-set Packx.Marshaller.

func (*Context) ReplyWithMarshaller

func (ctx *Context) ReplyWithMarshaller(marshaller Marshaller, messageID int32, src interface{}, headers ...map[string]interface{}) error

func (*Context) Reset

func (ctx *Context) Reset()

func (*Context) ResetOffset

func (ctx *Context) ResetOffset()

func (Context) RouterType

func (ctx Context) RouterType() string

Decode ctx.Stream.Header["Router-Type"], expected 'MESSAGE_ID', 'URL_PATTERN'

func (*Context) SendToConn

func (ctx *Context) SendToConn(anotherCtx *Context, messageID int32, src interface{}, headers ...map[string]interface{}) error

Send to another conn via Context. Make sure called `srv.WithBuiltInPool(true)`

func (*Context) SendToUsername

func (ctx *Context) SendToUsername(username string, messageID int32, src interface{}, headers ...map[string]interface{}) error

Send to another conn index via username. Make sure called `srv.WithBuiltInPool(true)`

func (*Context) SetCtxPerConn

func (ctx *Context) SetCtxPerConn(k, v interface{})

When context serves for tcp, set context k-v pair of PerConnectionContext. When context serves for udp, set context k-v pair of PerRequestContext Key should not start with 'tcpx-', or it will panic.

func (*Context) SetCtxPerRequest

func (ctx *Context) SetCtxPerRequest(k, v interface{})

func (*Context) SetDeadline

func (ctx *Context) SetDeadline(t time.Time) error

set deadline

func (*Context) SetReadDeadline

func (ctx *Context) SetReadDeadline(t time.Time) error

set read deadline

func (*Context) SetUsername

func (ctx *Context) SetUsername(username string)

When you want to tag an username to the context, use it, or it will be regarded as an anonymous user

func (*Context) SetWriteDeadline

func (ctx *Context) SetWriteDeadline(t time.Time) error

set write deadline

func (*Context) TOML

func (ctx *Context) TOML(messageID int32, src interface{}, headers ...map[string]interface{}) error

Reply to client using toml marshaller. Whatever ctx.Packx.Marshaller.MarshalName is 'toml' or not , message block will marshal its header and body by toml marshaller.

func (*Context) Username

func (ctx *Context) Username() (string, bool)

Context's connection scope saves an unique key to the connection pool Before using this, ctx.SetUsername should be call first

func (*Context) XML

func (ctx *Context) XML(messageID int32, src interface{}, headers ...map[string]interface{}) error

Reply to client using xml marshaller. Whatever ctx.Packx.Marshaller.MarshalName is 'xml' or not , message block will marshal its header and body by xml marshaller.

func (*Context) YAML

func (ctx *Context) YAML(messageID int32, src interface{}, headers ...map[string]interface{}) error

Reply to client using yaml marshaller. Whatever ctx.Packx.Marshaller.MarshalName is 'yaml' or not , message block will marshal its header and body by yaml marshaller.

type H

type H map[string]interface{}

type JsonMarshaller

type JsonMarshaller struct{}

func (JsonMarshaller) Marshal

func (js JsonMarshaller) Marshal(v interface{}) ([]byte, error)

func (JsonMarshaller) MarshalName

func (js JsonMarshaller) MarshalName() string

func (JsonMarshaller) Unmarshal

func (js JsonMarshaller) Unmarshal(data []byte, dest interface{}) error

type Log

type Log struct {
	Logger *log.Logger
	Mode   int
}

tcpx logger

func (Log) Println

func (l Log) Println(info ...interface{})

Println info in debug mode, do nothing in release mode

func (*Log) SetLogFlags

func (l *Log) SetLogFlags(flags int)

Set logger flags, value of flags are the same as the official log

func (*Log) SetLogMode

func (l *Log) SetLogMode(mode int)

Set mode of logger, value is tcpx.DEBUG, tcpx.RELEASE

type Marshaller

type Marshaller interface {
	Marshal(interface{}) ([]byte, error)
	Unmarshal([]byte, interface{}) error
	MarshalName() string
}

func GetMarshallerByMarshalName

func GetMarshallerByMarshalName(marshalName string) (Marshaller, error)

type Message

type Message struct {
	MessageID int32                  `json:"message_id"`
	Header    map[string]interface{} `json:"header"`
	Body      interface{}            `json:"body"`
}

Message contains the necessary parts of tcpx protocol MessagID is defining a message routing flag. Header is an attachment of a message. Body is the message itself, it should be raw message not serialized yet, like "hello", not []byte("hello")

func NewMessage

func NewMessage(messageID int32, src interface{}) Message

func NewURLPatternMessage

func NewURLPatternMessage(urlPattern string, src interface{}) Message

func UnpackWithMarshaller

func UnpackWithMarshaller(stream []byte, dest interface{}, marshaller Marshaller) (Message, error)

unpack stream from PackWithMarshaller If users want to use this protocol across languages, here are the protocol details: (they are ordered as list) [4]byte -- length fixed_size,binary big endian encode [4]byte -- messageID fixed_size,binary big endian encode [4]byte -- headerLength fixed_size,binary big endian encode [4]byte -- bodyLength fixed_size,binary big endian encode []byte -- header marshal by json []byte -- body marshal by marshaller

func UnpackWithMarshallerName

func UnpackWithMarshallerName(stream []byte, dest interface{}, marshallerName string) (Message, error)

same as above

func (Message) Get

func (msg Message) Get(key string) interface{}

Get value of message's header whose key is 'key' Get and Set don't have lock to ensure concurrently safe, which means if you should never operate the header in multiple goroutines, it's better to design a context yourself per request rather than straightly use message.Header.

func (Message) Pack

func (m Message) Pack(marshaller Marshaller) ([]byte, error)

func (*Message) Set

func (msg *Message) Set(k string, v interface{})

Get and Set don't have lock to ensure concurrently safe, which means if you should never operate the header in multiple goroutines, it's better to design a context yourself per request rather than straightly use message.Header.

type MessageIDAnchor

type MessageIDAnchor struct {
	URLPattern  string
	MessageID   int32
	AnchorIndex int
}

func NewMessageIDAnchor

func NewMessageIDAnchor(messageID int32, anchorIndex int) MessageIDAnchor

func NewUrlPatternAnchor

func NewUrlPatternAnchor(urlPattern string, anchorIndex int) MessageIDAnchor

type MiddlewareAnchor

type MiddlewareAnchor struct {
	MiddlewareKey string
	Middleware    func(c *Context)

	// anchorStartIndexRange.len should >= AnchorEndIndexRange.len, and should not bigger than 1.
	AnchorStartIndexRange []int
	AnchorEndIndexRange   []int
	// contains filtered or unexported fields
}

func (*MiddlewareAnchor) Contains

func (ma *MiddlewareAnchor) Contains(handlerIndex int) bool

func (*MiddlewareAnchor) FormatPath

func (ma *MiddlewareAnchor) FormatPath() string

type Mux

type Mux struct {

	// mux instance lock
	Mutex *sync.RWMutex

	// handlers of messageID routers
	Handlers map[int32]func(ctx *Context)

	AllowAdd bool

	// global-middlewares
	GlobalMiddlewares []func(ctx *Context)
	// messageID middlewares
	MessageIDSelfMiddleware map[int32][]func(ctx *Context)

	// all middleware anchors, expired anchors will not remove from it
	MiddlewareAnchors []MiddlewareAnchor
	// all middleware anchors
	MiddlewareAnchorMap map[string]MiddlewareAnchor
	// messageID handlers anchors
	MessageIDAnchorMap map[int32]MessageIDAnchor
	// contains filtered or unexported fields
}

Mux is used to register different request by messageID Middlewares are divided into 3 kinds: 1. global --> GlobalTypeMiddlewares 2. messageIDSelfRelated --> SelfRelatedTypeMiddleware 3. DynamicUsed --> AnchorTypeMiddleware. ATTENTION: Middlewares are executed in order of 1 ->3 -> 2 if OnMessage is not nil, GlobalTypeMiddlewares and AnchorTypeMiddleware will all be executed regardless of unUsed or not

func NewMux

func NewMux() *Mux

New a mux instance, malloc memory for its mutex, handler slice...

func (*Mux) AddGlobalMiddleware

func (mux *Mux) AddGlobalMiddleware(handlers ...func(c *Context))

Add Global middlewares

func (*Mux) AddHandleFunc

func (mux *Mux) AddHandleFunc(messageID int32, handler func(ctx *Context))

AddHandleFunc add routing handlers by messageID.

func (*Mux) AddMessageIDAnchor

func (mux *Mux) AddMessageIDAnchor(anchor MessageIDAnchor)

add messageID anchor

func (*Mux) AddMessageIDSelfMiddleware

func (mux *Mux) AddMessageIDSelfMiddleware(messageID int32, handlers ...func(c *Context))

add middleware by srv.Add(1, middleware1, middleware2, handler)

func (*Mux) AddMiddlewareAnchor

func (mux *Mux) AddMiddlewareAnchor(anchor MiddlewareAnchor)

add anchor index binding to middlewares

func (*Mux) AddURLAnchor

func (mux *Mux) AddURLAnchor(anchor MessageIDAnchor)

add url-pattern anchor

func (*Mux) AnchorIndexOfMessageID

func (mux *Mux) AnchorIndexOfMessageID(messageID int32) int

get anchor index of a messageID

func (*Mux) AnchorIndexOfURLPattern

func (mux *Mux) AnchorIndexOfURLPattern(urlPattern string) int

func (*Mux) Any

func (mux *Mux) Any(urlPattern string, handlers ...func(c *Context)) error

Any is used to routing message using url-pattern

func (*Mux) CurrentAnchorIndex

func (mux *Mux) CurrentAnchorIndex() int

anchorIndex of current handlers

func (*Mux) ReplaceMiddlewareAnchor

func (mux *Mux) ReplaceMiddlewareAnchor(anchor MiddlewareAnchor)

Used to reset anchor's ExpiredAnchorIndex, avoiding operate map straightly.

type PackType

type PackType []byte

PackType requires buffer message marshalled by tcpx.Pack

func Recv

func Recv(conn net.Conn) (PackType, error)

Recv a block of message from connection. To use this, it require sender sent message well packed by tcpx.Pack()

func (*PackType) BindJSON

func (pt *PackType) BindJSON(dest interface{}) error

func (*PackType) BindProtobuf

func (pt *PackType) BindProtobuf(dest proto.Message) error

func (*PackType) BindTOML

func (pt *PackType) BindTOML(dest interface{}) error

func (*PackType) BindXML

func (pt *PackType) BindXML(dest interface{}) error

func (*PackType) BindYAML

func (pt *PackType) BindYAML(dest interface{}) error

func (*PackType) MessageID

func (pt *PackType) MessageID() (int32, error)

func (*PackType) URLPattern

func (pt *PackType) URLPattern() (string, error)

type Packx

type Packx struct {
	Marshaller Marshaller
}

tcpx's tool to help build expected stream for communicating

func NewPackx

func NewPackx(marshaller Marshaller) *Packx

New a packx instance, specific a marshaller for communication. If marshaller is nil, official jsonMarshaller is put to used.

func (Packx) BodyBytesOf

func (packx Packx) BodyBytesOf(stream []byte) ([]byte, error)

body bytes of a block

func (Packx) BodyLengthOf

func (packx Packx) BodyLengthOf(stream []byte) (int32, error)

Body length of a stream received

func (Packx) FirstBlockOf

func (packx Packx) FirstBlockOf(r io.Reader) ([]byte, error)

a stream from a reader can be apart by protocol. FirstBlockOf helps tear apart the first block []byte from reader

func (Packx) FirstBlockOfBytes

func (packx Packx) FirstBlockOfBytes(buffer []byte) ([]byte, error)

a stream from a buffer which can be apart by protocol. FirstBlockOfBytes helps tear apart the first block []byte from a []byte buffer

func (Packx) FirstBlockOfLimitMaxByte

func (packx Packx) FirstBlockOfLimitMaxByte(r io.Reader, maxByte int32) ([]byte, error)

func (Packx) HeaderBytesOf

func (packx Packx) HeaderBytesOf(stream []byte) ([]byte, error)

Header bytes of a block

func (Packx) HeaderLengthOf

func (packx Packx) HeaderLengthOf(stream []byte) (int32, error)

Header length of a stream received

func (Packx) HeaderOf

func (packx Packx) HeaderOf(stream []byte) (map[string]interface{}, error)

header of a block

func (Packx) LengthOf

func (packx Packx) LengthOf(stream []byte) (int32, error)

Length of the stream starting validly. Length doesn't include length flag itself, it refers to a valid message length after it.

func (Packx) MessageIDOf

func (packx Packx) MessageIDOf(stream []byte) (int32, error)

messageID of a stream. Use this to choose which struct for unpacking.

func (Packx) Pack

func (packx Packx) Pack(messageID int32, src interface{}, headers ...map[string]interface{}) ([]byte, error)

Pack src with specific messageID and optional headers Src has not been marshaled yet.Whatever you put as src, it will be marshaled by packx.Marshaller.

func (Packx) PackWithBody

func (packx Packx) PackWithBody(messageID int32, body []byte, headers ...map[string]interface{}) ([]byte, error)

PackWithBody is used for self design protocol

func (Packx) Unpack

func (packx Packx) Unpack(stream []byte, dest interface{}) (Message, error)

Unpack Stream is a block of length,messageID,headerLength,bodyLength,header,body. Dest refers to the body, it can be dynamic by messageID.

Before use this, users should be aware of which struct used as `dest`. You can use stream's messageID for judgement like: messageID,_:= packx.MessageIDOf(stream)

switch messageID {
    case 1:
      packx.Unpack(stream, &struct1)
    case 2:
      packx.Unpack(stream, &struct2)
    ...
}

type PropertyCache

type PropertyCache struct {
	Network string
	Port    string

	// only when network is 'tcp','kcp', Listener can assert to net.Listener.
	// when network is 'udp', it can assert to net.PackConn
	Listener interface{}
}

type ProtobufMarshaller

type ProtobufMarshaller struct{}

func (ProtobufMarshaller) Marshal

func (pm ProtobufMarshaller) Marshal(v interface{}) ([]byte, error)

v should realize proto.Message

func (ProtobufMarshaller) MarshalName

func (pm ProtobufMarshaller) MarshalName() string

func (ProtobufMarshaller) Unmarshal

func (pm ProtobufMarshaller) Unmarshal(data []byte, dest interface{}) error

dest should realize proto.Message

type Request

type Request struct {
	Body   io.ReadCloser
	URL    string
	Header map[string]interface{}
}

func NewRequest

func NewRequest(url string, reader io.Reader) *Request

func (*Request) Set

func (r *Request) Set(key string, value interface{})

type Route

type Route struct {
	URLPattern string
	MessageId  int
	Whereis    []string
}

func (Route) Location

func (r Route) Location() string

func (*Route) Merge

func (r *Route) Merge(r2 Route) Route

type TcpX

type TcpX struct {
	OnConnect func(ctx *Context)
	OnMessage func(ctx *Context)
	OnClose   func(ctx *Context)
	Mux       *Mux
	Packx     *Packx

	// heartbeat setting
	HeartBeatOn        bool          // whether start a goroutine to spy on each connection
	HeatBeatInterval   time.Duration // heartbeat should receive in the interval
	HeartBeatMessageID int32         // which messageID to listen to heartbeat
	ThroughMiddleware  bool          // whether heartbeat go through middleware

	OnHeartbeatLoss func(c *Context) // when recv no heartbeat more than max configured times(default 3), will trigger this function

	AuthMessageID         int32
	AuthThroughMiddleware bool // whether auth handler go through middleware

	// external for handle any stream
	// only support tcp/kcp
	HandleRaw func(c *Context)

	// tls
	// If you want your tcp server using certs, using this field
	TLSConfig *tls.Config
	// contains filtered or unexported fields
}

OnMessage and mux are opposite. When OnMessage is not nil, users should deal will ctx.Stream themselves. When OnMessage is nil, program will handle ctx.Stream via mux routing by messageID

func NewTcpX

func NewTcpX(marshaller Marshaller) *TcpX

new an tcpx srv instance

func (*TcpX) AddHandler

func (tcpx *TcpX) AddHandler(messageID int32, handlers ...func(ctx *Context))

Middleware typed 'SelfRelatedTypedMiddleware'. Add handlers routing by messageID

func (*TcpX) Any

func (tcpx *TcpX) Any(urlPattern string, handlers ...func(ctx *Context))

func (*TcpX) BeforeExit

func (tcpx *TcpX) BeforeExit(f ...func())

Before exist do ending jobs

func (*TcpX) HeartBeatMode

func (tcpx *TcpX) HeartBeatMode(on bool, duration time.Duration) *TcpX

Set built in heart beat on Default heartbeat handler will be added by messageID tcpx.DEFAULT_HEARTBEAT_MESSAGEID(-1392), and default heartbeat handler will not execute all kinds of middleware.

... srv := tcpx.NewTcpX(nil) srv.HeartBeatMode(true, 10 * time.Second) ...

* If you want specific official heartbeat handler detail: srv.HeartBeatModeDetail(true, 10 * time.Second, true, 1)

* If you want to rewrite heartbeat handler: srv.RewriteHeartBeatHandler(func(c *tcpx.Context){})

* If you think built in heartbeat not good, abandon it: ```

srv.AddHandler(1111, func(c *tcpx.Context){
   //do nothing by default and define your heartbeat yourself
})

```

func (*TcpX) HeartBeatModeDetail

func (tcpx *TcpX) HeartBeatModeDetail(on bool, duration time.Duration, throughMiddleware bool, messageID int32) *TcpX

specific args for heartbeat

func (*TcpX) ListenAndServe

func (tcpx *TcpX) ListenAndServe(network, addr string) error

Start to listen. Serve can decode stream generated by packx. Support tcp and udp

func (*TcpX) ListenAndServeGRPC deprecated

func (tcpx *TcpX) ListenAndServeGRPC(network, addr string) error

grpc developing, do not use. marshaller must be protobuf, clients should send message bytes which body is protobuf bytes

Deprecated: on developing.

func (*TcpX) ListenAndServeHTTP deprecated

func (tcpx *TcpX) ListenAndServeHTTP(network, addr string) error

http developing, do not use.

Deprecated: on developing.

func (*TcpX) ListenAndServeRaw

func (tcpx *TcpX) ListenAndServeRaw(network, addr string) error

raw

func (*TcpX) ListenAndServeTCP

func (tcpx *TcpX) ListenAndServeTCP(network, addr string) error

tcp

func (*TcpX) ListenAndServeUDP

func (tcpx *TcpX) ListenAndServeUDP(network, addr string, maxBufferSize ...int) error

udp maxBufferSize can set buffer length, if receive a message longer than it ,

func (*TcpX) LoadTLSFile

func (tcpx *TcpX) LoadTLSFile(certPath string, keyPath string) error

certPath and keyPath is dir path where cert.pem and key.pem is put

func (*TcpX) Restart

func (tcpx *TcpX) Restart(closeAllConnection bool, beforeStart ...func()) error

Graceful Restart = Stop and Start.Besides, you can

func (*TcpX) RewriteHeartBeatHandler

func (tcpx *TcpX) RewriteHeartBeatHandler(messageID int32, f func(c *Context)) *TcpX

Rewrite heartbeat handler It will inherit properties of the older heartbeat handler:

  • heartbeatInterval
  • throughMiddleware

func (*TcpX) SetDeadline

func (tcpx *TcpX) SetDeadline(t time.Time)

Set deadline This should be set before server start. If you want change deadline while it's running, use ctx.SetDeadline(t time.Time) instead.

func (*TcpX) SetEventOnHeartbeatLoss

func (tcpx *TcpX) SetEventOnHeartbeatLoss(f func(c *Context))

func (*TcpX) SetMaxBytePerMessage

func (tcpx *TcpX) SetMaxBytePerMessage(maxByte int32)

func (*TcpX) SetReadDeadline

func (tcpx *TcpX) SetReadDeadline(t time.Time)

Set read deadline This should be set before server start. If you want change deadline while it's running, use ctx.SetDeadline(t time.Time) instead.

func (*TcpX) SetWriteDeadline

func (tcpx *TcpX) SetWriteDeadline(t time.Time)

Set write deadline This should be set before server start. If you want change deadline while it's running, use ctx.SetDeadline(t time.Time) instead.

func (*TcpX) Start

func (tcpx *TcpX) Start() error

Graceful start an existed tcpx srv, former server is stopped by tcpX.Stop()

func (*TcpX) State

func (tcpx *TcpX) State() int

func (*TcpX) Stop

func (tcpx *TcpX) Stop(closeAllConnection bool) error

Graceful stop server parts generated by `srv.ListenAndServe()`, this will not stop process, if param 'closeAllConnection' is false, only stop server listener. Older connections will remain safe and kept in pool.If param 'closeAllConnection' is true, it will not only stop the listener, but also kill all connections(stops their net.Conn, stop all sub-routine, clear the pool)

func (*TcpX) UnUse

func (tcpx *TcpX) UnUse(middlewareKeys ...string)

UnUse an middleware. a unused middleware will expired among handlers added after it.For example:

	srv := tcpx.NewTcpX(tcpx.JsonMarshaller{})
 srv.Use("middleware1", Middleware1, "middleware2", Middleware2)
	srv.AddHandler(1, SayHello)
	srv.UnUse("middleware2")
	srv.AddHandler(3, SayGoodBye)

middleware1 and middleware2 will both work to handler 'SayHello'. middleware1 will work to handler 'SayGoodBye' but middleware2 will not work to handler 'SayGoodBye'

func (*TcpX) Use

func (tcpx *TcpX) Use(mids ...interface{})

Middleware typed 'AnchorTypedMiddleware'. Add middlewares ruled by (string , func(c *Context),string , func(c *Context),string , func(c *Context)...). Middlewares will be added with an indexed key, which is used to unUse this middleware. Each middleware added will be well set an anchor index, when UnUse this middleware, its expire_anchor_index will be well set too.

func (*TcpX) UseGlobal

func (tcpx *TcpX) UseGlobal(mids ...func(c *Context))

Middleware typed 'GlobalTypedMiddleware'. GlobalMiddleware will work to all handlers.

func (*TcpX) WithAuthDetail

func (tcpx *TcpX) WithAuthDetail(yes bool, duration time.Duration, throughMiddleware bool, messageID int32, f func(c *Context)) *TcpX

func (*TcpX) WithBroadCastSignal

func (tcpx *TcpX) WithBroadCastSignal(yes bool) *TcpX

Whether using signal-broadcast. Used for these situations: closeAllSignal - close all connection and remove them from the built-in pool

func (*TcpX) WithBuiltInPool

func (tcpx *TcpX) WithBuiltInPool(yes bool) *TcpX

whether using built-in pool

type TomlMarshaller

type TomlMarshaller struct{}

func (TomlMarshaller) Marshal

func (tm TomlMarshaller) Marshal(v interface{}) ([]byte, error)

func (TomlMarshaller) MarshalName

func (tm TomlMarshaller) MarshalName() string

func (TomlMarshaller) Unmarshal

func (tm TomlMarshaller) Unmarshal(data []byte, dest interface{}) error

type URLMux

type URLMux struct {
	URLAnchorMap map[string]MessageIDAnchor
	// contains filtered or unexported fields
}

func NewURLMux

func NewURLMux() *URLMux

func (*URLMux) AddURLPatternHandler

func (m *URLMux) AddURLPatternHandler(urlPattern string, handlers ...func(c *Context)) error

基于url-pattern添加路由

func (*URLMux) LockWrite

func (m *URLMux) LockWrite()

锁定后,无法再添加路由

func (*URLMux) PanicOnExistRouter

func (m *URLMux) PanicOnExistRouter() error

MessageID和URL路由在添加时,如果已存在,则会panic。

type XmlMarshaller

type XmlMarshaller struct{}

func (XmlMarshaller) Marshal

func (xm XmlMarshaller) Marshal(v interface{}) ([]byte, error)

func (XmlMarshaller) MarshalName

func (xm XmlMarshaller) MarshalName() string

func (XmlMarshaller) Unmarshal

func (xm XmlMarshaller) Unmarshal(data []byte, dest interface{}) error

type YamlMarshaller

type YamlMarshaller struct{}

func (YamlMarshaller) Marshal

func (ym YamlMarshaller) Marshal(v interface{}) ([]byte, error)

func (YamlMarshaller) MarshalName

func (ym YamlMarshaller) MarshalName() string

func (YamlMarshaller) Unmarshal

func (ym YamlMarshaller) Unmarshal(data []byte, dest interface{}) error

Directories

Path Synopsis
export http api to validate stream from all language clients
export http api to validate stream from all language clients
go
Package go provides go client example
Package go provides go client example
examples
sayHello/client
Package client executable file
Package client executable file
sayHello/server
Package server executable file
Package server executable file

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL