server

package
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 7, 2022 License: Apache-2.0 Imports: 31 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewServer

func NewServer(proto string, path string) *server

func Print

func Print()

func Regsiter

func Regsiter(name string, f Handle)

func Server

func Server() *server

func Service

func Service(ctx context.Context, name string, data []byte) (p []byte, err error)

Types

type Handle

type Handle func(ctx context.Context, data []byte) ([]byte, error)

type Leaf

type Leaf struct {
	Message MessagePacket    // a message which has been retained for a specific topic.
	Key     string           // the key that was used to create the leaf.
	Filter  string           // the path of the topic filter being matched.
	Parent  *Leaf            // a pointer to the parent node for the leaf.
	Leaves  map[string]*Leaf // a map of child nodes, keyed on particle id.
	Clients map[string]byte  // a map of client ids subscribed to the topic.
}

Leaf is a child node on the tree.

type MessagePacket

type MessagePacket struct {
	Topics    []string
	Payload   []byte
	TopicName string
	Retain    bool
}

type MuxBroker

type MuxBroker struct {
	sync.Mutex
	// contains filtered or unexported fields
}

MuxBroker is responsible for brokering multiplexed connections by unique ID.

It is used by plugins to multiplex multiple RPC connections and data streams on top of a single connection between the plugin process and the host process.

This allows a plugin to request a channel with a specific ID to connect to or accept a connection from, and the broker handles the details of holding these channels open while they're being negotiated.

The Plugin interface has access to these for both Server and Client. The broker can be used by either (optionally) to reserve and connect to new multiplexed streams. This is useful for complex args and return values, or anything else you might need a data stream for.

func (*MuxBroker) Accept

func (m *MuxBroker) Accept(id uint32) (net.Conn, error)

Accept accepts a connection by ID.

This should not be called multiple times with the same ID at one time.

func (*MuxBroker) AcceptAndServe

func (m *MuxBroker) AcceptAndServe(id uint32, v interface{})

AcceptAndServe is used to accept a specific stream ID and immediately serve an RPC server on that stream ID. This is used to easily serve complex arguments.

The served interface is always registered to the "Plugin" name.

func (*MuxBroker) Close

func (m *MuxBroker) Close() error

Close closes the connection and all sub-connections.

func (*MuxBroker) Dial

func (m *MuxBroker) Dial(id uint32) (net.Conn, error)

Dial opens a connection by ID.

func (*MuxBroker) NextId

func (m *MuxBroker) NextId() uint32

NextId returns a unique ID to use next.

It is possible for very long-running plugin hosts to wrap this value, though it would require a very large amount of RPC calls. In practice we've never seen it happen.

func (*MuxBroker) Run

func (m *MuxBroker) Run()

Run starts the brokering and should be executed in a goroutine, since it blocks forever, or until the session closes.

Uses of MuxBroker never need to call this. It is called internally by the plugin host/client.

type Plugin

type Plugin struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

func NewPlugin

func NewPlugin(path string, id string, cfg common.PluginCfg) *Plugin

NewPlugin create a new plugin ready to be started, or returns an error if the initial setup fails.

The first argument specifies the protocol. It can be either set to "unix" for communication on an ephemeral local socket, or "tcp" for network communication on the local host (using a random unprivileged port.)

This constructor will panic if the proto argument is neither "unix" nor "tcp".

The path to the plugin executable should be absolute. Any path accepted by the "exec" package in the standard library is accepted and the same rules for execution are applied.

Optionally some parameters might be passed to the plugin executable.

func (*Plugin) GetId

func (p *Plugin) GetId() string

func (*Plugin) SetSocketDirectory

func (p *Plugin) SetSocketDirectory(dir string)

func (*Plugin) SetTimeout

func (p *Plugin) SetTimeout(t time.Duration)

Set the maximum time a plugin is allowed to start up and to shut down. Empty timeout (zero) is not allowed, default will be used.

Default is two seconds.

Panics if called after Start.

func (*Plugin) Start

func (p *Plugin) Start()

Start will execute the plugin as a subprocess. Start will return immediately. Any first call to the plugin will reveal eventual errors occurred at initialization.

Calls subsequent to Start will hang until the plugin has been properly initialized.

func (*Plugin) Stop

func (p *Plugin) Stop()

Stop attemps to stop cleanly or kill the running plugin, then will free all resources. Stop returns when the plugin as been shut down and related routines have exited.

func (*Plugin) String

func (p *Plugin) String() string

Default string representation

func (*Plugin) Wait

func (p *Plugin) Wait(ctx context.Context, pidCh chan<- int, exe string, params string)

type PublicMsg

type PublicMsg struct {
	// contains filtered or unexported fields
}

type Session

type Session interface {
	Version() int
	// ConnectedAt returns the connected time
	ConnectedAt() time.Time
	// Connection returns the raw net.Conn
	Connection() net.Conn
	// Close closes the client connection.
	Close()
	// Disconnect sends a disconnect packet to client, it is use to close v5 client.
	Disconnect()
}

type Subscriptions

type Subscriptions map[string]byte

Subscriptions is a map of subscriptions keyed on client.

type Topic

type Topic struct {
	Root *Leaf // a leaf containing a message and more leaves.
	// contains filtered or unexported fields
}

Topic is a prefix/trie tree containing topic subscribers and retained messages.

func TopicNew

func TopicNew() *Topic

New returns a pointer to a new instance of Topic.

func (*Topic) Messages

func (x *Topic) Messages(filter string) []MessagePacket

Messages returns a slice of retained topic messages which match a filter.

func (*Topic) RetainMessage

func (x *Topic) RetainMessage(msg MessagePacket) int64

RetainMessage saves a message payload to the end of a topic branch. Returns 1 if a retained message was added, and -1 if the retained message was removed. 0 is returned if sequential empty payloads are received.

func (*Topic) Subscribe

func (x *Topic) Subscribe(filter, client string, qos byte) bool

Subscribe creates a subscription filter for a client. Returns true if the subscription was new.

func (*Topic) Subscribers

func (x *Topic) Subscribers(topic string) Subscriptions

Subscribers returns a map of clients who are subscribed to matching filters.

func (*Topic) Unsubscribe

func (x *Topic) Unsubscribe(filter, client string) bool

Unsubscribe removes a subscription filter for a client. Returns true if an unsubscribe action successful and the subscription existed.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL