persistence

package
v1.5.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 25, 2023 License: MIT Imports: 3 Imported by: 0

Documentation

Index

Constants

View Source
const (

	// KSubscription is the key for subscription data.
	KSubscription = "sub"

	// KServerInfo is the key for server info data.
	KServerInfo = "srv"

	// KRetained is the key for retained messages data.
	KRetained = "ret"

	// KInflight is the key for inflight messages data.
	KInflight = "ifm"

	// KClient is the key for client data.
	KClient = "cl"
)

Variables

This section is empty.

Functions

This section is empty.

Types

type Client

type Client struct {
	LWT      LWT    `json:"lwt" msg:"lwt"`           // the last-will-and-testament message for the client.
	Username []byte `json:"username" msg:"username"` // the username the client authenticated with.
	ID       string `json:"-" msg:"-"`               // this field is ignored// the storage key.
	ClientID string `json:"-" msg:"-"`               // the id of the client.
	T        string `json:"-" msg:"-"`               // the type of the stored data.
	Listener string `json:"listener" msg:"listener"` // the last known listener id for the client
}

Client contains client data that can be persistently stored.

func (*Client) MarshalMsg

func (z *Client) MarshalMsg(b []byte) (o []byte, err error)

MarshalMsg implements msgp.Marshaler

func (*Client) Msgsize

func (z *Client) Msgsize() (s int)

Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message

func (*Client) UnmarshalMsg

func (z *Client) UnmarshalMsg(bts []byte) (o []byte, err error)

UnmarshalMsg implements msgp.Unmarshaler

type DeleteEntity

type DeleteEntity interface {
	DeleteSubscription(cid, filter string) error
	DeleteClient(id string) error
	DeleteInflight(cid string, pid uint16) error
	DeleteInflightBatch(cid string, pid []uint16) error
	DeleteRetained(topic string) error
}

type FixedHeader

type FixedHeader struct {
	Remaining int  `json:"remaining" msg:"remaining"` // the number of remaining bytes in the payload.
	Type      byte `json:"type" msg:"type"`           // the type of the packet (PUBLISH, SUBSCRIBE, etc) from bits 7 - 4 (byte 1).
	Qos       byte `json:"qos" msg:"qos"`             // indicates the quality of service expected.
	Dup       bool `json:"dup" msg:"dup"`             // indicates if the packet was already sent at an earlier time.
	Retain    bool `json:"retain" msg:"retain"`       // whether the message should be retained.
}

FixedHeader contains the fixed header properties of a message.

func (*FixedHeader) MarshalMsg

func (z *FixedHeader) MarshalMsg(b []byte) (o []byte, err error)

MarshalMsg implements msgp.Marshaler

func (*FixedHeader) Msgsize

func (z *FixedHeader) Msgsize() (s int)

Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message

func (*FixedHeader) UnmarshalMsg

func (z *FixedHeader) UnmarshalMsg(bts []byte) (o []byte, err error)

UnmarshalMsg implements msgp.Unmarshaler

type GenEntityId

type GenEntityId interface {
	GenInflightId(cid string, pid uint16) string
	GenSubscriptionId(cid, filter string) string
	GenRetainedId(topic string) string
}

type KeyValue

type KeyValue struct {
	Key   string `json:"key" msg:"key"`
	Value string `json:"value" msg:"value"`
}

func (KeyValue) MarshalMsg

func (z KeyValue) MarshalMsg(b []byte) (o []byte, err error)

MarshalMsg implements msgp.Marshaler

func (KeyValue) Msgsize

func (z KeyValue) Msgsize() (s int)

Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message

func (*KeyValue) UnmarshalMsg

func (z *KeyValue) UnmarshalMsg(bts []byte) (o []byte, err error)

UnmarshalMsg implements msgp.Unmarshaler

type LWT

type LWT struct {
	Message []byte `json:"message" msg:"message"` // the message that shall be sent when the client disconnects.
	Topic   string `json:"topic" msg:"topic"`     // the topic the will message shall be sent to.
	Qos     byte   `json:"qos" msg:"qos"`         // the quality of service desired.
	Retain  bool   `json:"retain" msg:"retain"`   // indicates whether the will message should be retained
}

LWT contains details about a clients LWT payload.

func (*LWT) MarshalMsg

func (z *LWT) MarshalMsg(b []byte) (o []byte, err error)

MarshalMsg implements msgp.Marshaler

func (*LWT) Msgsize

func (z *LWT) Msgsize() (s int)

Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message

func (*LWT) UnmarshalMsg

func (z *LWT) UnmarshalMsg(bts []byte) (o []byte, err error)

UnmarshalMsg implements msgp.Unmarshaler

type Message

type Message struct {
	Payload     []byte      `json:"payload" msg:"payload"`                 // the message payload (if retained).
	FixedHeader FixedHeader `json:"fixed_header" msg:"fixed_header"`       // the header properties of the message.
	T           string      `json:"-" msg:"-"`                             // the type of the stored data.
	ID          string      `json:"-" msg:"-"`                             // the storage key.
	Client      string      `json:"-" msg:"-"`                             // the id of the client who sent the message (if inflight).
	TopicName   string      `json:"topic_name" msg:"topic_name"`           // the topic the message was sent to (if retained).
	Sent        int64       `json:"sent" msg:"sent"`                       // the last time the message was sent (for retries) in unixtime (if inflight).
	Resends     int         `json:"resends" msg:"resends"`                 // the number of times the message was attempted to be sent (if inflight).
	PacketID    uint16      `json:"packet_id" msg:"packet_id"`             // the unique id of the packet (if inflight).
	Properties  Properties  `json:"props,omitempty" msg:"props,omitempty"` // for v5
}

Message contains the details of a retained or inflight message.

func (*Message) MarshalMsg

func (z *Message) MarshalMsg(b []byte) (o []byte, err error)

MarshalMsg implements msgp.Marshaler

func (*Message) Msgsize

func (z *Message) Msgsize() (s int)

Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message

func (*Message) UnmarshalMsg

func (z *Message) UnmarshalMsg(bts []byte) (o []byte, err error)

UnmarshalMsg implements msgp.Unmarshaler

type MockStore

type MockStore struct {
	Fail     map[string]bool // issue errors for different methods.
	FailOpen bool            // error on open.
	Closed   bool            // indicate mock store is closed.
	Opened   bool            // indicate mock store is open.
}

MockStore is a mock storage backend for testing.

func (*MockStore) Close

func (s *MockStore) Close()

Close closes the storage instance.

func (*MockStore) DeleteClient

func (s *MockStore) DeleteClient(id string) error

DeleteClient deletes a client from the persistent store.

func (*MockStore) DeleteInflight

func (s *MockStore) DeleteInflight(cid string, pid uint16) error

DeleteInflight deletes an inflight message from the persistent store.

func (*MockStore) DeleteInflightBatch

func (s *MockStore) DeleteInflightBatch(cid string, pid []uint16) error

DeleteInflightBatch

func (*MockStore) DeleteRetained

func (s *MockStore) DeleteRetained(topic string) error

DeleteRetained deletes a retained message from the persistent store.

func (*MockStore) DeleteSubscription

func (s *MockStore) DeleteSubscription(cid, filter string) error

DeleteSubscription deletes a subscription from the persistent store.

func (*MockStore) GenInflightId

func (s *MockStore) GenInflightId(cid string, pid uint16) string

func (*MockStore) GenRetainedId

func (s *MockStore) GenRetainedId(topic string) string

func (*MockStore) GenSubscriptionId

func (s *MockStore) GenSubscriptionId(cid, filter string) string

func (*MockStore) MarshalMsg

func (z *MockStore) MarshalMsg(b []byte) (o []byte, err error)

MarshalMsg implements msgp.Marshaler

func (*MockStore) Msgsize

func (z *MockStore) Msgsize() (s int)

Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message

func (*MockStore) Open

func (s *MockStore) Open() error

Open opens the storage instance.

func (*MockStore) ReadClientByCid

func (s *MockStore) ReadClientByCid(cid string) (v Client, err error)

func (*MockStore) ReadClients

func (s *MockStore) ReadClients() (v []Client, err error)

ReadClients loads the clients from the storage instance.

func (*MockStore) ReadInflight

func (s *MockStore) ReadInflight() (v []Message, err error)

ReadInflight loads the inflight messages from the storage instance.

func (*MockStore) ReadInflightByCid

func (s *MockStore) ReadInflightByCid(cid string) (v []Message, err error)

func (*MockStore) ReadRetained

func (s *MockStore) ReadRetained() (v []Message, err error)

ReadRetained loads the retained messages from the storage instance.

func (*MockStore) ReadRetainedByTopic

func (s *MockStore) ReadRetainedByTopic(topic string) (v Message, err error)

func (*MockStore) ReadServerInfo

func (s *MockStore) ReadServerInfo() (v ServerInfo, err error)

ReadServerInfo loads the server info from the storage instance.

func (*MockStore) ReadSubscriptions

func (s *MockStore) ReadSubscriptions() (v []Subscription, err error)

ReadSubscriptions loads the subscriptions from the storage instance.

func (*MockStore) ReadSubscriptionsByCid

func (s *MockStore) ReadSubscriptionsByCid(cid string) (v []Subscription, err error)

func (*MockStore) UnmarshalMsg

func (z *MockStore) UnmarshalMsg(bts []byte) (o []byte, err error)

UnmarshalMsg implements msgp.Unmarshaler

func (*MockStore) WriteClient

func (s *MockStore) WriteClient(v Client) error

WriteClient writes a single client to the storage instance.

func (*MockStore) WriteInflight

func (s *MockStore) WriteInflight(v Message) error

WriteInFlight writes a single InFlight message to the storage instance.

func (*MockStore) WriteRetained

func (s *MockStore) WriteRetained(v Message) error

WriteRetained writes a single retained message to the storage instance.

func (*MockStore) WriteServerInfo

func (s *MockStore) WriteServerInfo(v ServerInfo) error

WriteServerInfo writes server info to the storage instance.

func (*MockStore) WriteSubscription

func (s *MockStore) WriteSubscription(v Subscription) error

WriteSubscription writes a single subscription to the storage instance.

type Properties

type Properties struct {
	Expiry          int64      `json:"expiry" msg:"expiry"` // the message expiration time in unixtime.
	PayloadFormat   *byte      `json:"payload_format,omitempty" msg:"payload_format,omitempty"`
	ContentType     string     `json:"content_type,omitempty" msg:"content_type,omitempty"`
	ResponseTopic   string     `json:"resp_topic,omitempty" msg:"resp_topic,omitempty"`
	CorrelationData []byte     `json:"corr_data,omitempty" msg:"corr_data,omitempty"`
	UserProperties  []KeyValue `json:"user_props,omitempty" msg:"user_props,omitempty"`
}

func (*Properties) MarshalMsg

func (z *Properties) MarshalMsg(b []byte) (o []byte, err error)

MarshalMsg implements msgp.Marshaler

func (*Properties) Msgsize

func (z *Properties) Msgsize() (s int)

Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message

func (*Properties) UnmarshalMsg

func (z *Properties) UnmarshalMsg(bts []byte) (o []byte, err error)

UnmarshalMsg implements msgp.Unmarshaler

type ReadAll

type ReadAll interface {
	ReadSubscriptions() (v []Subscription, err error)
	ReadInflight() (v []Message, err error)
	ReadRetained() (v []Message, err error)
	ReadClients() (v []Client, err error)
	ReadServerInfo() (v ServerInfo, err error)
}

type ReadClient

type ReadClient interface {
	ReadSubscriptionsByCid(cid string) (v []Subscription, err error)
	ReadInflightByCid(cid string) (v []Message, err error)
	ReadRetainedByTopic(topic string) (v Message, err error)
	ReadClientByCid(cid string) (v Client, err error)
}

type ServerInfo

type ServerInfo struct {
	system.Info `json:"info" msg:"info"` // embed the system info struct.
	ID          string                   `json:"id" msg:"id"` // the storage key.
}

ServerInfo contains information and statistics about the server.

type Store

type Store interface {
	Open() error
	Close()

	GenEntityId
	WriteEntity
	DeleteEntity
	ReadClient
	ReadAll
}

Store is an interface which details a persistent storage connector.

type Subscription

type Subscription struct {
	ID     string `json:"-" msg:"-"`           // the storage key.
	T      string `json:"-" msg:"-"`           // the type of the stored data.
	Client string `json:"-" msg:"-"`           // the id of the client who the subscription belongs to.
	Filter string `json:"filter" msg:"filter"` // the topic filter being subscribed to.
	QoS    byte   `json:"qos" msg:"qos"`       // the desired QoS byte.

	// v5.0
	NoLocal           bool `json:"nl,omitempty" msg:"nl,omitempty"`
	RetainHandling    byte `json:"rh,omitempty" msg:"rh,omitempty"`
	RetainAsPublished bool `json:"rap,omitempty" msg:"rap,omitempty"`
}

Subscription contains the details of a topic filter subscription.

func (*Subscription) MarshalMsg

func (z *Subscription) MarshalMsg(b []byte) (o []byte, err error)

MarshalMsg implements msgp.Marshaler

func (*Subscription) Msgsize

func (z *Subscription) Msgsize() (s int)

Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message

func (*Subscription) UnmarshalMsg

func (z *Subscription) UnmarshalMsg(bts []byte) (o []byte, err error)

UnmarshalMsg implements msgp.Unmarshaler

type WriteEntity

type WriteEntity interface {
	WriteSubscription(v Subscription) error
	WriteClient(v Client) error
	WriteInflight(v Message) error
	WriteServerInfo(v ServerInfo) error
	WriteRetained(v Message) error
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL