mqtt

package
v1.0.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 5, 2020 License: GPL-3.0 Imports: 27 Imported by: 0

Documentation

Index

Constants

View Source
const DefaultConnectTimeout = 60 * time.Second
View Source
const INVALID_UNIQUE_ID int64 = -1
View Source
const InfiniteDuration time.Duration = 1<<63 - 1

const InfiniteDuration = time.Duration(math.MaxUint16) * time.Second InfiniteDuration is equal maxDuration

Variables

View Source
var (
	QoSToString = map[QoSType]string{
		QoSDefault: "QoSDefault",
		QoS1:       "QoS1",
		QoS2:       "QoS2",
		QoSError:   "QoSError",
		QoSFailure: "QoSFailure",
	}
	QoSFromString = map[string]QoSType{
		"QoSDefault": QoSDefault,
		"QoS1":       QoS1,
		"QoS2":       QoS2,
		"QoSError":   QoSError,
		"QoSFailure": QoSFailure,
	}
)
View Source
var (
	ReportTypeToString = map[ReportType]string{
		Reserved0:   "Nothing",
		CONNECT:     "CONNECT",
		CONNACK:     "CONNACK",
		PUBLISH:     "PUBLISH",
		PUBACK:      "PUBACK",
		PUBREC:      "PUBREC",
		PUBREL:      "PUBREL",
		PUBCOMP:     "PUBCOMP",
		SUBSCRIBE:   "SUBSCRIBE",
		SUBACK:      "SUBACK",
		UNSUBSCRIBE: "UNSUBSCRIBE",
		UNSUBACK:    "UNSUBACK",
		PINGREQ:     "PINGREQ",
		PINGRESP:    "PINGRESP",
		DISCONNECT:  "DISCONNECT",
		AUTH:        "AUTH",

		ReportType(IDLE):             "IDLE",
		ReportType(CONNECTING):       "CONNECTING",
		ReportType(CONNECTED):        "CONNECTED",
		ReportType(PUBLISHING):       "PUBLISHING",
		ReportType(PUBLISHED):        "PUBLISHED",
		ReportType(SUBSCRIBING):      "SUBSCRIBING",
		ReportType(SUBSCRIBED):       "SUBSCRIBED",
		ReportType(UNSUBSCRIBING):    "UNSUBSCRIBING",
		ReportType(UNSUBSCRIBED):     "UNSUBSCRIBED",
		ReportType(PINGING):          "PINGING",
		ReportType(CLOSED):           "CLOSED",
		ReportType(PUB1_ACK):         "PUB1_ACK",
		ReportType(PUB2_REC):         "PUB2_REC",
		ReportType(PUB2_REL):         "PUB2_REL",
		ReportType(PUB2_COMP):        "PUB2_COMP",
		ReportType(WRONG_STATE):      "WRONG_STATE",
		ReportType(NEED_CLOSE_STATE): "NEED_CLOSE_STATE",
		ReportType(MAX_STATE):        "MAX_STATE",
	}
	ReportTypeFromString = map[string]ReportType{
		"Nothing":     Reserved0,
		"CONNECT":     CONNECT,
		"CONNACK":     CONNACK,
		"PUBLISH":     PUBLISH,
		"PUBACK":      PUBACK,
		"PUBREC":      PUBREC,
		"PUBREL":      PUBREL,
		"PUBCOMP":     PUBCOMP,
		"SUBSCRIBE":   SUBSCRIBE,
		"SUBACK":      SUBACK,
		"UNSUBSCRIBE": UNSUBSCRIBE,
		"UNSUBACK":    UNSUBACK,
		"PINGREQ":     PINGREQ,
		"PINGRESP":    PINGRESP,
		"DISCONNECT":  DISCONNECT,
		"AUTH":        AUTH,
	}
)

Functions

This section is empty.

Types

type Applyable

type Applyable interface {
	Apply(ctx *StateContext, pkg *Pkg, data []byte, pos int) (newPos int, err error)
}

type BasePacketBuilder

type BasePacketBuilder struct {
	PacketIdentifer uint16
	// contains filtered or unexported fields
}

func (*BasePacketBuilder) AsString

func (b *BasePacketBuilder) AsString() (str string)

func (*BasePacketBuilder) Build

func (b *BasePacketBuilder) Build(ctx *StateContext, pkg *Pkg) (err error)

func (*BasePacketBuilder) Bytes

func (b *BasePacketBuilder) Bytes() (bytes []byte)

func (*BasePacketBuilder) IsResending

func (b *BasePacketBuilder) IsResending() bool

func (*BasePacketBuilder) NextPktId

func (b *BasePacketBuilder) NextPktId(ctx *StateContext) uint16

func (*BasePacketBuilder) ReportType

func (b *BasePacketBuilder) ReportType() ReportType

func (*BasePacketBuilder) RequestResend

func (b *BasePacketBuilder) RequestResend()

func (*BasePacketBuilder) WithCtx

func (p *BasePacketBuilder) WithCtx(ctx *StateContext) *logger.Base

WithCtx could cause race exception. for example: when two or more clients incoming with CONNECT packets, now connectParser parse them ok and print logging info via base.WithCtx(ctx).Debug(...), here is the race point. Another scene is the multiple clients send pingreq packets.

func (*BasePacketBuilder) WriteTo

func (b *BasePacketBuilder) WriteTo(w io.Writer) (n int, err error)

type ClientIdReadable

type ClientIdReadable interface {
	GetClientId() string
}

type ClientSubsHolder

type ClientSubsHolder struct {
	// filters          []SubBlock
	// PacketIdentifier uint16
	Subs map[uint16][]SubBlock
}

func (*ClientSubsHolder) Add

func (s *ClientSubsHolder) Add(packetIdentifier uint16, filters []SubBlock) (err error)

func (*ClientSubsHolder) ApplyRetCodes

func (s *ClientSubsHolder) ApplyRetCodes(ctx *StateContext, packetIdentifier uint16, data []byte, pos int) (newPos int)

type Config

type Config struct {
	Peers         string        `yaml:"peers"`
	Username      string        `yaml:"user"`
	Password      string        `yaml:"pass"`
	Db            int           `yaml:"db"`
	ReadonlyRoute bool          `yaml:"readonly-route"`
	DialTimeout   time.Duration `yaml:"dial-timeout"`
	ReadTimeout   time.Duration `yaml:"read-timeout"`
	WriteTimeout  time.Duration `yaml:"write-timeout"`
	EnableCluster bool          `yaml:"enable-cluster"`
	PoolSize      int           `yaml:"pool-size"`
	SubsChannels  []string      `yaml:"subs-channels"`
}

type ConnackBuilderOpt

type ConnackBuilderOpt func(*connackBuilder)

func WithConnack50AssignedClientId

func WithConnack50AssignedClientId(s string) ConnackBuilderOpt

func WithConnack50Authentication

func WithConnack50Authentication(method string, data []byte) ConnackBuilderOpt

func WithConnack50ConnackReason

func WithConnack50ConnackReason(reason ConnackRetcode, reasonString string) ConnackBuilderOpt

func WithConnack50MaximumPacketSize

func WithConnack50MaximumPacketSize(i int32) ConnackBuilderOpt

func WithConnack50MaximumQoS

func WithConnack50MaximumQoS(qos QoSType) ConnackBuilderOpt

func WithConnack50ReceiveMaximum

func WithConnack50ReceiveMaximum(i uint16) ConnackBuilderOpt

func WithConnack50ResponseInfo

func WithConnack50ResponseInfo(info string) ConnackBuilderOpt

func WithConnack50RetainAvailable

func WithConnack50RetainAvailable(i bool) ConnackBuilderOpt

func WithConnack50ServerKeepAlive

func WithConnack50ServerKeepAlive(interval uint16) ConnackBuilderOpt

WithConnack50ServerKeepAlive sets a seconds. optional, default value will be initialized in newvh50ext()

func WithConnack50SessionExpiryInterval

func WithConnack50SessionExpiryInterval(interval int32) ConnackBuilderOpt

func WithConnack50SharedSubsAvailable

func WithConnack50SharedSubsAvailable(i bool) ConnackBuilderOpt

optional, default value will be initialized in newvh50ext()

func WithConnack50SubsIdAvailable

func WithConnack50SubsIdAvailable(i bool) ConnackBuilderOpt

func WithConnack50TopicAliasMaximum

func WithConnack50TopicAliasMaximum(i uint16) ConnackBuilderOpt

func WithConnack50UserProperties

func WithConnack50UserProperties(props map[string]string) ConnackBuilderOpt

func WithConnack50WildcardSubsAvailable

func WithConnack50WildcardSubsAvailable(i bool) ConnackBuilderOpt

optional, default value will be initialized in newvh50ext()

type ConnackRetcode

type ConnackRetcode int
const (
	ConnackOK                 ConnackRetcode = iota
	ConnackUnspecError        ConnackRetcode = 0x80
	ConnackPacketInvalid      ConnackRetcode = 0x81
	ConnackProtocolError      ConnackRetcode = 0x82
	ConnackServerUnacceptable ConnackRetcode = 0x83

	ConnackServerBusy           ConnackRetcode = 0x89
	ConnackForbidden            ConnackRetcode = 0x8a
	ConnackUnknownAuthScheme    ConnackRetcode = 0x8c
	ConnackInvalidTopicName     ConnackRetcode = 0x90
	ConnackPacketTooLarge       ConnackRetcode = 0x95
	ConnackQuoteFull            ConnackRetcode = 0x97
	ConnackInvalidPayloadFormat ConnackRetcode = 0x99
	ConnackUnsupportedRetain    ConnackRetcode = 0x9a
	ConnackUnsupportedQoS       ConnackRetcode = 0x9b
	ConnackServerUseAnother     ConnackRetcode = 0x9c
	ConnackServerMovedPermanant ConnackRetcode = 0x9d
	ConnackRateOverflow         ConnackRetcode = 0x9f
)

type ConnackVH

type ConnackVH struct {
	SessionPresent bool
	ReasonCode     ConnackRetcode
	// contains filtered or unexported fields
}

func (*ConnackVH) Apply

func (s *ConnackVH) Apply(ctx *StateContext, pkg *Pkg, data []byte, pos int) (newPos int, err error)

func (*ConnackVH) WriteAs

func (s *ConnackVH) WriteAs() (propertiesBuf bytes.Buffer, err error)

func (*ConnackVH) WriteWithOrder

func (s *ConnackVH) WriteWithOrder(ids ...byte) (propertiesBuf bytes.Buffer, err error)

type ConnectBuilderOpt

type ConnectBuilderOpt func(*connectBuilder)

func WithConnectCleanSession

func WithConnectCleanSession(cleanSession bool) ConnectBuilderOpt

func WithConnectCleanSessionIfNotSet

func WithConnectCleanSessionIfNotSet(cleanSession bool) ConnectBuilderOpt

func WithConnectClientId

func WithConnectClientId(id string) ConnectBuilderOpt

func WithConnectKeepAliveSeconds

func WithConnectKeepAliveSeconds(seconds uint16) ConnectBuilderOpt

func WithConnectKeepAliveSecondsIfNotSet

func WithConnectKeepAliveSecondsIfNotSet(seconds uint16) ConnectBuilderOpt

func WithConnectPassword

func WithConnectPassword(password string) ConnectBuilderOpt

func WithConnectQoS

func WithConnectQoS(DUP bool, qos QoSType, RETAIN bool) ConnectBuilderOpt

func WithConnectUsername

func WithConnectUsername(username string) ConnectBuilderOpt

func WithConnectWillMsg

func WithConnectWillMsg(willRetain bool, willQoS QoSType, willTopic string, willMessage []byte) ConnectBuilderOpt

type ConnectParam

type ConnectParam struct {
	ProtocolLevel     ProtocolLevel
	KeepAlivedSeconds uint16 // in seconds
	CleanSessionFlag  bool

	// WillFlag          bool
	WillQoS        QoSType
	WillRetainFlag bool

	// ClientId may be in or out property
	// For client, it's initialized by NewClient and NewMqttClientContext
	ClientId        string
	ClientIdCreated bool
	WillTopic       string
	WillMessage     []byte

	Username string
	Password string

	RetCode int // in this field, the connect packet errors will be kept and sent to client later
}

func (*ConnectParam) MigrateFrom

func (s *ConnectParam) MigrateFrom(vh1 VariantHeader, payload1 Payload)

type ConnectPayload

type ConnectPayload struct {
	ClientId        string
	ClientIdCreated bool
	WillTopic       string
	WillMessage     []byte
	Username        string
	Password        string

	RetCode int // in this field, the connect packet errors will be kept and sent to client later

	StrictClientId bool // should we verify the user's client id with strict rules?
}

func (*ConnectPayload) Apply

func (s *ConnectPayload) Apply(ctx *StateContext, pkg *Pkg, data []byte, pos int) (newPos int, err error)

Apply to parse incoming 'data' from 'pos' postion. 3.1.3-1 Apply return false if payload or vh read error occured, 3.1.4-1, so that caller will fail the process flow and close client connection later.

type ConnectVH

type ConnectVH struct {
	ProtocolLevel ProtocolLevel
	ConnectFlag   byte

	KeepAlivedSeconds uint16 // in seconds
	CleanSessionFlag  bool   // Clean Start Flag for v5.0+
	WillFlag          bool
	WillQoS           QoSType
	WillRetainFlag    bool
	PasswordFlag      bool
	UsernameFlag      bool
	// contains filtered or unexported fields
}

func (*ConnectVH) Apply

func (s *ConnectVH) Apply(ctx *StateContext, pkg *Pkg, data []byte, pos int) (newPos int, err error)

func (*ConnectVH) WriteAs

func (s *ConnectVH) WriteAs() (propertiesBuf bytes.Buffer, err error)

func (*ConnectVH) WriteWithOrder

func (s *ConnectVH) WriteWithOrder(ids ...byte) (propertiesBuf bytes.Buffer, err error)

type Contextual

type Contextual interface {
	GetContext() *StateContext
}

type ControlMessage

type ControlMessage struct {
	Message
}

type DataMessage

type DataMessage struct {
	UniqueId          int64
	PackageIdentifier uint16
	Message
}

func NewDataMessage

func NewDataMessage(willQoS QoSType, willRetainFlag bool, packageIdentifier uint16, willTopic string, willMessage []byte) *DataMessage

type DefaultSessionStoreOpt

type DefaultSessionStoreOpt func(store *defaultSessionStore)

func WithDefaultSessionStoreNoSysStats

func WithDefaultSessionStoreNoSysStats(b bool) DefaultSessionStoreOpt

func WithDefaultSessionStoreNoSysStatsLog

func WithDefaultSessionStoreNoSysStatsLog(b bool) DefaultSessionStoreOpt

func WithDefaultSessionStorePathRW

func WithDefaultSessionStorePathRW(rw PathRW) DefaultSessionStoreOpt

func WithDefaultSessionStoreResetStorage

func WithDefaultSessionStoreResetStorage(b bool) DefaultSessionStoreOpt

func WithDefaultSessionStoreSavingEnabled

func WithDefaultSessionStoreSavingEnabled(persist bool) DefaultSessionStoreOpt

func WithDefaultSessionStoreSavingPeriod

func WithDefaultSessionStoreSavingPeriod(d time.Duration) DefaultSessionStoreOpt

type Disconnect50BuilderOpt

type Disconnect50BuilderOpt func(builder *disconnectBuilder)

func WithDisconnect50Reason

func WithDisconnect50Reason(reason errors.CloseReason, reasonString string) Disconnect50BuilderOpt

func WithDisconnect50ReasonServerMoved

func WithDisconnect50ReasonServerMoved(serverReference string) Disconnect50BuilderOpt

func WithDisconnect50ReasonUseOtherServer

func WithDisconnect50ReasonUseOtherServer(serverReference string) Disconnect50BuilderOpt

func WithDisconnect50SessionExpiryInterval

func WithDisconnect50SessionExpiryInterval(seconds int) Disconnect50BuilderOpt

func WithDisconnect50UserProperties

func WithDisconnect50UserProperties(props map[string]string) Disconnect50BuilderOpt

WithDisconnect50UserProperties sets the user's properties todo, in the use properties, duplicated keys are allowed, see also 3.14.2.2.5

type Hooker

type Hooker interface {
}

type IdStore

type IdStore interface {
	Has(key uint16) bool
	Get(key uint16, defaultValue ...interface{}) interface{}
	Set(key uint16, value interface{}) (err error)
}

type Message

type Message struct {
	QoS    QoSType
	Retain bool
	Topic  string
	Body   []byte
}

Message is the base structure of MQTT communicating packet data block

type Mode

type Mode int
const (
	ModeClient Mode = iota
	ModeBroker
)

type PacketBuilder

type PacketBuilder interface {
	ReportType() ReportType
	Build(ctx *StateContext, pkg *Pkg) (err error)
	Bytes() (bytes []byte)
	AsString() (str string)
	WriteTo(w io.Writer) (n int, err error)
	// NextPktId generate the packet identifier and store it, and return it too
	NextPktId(ctx *StateContext) uint16
	RequestResend()
	IsResending() bool
}

func NewConnectBuilder

func NewConnectBuilder(opts ...ConnectBuilderOpt) PacketBuilder

func NewDisconnectBuilder

func NewDisconnectBuilder(opts ...Disconnect50BuilderOpt) PacketBuilder

func NewPingreqBuilder

func NewPingreqBuilder() PacketBuilder

func NewPingrespBuilder

func NewPingrespBuilder() PacketBuilder

func NewPub10ackBuilderFromPkg

func NewPub10ackBuilderFromPkg(pkg *Pkg) PacketBuilder

func NewPub21recBuilderFromPkg

func NewPub21recBuilderFromPkg(pkg *Pkg) PacketBuilder

func NewPub22relBuilderFromPkg

func NewPub22relBuilderFromPkg(packetIdentifier uint16) PacketBuilder

func NewPub23compBuilderFromPkg

func NewPub23compBuilderFromPkg(packetIdentifier uint16) PacketBuilder

func NewPublishBuilder

func NewPublishBuilder(topicName string, msg []byte, opts ...PublishBuilderOpt) PacketBuilder

func NewSubackBuilder

func NewSubackBuilder(vh *subscribeVH, payload *subscribePayload, id uint16) PacketBuilder

func NewSubscribeBuilder

func NewSubscribeBuilder(verbose bool, opts ...SubscribeBuilderOpt) PacketBuilder

func NewUnsubackBuilder

func NewUnsubackBuilder(opts ...UnsubackBuilderOpt) PacketBuilder

func NewUnsubscribeBuilder

func NewUnsubscribeBuilder(opts ...UnsubscribeBuilderOpt) PacketBuilder

type PacketParser

type PacketParser interface {
	GetToState() State
	CreateVH() VariantHeader
	CreatePayload() Payload
	OnAction(ctx *StateContext, pkg *Pkg) (err error)
	OnError(ctx *StateContext, pkg *Pkg, reason error) (err error)
}

func NewConnackParser

func NewConnackParser(to State) PacketParser

func NewConnectParser

func NewConnectParser(to State, strictClientId bool) PacketParser

func NewDisconnectParser

func NewDisconnectParser(to State) PacketParser

func NewPingreqParser

func NewPingreqParser(to State) PacketParser

func NewPingrespParser

func NewPingrespParser(to State) PacketParser

func NewPubackParser

func NewPubackParser(to State) PacketParser

NewPubackParser for QoS 1

func NewPubcompParser

func NewPubcompParser(to State) PacketParser

NewPubcompParser for QoS 2

func NewPublishParser

func NewPublishParser(to State) PacketParser

func NewPubrecParser

func NewPubrecParser(to State) PacketParser

NewPubrecParser for QoS 2

func NewPubrelParser

func NewPubrelParser(to State) PacketParser

NewPubrelParser for QoS 2

func NewSubackParser

func NewSubackParser(to State) PacketParser

func NewSubscribeParser

func NewSubscribeParser(to State) PacketParser

func NewUnsubackParser

func NewUnsubackParser(to State) PacketParser

func NewUnsubscribeParser

func NewUnsubscribeParser(to State) PacketParser

func NewWrongPacketParser

func NewWrongPacketParser() PacketParser

type PacketParserMap

type PacketParserMap struct {
	ToList               map[ReportType]PacketParser
	ToStateUnconditional State
}

type PathRW

type PathRW interface {
	Start(opts ...PathRWOpt)
	Stop()

	Load(dst *defaultSessionStore) (err error)
	Save(src *defaultSessionStore, tick time.Time) (err error)

	Get(pathKey string, defaultValue ...interface{}) (ret interface{}, err error)
	Set(pathKey string, value interface{}) (err error)
}

PathRW presents a PathReadWriter

func NewPathRWGob

func NewPathRWGob(opts ...PathRWOpt) PathRW

NewPathRWGob creates a new gob PathReadWriter PathRW instance

func NewPathRWRedis

func NewPathRWRedis(opts ...PathRWOpt) PathRW

NewPathRWGob creates a new gob PathReadWriter PathRW instance

type PathRWOpt

type PathRWOpt func(rw PathRW)

func WithPathRWGobDataDir

func WithPathRWGobDataDir(dataDir string) PathRWOpt

func WithPathRWRedisOpts

func WithPathRWRedisOpts(opts ...redisop.RedisOpOpt) PathRWOpt

func WithPathRWRedisPrefix

func WithPathRWRedisPrefix(prefix string) PathRWOpt

type Payload

type Payload interface {
	Applyable
}

type Pkg

type Pkg struct {
	ReceivedTime time.Time

	HdrLen     int
	Length     int
	ReportType ReportType
	DUP        bool
	RETAIN     bool
	QoS        QoSType
	Flags      byte

	PacketIdentifier uint16
	VH               VariantHeader
	Payload          Payload

	// Data includes mqtt.VariantHeader + mqtt.Data.
	Data []byte
}

type ProtocolLevel

type ProtocolLevel byte
const (
	ProtocolLevel0       ProtocolLevel = iota // Level0
	ProtocolLevel1                            // Level1
	ProtocolLevel2                            // Level2
	ProtocolLevel3                            // Level3
	ProtocolLevelForV311                      // Level4(3.1.1)
	ProtocolLevelForV50                       // Level5(5.0)
)

func (ProtocolLevel) Parse

func (i ProtocolLevel) Parse(s string) ProtocolLevel

func (ProtocolLevel) String added in v1.0.5

func (i ProtocolLevel) String() string

type PubBlock

type PubBlock struct {
	TopicName string
	Payload   []byte
}

type PubCompPayload

type PubCompPayload struct {
}

func (*PubCompPayload) Apply

func (s *PubCompPayload) Apply(ctx *StateContext, pkg *Pkg, data []byte, pos int) (newPos int, err error)

type PubCompVH

type PubCompVH struct{ Id uint16 }

func (*PubCompVH) Apply

func (s *PubCompVH) Apply(ctx *StateContext, pkg *Pkg, data []byte, pos int) (newPos int, err error)

type PubSubTask

type PubSubTask struct {
	SendingTask
	PacketIdentifier uint16
	SentTime         time.Time
	OnSent           func(task *PubSubTask, data []byte)
}

func (*PubSubTask) IsResending

func (pt *PubSubTask) IsResending() bool

func (*PubSubTask) RequestResend

func (pt *PubSubTask) RequestResend() *PubSubTask

type PubackVH

type PubackVH struct{ PacketIdentifier uint16 }

func (*PubackVH) Apply

func (s *PubackVH) Apply(ctx *StateContext, pkg *Pkg, data []byte, pos int) (newPos int, err error)

type Pubinfo

type Pubinfo struct {
	// contains filtered or unexported fields
}

func (Pubinfo) MarshalBinary

func (s Pubinfo) MarshalBinary() (ret []byte, err error)

func (*Pubinfo) UnmarshalBinary

func (s *Pubinfo) UnmarshalBinary(data []byte) (err error)

type PublishBuilderOpt

type PublishBuilderOpt func(*publishBuilder)

func WithPub50UserProperties

func WithPub50UserProperties(props map[string]string) PublishBuilderOpt

func WithPub50contentType

func WithPub50contentType(i string) PublishBuilderOpt

func WithPub50correlationData

func WithPub50correlationData(i []byte) PublishBuilderOpt

func WithPub50messageExpiryInterval

func WithPub50messageExpiryInterval(i int32) PublishBuilderOpt

func WithPub50payloadFormatIndicator

func WithPub50payloadFormatIndicator(i byte) PublishBuilderOpt

func WithPub50responseTopic

func WithPub50responseTopic(i string) PublishBuilderOpt

func WithPub50subscriptionIdentifier

func WithPub50subscriptionIdentifier(i string) PublishBuilderOpt

func WithPub50topicAlias

func WithPub50topicAlias(i uint16) PublishBuilderOpt

func WithPubPacketIdentifier

func WithPubPacketIdentifier(id uint16) PublishBuilderOpt

func WithPubProtocolLevel

func WithPubProtocolLevel(level ProtocolLevel) PublishBuilderOpt

func WithPubQoS

func WithPubQoS(DUP bool, qos QoSType, RETAIN bool) PublishBuilderOpt

WithPubQoS attach these flags to a publish packet builder: DUP, qos, RETAIN.

For a recent publish packet, DUP must be true (3.3.1-1). For a QoS 0 packet, DUP must be false (3.3.1-2).

For server publishing, DUP must be false except that recent it after the first sent failed (3.3.1-3).

For server publishing, see also:

  1. on publish packet received from a client, server will send back with a publish ack packet if qos 1. In this case, refer to pkt.puback.go and pubackBuilder.OnAction.
  2. with a qos 2 publish packet received, refer to pkt.pubrec, pkt.pubrel, and pub.comp.go for the further implementations.
  3. for a qos 0 publish packet received, publishBuilder act as a publishParser, so check the codes at publishBuilder.OnAction and ctx.bizPublish()

type PublishPayload

type PublishPayload struct {
	Data []byte // by app
}

func (*PublishPayload) Apply

func (s *PublishPayload) Apply(ctx *StateContext, pkg *Pkg, data []byte, pos int) (newPos int, err error)

type PublishVH

type PublishVH struct {

	// DUP, QoS, RETAIN
	Topic            string
	PacketIdentifier uint16 // unless QoS in 1 or 2
	// contains filtered or unexported fields
}

func (*PublishVH) Apply

func (s *PublishVH) Apply(ctx *StateContext, pkg *Pkg, data []byte, pos int) (newPos int, err error)

func (*PublishVH) WriteAs

func (s *PublishVH) WriteAs() (propertiesBuf bytes.Buffer, err error)

func (*PublishVH) WriteWithOrder

func (s *PublishVH) WriteWithOrder(ids ...byte) (propertiesBuf bytes.Buffer, err error)

type QoSType

type QoSType int
const (
	QoSDefault QoSType = iota
	QoS1
	QoS2
	QoSError
	QoSFailure QoSType = 0x80 // for suback
)

func (QoSType) FromString

func (n QoSType) FromString(s string) QoSType

func (QoSType) String

func (n QoSType) String() string

type ReportType

type ReportType uint
const (
	Reserved0   ReportType = iota // ReportType
	CONNECT                       // ReportType = 1
	CONNACK                       // ReportType = 2
	PUBLISH                       // ReportType = 3
	PUBACK                        // ReportType = 4
	PUBREC                        // ReportType = 5
	PUBREL                        // ReportType = 6
	PUBCOMP                       // ReportType = 7
	SUBSCRIBE                     // ReportType = 8
	SUBACK                        // ReportType = 9
	UNSUBSCRIBE                   // ReportType = 10
	UNSUBACK                      // ReportType = 11
	PINGREQ                       // ReportType = 12
	PINGRESP                      // ReportType = 13
	DISCONNECT                    // ReportType = 14
	AUTH                          // ReportType = 15, since mqtt-v5.0
)

func (ReportType) FromString

func (n ReportType) FromString(s string) ReportType

func (ReportType) String

func (n ReportType) String() string

type RiskControl

type RiskControl interface {
	Disable(disable bool)
	IsEnabled() bool
}

func NewDefaultRiskControl

func NewDefaultRiskControl(enabled bool) RiskControl

type SendPayload

type SendPayload func(ctx *StateContext, clientId string, qos QoSType, path string, payload []byte, packetIdentifier uint16) (err error)

type SendPayloadVx

type SendPayloadVx func(ctx *StateContext, clientId string, builder PacketBuilder, qp *qp) (err error)

type SendingTask

type SendingTask struct {
	Builder PacketBuilder
}

type Session

type Session interface {
	Close() (err error)

	SetCleanSessionFlag(b bool)
	GetCleanSessionFlag() bool

	// Write write data to the connection associated with this session, in the context
	Write(data []byte) (n int, err error)

	// LinkTo relink the new context 'ctx' to this session
	LinkTo(ctx *StateContext)

	// KeepAlive reset the internal keep-alive timer so that refresh it
	KeepAlive()
}

type SessionStore

type SessionStore interface {
	// Close the session store if server stopping, it will flush all sessions to persistent storage.
	Close()

	BeforeServerListening()

	// Has tests the existence of a session via its clientId key
	Has(key string) bool
	Get(key string, defaultValue ...Session) Session
	Set(key string, value Session) (err error)
	Unset(key string) (err error)

	AddWillMessage(clientId string, willQoS QoSType, willRetainFlag bool, willTopic string, willMessage []byte)
	HasWillMessage(clientId string) bool
	// SendWillMessage do publish the will msg to all clients on will topic, and
	// erase the will msg if not retained
	// sendPaylaodVx should be nil so that a default sender can dispatch the
	// payload to all subscribers, see also
	// defaultSessionStore.sendPayloadToClient
	SendWillMessage(ctx *StateContext, clientId string, sendPayloadVx SendPayloadVx) (err error)

	StoreQos1or2Message(ctx *StateContext, pkg *Pkg) (err error)
	PublishByPktId(ctx *StateContext, pktId uint16) (err error)

	PublishByBuilder(ctx *StateContext, builder PacketBuilder) (err error)

	Publish(ctx *StateContext, pkg *Pkg) (err error)
	Subscribe(ctx *StateContext, clientId string, topicFilter TopicFilter, packetIdentifier uint16) (invoked QoSType, err error)
	Unsubscribe(ctx *StateContext, clientId, topicFilter string) (err error)

	EnableSysStats(enabled bool)
	SysStatsEnabled() bool
	EnableSysStatsLog(enabled bool)
	SysStatsLogEnabled() bool
	EnableResetStorage(enabled bool)
	ResetStorageEnabled() bool
}

func NewDefaultSessionStore

func NewDefaultSessionStore(opts ...DefaultSessionStoreOpt) SessionStore

NewDefaultSessionStore create or return the unique session store instance

type State

type State ReportType
const (
	IDLE State = 1000 + iota
	CONNECTING
	CONNECTED
	PUBLISHING
	PUBLISHED
	SUBSCRIBING
	SUBSCRIBED
	UNSUBSCRIBING
	UNSUBSCRIBED
	PINGING
	CLOSED
	PUB1_ACK
	PUB2_REC
	PUB2_REL
	PUB2_COMP
	WRONG_STATE
	NEED_CLOSE_STATE
	MAX_STATE
)

func (State) String

func (s State) String() string

type StateContext

type StateContext struct {
	TimeOfConnected time.Time
	ConnectTimeout  time.Duration

	State State

	// ClientId     string
	ConnectParam *ConnectParam
	// contains filtered or unexported fields
}

func NewMqttClientContext

func NewMqttClientContext(clientId string, conn net.Conn, opts ...StateContextOpt) *StateContext

NewMqttClientContext create StateContext for Client

func NewMqttServerContext

func NewMqttServerContext(clientId string, wr io.Writer, closer io.Closer, opts ...StateContextOpt) *StateContext

func NewMqttTestContext

func NewMqttTestContext(clientId string, wr io.Writer, closer io.Closer, opts ...StateContextOpt) *StateContext

func (*StateContext) Advance

func (s *StateContext) Advance(pkg *Pkg) (next State, err error)

func (*StateContext) Borrow

func (s *StateContext) Borrow() (pktId uint16)

func (*StateContext) CanWrite

func (s *StateContext) CanWrite() bool

func (*StateContext) CloseWithReason

func (s *StateContext) CloseWithReason(reason errors.CloseReason) (err error)

func (*StateContext) CreateSessionForThisClient

func (s *StateContext) CreateSessionForThisClient(pkg *Pkg) (sess Session, err error)

func (*StateContext) IsClientMode

func (s *StateContext) IsClientMode() bool

func (*StateContext) IsClose

func (s *StateContext) IsClose() bool

func (*StateContext) IsNormalClosed

func (s *StateContext) IsNormalClosed() bool

func (*StateContext) IsNotClientMode

func (s *StateContext) IsNotClientMode() bool

func (*StateContext) IsNotServer

func (s *StateContext) IsNotServer() bool

func (*StateContext) IsServer

func (s *StateContext) IsServer() bool

func (*StateContext) MaxProtocolLevel

func (s *StateContext) MaxProtocolLevel() ProtocolLevel

func (*StateContext) MaxQoS

func (s *StateContext) MaxQoS() QoSType

func (*StateContext) MaxVersion

func (s *StateContext) MaxVersion() string

func (*StateContext) Parse

func (ctx *StateContext) Parse(pkg *Pkg) (to State, err error)

func (*StateContext) Return

func (s *StateContext) Return(pktId uint16) (err error)

func (*StateContext) WithCtx

func (p *StateContext) WithCtx(ctx *StateContext) *logger.Base

WithCtx could cause race exception. for example: when two or more clients incoming with CONNECT packets, now connectParser parse them ok and print logging info via base.WithCtx(ctx).Debug(...), here is the race point. Another scene is the multiple clients send pingreq packets.

func (*StateContext) Write

func (s *StateContext) Write(p []byte) (nn int, err error)

Write to write bytes back to remote client. mqttReader implements Writer interface too.

type StateContextOpt

type StateContextOpt func(ctx *StateContext)

func WithAuthEnabled

func WithAuthEnabled(b bool) StateContextOpt

func WithAuthenticator

func WithAuthenticator(ss auth.Authenticator) StateContextOpt

func WithConnectTimeout

func WithConnectTimeout(timeout time.Duration) StateContextOpt

func WithConnectedTimestamp

func WithConnectedTimestamp(t time.Time) StateContextOpt

func WithDefaultRiskControl

func WithDefaultRiskControl(b bool) StateContextOpt

func WithMaxProtocolLevel

func WithMaxProtocolLevel(level ProtocolLevel) StateContextOpt

func WithMaxQoS

func WithMaxQoS(maxQoS QoSType) StateContextOpt

func WithMaxVersion

func WithMaxVersion(v string) StateContextOpt

func WithNoWildMatchFilter

func WithNoWildMatchFilter(b bool) StateContextOpt

func WithRiskControl

func WithRiskControl(ss RiskControl) StateContextOpt

func WithSessionStore

func WithSessionStore(ss SessionStore) StateContextOpt

func WithStateRegistry

func WithStateRegistry(ss StateRegistry) StateContextOpt

func WithStrictClientId

func WithStrictClientId(b bool) StateContextOpt

func WithStrictProtocolName

func WithStrictProtocolName(b bool) StateContextOpt

func WithTimeOfConnected

func WithTimeOfConnected(tm time.Time) StateContextOpt

type StateRegistry

type StateRegistry interface {
	AddSynonyms(from State, synonymy ...State)
	AddState(from State, parsers *PacketParserMap)
	// in testing mode, noOnAction should be true to ignore invoking PacketParser.OnAction
	Advance(out io.Writer, ctx *StateContext, pkg *Pkg, noOnAction bool) (to State, err error)
	Parsers() map[ReportType]PacketParser
	// in testing mode, noOnAction should be true to ignore invoking PacketParser.OnAction
	Parse(ctx *StateContext, pkg *Pkg, noOnAction bool) (to State, err error)
}

func NewDefaultClientStateRegistry

func NewDefaultClientStateRegistry(strictClientId bool) StateRegistry

func NewDefaultStateRegistry

func NewDefaultStateRegistry(strictClientId bool) StateRegistry

type Store

type Store interface {
	Has(key string) bool
	Get(key string, defaultValue ...interface{}) interface{}
	Set(key string, value interface{}) (err error)
}

type SubBlock

type SubBlock struct {
	TopicFilter string
	RequestQoS  QoSType
	Verbose     bool
	RetCode     QoSType
}

type SubHolder

type SubHolder interface {
	Add(packetIdentifier uint16, filters []SubBlock) (err error)
	ApplyRetCodes(ctx *StateContext, packetIdentifier uint16, data []byte, pos int) (newPos int)
}

type Subinfo

type Subinfo struct {
	// contains filtered or unexported fields
}

func (Subinfo) MarshalBinary

func (s Subinfo) MarshalBinary() (ret []byte, err error)

func (Subinfo) String

func (s Subinfo) String() string

func (*Subinfo) UnmarshalBinary

func (s *Subinfo) UnmarshalBinary(data []byte) (err error)

type SubscribeBuilderOpt

type SubscribeBuilderOpt func(*subBuilder)

func WithSub50UserProperties

func WithSub50UserProperties(props map[string]string) SubscribeBuilderOpt

func WithSubHolder

func WithSubHolder(holder SubHolder) SubscribeBuilderOpt

func WithSubPacketIdentifier

func WithSubPacketIdentifier(id uint16) SubscribeBuilderOpt

func WithSubTopicFilter

func WithSubTopicFilter(topicFilter string, qos QoSType) SubscribeBuilderOpt

func WithSubTopicFilters

func WithSubTopicFilters(topicFilters []string, qos QoSType) SubscribeBuilderOpt

type SysPublisher

type SysPublisher interface {
	SysPublish(topicParts string, payload []byte, opts ...PublishBuilderOpt)
	SysPublishString(topicParts string, payload string, opts ...PublishBuilderOpt)
	SysPublishInt64(topicParts string, payload int64, opts ...PublishBuilderOpt)
	SysPublishUint64(topicParts string, payload uint64, opts ...PublishBuilderOpt)
	SysPublishFloat64(topicParts string, payload float64, opts ...PublishBuilderOpt)

	// IncMessages updates statistics data under $SYS/broker/messages/...
	// The known suffixes must be:
	// "received", "sent", "publish/dropped", "publish/received", "publish/sent", "retained/count",
	IncMessages(suffix string, delta int64)

	// IncLoad updates statistics data under $%SYS/broker/load/...
	// The known suffixes could be:
	// "bytes/received", "bytes/sent", ...
	IncLoad(suffix string, delta int)
}

type TopicFilter

type TopicFilter struct {
	Filter            string
	Shared            bool
	SharedName        string
	RequestedQoS      QoSType
	NoLocal           bool // v5.0+
	RetainAsPublished bool // v5.0+
	RetainHandling    int  // v5.0+, see also ch. 3.8.3.1
	RetCode           errors.CloseReason
}

type UnsubackBuilderOpt

type UnsubackBuilderOpt func(*unsubackBuilder)

func WithUnsuback50UserProperties

func WithUnsuback50UserProperties(props map[string]string) UnsubackBuilderOpt

func WithUnsubackIdentifier

func WithUnsubackIdentifier(id uint16) UnsubackBuilderOpt

func WithUnsubackRetCodes

func WithUnsubackRetCodes(retCodes []UnsubackRetcode) UnsubackBuilderOpt

type UnsubackRetcode

type UnsubackRetcode int
const (
	UnsubackOK UnsubackRetcode = iota
)

type UnsubscribeBuilderOpt

type UnsubscribeBuilderOpt func(*unsubBuilder)

func WithUnsub50UserProperties

func WithUnsub50UserProperties(props map[string]string) UnsubscribeBuilderOpt

func WithUnsubPacketIdentifier

func WithUnsubPacketIdentifier(id uint16) UnsubscribeBuilderOpt

func WithUnsubTopicFilter

func WithUnsubTopicFilter(topicFilter string) UnsubscribeBuilderOpt

type VariantHeader

type VariantHeader interface {
	Applyable
}

type WillMessage

type WillMessage struct {
	Message
}

func NewWillMessage

func NewWillMessage(willQoS QoSType, willRetainFlag bool, willTopic string, willMessage []byte) *WillMessage

type WrongPacketParser

type WrongPacketParser struct{}

func (*WrongPacketParser) CreatePayload

func (p *WrongPacketParser) CreatePayload() Payload

func (*WrongPacketParser) CreateVH

func (p *WrongPacketParser) CreateVH() VariantHeader

func (*WrongPacketParser) GetToState

func (p *WrongPacketParser) GetToState() State

func (*WrongPacketParser) OnAction

func (p *WrongPacketParser) OnAction(ctx *StateContext, pkg *Pkg) (err error)

func (*WrongPacketParser) OnError

func (p *WrongPacketParser) OnError(ctx *StateContext, pkg *Pkg, reason error) (err error)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL