Documentation ¶
Index ¶
- Variables
- func AddFields(ctx context.Context, fields ...zap.Field) context.Context
- func L(ctx context.Context) *zap.Logger
- func NewWriter(peerID uint64, subscriptions distributed.SubscriptionsState, local LocalState, ...) *writer
- func SchedulePublishes(id uint64, writer Writer, messageLog messageLog) func(ctx context.Context)
- func StoreLogger(ctx context.Context, l *zap.Logger) context.Context
- type AuthenticationHandler
- type LocalState
- type Manager
- type NodeMemberManager
- type PacketProcessor
- type PublishDistributor
- type PublishHandler
- type RPCServer
- type RoutedMessage
- type Scheduler
- type Writer
Constants ¶
This section is empty.
Variables ¶
View Source
var ( Clock = time.Now ErrConnectNotDone = errors.New("CONNECT not done") ErrSessionLost = errors.New("Session lost") ErrSessionDisconnected = errors.New("Session disconnected") ErrUnknownPacketReceived = errors.New("Received unknown packet type") ErrAuthenticationFailed = errors.New("Authentication failed") ErrProtocolViolation = errors.New("Protocol violation") )
Functions ¶
func NewWriter ¶
func NewWriter(peerID uint64, subscriptions distributed.SubscriptionsState, local LocalState, ackQueue ack.Queue) *writer
func SchedulePublishes ¶
Types ¶
type AuthenticationHandler ¶
type AuthenticationHandler interface {
Authenticate(ctx context.Context, mqtt auth.ApplicationContext, transport auth.TransportContext) (principal auth.Principal, err error)
}
type LocalState ¶
type LocalState interface { Get(id string) *sessions.Session ListSessions() []*sessions.Session Create(id string, session *sessions.Session) *sessions.Session Delete(id string) *sessions.Session }
func NewState ¶
func NewState(id uint64) LocalState
type Manager ¶
type Manager interface { Run(ctx context.Context) Setup(ctx context.Context, c transport.Metadata) DisconnectClients(ctx context.Context) }
func NewConnectionManager ¶
func NewConnectionManager(authHandler AuthenticationHandler, local LocalState, state distributed.State, writer Writer, packetProcesor PacketProcessor, ackQueue ack.Queue) Manager
type NodeMemberManager ¶
func NewNodeMemberManager ¶
func NewNodeMemberManager(id uint64, log messageLog, state distributed.State) NodeMemberManager
type PacketProcessor ¶
type PacketProcessor interface {
Process(ctx context.Context, session *sessions.Session, c io.Writer, pkt packet.Packet) error
}
PacketProcessor processes MQTT packet and update state accordingly, or distribute messages
func NewPacketProcessor ¶
func NewPacketProcessor(local LocalState, state distributed.State, writer Writer, publishHandler PublishHandler, ackQueue ack.Queue) PacketProcessor
NewPacketProcessor returns a new packet processor
type PublishDistributor ¶
type PublishDistributor struct { ID uint64 Transport publishDistributorTransport State distributed.SubscriptionsState Storage messageLog Logger *zap.Logger }
PublishDistributor stores publish messaes in local or remote message logs.
func (*PublishDistributor) Distribute ¶
Distribute resolves message destinations, and use them to write message on disk.
type RPCServer ¶
type RPCServer interface { api.MQTTServer Serve(grpcServer *grpc.Server) }
func NewMQTTServer ¶
func NewMQTTServer(state distributed.State, local LocalState, storage messageLog, distributor *PublishDistributor, node cluster.MultiNode) RPCServer
type RoutedMessage ¶
type RoutedMessage struct {
// contains filtered or unexported fields
}
type Scheduler ¶
type Scheduler struct { ID uint64 // contains filtered or unexported fields }
Scheduler schedules message to local recipients
Source Files ¶
Click to show internal directories.
Click to hide internal directories.