ktmt

package module
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 6, 2020 License: MIT Imports: 12 Imported by: 0

README

ktmt

keepalive tcp message transport

Feature

  1. 支持断开重连
  2. 服务质量(Qos0/Qos1)最多一次/至少一次
  3. 双向的业务心跳
  4. 读写超时

整体结构

  • 应用层(业务协议)

  • 表示层(序列化)

  • 会话层(session 报文交互协议)

    服务质量保证

  • 连接层(tcp连接管理/keepalive)

应用层

Example

表示层

采用json作为数据序列化表示

会话层

协议文档

连接层

心跳检测,连接管理

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrCanceled = errors.New("canceled")
	ErrClosed   = errors.New("isClosed")
)

Functions

func AckFunc

func AckFunc(ctx context.Context, cl ConnectionLayer, packet *packets.PublishPacket) func()

func IsNetClosedErr

func IsNetClosedErr(err error) bool

func IsNetTimeout

func IsNetTimeout(err error) bool

func NewConnectMsg

func NewConnectMsg(cid string, cleanSession bool, keepalive uint16) *packets.ConnectPacket

func NewRouter

func NewRouter() *router

func SetLogger

func SetLogger(out io.Writer)

Types

type ApplicationLayer

type ApplicationLayer interface {
	Open(ctx context.Context) error
	Close() error

	Publish(ctx context.Context, topic string, data interface{}) error
	PublishTimeout(ctx context.Context, topic string, data interface{}, timeout time.Duration) error

	PublishWithID(ctx context.Context, sid string, topic string, data interface{}) Token

	Sessions() []string

	AddRoute(topic string, callback MessageHandler)
	DeleteRoute(topic string)
	SetDefaultHandler(handler MessageHandler)

	Remove(id string)
}

ApplicationLayer 应用层

type ConnConfig

type ConnConfig struct {
	Version      byte
	ReadTimeout  time.Duration
	WriteTimeout time.Duration
	CID          string
	Keepalive    uint16 // s
	CleanSession bool
	CloseCB      OnCloseCallback
}

type ConnectToken

type ConnectToken struct {
	// contains filtered or unexported fields
}

=======

func (*ConnectToken) Error

func (b *ConnectToken) Error() error

func (*ConnectToken) FlowComplete

func (b *ConnectToken) FlowComplete()

func (*ConnectToken) ReturnCode

func (c *ConnectToken) ReturnCode() byte

func (*ConnectToken) SessionPresent

func (c *ConnectToken) SessionPresent() bool

func (*ConnectToken) SetError

func (b *ConnectToken) SetError(e error)

func (*ConnectToken) SetReturnCode

func (c *ConnectToken) SetReturnCode(code byte)

func (*ConnectToken) SetSessionPresent

func (c *ConnectToken) SetSessionPresent(sp bool)

func (*ConnectToken) Wait

func (b *ConnectToken) Wait() bool

func (*ConnectToken) WaitTimeout

func (b *ConnectToken) WaitTimeout(d time.Duration) bool

type ConnectionLayer

type ConnectionLayer interface {
	ID() string
	Write(ctx context.Context, pkt *PacketAndToken) error
	WriteP(ctx context.Context, msg packets.ControlPacket) error
	Read() chan packets.ControlPacket
	Close()
}

ConnectionLayer 连接层

func NewConnectionKeeper

func NewConnectionKeeper(ctx context.Context, connFactory func(context.Context) MConn, sendPing bool) ConnectionLayer

type Customer

type Customer interface {
	GetAckFunc(msg *packets.PublishPacket) func()
}

type DisconnectToken

type DisconnectToken struct {
	// contains filtered or unexported fields
}

=====

func (*DisconnectToken) Error

func (b *DisconnectToken) Error() error

func (*DisconnectToken) FlowComplete

func (b *DisconnectToken) FlowComplete()

func (*DisconnectToken) SetError

func (b *DisconnectToken) SetError(e error)

func (*DisconnectToken) Wait

func (b *DisconnectToken) Wait() bool

func (*DisconnectToken) WaitTimeout

func (b *DisconnectToken) WaitTimeout(d time.Duration) bool

type DummyToken

type DummyToken struct {
	// contains filtered or unexported fields
}

func (*DummyToken) Error

func (d *DummyToken) Error() error

func (*DummyToken) FlowComplete

func (d *DummyToken) FlowComplete()

func (*DummyToken) SetError

func (d *DummyToken) SetError(error)

func (*DummyToken) Wait

func (d *DummyToken) Wait() bool

func (*DummyToken) WaitTimeout

func (d *DummyToken) WaitTimeout(time.Duration) bool

type ErrorToken

type ErrorToken struct {
	// contains filtered or unexported fields
}

func (*ErrorToken) Error

func (e *ErrorToken) Error() error

func (*ErrorToken) Wait

func (e *ErrorToken) Wait() bool

func (*ErrorToken) WaitTimeout

func (e *ErrorToken) WaitTimeout(time.Duration) bool

type Logger

type Logger interface {
	Println(v ...interface{})
	Printf(format string, v ...interface{})
}
var (
	ERROR    Logger = NOOPLogger{}
	CRITICAL Logger = NOOPLogger{}
	WARN     Logger = NOOPLogger{}
	INFO     Logger = NOOPLogger{}
	DEBUG    Logger = NOOPLogger{}
)

type MConn

type MConn interface {
	net.Conn
	ID() string
	Heartbeat() time.Duration
	CleanSession() bool
}

func Accept

func Accept(conn net.Conn, cf *ConnConfig) (MConn, error)

Accept 接收客户端连接

func Connect

func Connect(conn net.Conn, cf *ConnConfig) (MConn, error)

Connect 连接到服务端

func NewMConn

func NewMConn(conn net.Conn, options *ConnConfig) MConn

type MId

type MId uint16

type Message

type Message interface {
	Topic() string
	ClientID() string
	MessageID() MId
	Qos() byte
	Duplicate() bool
	Retained() bool
	Payload() []byte
	Ack()
}

func MessageFromPublish

func MessageFromPublish(p *packets.PublishPacket, cid string, ack func()) Message

type MessageHandler

type MessageHandler func(Message) bool

type MessageIds

type MessageIds struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

func NewMessageIds

func NewMessageIds() *MessageIds

func (*MessageIds) ClaimID

func (m *MessageIds) ClaimID(token TokenCompleter, id MId)

func (*MessageIds) CleanUp

func (m *MessageIds) CleanUp()

func (*MessageIds) FreeID

func (m *MessageIds) FreeID(id MId)

func (*MessageIds) GetID

func (m *MessageIds) GetID(t TokenCompleter) MId

func (*MessageIds) GetToken

func (m *MessageIds) GetToken(id MId) TokenCompleter

type NOOPLogger

type NOOPLogger struct{}

func (NOOPLogger) Printf

func (NOOPLogger) Printf(format string, v ...interface{})

func (NOOPLogger) Println

func (NOOPLogger) Println(v ...interface{})

type OnCloseCallback

type OnCloseCallback func(id string)

type PacketAndToken

type PacketAndToken struct {
	P packets.ControlPacket
	T TokenCompleter
}

type PresentationLayer

type PresentationLayer interface {
	Decode(src []byte, dst interface{}) (err error)
	Encode(src interface{}) (dst []byte, err error)
}

PresentationLayer 表示层

func NewJsonPresentationLayer

func NewJsonPresentationLayer() PresentationLayer

type PublishToken

type PublishToken struct {
	// contains filtered or unexported fields
}

==

func (*PublishToken) Error

func (b *PublishToken) Error() error

func (*PublishToken) FlowComplete

func (b *PublishToken) FlowComplete()

func (*PublishToken) MessageID

func (p *PublishToken) MessageID() MId

func (*PublishToken) SetError

func (b *PublishToken) SetError(e error)

func (*PublishToken) SetMessageID

func (p *PublishToken) SetMessageID(id MId)

func (*PublishToken) Wait

func (b *PublishToken) Wait() bool

func (*PublishToken) WaitTimeout

func (b *PublishToken) WaitTimeout(d time.Duration) bool

type Router

type Router interface {
	// AddRoute 添加路由
	AddRoute(topic string, callback MessageHandler)
	// DeleteRoute 删除路由
	DeleteRoute(topic string)
	// SetDefaultHandler 设置默认handler.
	SetDefaultHandler(handler MessageHandler)
	// MatchAndDispatch 分发消息,并处理. order 顺序执行.
	MatchAndDispatch(messages <-chan Message, order bool)
}

type SessionLayer

type SessionLayer interface {
	Close()
	Send(ctx context.Context, topic string, qos int, msg []byte) Token
	UpdateConnectLayer(ctx context.Context, cl ConnectionLayer) error
	In() <-chan Message
}

SessionLayer 会话层

func NewSession

func NewSession(ctx context.Context, cl ConnectionLayer) (SessionLayer, error)

type Token

type Token interface {
	Wait() bool
	WaitTimeout(time.Duration) bool
	Error() error
}

func NewErrToken

func NewErrToken(err error) Token

type TokenCompleter

type TokenCompleter interface {
	Token
	TokenErrorSetter
	FlowComplete()
}

func NewToken

func NewToken(tType byte) TokenCompleter

type TokenErrorSetter

type TokenErrorSetter interface {
	SetError(error)
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL