wasp

package
v4.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 24, 2020 License: MPL-2.0 Imports: 28 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	Clock                    = time.Now
	ErrConnectNotDone        = errors.New("CONNECT not done")
	ErrSessionLost           = errors.New("Session lost")
	ErrSessionDisconnected   = errors.New("Session disconnected")
	ErrUnknownPacketReceived = errors.New("Received unknown packet type")
	ErrAuthenticationFailed  = errors.New("Authentication failed")
	ErrProtocolViolation     = errors.New("Protocol violation")
)

Functions

func AddFields

func AddFields(ctx context.Context, fields ...zap.Field) context.Context

func L

func L(ctx context.Context) *zap.Logger

func NewWriter

func NewWriter(peerID uint64, subscriptions distributed.SubscriptionsState, local LocalState, ackQueue ack.Queue) *writer

func SchedulePublishes

func SchedulePublishes(id uint64, writer Writer, messageLog messageLog) func(ctx context.Context)

func StoreLogger

func StoreLogger(ctx context.Context, l *zap.Logger) context.Context

Types

type AuthenticationHandler

type AuthenticationHandler interface {
	Authenticate(ctx context.Context, mqtt auth.ApplicationContext, transport auth.TransportContext) (principal auth.Principal, err error)
}

type LocalState

type LocalState interface {
	Get(id string) *sessions.Session
	ListSessions() []*sessions.Session
	Create(id string, session *sessions.Session) *sessions.Session
	Delete(id string) *sessions.Session
}

func NewState

func NewState(id uint64) LocalState

type Manager

type Manager interface {
	Run(ctx context.Context)
	Setup(ctx context.Context, c transport.Metadata)
	DisconnectClients(ctx context.Context)
}

func NewConnectionManager

func NewConnectionManager(authHandler AuthenticationHandler, local LocalState, state distributed.State, writer Writer, packetProcesor PacketProcessor, ackQueue ack.Queue) Manager

type NodeMemberManager

type NodeMemberManager interface {
	NotifyGossipJoin(id uint64)
	NotifyGossipLeave(id uint64)
}

func NewNodeMemberManager

func NewNodeMemberManager(id uint64, log messageLog, state distributed.State) NodeMemberManager

type PacketProcessor

type PacketProcessor interface {
	Process(ctx context.Context, session *sessions.Session, c io.Writer, pkt packet.Packet) error
}

PacketProcessor processes MQTT packet and update state accordingly, or distribute messages

func NewPacketProcessor

func NewPacketProcessor(local LocalState, state distributed.State, writer Writer, publishHandler PublishHandler, ackQueue ack.Queue) PacketProcessor

NewPacketProcessor returns a new packet processor

type PublishDistributor

type PublishDistributor struct {
	ID        uint64
	Transport publishDistributorTransport
	State     distributed.SubscriptionsState
	Storage   messageLog
	Logger    *zap.Logger
}

PublishDistributor stores publish messaes in local or remote message logs.

func (*PublishDistributor) Distribute

func (storer *PublishDistributor) Distribute(ctx context.Context, publish *packet.Publish) error

Distribute resolves message destinations, and use them to write message on disk.

type PublishHandler

type PublishHandler func(sender string, publish *packet.Publish) error

type RPCServer

type RPCServer interface {
	api.MQTTServer
	Serve(grpcServer *grpc.Server)
}

func NewMQTTServer

func NewMQTTServer(state distributed.State, local LocalState, storage messageLog, distributor *PublishDistributor, node cluster.MultiNode) RPCServer

type RoutedMessage

type RoutedMessage struct {
	// contains filtered or unexported fields
}

type Scheduler

type Scheduler struct {
	ID uint64
	// contains filtered or unexported fields
}

Scheduler schedules message to local recipients

func (*Scheduler) Schedule

func (pdist *Scheduler) Schedule(ctx context.Context, offset uint64, publish *packet.Publish) error

Schedule distributes the message to local subscribers.

type Writer

type Writer interface {
	Register(sessionID string, enc transport.TimeoutReadWriteCloser)
	Unregister(sessionID string)
	Run(ctx context.Context, log messageLog) error
	Schedule(ctx context.Context, offset uint64)
	Send(ctx context.Context, recipients []string, qosses []int32, p *packet.Publish)
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL