Documentation ¶
Index ¶
- Constants
- Variables
- type Applyable
- type BasePacketBuilder
- func (b *BasePacketBuilder) AsString() (str string)
- func (b *BasePacketBuilder) Build(ctx *StateContext, pkg *Pkg) (err error)
- func (b *BasePacketBuilder) Bytes() (bytes []byte)
- func (b *BasePacketBuilder) IsResending() bool
- func (b *BasePacketBuilder) NextPktId(ctx *StateContext) uint16
- func (b *BasePacketBuilder) ReportType() ReportType
- func (b *BasePacketBuilder) RequestResend()
- func (p *BasePacketBuilder) WithCtx(ctx *StateContext) *logger.Base
- func (b *BasePacketBuilder) WriteTo(w io.Writer) (n int, err error)
- type ClientIdReadable
- type ClientSubsHolder
- type Config
- type ConnackBuilderOpt
- func WithConnack50AssignedClientId(s string) ConnackBuilderOpt
- func WithConnack50Authentication(method string, data []byte) ConnackBuilderOpt
- func WithConnack50ConnackReason(reason ConnackRetcode, reasonString string) ConnackBuilderOpt
- func WithConnack50MaximumPacketSize(i int32) ConnackBuilderOpt
- func WithConnack50MaximumQoS(qos QoSType) ConnackBuilderOpt
- func WithConnack50ReceiveMaximum(i uint16) ConnackBuilderOpt
- func WithConnack50ResponseInfo(info string) ConnackBuilderOpt
- func WithConnack50RetainAvailable(i bool) ConnackBuilderOpt
- func WithConnack50ServerKeepAlive(interval uint16) ConnackBuilderOpt
- func WithConnack50SessionExpiryInterval(interval int32) ConnackBuilderOpt
- func WithConnack50SharedSubsAvailable(i bool) ConnackBuilderOpt
- func WithConnack50SubsIdAvailable(i bool) ConnackBuilderOpt
- func WithConnack50TopicAliasMaximum(i uint16) ConnackBuilderOpt
- func WithConnack50UserProperties(props map[string]string) ConnackBuilderOpt
- func WithConnack50WildcardSubsAvailable(i bool) ConnackBuilderOpt
- type ConnackRetcode
- type ConnackVH
- type ConnectBuilderOpt
- func WithConnectCleanSession(cleanSession bool) ConnectBuilderOpt
- func WithConnectCleanSessionIfNotSet(cleanSession bool) ConnectBuilderOpt
- func WithConnectClientId(id string) ConnectBuilderOpt
- func WithConnectKeepAliveSeconds(seconds uint16) ConnectBuilderOpt
- func WithConnectKeepAliveSecondsIfNotSet(seconds uint16) ConnectBuilderOpt
- func WithConnectPassword(password string) ConnectBuilderOpt
- func WithConnectQoS(DUP bool, qos QoSType, RETAIN bool) ConnectBuilderOpt
- func WithConnectUsername(username string) ConnectBuilderOpt
- func WithConnectWillMsg(willRetain bool, willQoS QoSType, willTopic string, willMessage []byte) ConnectBuilderOpt
- type ConnectParam
- type ConnectPayload
- type ConnectVH
- type Contextual
- type ControlMessage
- type DataMessage
- type DefaultSessionStoreOpt
- func WithDefaultSessionStoreNoSysStats(b bool) DefaultSessionStoreOpt
- func WithDefaultSessionStoreNoSysStatsLog(b bool) DefaultSessionStoreOpt
- func WithDefaultSessionStorePathRW(rw PathRW) DefaultSessionStoreOpt
- func WithDefaultSessionStoreResetStorage(b bool) DefaultSessionStoreOpt
- func WithDefaultSessionStoreSavingEnabled(persist bool) DefaultSessionStoreOpt
- func WithDefaultSessionStoreSavingPeriod(d time.Duration) DefaultSessionStoreOpt
- type Disconnect50BuilderOpt
- func WithDisconnect50Reason(reason errors.CloseReason, reasonString string) Disconnect50BuilderOpt
- func WithDisconnect50ReasonServerMoved(serverReference string) Disconnect50BuilderOpt
- func WithDisconnect50ReasonUseOtherServer(serverReference string) Disconnect50BuilderOpt
- func WithDisconnect50SessionExpiryInterval(seconds int) Disconnect50BuilderOpt
- func WithDisconnect50UserProperties(props map[string]string) Disconnect50BuilderOpt
- type Hooker
- type IdStore
- type Message
- type Mode
- type PacketBuilder
- func NewConnectBuilder(opts ...ConnectBuilderOpt) PacketBuilder
- func NewDisconnectBuilder(opts ...Disconnect50BuilderOpt) PacketBuilder
- func NewPingreqBuilder() PacketBuilder
- func NewPingrespBuilder() PacketBuilder
- func NewPub10ackBuilderFromPkg(pkg *Pkg) PacketBuilder
- func NewPub21recBuilderFromPkg(pkg *Pkg) PacketBuilder
- func NewPub22relBuilderFromPkg(packetIdentifier uint16) PacketBuilder
- func NewPub23compBuilderFromPkg(packetIdentifier uint16) PacketBuilder
- func NewPublishBuilder(topicName string, msg []byte, opts ...PublishBuilderOpt) PacketBuilder
- func NewSubackBuilder(vh *subscribeVH, payload *subscribePayload, id uint16) PacketBuilder
- func NewSubscribeBuilder(verbose bool, opts ...SubscribeBuilderOpt) PacketBuilder
- func NewUnsubackBuilder(opts ...UnsubackBuilderOpt) PacketBuilder
- func NewUnsubscribeBuilder(opts ...UnsubscribeBuilderOpt) PacketBuilder
- type PacketParser
- func NewConnackParser(to State) PacketParser
- func NewConnectParser(to State, strictClientId bool) PacketParser
- func NewDisconnectParser(to State) PacketParser
- func NewPingreqParser(to State) PacketParser
- func NewPingrespParser(to State) PacketParser
- func NewPubackParser(to State) PacketParser
- func NewPubcompParser(to State) PacketParser
- func NewPublishParser(to State) PacketParser
- func NewPubrecParser(to State) PacketParser
- func NewPubrelParser(to State) PacketParser
- func NewSubackParser(to State) PacketParser
- func NewSubscribeParser(to State) PacketParser
- func NewUnsubackParser(to State) PacketParser
- func NewUnsubscribeParser(to State) PacketParser
- func NewWrongPacketParser() PacketParser
- type PacketParserMap
- type PathRW
- type PathRWOpt
- type Payload
- type Pkg
- type ProtocolLevel
- type PubBlock
- type PubCompPayload
- type PubCompVH
- type PubSubTask
- type PubackVH
- type Pubinfo
- type PublishBuilderOpt
- func WithPub50UserProperties(props map[string]string) PublishBuilderOpt
- func WithPub50contentType(i string) PublishBuilderOpt
- func WithPub50correlationData(i []byte) PublishBuilderOpt
- func WithPub50messageExpiryInterval(i int32) PublishBuilderOpt
- func WithPub50payloadFormatIndicator(i byte) PublishBuilderOpt
- func WithPub50responseTopic(i string) PublishBuilderOpt
- func WithPub50subscriptionIdentifier(i string) PublishBuilderOpt
- func WithPub50topicAlias(i uint16) PublishBuilderOpt
- func WithPubPacketIdentifier(id uint16) PublishBuilderOpt
- func WithPubProtocolLevel(level ProtocolLevel) PublishBuilderOpt
- func WithPubQoS(DUP bool, qos QoSType, RETAIN bool) PublishBuilderOpt
- type PublishPayload
- type PublishVH
- type QoSType
- type ReportType
- type RiskControl
- type SendPayload
- type SendPayloadVx
- type SendingTask
- type Session
- type SessionStore
- type State
- type StateContext
- func NewMqttClientContext(clientId string, conn net.Conn, opts ...StateContextOpt) *StateContext
- func NewMqttServerContext(clientId string, wr io.Writer, closer io.Closer, opts ...StateContextOpt) *StateContext
- func NewMqttTestContext(clientId string, wr io.Writer, closer io.Closer, opts ...StateContextOpt) *StateContext
- func (s *StateContext) Advance(pkg *Pkg) (next State, err error)
- func (s *StateContext) Borrow() (pktId uint16)
- func (s *StateContext) CanWrite() bool
- func (s *StateContext) CloseWithReason(reason errors.CloseReason) (err error)
- func (s *StateContext) CreateSessionForThisClient(pkg *Pkg) (sess Session, err error)
- func (s *StateContext) IsClientMode() bool
- func (s *StateContext) IsClose() bool
- func (s *StateContext) IsNormalClosed() bool
- func (s *StateContext) IsNotClientMode() bool
- func (s *StateContext) IsNotServer() bool
- func (s *StateContext) IsServer() bool
- func (s *StateContext) MaxProtocolLevel() ProtocolLevel
- func (s *StateContext) MaxQoS() QoSType
- func (s *StateContext) MaxVersion() string
- func (ctx *StateContext) Parse(pkg *Pkg) (to State, err error)
- func (s *StateContext) Return(pktId uint16) (err error)
- func (p *StateContext) WithCtx(ctx *StateContext) *logger.Base
- func (s *StateContext) Write(p []byte) (nn int, err error)
- type StateContextOpt
- func WithAuthEnabled(b bool) StateContextOpt
- func WithAuthenticator(ss auth.Authenticator) StateContextOpt
- func WithConnectTimeout(timeout time.Duration) StateContextOpt
- func WithConnectedTimestamp(t time.Time) StateContextOpt
- func WithDefaultRiskControl(b bool) StateContextOpt
- func WithMaxProtocolLevel(level ProtocolLevel) StateContextOpt
- func WithMaxQoS(maxQoS QoSType) StateContextOpt
- func WithMaxVersion(v string) StateContextOpt
- func WithNoWildMatchFilter(b bool) StateContextOpt
- func WithRiskControl(ss RiskControl) StateContextOpt
- func WithSessionStore(ss SessionStore) StateContextOpt
- func WithStateRegistry(ss StateRegistry) StateContextOpt
- func WithStrictClientId(b bool) StateContextOpt
- func WithStrictProtocolName(b bool) StateContextOpt
- func WithTimeOfConnected(tm time.Time) StateContextOpt
- type StateRegistry
- type Store
- type SubBlock
- type SubHolder
- type Subinfo
- type SubscribeBuilderOpt
- func WithSub50UserProperties(props map[string]string) SubscribeBuilderOpt
- func WithSubHolder(holder SubHolder) SubscribeBuilderOpt
- func WithSubPacketIdentifier(id uint16) SubscribeBuilderOpt
- func WithSubTopicFilter(topicFilter string, qos QoSType) SubscribeBuilderOpt
- func WithSubTopicFilters(topicFilters []string, qos QoSType) SubscribeBuilderOpt
- type SysPublisher
- type TopicFilter
- type UnsubackBuilderOpt
- type UnsubackRetcode
- type UnsubscribeBuilderOpt
- type VariantHeader
- type WillMessage
- type WrongPacketParser
- func (p *WrongPacketParser) CreatePayload() Payload
- func (p *WrongPacketParser) CreateVH() VariantHeader
- func (p *WrongPacketParser) GetToState() State
- func (p *WrongPacketParser) OnAction(ctx *StateContext, pkg *Pkg) (err error)
- func (p *WrongPacketParser) OnError(ctx *StateContext, pkg *Pkg, reason error) (err error)
Constants ¶
const DefaultConnectTimeout = 60 * time.Second
const INVALID_UNIQUE_ID int64 = -1
const InfiniteDuration time.Duration = 1<<63 - 1
const InfiniteDuration = time.Duration(math.MaxUint16) * time.Second InfiniteDuration is equal maxDuration
Variables ¶
var ( QoSToString = map[QoSType]string{ QoSDefault: "QoSDefault", QoS1: "QoS1", QoS2: "QoS2", QoSError: "QoSError", QoSFailure: "QoSFailure", } QoSFromString = map[string]QoSType{ "QoSDefault": QoSDefault, "QoS1": QoS1, "QoS2": QoS2, "QoSError": QoSError, "QoSFailure": QoSFailure, } )
var ( ReportTypeToString = map[ReportType]string{ Reserved0: "Nothing", CONNECT: "CONNECT", CONNACK: "CONNACK", PUBLISH: "PUBLISH", PUBACK: "PUBACK", PUBREC: "PUBREC", PUBREL: "PUBREL", PUBCOMP: "PUBCOMP", SUBSCRIBE: "SUBSCRIBE", SUBACK: "SUBACK", UNSUBSCRIBE: "UNSUBSCRIBE", UNSUBACK: "UNSUBACK", PINGREQ: "PINGREQ", PINGRESP: "PINGRESP", DISCONNECT: "DISCONNECT", AUTH: "AUTH", ReportType(IDLE): "IDLE", ReportType(CONNECTING): "CONNECTING", ReportType(CONNECTED): "CONNECTED", ReportType(PUBLISHING): "PUBLISHING", ReportType(PUBLISHED): "PUBLISHED", ReportType(SUBSCRIBING): "SUBSCRIBING", ReportType(SUBSCRIBED): "SUBSCRIBED", ReportType(UNSUBSCRIBING): "UNSUBSCRIBING", ReportType(UNSUBSCRIBED): "UNSUBSCRIBED", ReportType(PINGING): "PINGING", ReportType(CLOSED): "CLOSED", ReportType(PUB1_ACK): "PUB1_ACK", ReportType(PUB2_REC): "PUB2_REC", ReportType(PUB2_REL): "PUB2_REL", ReportType(PUB2_COMP): "PUB2_COMP", ReportType(WRONG_STATE): "WRONG_STATE", ReportType(NEED_CLOSE_STATE): "NEED_CLOSE_STATE", ReportType(MAX_STATE): "MAX_STATE", } ReportTypeFromString = map[string]ReportType{ "Nothing": Reserved0, "CONNECT": CONNECT, "CONNACK": CONNACK, "PUBLISH": PUBLISH, "PUBACK": PUBACK, "PUBREC": PUBREC, "PUBREL": PUBREL, "PUBCOMP": PUBCOMP, "SUBSCRIBE": SUBSCRIBE, "SUBACK": SUBACK, "UNSUBSCRIBE": UNSUBSCRIBE, "UNSUBACK": UNSUBACK, "PINGREQ": PINGREQ, "PINGRESP": PINGRESP, "DISCONNECT": DISCONNECT, "AUTH": AUTH, } )
Functions ¶
This section is empty.
Types ¶
type BasePacketBuilder ¶
type BasePacketBuilder struct { PacketIdentifer uint16 // contains filtered or unexported fields }
func (*BasePacketBuilder) AsString ¶
func (b *BasePacketBuilder) AsString() (str string)
func (*BasePacketBuilder) Build ¶
func (b *BasePacketBuilder) Build(ctx *StateContext, pkg *Pkg) (err error)
func (*BasePacketBuilder) Bytes ¶
func (b *BasePacketBuilder) Bytes() (bytes []byte)
func (*BasePacketBuilder) IsResending ¶
func (b *BasePacketBuilder) IsResending() bool
func (*BasePacketBuilder) NextPktId ¶
func (b *BasePacketBuilder) NextPktId(ctx *StateContext) uint16
func (*BasePacketBuilder) ReportType ¶
func (b *BasePacketBuilder) ReportType() ReportType
func (*BasePacketBuilder) RequestResend ¶
func (b *BasePacketBuilder) RequestResend()
func (*BasePacketBuilder) WithCtx ¶
func (p *BasePacketBuilder) WithCtx(ctx *StateContext) *logger.Base
WithCtx could cause race exception. for example: when two or more clients incoming with CONNECT packets, now connectParser parse them ok and print logging info via base.WithCtx(ctx).Debug(...), here is the race point. Another scene is the multiple clients send pingreq packets.
type ClientIdReadable ¶
type ClientIdReadable interface {
GetClientId() string
}
type ClientSubsHolder ¶
type ClientSubsHolder struct { // filters []SubBlock // PacketIdentifier uint16 Subs map[uint16][]SubBlock }
func (*ClientSubsHolder) Add ¶
func (s *ClientSubsHolder) Add(packetIdentifier uint16, filters []SubBlock) (err error)
func (*ClientSubsHolder) ApplyRetCodes ¶
func (s *ClientSubsHolder) ApplyRetCodes(ctx *StateContext, packetIdentifier uint16, data []byte, pos int) (newPos int)
type Config ¶
type Config struct { Peers string `yaml:"peers"` Username string `yaml:"user"` Password string `yaml:"pass"` Db int `yaml:"db"` ReadonlyRoute bool `yaml:"readonly-route"` DialTimeout time.Duration `yaml:"dial-timeout"` ReadTimeout time.Duration `yaml:"read-timeout"` WriteTimeout time.Duration `yaml:"write-timeout"` EnableCluster bool `yaml:"enable-cluster"` PoolSize int `yaml:"pool-size"` SubsChannels []string `yaml:"subs-channels"` }
type ConnackBuilderOpt ¶
type ConnackBuilderOpt func(*connackBuilder)
func WithConnack50AssignedClientId ¶
func WithConnack50AssignedClientId(s string) ConnackBuilderOpt
func WithConnack50Authentication ¶
func WithConnack50Authentication(method string, data []byte) ConnackBuilderOpt
func WithConnack50ConnackReason ¶
func WithConnack50ConnackReason(reason ConnackRetcode, reasonString string) ConnackBuilderOpt
func WithConnack50MaximumPacketSize ¶
func WithConnack50MaximumPacketSize(i int32) ConnackBuilderOpt
func WithConnack50MaximumQoS ¶
func WithConnack50MaximumQoS(qos QoSType) ConnackBuilderOpt
func WithConnack50ReceiveMaximum ¶
func WithConnack50ReceiveMaximum(i uint16) ConnackBuilderOpt
func WithConnack50ResponseInfo ¶
func WithConnack50ResponseInfo(info string) ConnackBuilderOpt
func WithConnack50RetainAvailable ¶
func WithConnack50RetainAvailable(i bool) ConnackBuilderOpt
func WithConnack50ServerKeepAlive ¶
func WithConnack50ServerKeepAlive(interval uint16) ConnackBuilderOpt
WithConnack50ServerKeepAlive sets a seconds. optional, default value will be initialized in newvh50ext()
func WithConnack50SessionExpiryInterval ¶
func WithConnack50SessionExpiryInterval(interval int32) ConnackBuilderOpt
func WithConnack50SharedSubsAvailable ¶
func WithConnack50SharedSubsAvailable(i bool) ConnackBuilderOpt
optional, default value will be initialized in newvh50ext()
func WithConnack50SubsIdAvailable ¶
func WithConnack50SubsIdAvailable(i bool) ConnackBuilderOpt
func WithConnack50TopicAliasMaximum ¶
func WithConnack50TopicAliasMaximum(i uint16) ConnackBuilderOpt
func WithConnack50UserProperties ¶
func WithConnack50UserProperties(props map[string]string) ConnackBuilderOpt
func WithConnack50WildcardSubsAvailable ¶
func WithConnack50WildcardSubsAvailable(i bool) ConnackBuilderOpt
optional, default value will be initialized in newvh50ext()
type ConnackRetcode ¶
type ConnackRetcode int
const ( ConnackOK ConnackRetcode = iota ConnackUnspecError ConnackRetcode = 0x80 ConnackPacketInvalid ConnackRetcode = 0x81 ConnackProtocolError ConnackRetcode = 0x82 ConnackServerUnacceptable ConnackRetcode = 0x83 ConnackServerBusy ConnackRetcode = 0x89 ConnackForbidden ConnackRetcode = 0x8a ConnackUnknownAuthScheme ConnackRetcode = 0x8c ConnackInvalidTopicName ConnackRetcode = 0x90 ConnackPacketTooLarge ConnackRetcode = 0x95 ConnackQuoteFull ConnackRetcode = 0x97 ConnackInvalidPayloadFormat ConnackRetcode = 0x99 ConnackUnsupportedRetain ConnackRetcode = 0x9a ConnackUnsupportedQoS ConnackRetcode = 0x9b ConnackServerUseAnother ConnackRetcode = 0x9c ConnackServerMovedPermanant ConnackRetcode = 0x9d ConnackRateOverflow ConnackRetcode = 0x9f )
type ConnackVH ¶
type ConnackVH struct { SessionPresent bool ReasonCode ConnackRetcode // contains filtered or unexported fields }
type ConnectBuilderOpt ¶
type ConnectBuilderOpt func(*connectBuilder)
func WithConnectCleanSession ¶
func WithConnectCleanSession(cleanSession bool) ConnectBuilderOpt
func WithConnectCleanSessionIfNotSet ¶
func WithConnectCleanSessionIfNotSet(cleanSession bool) ConnectBuilderOpt
func WithConnectClientId ¶
func WithConnectClientId(id string) ConnectBuilderOpt
func WithConnectKeepAliveSeconds ¶
func WithConnectKeepAliveSeconds(seconds uint16) ConnectBuilderOpt
func WithConnectKeepAliveSecondsIfNotSet ¶
func WithConnectKeepAliveSecondsIfNotSet(seconds uint16) ConnectBuilderOpt
func WithConnectPassword ¶
func WithConnectPassword(password string) ConnectBuilderOpt
func WithConnectQoS ¶
func WithConnectQoS(DUP bool, qos QoSType, RETAIN bool) ConnectBuilderOpt
func WithConnectUsername ¶
func WithConnectUsername(username string) ConnectBuilderOpt
func WithConnectWillMsg ¶
func WithConnectWillMsg(willRetain bool, willQoS QoSType, willTopic string, willMessage []byte) ConnectBuilderOpt
type ConnectParam ¶
type ConnectParam struct { ProtocolLevel ProtocolLevel KeepAlivedSeconds uint16 // in seconds CleanSessionFlag bool // WillFlag bool WillQoS QoSType WillRetainFlag bool // ClientId may be in or out property // For client, it's initialized by NewClient and NewMqttClientContext ClientId string ClientIdCreated bool WillTopic string WillMessage []byte Username string Password string RetCode int // in this field, the connect packet errors will be kept and sent to client later }
func (*ConnectParam) MigrateFrom ¶
func (s *ConnectParam) MigrateFrom(vh1 VariantHeader, payload1 Payload)
type ConnectPayload ¶
type ConnectPayload struct { ClientId string ClientIdCreated bool WillTopic string WillMessage []byte Username string Password string RetCode int // in this field, the connect packet errors will be kept and sent to client later StrictClientId bool // should we verify the user's client id with strict rules? }
func (*ConnectPayload) Apply ¶
func (s *ConnectPayload) Apply(ctx *StateContext, pkg *Pkg, data []byte, pos int) (newPos int, err error)
Apply to parse incoming 'data' from 'pos' postion. 3.1.3-1 Apply return false if payload or vh read error occured, 3.1.4-1, so that caller will fail the process flow and close client connection later.
type ConnectVH ¶
type ConnectVH struct { ProtocolLevel ProtocolLevel ConnectFlag byte KeepAlivedSeconds uint16 // in seconds CleanSessionFlag bool // Clean Start Flag for v5.0+ WillFlag bool WillQoS QoSType WillRetainFlag bool PasswordFlag bool UsernameFlag bool // contains filtered or unexported fields }
type Contextual ¶
type Contextual interface {
GetContext() *StateContext
}
type ControlMessage ¶
type ControlMessage struct {
Message
}
type DataMessage ¶
func NewDataMessage ¶
type DefaultSessionStoreOpt ¶
type DefaultSessionStoreOpt func(store *defaultSessionStore)
func WithDefaultSessionStoreNoSysStats ¶
func WithDefaultSessionStoreNoSysStats(b bool) DefaultSessionStoreOpt
func WithDefaultSessionStoreNoSysStatsLog ¶
func WithDefaultSessionStoreNoSysStatsLog(b bool) DefaultSessionStoreOpt
func WithDefaultSessionStorePathRW ¶
func WithDefaultSessionStorePathRW(rw PathRW) DefaultSessionStoreOpt
func WithDefaultSessionStoreResetStorage ¶
func WithDefaultSessionStoreResetStorage(b bool) DefaultSessionStoreOpt
func WithDefaultSessionStoreSavingEnabled ¶
func WithDefaultSessionStoreSavingEnabled(persist bool) DefaultSessionStoreOpt
func WithDefaultSessionStoreSavingPeriod ¶
func WithDefaultSessionStoreSavingPeriod(d time.Duration) DefaultSessionStoreOpt
type Disconnect50BuilderOpt ¶
type Disconnect50BuilderOpt func(builder *disconnectBuilder)
func WithDisconnect50Reason ¶
func WithDisconnect50Reason(reason errors.CloseReason, reasonString string) Disconnect50BuilderOpt
func WithDisconnect50ReasonServerMoved ¶
func WithDisconnect50ReasonServerMoved(serverReference string) Disconnect50BuilderOpt
func WithDisconnect50ReasonUseOtherServer ¶
func WithDisconnect50ReasonUseOtherServer(serverReference string) Disconnect50BuilderOpt
func WithDisconnect50SessionExpiryInterval ¶
func WithDisconnect50SessionExpiryInterval(seconds int) Disconnect50BuilderOpt
func WithDisconnect50UserProperties ¶
func WithDisconnect50UserProperties(props map[string]string) Disconnect50BuilderOpt
WithDisconnect50UserProperties sets the user's properties todo, in the use properties, duplicated keys are allowed, see also 3.14.2.2.5
type PacketBuilder ¶
type PacketBuilder interface { ReportType() ReportType Build(ctx *StateContext, pkg *Pkg) (err error) Bytes() (bytes []byte) AsString() (str string) WriteTo(w io.Writer) (n int, err error) // NextPktId generate the packet identifier and store it, and return it too NextPktId(ctx *StateContext) uint16 RequestResend() IsResending() bool }
func NewConnectBuilder ¶
func NewConnectBuilder(opts ...ConnectBuilderOpt) PacketBuilder
func NewDisconnectBuilder ¶
func NewDisconnectBuilder(opts ...Disconnect50BuilderOpt) PacketBuilder
func NewPingreqBuilder ¶
func NewPingreqBuilder() PacketBuilder
func NewPingrespBuilder ¶
func NewPingrespBuilder() PacketBuilder
func NewPub10ackBuilderFromPkg ¶
func NewPub10ackBuilderFromPkg(pkg *Pkg) PacketBuilder
func NewPub21recBuilderFromPkg ¶
func NewPub21recBuilderFromPkg(pkg *Pkg) PacketBuilder
func NewPub22relBuilderFromPkg ¶
func NewPub22relBuilderFromPkg(packetIdentifier uint16) PacketBuilder
func NewPub23compBuilderFromPkg ¶
func NewPub23compBuilderFromPkg(packetIdentifier uint16) PacketBuilder
func NewPublishBuilder ¶
func NewPublishBuilder(topicName string, msg []byte, opts ...PublishBuilderOpt) PacketBuilder
func NewSubackBuilder ¶
func NewSubackBuilder(vh *subscribeVH, payload *subscribePayload, id uint16) PacketBuilder
func NewSubscribeBuilder ¶
func NewSubscribeBuilder(verbose bool, opts ...SubscribeBuilderOpt) PacketBuilder
func NewUnsubackBuilder ¶
func NewUnsubackBuilder(opts ...UnsubackBuilderOpt) PacketBuilder
func NewUnsubscribeBuilder ¶
func NewUnsubscribeBuilder(opts ...UnsubscribeBuilderOpt) PacketBuilder
type PacketParser ¶
type PacketParser interface { GetToState() State CreateVH() VariantHeader CreatePayload() Payload OnAction(ctx *StateContext, pkg *Pkg) (err error) OnError(ctx *StateContext, pkg *Pkg, reason error) (err error) }
func NewConnackParser ¶
func NewConnackParser(to State) PacketParser
func NewConnectParser ¶
func NewConnectParser(to State, strictClientId bool) PacketParser
func NewDisconnectParser ¶
func NewDisconnectParser(to State) PacketParser
func NewPingreqParser ¶
func NewPingreqParser(to State) PacketParser
func NewPingrespParser ¶
func NewPingrespParser(to State) PacketParser
func NewPublishParser ¶
func NewPublishParser(to State) PacketParser
func NewSubackParser ¶
func NewSubackParser(to State) PacketParser
func NewSubscribeParser ¶
func NewSubscribeParser(to State) PacketParser
func NewUnsubackParser ¶
func NewUnsubackParser(to State) PacketParser
func NewUnsubscribeParser ¶
func NewUnsubscribeParser(to State) PacketParser
func NewWrongPacketParser ¶
func NewWrongPacketParser() PacketParser
type PacketParserMap ¶
type PacketParserMap struct { ToList map[ReportType]PacketParser ToStateUnconditional State }
type PathRW ¶
type PathRW interface { Start(opts ...PathRWOpt) Stop() Load(dst *defaultSessionStore) (err error) Save(src *defaultSessionStore, tick time.Time) (err error) Get(pathKey string, defaultValue ...interface{}) (ret interface{}, err error) Set(pathKey string, value interface{}) (err error) }
PathRW presents a PathReadWriter
func NewPathRWGob ¶
NewPathRWGob creates a new gob PathReadWriter PathRW instance
func NewPathRWRedis ¶
NewPathRWGob creates a new gob PathReadWriter PathRW instance
type PathRWOpt ¶
type PathRWOpt func(rw PathRW)
func WithPathRWGobDataDir ¶
func WithPathRWRedisOpts ¶
func WithPathRWRedisOpts(opts ...redisop.RedisOpOpt) PathRWOpt
func WithPathRWRedisPrefix ¶
type ProtocolLevel ¶
type ProtocolLevel byte
const ( ProtocolLevel0 ProtocolLevel = iota // Level0 ProtocolLevel1 // Level1 ProtocolLevel2 // Level2 ProtocolLevel3 // Level3 ProtocolLevelForV311 // Level4(3.1.1) ProtocolLevelForV50 // Level5(5.0) )
func (ProtocolLevel) Parse ¶
func (i ProtocolLevel) Parse(s string) ProtocolLevel
func (ProtocolLevel) String ¶ added in v1.0.5
func (i ProtocolLevel) String() string
type PubCompPayload ¶
type PubCompPayload struct { }
func (*PubCompPayload) Apply ¶
func (s *PubCompPayload) Apply(ctx *StateContext, pkg *Pkg, data []byte, pos int) (newPos int, err error)
type PubSubTask ¶
type PubSubTask struct { SendingTask PacketIdentifier uint16 SentTime time.Time OnSent func(task *PubSubTask, data []byte) }
func (*PubSubTask) IsResending ¶
func (pt *PubSubTask) IsResending() bool
func (*PubSubTask) RequestResend ¶
func (pt *PubSubTask) RequestResend() *PubSubTask
type Pubinfo ¶
type Pubinfo struct {
// contains filtered or unexported fields
}
func (Pubinfo) MarshalBinary ¶
func (*Pubinfo) UnmarshalBinary ¶
type PublishBuilderOpt ¶
type PublishBuilderOpt func(*publishBuilder)
func WithPub50UserProperties ¶
func WithPub50UserProperties(props map[string]string) PublishBuilderOpt
func WithPub50contentType ¶
func WithPub50contentType(i string) PublishBuilderOpt
func WithPub50correlationData ¶
func WithPub50correlationData(i []byte) PublishBuilderOpt
func WithPub50messageExpiryInterval ¶
func WithPub50messageExpiryInterval(i int32) PublishBuilderOpt
func WithPub50payloadFormatIndicator ¶
func WithPub50payloadFormatIndicator(i byte) PublishBuilderOpt
func WithPub50responseTopic ¶
func WithPub50responseTopic(i string) PublishBuilderOpt
func WithPub50subscriptionIdentifier ¶
func WithPub50subscriptionIdentifier(i string) PublishBuilderOpt
func WithPub50topicAlias ¶
func WithPub50topicAlias(i uint16) PublishBuilderOpt
func WithPubPacketIdentifier ¶
func WithPubPacketIdentifier(id uint16) PublishBuilderOpt
func WithPubProtocolLevel ¶
func WithPubProtocolLevel(level ProtocolLevel) PublishBuilderOpt
func WithPubQoS ¶
func WithPubQoS(DUP bool, qos QoSType, RETAIN bool) PublishBuilderOpt
WithPubQoS attach these flags to a publish packet builder: DUP, qos, RETAIN.
For a recent publish packet, DUP must be true (3.3.1-1). For a QoS 0 packet, DUP must be false (3.3.1-2).
For server publishing, DUP must be false except that recent it after the first sent failed (3.3.1-3).
For server publishing, see also:
- on publish packet received from a client, server will send back with a publish ack packet if qos 1. In this case, refer to pkt.puback.go and pubackBuilder.OnAction.
- with a qos 2 publish packet received, refer to pkt.pubrec, pkt.pubrel, and pub.comp.go for the further implementations.
- for a qos 0 publish packet received, publishBuilder act as a publishParser, so check the codes at publishBuilder.OnAction and ctx.bizPublish()
type PublishPayload ¶
type PublishPayload struct {
Data []byte // by app
}
func (*PublishPayload) Apply ¶
func (s *PublishPayload) Apply(ctx *StateContext, pkg *Pkg, data []byte, pos int) (newPos int, err error)
type PublishVH ¶
type PublishVH struct { // DUP, QoS, RETAIN Topic string PacketIdentifier uint16 // unless QoS in 1 or 2 // contains filtered or unexported fields }
type ReportType ¶
type ReportType uint
const ( Reserved0 ReportType = iota // ReportType CONNECT // ReportType = 1 CONNACK // ReportType = 2 PUBLISH // ReportType = 3 PUBACK // ReportType = 4 PUBREC // ReportType = 5 PUBREL // ReportType = 6 PUBCOMP // ReportType = 7 SUBSCRIBE // ReportType = 8 SUBACK // ReportType = 9 UNSUBSCRIBE // ReportType = 10 UNSUBACK // ReportType = 11 PINGREQ // ReportType = 12 PINGRESP // ReportType = 13 DISCONNECT // ReportType = 14 AUTH // ReportType = 15, since mqtt-v5.0 )
func (ReportType) FromString ¶
func (n ReportType) FromString(s string) ReportType
func (ReportType) String ¶
func (n ReportType) String() string
type RiskControl ¶
func NewDefaultRiskControl ¶
func NewDefaultRiskControl(enabled bool) RiskControl
type SendPayload ¶
type SendPayloadVx ¶
type SendPayloadVx func(ctx *StateContext, clientId string, builder PacketBuilder, qp *qp) (err error)
type SendingTask ¶
type SendingTask struct {
Builder PacketBuilder
}
type Session ¶
type Session interface { Close() (err error) SetCleanSessionFlag(b bool) GetCleanSessionFlag() bool // Write write data to the connection associated with this session, in the context Write(data []byte) (n int, err error) // LinkTo relink the new context 'ctx' to this session LinkTo(ctx *StateContext) // KeepAlive reset the internal keep-alive timer so that refresh it KeepAlive() }
type SessionStore ¶
type SessionStore interface { // Close the session store if server stopping, it will flush all sessions to persistent storage. Close() BeforeServerListening() // Has tests the existence of a session via its clientId key Has(key string) bool Get(key string, defaultValue ...Session) Session Set(key string, value Session) (err error) Unset(key string) (err error) AddWillMessage(clientId string, willQoS QoSType, willRetainFlag bool, willTopic string, willMessage []byte) HasWillMessage(clientId string) bool // SendWillMessage do publish the will msg to all clients on will topic, and // erase the will msg if not retained // sendPaylaodVx should be nil so that a default sender can dispatch the // payload to all subscribers, see also // defaultSessionStore.sendPayloadToClient SendWillMessage(ctx *StateContext, clientId string, sendPayloadVx SendPayloadVx) (err error) StoreQos1or2Message(ctx *StateContext, pkg *Pkg) (err error) PublishByPktId(ctx *StateContext, pktId uint16) (err error) PublishByBuilder(ctx *StateContext, builder PacketBuilder) (err error) Publish(ctx *StateContext, pkg *Pkg) (err error) Subscribe(ctx *StateContext, clientId string, topicFilter TopicFilter, packetIdentifier uint16) (invoked QoSType, err error) Unsubscribe(ctx *StateContext, clientId, topicFilter string) (err error) EnableSysStats(enabled bool) SysStatsEnabled() bool EnableSysStatsLog(enabled bool) SysStatsLogEnabled() bool EnableResetStorage(enabled bool) ResetStorageEnabled() bool }
func NewDefaultSessionStore ¶
func NewDefaultSessionStore(opts ...DefaultSessionStoreOpt) SessionStore
NewDefaultSessionStore create or return the unique session store instance
type StateContext ¶
type StateContext struct { TimeOfConnected time.Time ConnectTimeout time.Duration State State // ClientId string ConnectParam *ConnectParam // contains filtered or unexported fields }
func NewMqttClientContext ¶
func NewMqttClientContext(clientId string, conn net.Conn, opts ...StateContextOpt) *StateContext
NewMqttClientContext create StateContext for Client
func NewMqttServerContext ¶
func NewMqttServerContext(clientId string, wr io.Writer, closer io.Closer, opts ...StateContextOpt) *StateContext
func NewMqttTestContext ¶
func NewMqttTestContext(clientId string, wr io.Writer, closer io.Closer, opts ...StateContextOpt) *StateContext
func (*StateContext) Borrow ¶
func (s *StateContext) Borrow() (pktId uint16)
func (*StateContext) CanWrite ¶
func (s *StateContext) CanWrite() bool
func (*StateContext) CloseWithReason ¶
func (s *StateContext) CloseWithReason(reason errors.CloseReason) (err error)
func (*StateContext) CreateSessionForThisClient ¶
func (s *StateContext) CreateSessionForThisClient(pkg *Pkg) (sess Session, err error)
func (*StateContext) IsClientMode ¶
func (s *StateContext) IsClientMode() bool
func (*StateContext) IsClose ¶
func (s *StateContext) IsClose() bool
func (*StateContext) IsNormalClosed ¶
func (s *StateContext) IsNormalClosed() bool
func (*StateContext) IsNotClientMode ¶
func (s *StateContext) IsNotClientMode() bool
func (*StateContext) IsNotServer ¶
func (s *StateContext) IsNotServer() bool
func (*StateContext) IsServer ¶
func (s *StateContext) IsServer() bool
func (*StateContext) MaxProtocolLevel ¶
func (s *StateContext) MaxProtocolLevel() ProtocolLevel
func (*StateContext) MaxQoS ¶
func (s *StateContext) MaxQoS() QoSType
func (*StateContext) MaxVersion ¶
func (s *StateContext) MaxVersion() string
func (*StateContext) Return ¶
func (s *StateContext) Return(pktId uint16) (err error)
func (*StateContext) WithCtx ¶
func (p *StateContext) WithCtx(ctx *StateContext) *logger.Base
WithCtx could cause race exception. for example: when two or more clients incoming with CONNECT packets, now connectParser parse them ok and print logging info via base.WithCtx(ctx).Debug(...), here is the race point. Another scene is the multiple clients send pingreq packets.
type StateContextOpt ¶
type StateContextOpt func(ctx *StateContext)
func WithAuthEnabled ¶
func WithAuthEnabled(b bool) StateContextOpt
func WithAuthenticator ¶
func WithAuthenticator(ss auth.Authenticator) StateContextOpt
func WithConnectTimeout ¶
func WithConnectTimeout(timeout time.Duration) StateContextOpt
func WithConnectedTimestamp ¶
func WithConnectedTimestamp(t time.Time) StateContextOpt
func WithDefaultRiskControl ¶
func WithDefaultRiskControl(b bool) StateContextOpt
func WithMaxProtocolLevel ¶
func WithMaxProtocolLevel(level ProtocolLevel) StateContextOpt
func WithMaxQoS ¶
func WithMaxQoS(maxQoS QoSType) StateContextOpt
func WithMaxVersion ¶
func WithMaxVersion(v string) StateContextOpt
func WithNoWildMatchFilter ¶
func WithNoWildMatchFilter(b bool) StateContextOpt
func WithRiskControl ¶
func WithRiskControl(ss RiskControl) StateContextOpt
func WithSessionStore ¶
func WithSessionStore(ss SessionStore) StateContextOpt
func WithStateRegistry ¶
func WithStateRegistry(ss StateRegistry) StateContextOpt
func WithStrictClientId ¶
func WithStrictClientId(b bool) StateContextOpt
func WithStrictProtocolName ¶
func WithStrictProtocolName(b bool) StateContextOpt
func WithTimeOfConnected ¶
func WithTimeOfConnected(tm time.Time) StateContextOpt
type StateRegistry ¶
type StateRegistry interface { AddSynonyms(from State, synonymy ...State) AddState(from State, parsers *PacketParserMap) // in testing mode, noOnAction should be true to ignore invoking PacketParser.OnAction Advance(out io.Writer, ctx *StateContext, pkg *Pkg, noOnAction bool) (to State, err error) Parsers() map[ReportType]PacketParser // in testing mode, noOnAction should be true to ignore invoking PacketParser.OnAction Parse(ctx *StateContext, pkg *Pkg, noOnAction bool) (to State, err error) }
func NewDefaultClientStateRegistry ¶
func NewDefaultClientStateRegistry(strictClientId bool) StateRegistry
func NewDefaultStateRegistry ¶
func NewDefaultStateRegistry(strictClientId bool) StateRegistry
type Subinfo ¶
type Subinfo struct {
// contains filtered or unexported fields
}
func (Subinfo) MarshalBinary ¶
func (*Subinfo) UnmarshalBinary ¶
type SubscribeBuilderOpt ¶
type SubscribeBuilderOpt func(*subBuilder)
func WithSub50UserProperties ¶
func WithSub50UserProperties(props map[string]string) SubscribeBuilderOpt
func WithSubHolder ¶
func WithSubHolder(holder SubHolder) SubscribeBuilderOpt
func WithSubPacketIdentifier ¶
func WithSubPacketIdentifier(id uint16) SubscribeBuilderOpt
func WithSubTopicFilter ¶
func WithSubTopicFilter(topicFilter string, qos QoSType) SubscribeBuilderOpt
func WithSubTopicFilters ¶
func WithSubTopicFilters(topicFilters []string, qos QoSType) SubscribeBuilderOpt
type SysPublisher ¶
type SysPublisher interface { SysPublish(topicParts string, payload []byte, opts ...PublishBuilderOpt) SysPublishString(topicParts string, payload string, opts ...PublishBuilderOpt) SysPublishInt64(topicParts string, payload int64, opts ...PublishBuilderOpt) SysPublishUint64(topicParts string, payload uint64, opts ...PublishBuilderOpt) SysPublishFloat64(topicParts string, payload float64, opts ...PublishBuilderOpt) // IncMessages updates statistics data under $SYS/broker/messages/... // The known suffixes must be: // "received", "sent", "publish/dropped", "publish/received", "publish/sent", "retained/count", IncMessages(suffix string, delta int64) // IncLoad updates statistics data under $%SYS/broker/load/... // The known suffixes could be: // "bytes/received", "bytes/sent", ... IncLoad(suffix string, delta int) }
type TopicFilter ¶
type UnsubackBuilderOpt ¶
type UnsubackBuilderOpt func(*unsubackBuilder)
func WithUnsuback50UserProperties ¶
func WithUnsuback50UserProperties(props map[string]string) UnsubackBuilderOpt
func WithUnsubackIdentifier ¶
func WithUnsubackIdentifier(id uint16) UnsubackBuilderOpt
func WithUnsubackRetCodes ¶
func WithUnsubackRetCodes(retCodes []UnsubackRetcode) UnsubackBuilderOpt
type UnsubscribeBuilderOpt ¶
type UnsubscribeBuilderOpt func(*unsubBuilder)
func WithUnsub50UserProperties ¶
func WithUnsub50UserProperties(props map[string]string) UnsubscribeBuilderOpt
func WithUnsubPacketIdentifier ¶
func WithUnsubPacketIdentifier(id uint16) UnsubscribeBuilderOpt
func WithUnsubTopicFilter ¶
func WithUnsubTopicFilter(topicFilter string) UnsubscribeBuilderOpt
type VariantHeader ¶
type VariantHeader interface { Applyable }
type WillMessage ¶
type WillMessage struct {
Message
}
func NewWillMessage ¶
func NewWillMessage(willQoS QoSType, willRetainFlag bool, willTopic string, willMessage []byte) *WillMessage
type WrongPacketParser ¶
type WrongPacketParser struct{}
func (*WrongPacketParser) CreatePayload ¶
func (p *WrongPacketParser) CreatePayload() Payload
func (*WrongPacketParser) CreateVH ¶
func (p *WrongPacketParser) CreateVH() VariantHeader
func (*WrongPacketParser) GetToState ¶
func (p *WrongPacketParser) GetToState() State
func (*WrongPacketParser) OnAction ¶
func (p *WrongPacketParser) OnAction(ctx *StateContext, pkg *Pkg) (err error)
func (*WrongPacketParser) OnError ¶
func (p *WrongPacketParser) OnError(ctx *StateContext, pkg *Pkg, reason error) (err error)
Source Files ¶
- base.go
- base.protocollevel.go
- base.protocollevel_string.go
- base.qostype.go
- base.reporttype.go
- ctx.go
- ctx.with.opts.go
- hooker.go
- lib.go
- msg.go
- pkt.all.go
- pkt.base.go
- pkt.connack.go
- pkt.connect.go
- pkt.connect.with.opts.go
- pkt.disconnect.go
- pkt.pingreq.go
- pkt.pingresp.go
- pkt.puback.go
- pkt.pubcomp.go
- pkt.publish.go
- pkt.pubrec.go
- pkt.pubrel.go
- pkt.suback.go
- pkt.subscribe.go
- pkt.unsuback.go
- pkt.unsubscribe.go
- pkt.wrong.go
- psr.connack.go
- psr.connect.go
- psr.connect.vh.payload.go
- psr.disconnect.go
- psr.ping.go
- psr.puback.go
- psr.pubcomp.go
- psr.publish.go
- psr.publish.vh.payload.go
- psr.pubrec.go
- psr.pubrel.go
- psr.sub.go
- psr.suback.go
- psr.unsub.go
- psr.unsuback.go
- psr.vh50ext.go
- pubsub.go
- pubsub.node.go
- pubsub.pubinfo.go
- pubsub.subinfo.go
- risk.go
- session.go
- session.store.bin.go
- session.store.go
- session.store.gob.go
- session.store.pathrw.go
- session.store.redis.go
- session.store.with.opts.go
- state.registry.go
- states.go
- store.default.go
- sub.holder.go
- vhpi.go