server

package
v0.0.0-...-740c5c8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 29, 2024 License: BSD-3-Clause Imports: 26 Imported by: 0

Documentation

Index

Constants

View Source
const AssetsDir = "assets"

AssetsDir is the assets directory relative to the module root

Variables

View Source
var (
	// Version of executable
	Version = "unknown version"
	// Commit of executable
	Commit = "unknown commit"
)
View Source
var Assets fs.FS

Assets is the embedded assets file system

Functions

func FromControlChannel

func FromControlChannel(in <-chan ControlSnip) <-chan interface{}

FromControlChannel adapts a chan ControlSnip to chan interface

func FromSnipChannel

func FromSnipChannel(in <-chan QuerySnip) <-chan interface{}

FromSnipChannel adapts a chan QuerySnip to chan interface

func NewControlRunner

func NewControlRunner(run func(c <-chan ControlSnip)) func(c <-chan interface{})

NewControlRunner adapts a chan ControlSnip to chan interface

func NewMqttOptions

func NewMqttOptions(
	broker string,
	user string,
	password string,
	clientID string,
) *MQTT.ClientOptions

NewMqttOptions creates MQTT client options

func NewSnipRunner

func NewSnipRunner(run func(c <-chan QuerySnip)) func(c <-chan interface{})

NewSnipRunner adapts a chan QuerySnip to chan interface

func ServeWebsocket

func ServeWebsocket(hub *SocketHub, w http.ResponseWriter, r *http.Request)

ServeWebsocket handles websocket requests from the peer.

func ToControlChannel

func ToControlChannel(in <-chan interface{}) <-chan ControlSnip

ToControlChannel adapts a chan interface to chan ControlSnip

Types

type Broadcaster

type Broadcaster struct {
	sync.Mutex // guard recipients
	// contains filtered or unexported fields
}

Broadcaster acts as hub for broadcating snips to multiple recipients

func NewBroadcaster

func NewBroadcaster(in <-chan interface{}) *Broadcaster

NewBroadcaster creates a Broadcaster that implements a hub and spoke message replication pattern

func (*Broadcaster) Attach

func (b *Broadcaster) Attach() <-chan interface{}

Attach creates and attaches a channel to the broadcaster

func (*Broadcaster) AttachRunner

func (b *Broadcaster) AttachRunner(runner func(<-chan interface{}))

AttachRunner attaches a Run method as broadcast receiver and adds it to the waitgroup

func (*Broadcaster) Done

func (b *Broadcaster) Done() <-chan struct{}

Done returns a channel signalling when broadcasting has stopped

func (*Broadcaster) Run

func (b *Broadcaster) Run()

Run executes the broadcaster

type Cache

type Cache struct {
	sync.Mutex
	// contains filtered or unexported fields
}

Cache caches and aggregates meter reasings

func NewCache

func NewCache(maxAge time.Duration, status *Status, verbose bool) *Cache

NewCache creates new meter reading cache

func (*Cache) Average

func (mc *Cache) Average(device string) (*Readings, error)

Average returns averaged sets of meter readings

func (*Cache) Current

func (mc *Cache) Current(device string) (res *Readings, err error)

Current returns the latest set of meter reading

func (*Cache) Purge

func (mc *Cache) Purge(device string) error

Purge removes accumulated data for specified device

func (*Cache) Run

func (mc *Cache) Run(in <-chan QuerySnip)

Run consumes meter readings into snip cache

func (*Cache) SortedIDs

func (mc *Cache) SortedIDs() []string

SortedIDs returns the sorted list of cache ids

type ControlSnip

type ControlSnip struct {
	Device string
	Status RuntimeInfo
}

ControlSnip wraps device status information

type DeviceInfo

type DeviceInfo interface {
	DeviceDescriptorByID(id string) meters.DeviceDescriptor
}

DeviceInfo returns device descriptor by device id

type DeviceStatus

type DeviceStatus struct {
	Device string
	Type   string
	Online bool
	ModbusStatus
}

DeviceStatus represents a devices runtime status

type Handler

type Handler struct {
	ID      int
	Manager *meters.Manager
	// contains filtered or unexported fields
}

Handler is responsible for querying a single connection

func NewHandler

func NewHandler(id int, m *meters.Manager) *Handler

NewHandler creates a connection handler. The handler is responsible for querying all devices attached to the connection.

func (*Handler) Run

func (h *Handler) Run(
	ctx context.Context,
	control chan<- ControlSnip,
	results chan<- QuerySnip,
)

Run initializes and queries every device attached to the handler's connection

type HomieRunner

type HomieRunner struct {
	// contains filtered or unexported fields
}

HomieRunner publishes query results as homie mqtt topics

func NewHomieRunner

func NewHomieRunner(qe DeviceInfo, cc <-chan ControlSnip, options *MQTT.ClientOptions, qos byte, rootTopic string, verbose bool) *HomieRunner

NewHomieRunner create new runner for homie IoT spec

func (*HomieRunner) Run

func (hr *HomieRunner) Run(in <-chan QuerySnip)

Run MQTT client publisher

type Httpd

type Httpd struct {
	// contains filtered or unexported fields
}

Httpd is an http server

func NewHttpd

func NewHttpd(hub *SocketHub, s *Status, qe DeviceInfo, mc *Cache) *Httpd

NewHttpd creates HTTP daemon

func (*Httpd) Router

func (h *Httpd) Router() *mux.Router

Router returns the root router

func (*Httpd) Run

func (h *Httpd) Run(url string)

Run executes the http server

type Influx

type Influx struct {
	// contains filtered or unexported fields
}

Influx is an InfluxDB v2 publisher

func NewInfluxClient

func NewInfluxClient(
	url string,
	database string,
	measurement string,
	org string,
	token string,
	user string,
	password string,
) *Influx

NewInfluxClient creates new publisher for influx

func (*Influx) Run

func (m *Influx) Run(in <-chan QuerySnip)

Run Influx publisher

type MemoryStatus

type MemoryStatus struct {
	Alloc     uint64
	HeapAlloc uint64
}

MemoryStatus represents daemon memory allocation

type MeterReadings

type MeterReadings struct {
	sync.Mutex
	Current  Readings
	Historic []*Readings
}

MeterReadings holds entire sets of current and recent meter readings for a single device

func NewMeterReadings

func NewMeterReadings(maxAge time.Duration) *MeterReadings

NewMeterReadings container for current and recent meter readings

func (*MeterReadings) Add

func (mr *MeterReadings) Add(snip QuerySnip)

Add adds a meter reading for specified device

func (*MeterReadings) Average

func (mr *MeterReadings) Average(timestamp time.Time) *Readings

Average averages historic readings after given timestamp

func (*MeterReadings) Purge

func (mr *MeterReadings) Purge()

Purge clears meter readings

func (*MeterReadings) TrimBefore

func (mr *MeterReadings) TrimBefore(timestamp time.Time)

TrimBefore removes historic readings older than timestamp

type ModbusStatus

type ModbusStatus struct {
	Requests          uint64
	RequestsPerMinute float64
	Errors            uint64
	ErrorsPerMinute   float64
}

ModbusStatus represents device request and error status

type MqttClient

type MqttClient struct {
	Client MQTT.Client
	// contains filtered or unexported fields
}

MqttClient is a MQTT publisher

func NewMqttClient

func NewMqttClient(
	options *MQTT.ClientOptions,
	qos byte,
	verbose bool,
) *MqttClient

NewMqttClient creates new publisher for MQTT

func (*MqttClient) Publish

func (m *MqttClient) Publish(topic string, retained bool, message interface{})

Publish MQTT message with error handling

func (*MqttClient) WaitForToken

func (m *MqttClient) WaitForToken(token MQTT.Token)

WaitForToken synchronously waits until token operation completed

type MqttRunner

type MqttRunner struct {
	*MqttClient
	// contains filtered or unexported fields
}

MqttRunner allows to attach an MqttClient as broadcast receiver

func NewMqttRunner

func NewMqttRunner(options *MQTT.ClientOptions, qos byte, topic string, verbose bool) *MqttRunner

NewMqttRunner create a new runer for plain MQTT

func (*MqttRunner) Run

func (m *MqttRunner) Run(in <-chan QuerySnip)

Run MqttClient publisher

type QueryEngine

type QueryEngine struct {
	// contains filtered or unexported fields
}

QueryEngine executes queries on connections and attached devices

func NewQueryEngine

func NewQueryEngine(managers map[string]*meters.Manager) *QueryEngine

NewQueryEngine creates new query engine

func (*QueryEngine) DeviceDescriptorByID

func (q *QueryEngine) DeviceDescriptorByID(id string) (res meters.DeviceDescriptor)

DeviceDescriptorByID implements DeviceInfo interface

func (*QueryEngine) Run

func (q *QueryEngine) Run(
	ctx context.Context,
	rate time.Duration,
	control chan<- ControlSnip,
	results chan<- QuerySnip,
)

Run executes the query engine to produce measurement results

type QuerySnip

type QuerySnip struct {
	Device string
	meters.MeasurementResult
}

QuerySnip wraps query results

func (*QuerySnip) MarshalJSON

func (q *QuerySnip) MarshalJSON() ([]byte, error)

MarshalJSON converts QuerySnip to json, replacing Timestamp with unix time representation

func (*QuerySnip) String

func (q *QuerySnip) String() string

String representation

type Readings

type Readings struct {
	sync.Mutex
	Timestamp time.Time
	Values    map[meters.Measurement]float64
}

Readings combines readings of all measurements into one data structure

func (*Readings) Add

func (r *Readings) Add(q QuerySnip)

Add adds the values represented by the QuerySnip to the Readings and updates the current time stamp

func (*Readings) Clone

func (r *Readings) Clone() *Readings

Clone clones a Readings including its values map

func (*Readings) String

func (r *Readings) String() string

type RuntimeInfo

type RuntimeInfo struct {
	Online   bool
	Requests uint64
	Errors   uint64
	// contains filtered or unexported fields
}

RuntimeInfo represents a single modbus device status

func (*RuntimeInfo) Available

func (r *RuntimeInfo) Available(online bool)

Available sets the device online status

func (*RuntimeInfo) IsQueryable

func (r *RuntimeInfo) IsQueryable() (queryable bool, elapsed bool)

IsQueryable determines if a device can be queries. This is the case if either the device is online or the device is offline and the retryTimeout has elapsed. Returns queryable status and if the offline timeout has elapsed.

type SocketClient

type SocketClient struct {
	// contains filtered or unexported fields
}

SocketClient is a middleman between the websocket connection and the hub.

type SocketHub

type SocketHub struct {
	// contains filtered or unexported fields
}

SocketHub maintains the set of active clients and broadcasts messages to the clients.

func NewSocketHub

func NewSocketHub(status *Status) *SocketHub

NewSocketHub creates a web socket hub that distributes meter status and query results for the ui or other clients

func (*SocketHub) Run

func (h *SocketHub) Run(in <-chan QuerySnip)

Run starts data and status distribution

type Status

type Status struct {
	sync.Mutex

	StartTime  time.Time
	UpTime     float64
	Goroutines int
	Memory     MemoryStatus
	Meters     []DeviceStatus
	// contains filtered or unexported fields
}

Status represents the daemon and device status. It is updated when marshaled to JSON

func NewStatus

func NewStatus(qe DeviceInfo, control <-chan ControlSnip) *Status

NewStatus creates status cache that collects device status from control channel. It needs to be Update()d in order to refresh its data for consumption

func (*Status) MarshalJSON

func (s *Status) MarshalJSON() ([]byte, error)

MarshalJSON will syncronize access to the status object see http://choly.ca/post/go-json-marshalling/ for avoiding infinite loop

func (*Status) Online

func (s *Status) Online(device string) bool

Online returns device's online status or false if the device does not exist

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL