tp

package module
v0.5.1-0...-5006072 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 30, 2017 License: Apache-2.0 Imports: 32 Imported by: 0

README

Teleport GitHub release report card github issues github closed issues GoDoc view examples

Teleport is a versatile, high-performance and flexible TCP socket framework.

It can be used for peer-peer, rpc, gateway, micro services, push services, game services and so on.

简体中文

Teleport-Architecture

Benchmark

  • Test server configuration
darwin amd64 4CPU 8GB
  • teleport-socket

tp_socket_benchmark

test code

  • contrast rpcx

rpcx_benchmark

test code

  • torch of teleport-socket

tp_socket_torch

svg file

1. Version

version status branch
v3 release v3
v2 release v2
v1 release v1

2. Install

go get -u github.com/henrylee2cn/teleport

3. Feature

  • Server and client are peer-to-peer, have the same API method
  • Support custom communication protocol
  • Support set the size of socket I/O buffer
  • Packet contains both Header and Body two parts
  • Support for customizing head and body coding types separately, e.g JSON Protobuf string
  • Packet Header contains metadata in the same format as http header
  • Support push, pull, reply and other means of communication
  • Support plug-in mechanism, can customize authentication, heartbeat, micro service registration center, statistics, etc.
  • Whether server or client, the peer support reboot and shutdown gracefully
  • Support reverse proxy
  • Detailed log information, support print input and output details
  • Supports setting slow operation alarm threshold
  • Use I/O multiplexing technology
  • Support setting the size of the reading packet (if exceed disconnect it)
  • Provide the context of the handler

4. Architecture

4.1 Keywords
  • Peer: A communication instance may be a client or a client
  • Session: A connection session, with push, pull, reply, close and other methods of operation
  • Context: Handle the received or send packets
  • Pull-Launch: Pull data from the peer
  • Pull-Handle: Handle and reply to the pull of peer
  • Push-Launch: Push data to the peer
  • Push-Handle: Handle the push of peer
  • Router: Register handlers
  • Packet: The corresponding structure of the data package
  • Proto: The protocol interface of packet pack/unpack
  • Codec: Serialization interface for Packet.Body
  • XferPipe: A series of pipelines to handle packet data before transfer
  • XferFilter: A interface to handle packet data before transfer
4.2 Execution level
Peer -> Connection -> Socket -> Session -> Context
4.3 Packet

The contents of every one packet:

// in socket package
type (
    // Packet a socket data packet.
    Packet struct {
        // packet sequence
        seq uint64
        // packet type, such as PULL, PUSH, REPLY
        ptype byte
        // URL string
        uri string
        // metadata
        meta *utils.Args
        // body codec type
        bodyCodec byte
        // body object
        body interface{}
        // newBodyFunc creates a new body by packet type and URI.
        // Note:
        //  only for writing packet;
        //  should be nil when reading packet.
        newBodyFunc NewBodyFunc
        // XferPipe transfer filter pipe, handlers from outer-most to inner-most.
        // Note: the length can not be bigger than 255!
        xferPipe *xfer.XferPipe
        // packet size
        size uint32
        next *Packet
    }

    // NewBodyFunc creates a new body by header info.
    NewBodyFunc func(seq uint64, ptype byte, uri string) interface{}
)

// in xfer package
type (
    // XferPipe transfer filter pipe, handlers from outer-most to inner-most.
    // Note: the length can not be bigger than 255!
    XferPipe struct {
        filters []XferFilter
    }
    // XferFilter handles byte stream of packet when transfer.
    XferFilter interface {
        Id() byte
        OnPack([]byte) ([]byte, error)
        OnUnpack([]byte) ([]byte, error)
    }
)
4.4 Protocol

You can customize your own communication protocol by implementing the interface:

type (
	// Proto pack/unpack protocol scheme of socket packet.
	Proto interface {
		// Version returns the protocol's id and name.
		Version() (byte, string)
		// Pack pack socket data packet.
		// Note: Make sure to write only once or there will be package contamination!
		Pack(*Packet) error
		// Unpack unpack socket data packet.
		// Note: Concurrent unsafe!
		Unpack(*Packet) error
	}
	ProtoFunc func(io.ReadWriter) Proto
)

Next, you can specify the communication protocol in the following ways:

func SetDefaultProtoFunc(socket.ProtoFunc)
func (*Peer) ServeConn(conn net.Conn, protoFunc ...socket.ProtoFunc) Session
func (*Peer) DialContext(ctx context.Context, addr string, protoFunc ...socket.ProtoFunc) (Session, *Rerror)
func (*Peer) Dial(addr string, protoFunc ...socket.ProtoFunc) (Session, *Rerror)
func (*Peer) Listen(protoFunc ...socket.ProtoFunc) error

5. Usage

  • Create a server or client peer
var cfg = &tp.PeerConfig{
	DefaultReadTimeout:  time.Minute * 5,
	DefaultWriteTimeout: time.Millisecond * 500,
	TlsCertFile:         "",
	TlsKeyFile:          "",
	SlowCometDuration:   time.Millisecond * 500,
	DefaultBodyCodec:     "json",
	PrintBody:           true,
	CountTime:           true,
	ListenAddrs: []string{
		"0.0.0.0:9090",
	},
}


var peer = tp.NewPeer(cfg)

// It can be used as a server
peer.Listen()

// Also, it can also be used as a client at the same time
var sess, err = peer.Dial("127.0.0.1:8080")
if err != nil {
	tp.Panicf("%v", err)
}
  • Define a controller and handler for pull request
// Home controller
type Home struct {
	tp.PullCtx
}

// Test handler
func (h *Home) Test(args *[2]int) (int, *tp.Rerror) {
	a := (*args)[0]
	b := (*args)[1]
	return a + b, nil
}
  • Define controller and handler for push request
// Msg controller
type Msg struct {
	tp.PushCtx
}

// Test handler
func (m *Msg) Test(args *map[string]interface{}) {
	tp.Infof("receive push(%s):\nargs: %#v\nquery: %#v\n", m.Ip(), args, m.Query())
}
  • Define a handler for unknown pull request
func UnknownPullHandle(ctx tp.UnknownPullCtx, body *[]byte) (interface{}, *tp.Rerror) {
	var v interface{}
	codecId, err := ctx.Unmarshal(*body, &v, true)
	if err != nil {
		return nil, tp.New*Rerror(0, err.Error())
	}
	tp.Infof("receive unknown pull:\n codec: %s\n content: %#v", codecId, v)
	return "this is reply string for unknown pull", nil
}

  • Define a handler for unknown push request
func UnknownPushHandle(ctx tp.UnknownPushCtx, body *[]byte) {
	var v interface{}
	codecId, err := ctx.Unmarshal(*body, &v, true)
	if err != nil {
		tp.Errorf("%v", err)
	} else {
		tp.Infof("receive unknown push:\n codec: %s\n content: %#v", codecId, v)
	}
}
  • Define a plugin
// AliasPlugin can be used to set aliases for pull or push services
type AliasPlugin struct {
	Aliases map[string]string
}

// NewAliasPlugin creates a new NewAliasPlugin
func NewAliasPlugin() *AliasPlugin {
	return &AliasPlugin{Aliases: make(map[string]string)}
}

// Alias sets a alias for the uri.
// For example Alias("/arith/mul", "/mul")
func (p *AliasPlugin) Alias(alias string, uri string) {
	p.Aliases[alias] = uri
}

// Name return name of this plugin.
func (p *AliasPlugin) Name() string {
	return "AliasPlugin"
}

// PostReadPullHeader converts the alias of this service.
func (p *AliasPlugin) PostReadPullHeader(ctx tp.ReadCtx) *tp.Rerror {
	var u = ctx.Input().Header.Uri
	if p.Aliases != nil {
		if a = p.Aliases[u]; a != "" {
			ctx.Input().Header.Uri = a
		}
	}
	return nil
}
  • Register above handler and plugin
aliasesPlugin := NewAliasPlugin()
aliasesPlugin.Alias("/alias", "/origin")
{
	pullGroup := peer.PullRouter.Group("pull", aliasesPlugin)
	pullGroup.Reg(new(Home))
	peer.PullRouter.SetUnknown(UnknownPullHandle)
}
{
	pushGroup := peer.PushRouter.Group("push")
	pushGroup.Reg(new(Msg), aliasesPlugin)
	peer.PushRouter.SetUnknown(UnknownPushHandle)
}

6. Demo

server.go
package main

import (
    "encoding/json"
    "time"

    tp "github.com/henrylee2cn/teleport"
)

func main() {
    go tp.GraceSignal()
    // tp.SetReadLimit(10)
    tp.SetShutdown(time.Second*20, nil, nil)
    var cfg = &tp.PeerConfig{
        DefaultReadTimeout:  time.Minute * 5,
        DefaultWriteTimeout: time.Millisecond * 500,
        TlsCertFile:         "",
        TlsKeyFile:          "",
        SlowCometDuration:   time.Millisecond * 500,
        DefaultBodyCodec:    "json",
        PrintBody:           true,
        CountTime:           true,
        ListenAddrs: []string{
            "0.0.0.0:9090",
            "0.0.0.0:9091",
        },
    }
    var peer = tp.NewPeer(cfg)
    {
        group := peer.PullRouter.Group("group")
        group.Reg(new(Home))
    }
    peer.PullRouter.SetUnknown(UnknownPullHandle)
    peer.Listen()
}

// Home controller
type Home struct {
    tp.PullCtx
}

// Test handler
func (h *Home) Test(args *map[string]interface{}) (map[string]interface{}, *tp.Rerror) {
    h.Session().Push("/push/test?tag=from home-test", map[string]interface{}{
        "your_id": h.Query().Get("peer_id"),
        "a":       1,
    })
    return map[string]interface{}{
        "your_args":   *args,
        "server_time": time.Now(),
    }, nil
}

func UnknownPullHandle(ctx tp.UnknownPullCtx) (interface{}, *tp.Rerror) {
    time.Sleep(1)
    var v = struct {
        ConnPort   int
        RawMessage json.RawMessage
        Bytes      []byte
    }{}
    codecId, err := ctx.Bind(&v)
    if err != nil {
        return nil, tp.NewRerror(1001, "bind error", err.Error())
    }
    tp.Debugf("UnknownPullHandle: codec: %d, conn_port: %d, RawMessage: %s, bytes: %s",
        codecId, v.ConnPort, v.RawMessage, v.Bytes,
    )
    return []string{"a", "aa", "aaa"}, nil
}
client.go
package main

import (
    "encoding/json"
    "time"

    tp "github.com/henrylee2cn/teleport"
)

func main() {
    go tp.GraceSignal()
    tp.SetShutdown(time.Second*20, nil, nil)
    var cfg = &tp.PeerConfig{
        DefaultReadTimeout:  time.Minute * 5,
        DefaultWriteTimeout: time.Millisecond * 500,
        TlsCertFile:         "",
        TlsKeyFile:          "",
        SlowCometDuration:   time.Millisecond * 500,
        DefaultBodyCodec:    "json",
        PrintBody:           true,
        CountTime:           true,
    }

    var peer = tp.NewPeer(cfg)
    defer peer.Close()
    peer.PushRouter.Reg(new(Push))

    {
        var sess, err = peer.Dial("127.0.0.1:9090")
        if err != nil {
            tp.Fatalf("%v", err)
        }

        var reply interface{}
        var pullcmd = sess.Pull(
            "/group/home/test?peer_id=client9090",
            map[string]interface{}{
                "conn_port": 9090,
                "bytes":     []byte("bytestest9090"),
            },
            &reply,
        )

        if pullcmd.Rerror() != nil {
            tp.Fatalf("pull error: %v", pullcmd.Rerror())
        }
        tp.Infof("9090reply: %#v", reply)
    }

    {
        var sess, err = peer.Dial("127.0.0.1:9091")
        if err != nil {
            tp.Panicf("%v", err)
        }

        var reply interface{}
        var pullcmd = sess.Pull(
            "/group/home/test_unknown?peer_id=client9091",
            struct {
                ConnPort   int
                RawMessage json.RawMessage
                Bytes      []byte
            }{
                9091,
                json.RawMessage(`{"RawMessage":"test9091"}`),
                []byte("bytes-test"),
            },
            &reply,
        )

        if pullcmd.Rerror() != nil {
            tp.Fatalf("pull error: %v", pullcmd.Rerror())
        }
        tp.Infof("9091reply test_unknown: %#v", reply)
    }
}

// Push controller
type Push struct {
    tp.PushCtx
}

// Test handler
func (p *Push) Test(args *map[string]interface{}) {
    tp.Infof("receive push(%s):\nargs: %#v\nquery: %#v\n", p.Ip(), args, p.Query())
}

7. License

Teleport is under Apache v2 License. See the LICENSE file for the full license text

Documentation

Overview

Teleport is a versatile, high-performance and flexible TCP socket framework. It can be used for peer-peer, rpc, gateway, micro services, push services, game services and so on.

Copyright 2015-2017 HenryLee. All Rights Reserved.

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Index

Constants

View Source
const (
	TypeUndefined byte = 0
	TypePull      byte = 1
	TypeReply     byte = 2 // reply to pull
	TypePush      byte = 3
)

Packet types

View Source
const (
	CodeDialFailed     = 105
	CodeConnClosed     = 102
	CodeWriteFailed    = 104
	CodeBadPacket      = 400
	CodeNotFound       = 404
	CodeNotImplemented = 501
)

Internal Framework Rerror code. Note: Recommended custom code is greater than 1000.

View Source
const MetaRerrorKey = "X-Reply-Error"

Variables

View Source
var DefaultProtoFunc = socket.DefaultProtoFunc

DefaultProtoFunc gets the default builder of socket communication protocol

func DefaultProtoFunc() socket.ProtoFunc
View Source
var ErrConnClosed = errors.New("connection is closed")

ErrConnClosed connection is closed error.

View Source
var ErrListenClosed = errors.New("listener is closed")

ErrListenClosed listener is closed error.

View Source
var FirstSweep, BeforeExiting func() error

FirstSweep is first executed. BeforeExiting is executed before process exiting. Usage: share github.com/henrylee2cn/goutil/graceful with other project.

View Source
var GetPacket = socket.GetPacket

GetPacket gets a *Packet form packet stack. Note:

newBodyFunc is only for reading form connection;
settings are only for writing to connection.
func GetPacket(settings ...socket.PacketSetting) *socket.Packet
View Source
var GetReadLimit = socket.PacketSizeLimit

GetReadLimit gets the packet size upper limit of reading.

PacketSizeLimit() uint32
View Source
var PutPacket = socket.PutPacket

PutPacket puts a *socket.Packet to packet stack.

func PutPacket(p *socket.Packet)
View Source
var SetDefaultProtoFunc = socket.SetDefaultProtoFunc

SetDefaultProtoFunc sets the default builder of socket communication protocol

func SetDefaultProtoFunc(protoFunc socket.ProtoFunc)

SetPacketSizeLimit sets max packet size. If maxSize<=0, set it to max uint32.

func SetPacketSizeLimit(maxPacketSize uint32)
View Source
var SetTCPReadBuffer = socket.SetTCPReadBuffer

SetReadBuffer sets the size of the operating system's receive buffer associated with the *net.TCP connection. Note: Uses the default value, if bytes=1.

func SetTCPReadBuffer(bytes int)
View Source
var SetTCPWriteBuffer = socket.SetTCPWriteBuffer

SetWriteBuffer sets the size of the operating system's transmit buffer associated with the *net.TCP connection. Note: Uses the default value, if bytes=1.

func SetTCPWriteBuffer(bytes int)

Functions

func Criticalf

func Criticalf(format string, args ...interface{})

Criticalf logs a message using CRITICAL as log level.

func Debugf

func Debugf(format string, args ...interface{})

Debugf logs a message using DEBUG as log level.

func Errorf

func Errorf(format string, args ...interface{})

Errorf logs a message using ERROR as log level.

func Fatalf

func Fatalf(format string, args ...interface{})

Fatalf is equivalent to l.Criticalf followed by a call to os.Exit(1).

func Go

func Go(fn func()) bool

Go go func

func GraceSignal

func GraceSignal()

GraceSignal open graceful shutdown or reboot signal.

func Infof

func Infof(format string, args ...interface{})

Infof logs a message using INFO as log level.

func Noticef

func Noticef(format string, args ...interface{})

Noticef logs a message using NOTICE as log level.

func Panicf

func Panicf(format string, args ...interface{})

Panicf is equivalent to l.Criticalf followed by a call to panic().

func Printf

func Printf(format string, args ...interface{})

Printf formats according to a format specifier and writes to standard output. It returns the number of bytes written and any write error encountered.

func Reboot

func Reboot(timeout ...time.Duration)

Reboot all the frame process gracefully. Notes: Windows system are not supported!

func SetGlobalBodyCodec

func SetGlobalBodyCodec(codecId byte)

SetDefaultBodyCodec set the default header codec. Note:

If the codec.Codec named 'codecId' is not registered, it will panic;
It is not safe to call it concurrently.

func SetGopool

func SetGopool(maxGoroutinesAmount int, maxGoroutineIdleDuration time.Duration)

SetGopool set or reset go pool config. Note: Make sure to call it before calling NewPeer() and Go()

func SetLogger

func SetLogger(logger Logger)

SetLogger sets global logger. Note: Concurrent is not safe!

func SetRawlogLevel

func SetRawlogLevel(level string)

SetRawlogLevel sets the default logger's level. Note: Concurrent is not safe!

func SetShutdown

func SetShutdown(timeout time.Duration, firstSweep, beforeExiting func() error)

SetShutdown sets the function which is called after the process shutdown, and the time-out period for the process shutdown. If 0<=timeout<5s, automatically use 'MinShutdownTimeout'(5s). If timeout<0, indefinite period. 'firstSweep' is first executed. 'beforeExiting' is executed before process exiting.

func Shutdown

func Shutdown(timeout ...time.Duration)

Shutdown closes all the frame process gracefully. Parameter timeout is used to reset time-out period for the process shutdown.

func Tracef

func Tracef(format string, args ...interface{})

Tracef logs a message using TRACE as log level.

func TypeText

func TypeText(typ byte) string

TypeText returns the packet type text. If the type is undefined returns 'Undefined'.

func Warnf

func Warnf(format string, args ...interface{})

Warnf logs a message using WARNING as log level.

Types

type Handler

type Handler struct {
	// contains filtered or unexported fields
}

Handler pull or push handler type info

func (*Handler) ArgElemType

func (h *Handler) ArgElemType() reflect.Type

ArgElemType returns the handler arg elem type.

func (*Handler) IsPull

func (h *Handler) IsPull() bool

IsPull checks if it is pull handler or not.

func (*Handler) IsPush

func (h *Handler) IsPush() bool

IsPush checks if it is push handler or not.

func (*Handler) Name

func (h *Handler) Name() string

Name returns the handler name.

func (*Handler) ReplyType

func (h *Handler) ReplyType() reflect.Type

ReplyType returns the handler reply type

type HandlersMaker

type HandlersMaker func(string, interface{}, PluginContainer) ([]*Handler, error)

HandlersMaker makes []*Handler

type Logger

type Logger interface {

	// Printf formats according to a format specifier and writes to standard output.
	// It returns the number of bytes written and any write error encountered.
	Printf(format string, args ...interface{})

	// Fatalf is equivalent to Criticalf followed by a call to os.Exit(1).
	Fatalf(format string, args ...interface{})

	// Panicf is equivalent to Criticalf followed by a call to panic().
	Panicf(format string, args ...interface{})

	// Criticalf logs a message using CRITICAL as log level.
	Criticalf(format string, args ...interface{})

	// Errorf logs a message using ERROR as log level.
	Errorf(format string, args ...interface{})

	// Warnf logs a message using WARNING as log level.
	Warnf(format string, args ...interface{})

	// Noticef logs a message using NOTICE as log level.
	Noticef(format string, args ...interface{})

	// Infof logs a message using INFO as log level.
	Infof(format string, args ...interface{})

	// Debugf logs a message using DEBUG as log level.
	Debugf(format string, args ...interface{})

	// Tracef logs a message using TRACE as log level.
	Tracef(format string, args ...interface{})
}

Logger interface

type Peer

type Peer struct {
	PullRouter *Router
	PushRouter *Router
	// contains filtered or unexported fields
}

Peer peer which is server or client.

func NewPeer

func NewPeer(cfg *PeerConfig, plugin ...Plugin) *Peer

NewPeer creates a new peer.

func (*Peer) Close

func (p *Peer) Close() (err error)

Close closes peer.

func (*Peer) CountSession

func (p *Peer) CountSession() int

CountSession returns the number of sessions.

func (*Peer) Dial

func (p *Peer) Dial(addr string, protoFunc ...socket.ProtoFunc) (Session, *Rerror)

Dial connects with the peer of the destination address.

func (*Peer) DialContext

func (p *Peer) DialContext(ctx context.Context, addr string, protoFunc ...socket.ProtoFunc) (Session, *Rerror)

DialContext connects with the peer of the destination address, using the provided context.

func (*Peer) GetSession

func (p *Peer) GetSession(sessionId string) (Session, bool)

GetSession gets the session by id.

func (*Peer) Listen

func (p *Peer) Listen(protoFunc ...socket.ProtoFunc) error

Listen turns on the listening service.

func (*Peer) RangeSession

func (p *Peer) RangeSession(fn func(sess Session) bool)

RangeSession ranges all sessions. If fn returns false, stop traversing.

func (*Peer) ServeConn

func (p *Peer) ServeConn(conn net.Conn, protoFunc ...socket.ProtoFunc) Session

ServeConn serves the connection and returns a session.

type PeerConfig

type PeerConfig struct {
	TlsCertFile         string        `yaml:"tls_cert_file"           ini:"tls_cert_file"           comment:"TLS certificate file path"`
	TlsKeyFile          string        `yaml:"tls_key_file"            ini:"tls_key_file"            comment:"TLS key file path"`
	DefaultReadTimeout  time.Duration `yaml:"default_read_timeout"    ini:"default_read_timeout"    comment:"Default maximum duration for reading; ns,µs,ms,s,m,h"`
	DefaultWriteTimeout time.Duration `yaml:"default_write_timeout"   ini:"default_write_timeout"   comment:"Default maximum duration for writing; ns,µs,ms,s,m,h"`
	SlowCometDuration   time.Duration `yaml:"slow_comet_duration"     ini:"slow_comet_duration"     comment:"Slow operation alarm threshold; ns,µs,ms,s ..."`
	DefaultBodyCodec    string        `yaml:"default_body_codec"      ini:"default_body_codec"      comment:"Default body codec type id"`
	PrintBody           bool          `yaml:"print_body"              ini:"print_body"              comment:"Is print body or not"`
	CountTime           bool          `yaml:"count_time"              ini:"count_time"              comment:"Is count cost time or not"`
	DefaultDialTimeout  time.Duration `` /* 141-byte string literal not displayed */
	ListenAddrs         []string      `yaml:"listen_addrs"            ini:"listen_addrs"            comment:"Listen addresses; for server role"`
}

PeerConfig peer config Note:

yaml tag is used for github.com/henrylee2cn/cfgo
ini tag is used for github.com/henrylee2cn/ini

func (*PeerConfig) Reload

func (p *PeerConfig) Reload(bind cfgo.BindFunc) error

type Plugin

type Plugin interface {
	Name() string
}

Interfaces about plugin.

type PluginContainer

type PluginContainer interface {
	Add(plugins ...Plugin) error
	Remove(pluginName string) error
	GetByName(pluginName string) Plugin
	GetAll() []Plugin

	PostReg(*Handler) *Rerror
	PostDial(PreSession) *Rerror
	PostAccept(PreSession) *Rerror
	PreWritePull(WriteCtx) *Rerror
	PostWritePull(WriteCtx) *Rerror
	PreWriteReply(WriteCtx) *Rerror
	PostWriteReply(WriteCtx) *Rerror
	PreWritePush(WriteCtx) *Rerror
	PostWritePush(WriteCtx) *Rerror
	PreReadHeader(ReadCtx) *Rerror

	PostReadPullHeader(ReadCtx) *Rerror
	PreReadPullBody(ReadCtx) *Rerror
	PostReadPullBody(ReadCtx) *Rerror

	PostReadPushHeader(ReadCtx) *Rerror
	PreReadPushBody(ReadCtx) *Rerror
	PostReadPushBody(ReadCtx) *Rerror

	PostReadReplyHeader(ReadCtx) *Rerror
	PreReadReplyBody(ReadCtx) *Rerror
	PostReadReplyBody(ReadCtx) *Rerror

	PostDisconnect(PostSession) *Rerror
	// contains filtered or unexported methods
}

PluginContainer plugin container that defines base methods to manage plugins.

type PostAcceptPlugin

type PostAcceptPlugin interface {
	Plugin
	PostAccept(PreSession) *Rerror
}

Interfaces about plugin.

type PostDialPlugin

type PostDialPlugin interface {
	Plugin
	PostDial(PreSession) *Rerror
}

Interfaces about plugin.

type PostDisconnectPlugin

type PostDisconnectPlugin interface {
	Plugin
	PostDisconnect(PostSession) *Rerror
}

Interfaces about plugin.

type PostReadPullBodyPlugin

type PostReadPullBodyPlugin interface {
	Plugin
	PostReadPullBody(ReadCtx) *Rerror
}

Interfaces about plugin.

type PostReadPullHeaderPlugin

type PostReadPullHeaderPlugin interface {
	Plugin
	PostReadPullHeader(ReadCtx) *Rerror
}

Interfaces about plugin.

type PostReadPushBodyPlugin

type PostReadPushBodyPlugin interface {
	Plugin
	PostReadPushBody(ReadCtx) *Rerror
}

Interfaces about plugin.

type PostReadPushHeaderPlugin

type PostReadPushHeaderPlugin interface {
	Plugin
	PostReadPushHeader(ReadCtx) *Rerror
}

Interfaces about plugin.

type PostReadReplyBodyPlugin

type PostReadReplyBodyPlugin interface {
	Plugin
	PostReadReplyBody(ReadCtx) *Rerror
}

Interfaces about plugin.

type PostReadReplyHeaderPlugin

type PostReadReplyHeaderPlugin interface {
	Plugin
	PostReadReplyHeader(ReadCtx) *Rerror
}

Interfaces about plugin.

type PostRegPlugin

type PostRegPlugin interface {
	Plugin
	PostReg(*Handler) *Rerror
}

Interfaces about plugin.

type PostSession

type PostSession interface {
	// Id returns the session id.
	Id() string
	// Peer returns the peer.
	Peer() *Peer
	// RemoteIp returns the remote peer ip.
	RemoteIp() string
	// LocalIp returns the local peer ip.
	LocalIp() string
	// Public returns temporary public data of session(socket).
	Public() goutil.Map
	// PublicLen returns the length of public data of session(socket).
	PublicLen() int
}

Session a connection session.

type PostWritePullPlugin

type PostWritePullPlugin interface {
	Plugin
	PostWritePull(WriteCtx) *Rerror
}

Interfaces about plugin.

type PostWritePushPlugin

type PostWritePushPlugin interface {
	Plugin
	PostWritePush(WriteCtx) *Rerror
}

Interfaces about plugin.

type PostWriteReplyPlugin

type PostWriteReplyPlugin interface {
	Plugin
	PostWriteReply(WriteCtx) *Rerror
}

Interfaces about plugin.

type PreReadHeaderPlugin

type PreReadHeaderPlugin interface {
	Plugin
	PreReadHeader(ReadCtx) *Rerror
}

Interfaces about plugin.

type PreReadPullBodyPlugin

type PreReadPullBodyPlugin interface {
	Plugin
	PreReadPullBody(ReadCtx) *Rerror
}

Interfaces about plugin.

type PreReadPushBodyPlugin

type PreReadPushBodyPlugin interface {
	Plugin
	PreReadPushBody(ReadCtx) *Rerror
}

Interfaces about plugin.

type PreReadReplyBodyPlugin

type PreReadReplyBodyPlugin interface {
	Plugin
	PreReadReplyBody(ReadCtx) *Rerror
}

Interfaces about plugin.

type PreSession

type PreSession interface {
	// SetId sets the session id.
	SetId(newId string)
	// Close closes the session.
	Close() error
	// Id returns the session id.
	Id() string
	// IsOk checks if the session is ok.
	IsOk() bool
	// Peer returns the peer.
	Peer() *Peer
	// RemoteIp returns the remote peer ip.
	RemoteIp() string
	// LocalIp returns the local peer ip.
	LocalIp() string
	// ReadTimeout returns readdeadline for underlying net.Conn.
	SetReadTimeout(duration time.Duration)
	// WriteTimeout returns writedeadline for underlying net.Conn.
	SetWriteTimeout(duration time.Duration)
	// Public returns temporary public data of session(socket).
	Public() goutil.Map
	// PublicLen returns the length of public data of session(socket).
	PublicLen() int
	// Send sends packet to peer.
	Send(packet *socket.Packet) error
	// Receive receives a packet from peer.
	Receive(packet *socket.Packet) error
}

Session a connection session.

type PreWritePullPlugin

type PreWritePullPlugin interface {
	Plugin
	PreWritePull(WriteCtx) *Rerror
}

Interfaces about plugin.

type PreWritePushPlugin

type PreWritePushPlugin interface {
	Plugin
	PreWritePush(WriteCtx) *Rerror
}

Interfaces about plugin.

type PreWriteReplyPlugin

type PreWriteReplyPlugin interface {
	Plugin
	PreWriteReply(WriteCtx) *Rerror
}

Interfaces about plugin.

type PullCmd

type PullCmd struct {
	// contains filtered or unexported fields
}

PullCmd the command of the pulling operation's response.

func (*PullCmd) CostTime

func (c *PullCmd) CostTime() time.Duration

CostTime returns the pulled cost time. If PeerConfig.CountTime=false, always returns 0.

func (*PullCmd) Ip

func (c *PullCmd) Ip() string

Ip returns the remote addr.

func (*PullCmd) Output

func (c *PullCmd) Output() *socket.Packet

Output returns writed packet.

func (*PullCmd) Peer

func (c *PullCmd) Peer() *Peer

Peer returns the peer.

func (*PullCmd) Public

func (c *PullCmd) Public() goutil.Map

Public returns temporary public data of context.

func (*PullCmd) PublicLen

func (c *PullCmd) PublicLen() int

PublicLen returns the length of public data of context.

func (*PullCmd) Rerror

func (c *PullCmd) Rerror() *Rerror

*Rerror returns the pull error.

func (*PullCmd) Result

func (c *PullCmd) Result() (interface{}, *Rerror)

Result returns the pull result.

func (*PullCmd) Session

func (c *PullCmd) Session() Session

Session returns the session.

type PullCtx

type PullCtx interface {
	PushCtx
	SetBodyCodec(byte)
	SetMeta(key, value string)
	AddXferPipe(filterId ...byte)
}

PullCtx request handler context. For example:

type HomePull struct{ PullCtx }

type PushCtx

type PushCtx interface {
	Seq() uint64
	GetBodyCodec() byte
	GetMeta(key string) []byte
	Uri() string
	Path() string
	RawQuery() string
	Query() url.Values
	Public() goutil.Map
	PublicLen() int
	Ip() string
	Peer() *Peer
	Session() Session
}

PushCtx push handler context. For example:

type HomePush struct{ PushCtx }

type ReadCtx

type ReadCtx interface {
	Input() *socket.Packet
	Public() goutil.Map
	PublicLen() int
	Ip() string
	Peer() *Peer
	Session() Session
}

ReadCtx for reading packet.

type Rerror

type Rerror struct {
	// Code error code
	Code int32
	// Message error message to the user (optional)
	Message string
	// Detail error's detailed reason (optional)
	Detail string
}

Rerror error only for reply packet

func NewRerror

func NewRerror(code int32, message, detail string) *Rerror

NewRerror creates a *Rerror.

func NewRerrorFromMeta

func NewRerrorFromMeta(meta *utils.Args) *Rerror

NewRerrorFromMeta creates a *Rerror from 'X-Reply-Error' metadata. Return nil if there is no 'X-Reply-Error' in metadata.

func (Rerror) Copy

func (r Rerror) Copy() *Rerror

Copy returns the copy of Rerror

func (*Rerror) MarshalJSON

func (r *Rerror) MarshalJSON() ([]byte, error)

MarshalJSON marshals Rerror into JSON, implements json.Marshaler interface.

func (*Rerror) SetToMeta

func (r *Rerror) SetToMeta(meta *utils.Args)

SetToMeta sets self to 'X-Reply-Error' metadata.

func (*Rerror) String

func (r *Rerror) String() string

String prints error info.

func (*Rerror) UnmarshalJSON

func (r *Rerror) UnmarshalJSON(b []byte) error

UnmarshalJSON unmarshals a JSON description of self.

type Router

type Router struct {
	// contains filtered or unexported fields
}

Router the router of pull or push.

func (*Router) Group

func (r *Router) Group(pathPrefix string, plugin ...Plugin) *Router

Group add handler group.

func (*Router) Reg

func (r *Router) Reg(ctrlStruct interface{}, plugin ...Plugin)

Reg registers handler.

func (*Router) SetUnknown

func (r *Router) SetUnknown(unknownHandler interface{}, plugin ...Plugin)

SetUnknown sets the default handler, which is called when no handler for pull or push is found.

type Session

type Session interface {
	// SetId sets the session id.
	SetId(newId string)
	// Close closes the session.
	Close() error
	// Id returns the session id.
	Id() string
	// IsOk checks if the session is ok.
	IsOk() bool
	// Peer returns the peer.
	Peer() *Peer
	// GoPull sends a packet and receives reply asynchronously.
	// If the args is []byte or *[]byte type, it can automatically fill in the body codec name.
	GoPull(uri string, args interface{}, reply interface{}, done chan *PullCmd, setting ...socket.PacketSetting)
	// Pull sends a packet and receives reply.
	// If the args is []byte or *[]byte type, it can automatically fill in the body codec name.
	Pull(uri string, args interface{}, reply interface{}, setting ...socket.PacketSetting) *PullCmd
	// Push sends a packet, but do not receives reply.
	// If the args is []byte or *[]byte type, it can automatically fill in the body codec name.
	Push(uri string, args interface{}, setting ...socket.PacketSetting) *Rerror
	// ReadTimeout returns readdeadline for underlying net.Conn.
	ReadTimeout() time.Duration
	// RemoteIp returns the remote peer ip.
	RemoteIp() string
	// LocalIp returns the local peer ip.
	LocalIp() string
	// ReadTimeout returns readdeadline for underlying net.Conn.
	SetReadTimeout(duration time.Duration)
	// WriteTimeout returns writedeadline for underlying net.Conn.
	SetWriteTimeout(duration time.Duration)
	// Socket returns the Socket.
	// Socket() socket.Socket
	// WriteTimeout returns writedeadline for underlying net.Conn.
	WriteTimeout() time.Duration
	// Public returns temporary public data of session(socket).
	Public() goutil.Map
	// PublicLen returns the length of public data of session(socket).
	PublicLen() int
}

Session a connection session.

type SessionHub

type SessionHub struct {
	// contains filtered or unexported fields
}

SessionHub sessions hub

func (*SessionHub) Delete

func (sh *SessionHub) Delete(id string)

Delete deletes the *session for a id.

func (*SessionHub) Get

func (sh *SessionHub) Get(id string) (*session, bool)

Get gets *session by id. If second returned arg is false, mean the *session is not found.

func (*SessionHub) Len

func (sh *SessionHub) Len() int

Len returns the length of the session hub. Note: the count implemented using sync.Map may be inaccurate.

func (*SessionHub) Random

func (sh *SessionHub) Random() (*session, bool)

Random gets a *session randomly. If third returned arg is false, mean no *session is exist.

func (*SessionHub) Range

func (sh *SessionHub) Range(fn func(*session) bool)

Range calls f sequentially for each id and *session present in the session hub. If fn returns false, stop traversing.

func (*SessionHub) Set

func (sh *SessionHub) Set(sess *session)

Set sets a *session.

type UnknownPullCtx

type UnknownPullCtx interface {
	UnknownPushCtx
	AddXferPipe(filterId ...byte)
}

type UnknownPushCtx

type UnknownPushCtx interface {
	PushCtx
	InputBodyBytes() []byte
	Bind(v interface{}) (bodyCodec byte, err error)
}

type WriteCtx

type WriteCtx interface {
	Output() *socket.Packet
	Public() goutil.Map
	PublicLen() int
	Ip() string
	Peer() *Peer
	Session() Session
}

WriteCtx for writing packet.

Directories

Path Synopsis
samples
ab
Socket package provides a concise, powerful and high-performance TCP socket.
Socket package provides a concise, powerful and high-performance TCP socket.
example/pb
Package pb is a generated protocol buffer package.
Package pb is a generated protocol buffer package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL