gomavproxy

package module
v0.5.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 15, 2023 License: MIT Imports: 21 Imported by: 0

README

gomavproxy

pipeline status coverage report go report go reference

gomavproxy is a library that implements a proxy and router functionality for Mavlink protocol. It has a minimalistic and developer-friendly interface for manipulation multiple mavlink streams in runtime. Also, it has flexible filtering functionality which helps to change mavlink streams. Filters support removing, rewriting, and injecting messages.

MAVLink is a very lightweight messaging protocol for communicating with drones (and between onboard drone components).

Features

  • Creating multiple listeners and multiple filters in runtime
  • Listener supports one upstream and multiple downstream
  • Listener can be launched without downstreams. Only for filter processing.
  • Upstreams support TCP, UDP, and SERIAL device source
  • Downstreams support TCP and UDP in server mode
  • Listener can have multiple filters
  • Filters can be chained to the listeners by name or label set. It allows flexible configure handler sequences for each listener.
  • Filters support rewriting and removing mavlink frames
  • Listeners support injecting mavlink frame
  • Filters can have multiple mavlink frame handlers
  • Frame handlers can be sync and async
  • Frame handlers subscribe to frames by payload types. Supports next payload types: any, message id, command id, command id ack.
  • Frame handlers sorting by priority when we have several handlers with the same subscription. Handlers with the highest priority will be executed first.
  • Frame sequence id control. It rewrites sequence id in a frame for saving packet loss detection behavior when some frames were injected and removed.
  • Mavlink dialect agnostic**

** Only if you want to remove or inject frames in some mavlink component that uses custom dialect. You must specify this dialect. In all other cases, it is not necessary.

Install

go get mavpoint.io/gomavproxy

Usage

Creating proxy

proxy := gomavproxy.New(logger)

Adding listener which proxing mavlink stream from serial device to udp port on localhost (port will be selected automatically if it not specify)

listener, err := proxy.RegisterListener(
    &gomavproxy.ListenerMetadata{
        Name: "superlistener",
        Labels: map[string]string{"group":"somegroup","tag":"sometag"},
    },
    &gomavproxy.ListenerSpec{
        Upstream: &gomavproxy.ListenerUpstreamSerial{
            Device: "/path/to/device",
            Baudrate: 57600,
        },
        Downstreams: []gomavproxy.ListenerDownstream{
            &gomavproxy.ListenerDownstreamUDP{Host: "127.0.0.1"},
        },
    },
)

Add filter which will be chained with all listeners (exists and further created) which have label with name "tag" and value "sometag". Filter has one handler which will be processing all frames with heartbeat payload from upstream.

filter, err := proxy.RegisterFilter(
    &gomavproxy.FilterMetadata{Name: "first"},
    &gomavproxy.FilterSpec{
        Listeners: &gomavproxy.ObjectSelector{
            Labels: &metav1.LabelSelector{
                MatchLabels: map[string]string{"tag":"sometag"},
            },
        },
        Handlers: []*gomavproxy.FilterHandler{{
            Spec: &gomavproxy.HandlerSpec{
                Async: false, 
                Priority: 0,
            },
            Frame: &gomavproxy.HandlerFrame{
                Source: gomavproxy.FrameSourceUpstream,
                Payload: gomavproxy.FramePayloadTypeMessage((&common.MessageHeartbeat{}).GetID()),
            },
            Exec: func(
                lm *gomavproxy.ListenerMetadata,
                frm *gomavproxy.Frame,
            ) (gomavproxy.HandlerExecResponse, error) {
                return gomavproxy.HandlerExecResponseRewrite(frm.Payload), nil
            },
        }},
    },
)

Add filter which will be chained with all listeners (exists and further created) which have name "superlistener". Filter has one handler which will be removing all frames with request video stream information payload from downstreams.

filter, err := proxy.RegisterFilter(
    &gomavproxy.FilterMetadata{Name: "first"},
    &gomavproxy.FilterSpec{
        Listeners: &gomavproxy.ObjectSelector{
            Names: []string{"superlistener"},
        },
        Handlers: []*gomavproxy.FilterHandler{{
            Spec: &gomavproxy.HandlerSpec{
                Async: false, 
                Priority: 0,
            },
            Frame: &gomavproxy.HandlerFrame{
                Source: gomavproxy.FrameSourceDownstream,
                Payload: gomavproxy.FramePayloadTypeCommandRequestMessage((&common.MessageVideoStreamInformation{}).GetID()),
            },
            Exec: func(
                lm *gomavproxy.ListenerMetadata,
                frm *gomavproxy.Frame,
            ) (gomavproxy.HandlerExecResponse, error) {
                return gomavproxy.HandlerExecResponseRemove, nil
            },
        }},
    },
)

Frame sequence ID control and checksum recalculation

Each mavlink frame has a sequence id field that takes part in the frame loss detection process. Each vehicle component that produces mavlink messages has its pair systemID and componentID. When it sent a message, the sequenceID counter is incrementing. If the consumer sees a difference greater than one between the sequence id current frame and sequence id previous frame within one component then we lost some frame or frames.

That behavior is very simple and useful. But if we want to implement drop or inject frames we need to deal something with that.

That project has an implementation that stores the shift of sequenceIDs for each component separately and rewrites it if it is necessary. When we rewrite some fields in the frame then it becomes invalid. Because each mavlink frame has a checksum field. This leads to checksum recalculation by the proxy for all frames within components with a non-zero shift of sequence id. But if we make inject after dropping within one component then sequence id will become zero again. The more components with non-zero sequence shifts, the higher the delay will be.

TODO List

  • SequenceID auto adjustment by heartbeats
  • More flexible handler subscription on frame

Related projects

Mavlink

Documentation

Index

Constants

View Source
const FramePayloadTypeAny = framePayloadTypeAny(true)
View Source
const HandlerExecResponseRemove = handlerExecResponseRemove(true)

Variables

View Source
var ErrEmptyResult = errors.New("empty result")

Functions

This section is empty.

Types

type Filter

type Filter struct {
	Metadata *FilterMetadata
	Spec     *FilterSpec
}

func (*Filter) DeepCopy

func (f *Filter) DeepCopy() *Filter

type FilterHandler

type FilterHandler struct {
	Spec  *HandlerSpec
	Frame *HandlerFrame
	Exec  HandlerExecFn
}

func (*FilterHandler) DeepCopy

func (fh *FilterHandler) DeepCopy() *FilterHandler

type FilterMetadata

type FilterMetadata struct {
	Name        string
	Labels      map[string]string
	Description string
}

func (*FilterMetadata) DeepCopy

func (m *FilterMetadata) DeepCopy() *FilterMetadata

func (*FilterMetadata) Match

func (m *FilterMetadata) Match(selector *ObjectSelector) bool

type FilterSpec

type FilterSpec struct {
	Listeners *ObjectSelector
	Handlers  []*FilterHandler
}

func (*FilterSpec) DeepCopy

func (fs *FilterSpec) DeepCopy() *FilterSpec

type Frame

type Frame struct {
	SystemID    uint8
	ComponentID uint8
	MessageID   uint32
	Payload     []byte
	Version     uint8
	// contains filtered or unexported fields
}

func (*Frame) DeepCopy

func (f *Frame) DeepCopy() *Frame

type FramePayloadType

type FramePayloadType interface {
	String() string
	// contains filtered or unexported methods
}

type FramePayloadTypeCommand

type FramePayloadTypeCommand uint32

func (FramePayloadTypeCommand) String

func (t FramePayloadTypeCommand) String() string

type FramePayloadTypeCommandAck

type FramePayloadTypeCommandAck uint32

func (FramePayloadTypeCommandAck) String

type FramePayloadTypeCommandRequestMessage

type FramePayloadTypeCommandRequestMessage uint32

func (FramePayloadTypeCommandRequestMessage) String

type FramePayloadTypeMessage

type FramePayloadTypeMessage uint32

func (FramePayloadTypeMessage) String

func (t FramePayloadTypeMessage) String() string

type FrameSource

type FrameSource uint8
const (
	FrameSourceUnknown FrameSource = iota
	FrameSourceUpstream
	FrameSourceDownstream
	FrameSourceAny
)

func (FrameSource) String

func (fs FrameSource) String() string

type Handler

type Handler struct {
	Filter *FilterMetadata
	Spec   *HandlerSpec
	Frame  *HandlerFrame
	// contains filtered or unexported fields
}

type HandlerExecFn added in v0.5.1

type HandlerExecFn func(lm *ListenerMetadata, frm *Frame) (HandlerExecResponse, error)

type HandlerExecResponse added in v0.5.1

type HandlerExecResponse interface {
	// contains filtered or unexported methods
}

type HandlerExecResponseRewrite added in v0.5.1

type HandlerExecResponseRewrite []byte

type HandlerFrame added in v0.5.1

type HandlerFrame struct {
	Source  FrameSource
	Payload FramePayloadType
}

func (*HandlerFrame) DeepCopy added in v0.5.1

func (fhf *HandlerFrame) DeepCopy() *HandlerFrame

type HandlerSpec added in v0.5.1

type HandlerSpec struct {
	Async    bool
	Priority int32
}

func (*HandlerSpec) DeepCopy added in v0.5.1

func (fhs *HandlerSpec) DeepCopy() *HandlerSpec

type Interface

type Interface interface {
	GetListener(name string) (*Listener, error)
	ListListener(selector *ObjectSelector) ([]*Listener, error)
	RegisterListener(metadata *ListenerMetadata, spec *ListenerSpec) (*Listener, error)
	UnregisterListener(selector *ObjectSelector) error

	GetFilter(name string) (*Filter, error)
	ListFilter(selector *ObjectSelector) ([]*Filter, error)
	ListFilterByListener(selector *ObjectSelector) ([]*Filter, error)
	RegisterFilter(metadata *FilterMetadata, spec *FilterSpec) (*Filter, error)
	UnregisterFilter(selector *ObjectSelector) error
	PruneFilters() error

	ListHandlerByFilter(selector *ObjectSelector, source FrameSource, payload FramePayloadType) ([]*Handler, error)
	ListHandlerByListener(selector *ObjectSelector, source FrameSource, payload FramePayloadType) ([]*Handler, error)
}

func New

func New(logger logr.Logger) Interface

type Listener

type Listener struct {
	Metadata *ListenerMetadata
	Spec     *ListenerSpec
	// contains filtered or unexported fields
}

func (*Listener) DeepCopy

func (l *Listener) DeepCopy() *Listener

func (*Listener) InjectFrame

func (l *Listener) InjectFrame(source FrameSource, frame Frame, exceptFilters []string)

type ListenerDownstream

type ListenerDownstream interface {
	DeepCopy() ListenerDownstream
	// contains filtered or unexported methods
}

type ListenerDownstreamTCP

type ListenerDownstreamTCP struct {
	Host string
	Port uint32
}

func (*ListenerDownstreamTCP) DeepCopy

type ListenerDownstreamUDP

type ListenerDownstreamUDP struct {
	Host string
	Port uint32
}

func (*ListenerDownstreamUDP) DeepCopy

type ListenerMavlink struct {
	SystemID          uint8
	ComponentID       uint8
	HeartbeatInterval time.Duration
}

func (*ListenerMavlink) DeepCopy

func (lsm *ListenerMavlink) DeepCopy() *ListenerMavlink

type ListenerMetadata

type ListenerMetadata struct {
	Name   string
	Labels map[string]string
}

func (*ListenerMetadata) DeepCopy

func (m *ListenerMetadata) DeepCopy() *ListenerMetadata

func (*ListenerMetadata) Match

func (m *ListenerMetadata) Match(selector *ObjectSelector) bool

type ListenerSpec

type ListenerSpec struct {
	Upstream    ListenerUpstream
	Downstreams []ListenerDownstream
	Mavlink     *ListenerMavlink
}

func (*ListenerSpec) DeepCopy

func (ls *ListenerSpec) DeepCopy() *ListenerSpec

type ListenerStatusChannel

type ListenerStatusChannel struct {
	Name   string
	Status string
}

type ListenerStatusPhase

type ListenerStatusPhase int8
const (
	ListenerStatusPhaseUnknown ListenerStatusPhase = iota
	ListenerStatusPhaseCreated
	ListenerStatusPhaseRunning
	ListenerStatusPhaseTerminating
	ListenerStatusPhaseError
)

type ListenerUpstream

type ListenerUpstream interface {
	DeepCopy() ListenerUpstream
	// contains filtered or unexported methods
}

type ListenerUpstreamSerial

type ListenerUpstreamSerial struct {
	Device   string
	Baudrate uint32
}

func (*ListenerUpstreamSerial) DeepCopy

type ListenerUpstreamTCP

type ListenerUpstreamTCP struct {
	Host string
	Port uint32
}

func (*ListenerUpstreamTCP) DeepCopy

func (u *ListenerUpstreamTCP) DeepCopy() ListenerUpstream

type ListenerUpstreamUDP

type ListenerUpstreamUDP struct {
	Host string
	Port uint32
}

func (*ListenerUpstreamUDP) DeepCopy

func (u *ListenerUpstreamUDP) DeepCopy() ListenerUpstream

type ObjectSelector

type ObjectSelector struct {
	Names  []string
	Labels *metav1.LabelSelector
}

func Everything

func Everything() *ObjectSelector

func (*ObjectSelector) DeepCopy

func (os *ObjectSelector) DeepCopy() *ObjectSelector

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL