Documentation ¶
Index ¶
- Variables
- func AckFunc(ctx context.Context, cl ConnectionLayer, packet *packets.PublishPacket) func()
- func IsNetClosedErr(err error) bool
- func IsNetTimeout(err error) bool
- func NewConnectMsg(cid string, cleanSession bool, keepalive uint16) *packets.ConnectPacket
- func NewRouter() *router
- func SetLogger(out io.Writer)
- type ApplicationLayer
- type ConnConfig
- type ConnectToken
- func (b *ConnectToken) Error() error
- func (b *ConnectToken) FlowComplete()
- func (c *ConnectToken) ReturnCode() byte
- func (c *ConnectToken) SessionPresent() bool
- func (b *ConnectToken) SetError(e error)
- func (c *ConnectToken) SetReturnCode(code byte)
- func (c *ConnectToken) SetSessionPresent(sp bool)
- func (b *ConnectToken) Wait() bool
- func (b *ConnectToken) WaitTimeout(d time.Duration) bool
- type ConnectionLayer
- type Customer
- type DisconnectToken
- type DummyToken
- type ErrorToken
- type Logger
- type MConn
- type MId
- type Message
- type MessageHandler
- type MessageIds
- type NOOPLogger
- type OnCloseCallback
- type PacketAndToken
- type PresentationLayer
- type PublishToken
- type Router
- type SessionLayer
- type Token
- type TokenCompleter
- type TokenErrorSetter
Constants ¶
This section is empty.
Variables ¶
View Source
var ( ErrCanceled = errors.New("canceled") ErrClosed = errors.New("isClosed") )
Functions ¶
func AckFunc ¶
func AckFunc(ctx context.Context, cl ConnectionLayer, packet *packets.PublishPacket) func()
func IsNetClosedErr ¶
func IsNetTimeout ¶
func NewConnectMsg ¶
func NewConnectMsg(cid string, cleanSession bool, keepalive uint16) *packets.ConnectPacket
Types ¶
type ApplicationLayer ¶
type ApplicationLayer interface { Open(ctx context.Context) error Close() error Publish(ctx context.Context, topic string, data interface{}) error PublishTimeout(ctx context.Context, topic string, data interface{}, timeout time.Duration) error PublishWithID(ctx context.Context, sid string, topic string, data interface{}) Token Sessions() []string AddRoute(topic string, callback MessageHandler) DeleteRoute(topic string) SetDefaultHandler(handler MessageHandler) Remove(id string) }
ApplicationLayer 应用层
type ConnConfig ¶
type ConnectToken ¶
type ConnectToken struct {
// contains filtered or unexported fields
}
=======
func (*ConnectToken) FlowComplete ¶
func (b *ConnectToken) FlowComplete()
func (*ConnectToken) ReturnCode ¶
func (c *ConnectToken) ReturnCode() byte
func (*ConnectToken) SessionPresent ¶
func (c *ConnectToken) SessionPresent() bool
func (*ConnectToken) SetReturnCode ¶
func (c *ConnectToken) SetReturnCode(code byte)
func (*ConnectToken) SetSessionPresent ¶
func (c *ConnectToken) SetSessionPresent(sp bool)
func (*ConnectToken) WaitTimeout ¶
type ConnectionLayer ¶
type ConnectionLayer interface { ID() string Write(ctx context.Context, pkt *PacketAndToken) error WriteP(ctx context.Context, msg packets.ControlPacket) error Read() chan packets.ControlPacket Close() }
ConnectionLayer 连接层
func NewConnectionKeeper ¶
type Customer ¶
type Customer interface {
GetAckFunc(msg *packets.PublishPacket) func()
}
type DisconnectToken ¶
type DisconnectToken struct {
// contains filtered or unexported fields
}
=====
func (*DisconnectToken) FlowComplete ¶
func (b *DisconnectToken) FlowComplete()
func (*DisconnectToken) WaitTimeout ¶
type DummyToken ¶
type DummyToken struct {
// contains filtered or unexported fields
}
func (*DummyToken) Error ¶
func (d *DummyToken) Error() error
func (*DummyToken) FlowComplete ¶
func (d *DummyToken) FlowComplete()
func (*DummyToken) SetError ¶
func (d *DummyToken) SetError(error)
func (*DummyToken) Wait ¶
func (d *DummyToken) Wait() bool
func (*DummyToken) WaitTimeout ¶
func (d *DummyToken) WaitTimeout(time.Duration) bool
type ErrorToken ¶
type ErrorToken struct {
// contains filtered or unexported fields
}
func (*ErrorToken) Error ¶
func (e *ErrorToken) Error() error
func (*ErrorToken) Wait ¶
func (e *ErrorToken) Wait() bool
func (*ErrorToken) WaitTimeout ¶
func (e *ErrorToken) WaitTimeout(time.Duration) bool
type Logger ¶
type Logger interface { Println(v ...interface{}) Printf(format string, v ...interface{}) }
var ( ERROR Logger = NOOPLogger{} CRITICAL Logger = NOOPLogger{} WARN Logger = NOOPLogger{} INFO Logger = NOOPLogger{} DEBUG Logger = NOOPLogger{} )
type Message ¶
type Message interface { Topic() string ClientID() string MessageID() MId Qos() byte Duplicate() bool Retained() bool Payload() []byte Ack() }
func MessageFromPublish ¶
func MessageFromPublish(p *packets.PublishPacket, cid string, ack func()) Message
type MessageHandler ¶
type MessageIds ¶
func NewMessageIds ¶
func NewMessageIds() *MessageIds
func (*MessageIds) ClaimID ¶
func (m *MessageIds) ClaimID(token TokenCompleter, id MId)
func (*MessageIds) CleanUp ¶
func (m *MessageIds) CleanUp()
func (*MessageIds) FreeID ¶
func (m *MessageIds) FreeID(id MId)
func (*MessageIds) GetID ¶
func (m *MessageIds) GetID(t TokenCompleter) MId
func (*MessageIds) GetToken ¶
func (m *MessageIds) GetToken(id MId) TokenCompleter
type NOOPLogger ¶
type NOOPLogger struct{}
func (NOOPLogger) Printf ¶
func (NOOPLogger) Printf(format string, v ...interface{})
func (NOOPLogger) Println ¶
func (NOOPLogger) Println(v ...interface{})
type OnCloseCallback ¶
type OnCloseCallback func(id string)
type PacketAndToken ¶
type PacketAndToken struct { P packets.ControlPacket T TokenCompleter }
type PresentationLayer ¶
type PresentationLayer interface { Decode(src []byte, dst interface{}) (err error) Encode(src interface{}) (dst []byte, err error) }
PresentationLayer 表示层
func NewJsonPresentationLayer ¶
func NewJsonPresentationLayer() PresentationLayer
type PublishToken ¶
type PublishToken struct {
// contains filtered or unexported fields
}
==
func (*PublishToken) FlowComplete ¶
func (b *PublishToken) FlowComplete()
func (*PublishToken) MessageID ¶
func (p *PublishToken) MessageID() MId
func (*PublishToken) SetMessageID ¶
func (p *PublishToken) SetMessageID(id MId)
func (*PublishToken) WaitTimeout ¶
type Router ¶
type Router interface { // AddRoute 添加路由 AddRoute(topic string, callback MessageHandler) // DeleteRoute 删除路由 DeleteRoute(topic string) // SetDefaultHandler 设置默认handler. SetDefaultHandler(handler MessageHandler) // MatchAndDispatch 分发消息,并处理. order 顺序执行. MatchAndDispatch(messages <-chan Message, order bool) }
type SessionLayer ¶
type SessionLayer interface { Close() Send(ctx context.Context, topic string, qos int, msg []byte) Token UpdateConnectLayer(ctx context.Context, cl ConnectionLayer) error In() <-chan Message }
SessionLayer 会话层
func NewSession ¶
func NewSession(ctx context.Context, cl ConnectionLayer) (SessionLayer, error)
type Token ¶
func NewErrToken ¶
type TokenCompleter ¶
type TokenCompleter interface { Token TokenErrorSetter FlowComplete() }
func NewToken ¶
func NewToken(tType byte) TokenCompleter
type TokenErrorSetter ¶
type TokenErrorSetter interface {
SetError(error)
}
Source Files ¶
Click to show internal directories.
Click to hide internal directories.