ikio

package
v0.0.0-...-7429660 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 29, 2023 License: Apache-2.0 Imports: 22 Imported by: 0

Documentation

Index

Constants

View Source
const (
	PacketTypeRequest  = 0
	PacketTypeResponse = 1
	PacketTypeHint     = 2
)
View Source
const (
	RPCDefaultMaxHeaderLen = 10 * 1024 * 1024
	RPCDefaultMaxBodyLen   = 10 * 1024 * 1024
	RPCDefaultMaxPeerIDLen = 1024
)

RPC body/header limit

View Source
const (
	HintTypeKeepalive = 0
)
View Source
const (
	RPCNegoMessageType = 0xFFFF
)

RPC Nego msg type

Variables

View Source
var (
	ErrRPCHeaderSizeLimit = errors.New("rpc header size max error")
	ErrRPCBodySizeLimit   = errors.New("rpc body size max error")
	ErrRPCPeerIDSizeLimit = errors.New("rpc peerid size max error")
)

rpc codec errors

View Source
var (
	ErrWouldBlock   = errors.New("would block")
	ErrServerClosed = errors.New("server has been closed")
)

Error codes returned by failures dealing with server or connection.

View Source
var Buffer bufferPool

Functions

func ConnIDFromContext

func ConnIDFromContext(ctx context.Context) int64

func NewContextWithConnID

func NewContextWithConnID(ctx context.Context, connID int64) context.Context

func NewContextWithMessage

func NewContextWithMessage(ctx context.Context, msg Packet) context.Context

NewContextWithMessage returns a new Context that carries message.

func NewContextWithServer

func NewContextWithServer(ctx context.Context, s *Server) context.Context

NewContextWithServer returns a new Context that carries server.

func PutRPCPacket

func PutRPCPacket(p *RPCPacket)

func SetTimestamp

func SetTimestamp(ctx context.Context, t int64, tags ...interface{}) context.Context

Types

type Channel

type Channel struct {
	// contains filtered or unexported fields
}

Channel represents a server connection to a TCP server, it implments Conn.

func NewChannel

func NewChannel(ctx context.Context, conn net.Conn, opts channelOption) *Channel

NewChannel returns a new Channel

func (*Channel) AddPendingTimer

func (c *Channel) AddPendingTimer(ctx *timer.Context)

AddPendingTimer adds a new timer ID to client connection.

func (*Channel) CancelTimer

func (c *Channel) CancelTimer(ctx *timer.Context)

CancelTimer cancels a timer with the specified ID.

func (*Channel) Close

func (c *Channel) Close() error

Close gracefully closes the server connection. It blocked until all sub go-routines are completed and returned.

func (*Channel) ConnID

func (c *Channel) ConnID() int64

ConnID returns net ID of server connection.

func (*Channel) ContextValue

func (c *Channel) ContextValue(k interface{}) interface{}

ContextValue gets extra data from server connection.

func (*Channel) LastActive

func (c *Channel) LastActive() int64

LastActive returns the heart beats of server connection.

func (*Channel) LocalAddr

func (c *Channel) LocalAddr() net.Addr

LocalAddr returns the local address of server connection.

func (*Channel) Name

func (c *Channel) Name() string

Name returns the name of server connection.

func (*Channel) RemoteAddr

func (c *Channel) RemoteAddr() net.Addr

RemoteAddr returns the remote address of server connection.

func (*Channel) RunAfter

func (c *Channel) RunAfter(delay time.Duration, interval time.Duration, cb func(time.Time, WriteCloser), askForWorks ...bool) *timer.Context

func (*Channel) SetContextValue

func (c *Channel) SetContextValue(k, v interface{})

SetContextValue sets extra data to server connection.

func (*Channel) SetLastActive

func (c *Channel) SetLastActive(heart int64)

SetLastActive sets the heart beats of server connection.

func (*Channel) SetName

func (c *Channel) SetName(name string)

SetName sets name of server connection.

func (*Channel) Start

func (c *Channel) Start()

Start starts the server connection, creating go-routines for reading, writing and handlng.

func (*Channel) Write

func (c *Channel) Write(ctx context.Context, packet Packet) (int, error)

Write writes a message to the client.

type ClientChannel

type ClientChannel struct {
	*Channel
	// contains filtered or unexported fields
}

func NewClientChannel

func NewClientChannel(c net.Conn, opt ...Option) *ClientChannel

func (*ClientChannel) Close

func (c *ClientChannel) Close() error

Close asyncclose

type Codec

type Codec interface {
	Decode(*bufio.Reader) (Packet, error)
	Encode(Packet, io.Writer) (int, error)
}

网络读包

type ErrUndefined

type ErrUndefined int32

ErrUndefined for undefined message type.

func (ErrUndefined) Error

func (e ErrUndefined) Error() string

type HandlePoolType

type HandlePoolType int8

HandlePoolType handle pool type

const (
	HandleNoPooled HandlePoolType = iota
	HandlePooledRandom
	HandlePooledStick
	HandlePoolNewRoutine
)

handler type

type Handler

type Handler interface {
	Handle(context.Context, WriteCloser)
}

Handler takes the responsibility to handle incoming messages.

type HandlerFunc

type HandlerFunc func(context.Context, WriteCloser)

HandlerFunc serves as an adapter to allow the use of ordinary functions as handlers.

func (HandlerFunc) Handle

func (f HandlerFunc) Handle(ctx context.Context, c WriteCloser)

Handle calls f(ctx, c)

type Hashable

type Hashable interface {
	HashCode() int32
}

Hashable is a interface for hashable object.

type LineCodec

type LineCodec struct{}

func (*LineCodec) Decode

func (lc *LineCodec) Decode(reader *bufio.Reader) (Packet, error)

func (*LineCodec) Encode

func (lc *LineCodec) Encode(p Packet, writer io.Writer) (int, error)

type LinePacket

type LinePacket struct {
	Payload []byte
}

func (*LinePacket) Serialize

func (lp *LinePacket) Serialize() ([]byte, error)

func (*LinePacket) Timestamp

func (lp *LinePacket) Timestamp() int64

func (*LinePacket) Type

func (lp *LinePacket) Type() int32

type MessageHandler

type MessageHandler struct {
	// contains filtered or unexported fields
}

MessageHandler is a combination of message and its handler function.

type OnTimeOut

type OnTimeOut struct {
	Callback      func(time.Time, WriteCloser)
	Ctx           context.Context
	AskForWorkers bool
}

OnTimeOut represents a timed task.

func NewOnTimeOut

func NewOnTimeOut(ctx context.Context, cb func(time.Time, WriteCloser)) *OnTimeOut

NewOnTimeOut returns OnTimeOut.

type Option

type Option func(*options)

Option sets server options.

func CustomCodecOption

func CustomCodecOption(codec func() Codec) Option

CustomCodecOption returns a Option that will apply a custom Codec.

func HandlerBufferSizeOption

func HandlerBufferSizeOption(indicator int) Option

HandlerBufferSizeOption returns a Option that is the size of handler buffered channel,

func MetricsTags

func MetricsTags(kvs ...interface{}) Option

MetricsTags SetOption metrics tags

func OnCloseOption

func OnCloseOption(cb func(WriteCloser)) Option

OnCloseOption returns a Option that will set callback to call when client closed.

func OnConnectOption

func OnConnectOption(cb func(WriteCloser) bool) Option

OnConnectOption returns a Option that will set callback to call when new client connected.

func OnMessageOption

func OnMessageOption(cb func(Packet, WriteCloser), poolType ...HandlePoolType) Option

OnMessageOption returns a Option that will set callback to call when new message arrived.

func ReadTimeoutOption

func ReadTimeoutOption(t time.Duration) Option

ReadTimeoutOption SetSocket read timeout

func TimerBufferSizeOption

func TimerBufferSizeOption(indicator int) Option

TimerBufferSizeOption returns a Option that is the size of timer buffered channel,

func WithLogger

func WithLogger(l *log.Logger) Option

WithLogger SetOption Logger

func WorkerSizeOption

func WorkerSizeOption(workerSz int) Option

WorkerSizeOption returns a Option that will set the number of go-routines in WorkerPool.

func WriteTimeoutOption

func WriteTimeoutOption(t time.Duration) Option

WriteTimeoutOption Option SetSocket write timeout

func WriterBufferSizeOption

func WriterBufferSizeOption(indicator int) Option

WriterBufferSizeOption returns a Option that is the size of writer buffered channel,

type Packet

type Packet interface {
	Type() int32
	Serialize() ([]byte, error)
}

网络协议包

func MessageFromContext

func MessageFromContext(ctx context.Context) Packet

type RPCCodec

type RPCCodec struct {
	MaxHeaderLen int32
	MaxBodyLen   int32
	MaxPeerIDLen int32
	// contains filtered or unexported fields
}

func (*RPCCodec) Decode

func (rc *RPCCodec) Decode(reader *bufio.Reader) (Packet, error)

func (*RPCCodec) Encode

func (rc *RPCCodec) Encode(p Packet, writer io.Writer) (int, error)

type RPCNegoPacket

type RPCNegoPacket struct {
	Magic uint32
	ID    []byte
	Flag  uint32
}

func (*RPCNegoPacket) Serialize

func (rnp *RPCNegoPacket) Serialize() ([]byte, error)

func (*RPCNegoPacket) Timestamp

func (rnp *RPCNegoPacket) Timestamp() int64

func (*RPCNegoPacket) Type

func (rnp *RPCNegoPacket) Type() int32

type RPCPacket

type RPCPacket struct {
	ID      int64
	Code    int32
	Header  []RPCPacketHeader
	Payload []byte
	Tp      int32
	Flags   int32

	// timestamp
	T int64
}

func GetRPCPacket

func GetRPCPacket() *RPCPacket

func (*RPCPacket) AddHeader

func (rp *RPCPacket) AddHeader(key []byte, value []byte)

func (*RPCPacket) ForeachHeader

func (rp *RPCPacket) ForeachHeader(cb func(key, value []byte) error) error

func (*RPCPacket) GetHeader

func (rp *RPCPacket) GetHeader(key []byte) ([]byte, bool)

func (*RPCPacket) GetHeaderUint32

func (rp *RPCPacket) GetHeaderUint32(key []byte) (uint32, bool)

func (*RPCPacket) GetHeaderUint64

func (rp *RPCPacket) GetHeaderUint64(key []byte) (uint64, bool)

func (*RPCPacket) Serialize

func (rp *RPCPacket) Serialize() ([]byte, error)

func (*RPCPacket) Timestamp

func (rp *RPCPacket) Timestamp() int64

func (*RPCPacket) Type

func (rp *RPCPacket) Type() int32

type RPCPacketHeader

type RPCPacketHeader struct {
	Key   []byte
	Value []byte
}

type Server

type Server struct {
	// contains filtered or unexported fields
}

func NewServer

func NewServer(opt ...Option) *Server

func ServerFromContext

func ServerFromContext(ctx context.Context) *Server

func (*Server) Broadcast

func (s *Server) Broadcast(ctx context.Context, msg Packet, except func(connid int64) bool)

Broadcast broadcasts message to all server connections managed.

func (*Server) Conn

func (s *Server) Conn(id int64) (*ServerChannel, bool)

Conn returns a server connection with specified ID.

func (*Server) Register

func (s *Server) Register(msgType int32, handler HandlerFunc, handleType HandlePoolType)

func (*Server) Start

func (s *Server) Start(l net.Listener) error

func (*Server) Stop

func (s *Server) Stop()

func (*Server) Unicast

func (s *Server) Unicast(ctx context.Context, id int64, msg Packet) (int, error)

Unicast unicasts message to a specified conn.

type ServerChannel

type ServerChannel struct {
	*Channel
}

func NewServerChannel

func NewServerChannel(id int64, s *Server, c net.Conn) *ServerChannel

func (*ServerChannel) Close

func (c *ServerChannel) Close() error

Close asyncclose

type Stats

type Stats struct {
}

type WorkerPool

type WorkerPool struct {
	// contains filtered or unexported fields
}

WorkerPool is a pool of go-routines running functions.

func WorkerPoolInstance

func WorkerPoolInstance() *WorkerPool

WorkerPoolInstance returns the global pool.

func (*WorkerPool) Close

func (wp *WorkerPool) Close()

Close closes the pool, stopping it from executing functions.

func (*WorkerPool) Put

func (wp *WorkerPool) Put(code uint32, cb func()) error

Put appends a function to some worker's channel.

func (*WorkerPool) Size

func (wp *WorkerPool) Size() int

Size returns the size of pool.

type WriteCloser

type WriteCloser interface {
	Write(context.Context, Packet) (int, error)
	Close() error
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL