otto

package module
v0.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 14, 2024 License: Apache-2.0 Imports: 12 Imported by: 0

README

Sensor Station

Sensor Station gathers data via MQTT from any number of publishers, which are typically battery powered, wireless sensors spread around a certain location.

Overview

MQTT is the key

MQTT Broker
  • Run MQTT broker, e.g. mosquitto

  • Base topic "ss//data/"

Example: ss/00:95:fb:3f:34:95/data/tempc 25.00

Web Sockets

We sockets or HTTP/2 will be used to send data to and from the IOTe device (otto) in our case.

Subscribe to Topics
  • announce/station - announces stations that control or collect

  • announce/hub - announces hubs, typ

  • data/tempc/ - data can have option /index at the end

  • data/humidity

  • control/relay/idx - control can have option /index at the end

REST API

  • GET /api/config

  • PUT /api/config data => { config: id, ... }

  • GET /api/data

  • GET /api/stations

Station Manager

  • Collection of stations
  • Stations can age out
Stations
  • ID (name, IP and mac address)
  • Capabilities
    • sensors
    • relay

Data

Data can be optimized and we expect we will want to optimize different data for all kinds of reasons and we won't preclude that from happening, we'll give applications the flexibility to handle data elements as they see fit (can optimize).

We will take an memory expensive approach, every data point can be handled on it's own. The data structure will be:

struct Data
    Source ID
    Type
    Timestamp
    Value

User Interface

Build

  1. Install Go
  2. go get ./...
  3. cd ss; go build

That should leave the execuable 'sensors' in the 'sensors' directory as so:

./station/sensors/sensors

Deploy

  1. Install and run an MQTT broker on the sensors host (e.g. mosquitto).

  2. Start the sensors program ensuring the sensor station has connected to a wifi network.

  3. Put batteries in sensors and let the network build itself.

Testing

Fake Websocket Data
% ./ss -fake-ws
% ./ss -help

This will open the following URL for the fake websocket data:

http://localhost:8011/ws

Replace localhost with a hostname or IP if needed. Have the websocket connect to the URL and start spitting out fake data formatted like this:

{"year":2020,"month":12,"day":10,"hour":20,"minute":48,"second":8,"action":"setTime"}
{"K":"tempf","V":88}
{"K":"soil","V":0.49}
{"K":"light","V":0.62}
{"K":"humid","V":0.12}

Adding Automated Builds

Automated builds will be using Github events.

Documentation

Overview

The iote package was designed to help build IoT edge software for on premise edge devices managing a large number of IoT Stations.

The package provides

- MQTT messaging amoung IoT stations and control software - HTTP REST Server for data gathering and configuration - Websockets for realtime bidirectional communication with UI - Web server for mondern web based User Interface - Station manager to track a variety of IoT stations

Messaging Based

The primary communication model for IoTe is a messaging system based on MQTT. These messages can be broke into the following categories

- Meta Data that help describe the "network" infrastructure - Sensor Data that produces data gathered by sensors - Control Data that represents actions to be performed by stations

The topic format used by MQTT is flexible but generally follows the following formats:

## ss/m/<source>/<type> -> { station-informaiton }

Where ss/m == sensor station, <source> is the station Id or source of the message and type represents the specific type of information.

### Meta Data (Station Information)

For example when a station comes alive it can provide some information about itself using the topic:

```ss/m/be:ef:ca:fe:02/station```

The station will announce itself along with some meta information and it's capabilities. The body of the message might look something like this:

```json

{
	"id": "be:ef:ca:fe:02",
	"ip": "10.11.24.24",
    "sensors": [
		"tempc",
		"humidity",
		"light"
	],
	"relays": [
		"heater",
		"light"
	],
}

```

### Sensor Data

Sensor data takes on the form:

```ss/d/<source>/<sensor>/<index>```

Where the source is the Station ID publishing the respective data. The sensor is the type of data being produced (temp, humidity, lidar, GPS).

The index is optional in situations where they may be more than one similar device or sensor, for example a couple of rotation counters on wheels.

The value published by the sensors is typically going to be floating point, however these values may also be integer or string values, including nema-0183.

### Control Data

```ss/c/<source>/<device>/<index>```

This is essentially the same as the sensor except that control commands are used to have a particular device change, for example turning a relay on or off.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Config

type Config interface {
	GetAddr() string
	GetAppdir() string
	GetBroker() string
}

type Dispatcher

type Dispatcher struct {
	InQ    chan *Msg
	StoreQ chan *Msg
	WebQ   map[chan *Station]chan *Station
}

Dispatcher accepts

func NewDispatcher

func NewDispatcher() (d *Dispatcher)

func (*Dispatcher) AddWebQ

func (d *Dispatcher) AddWebQ() chan *Station

func (*Dispatcher) FreeWebQ

func (d *Dispatcher) FreeWebQ(c chan *Station)

type MQTT

type MQTT struct {
	ID     string
	Broker string
	Debug  bool

	gomqtt.Client
	// contains filtered or unexported fields
}

func (*MQTT) Connect

func (m *MQTT) Connect()

func (MQTT) Publish

func (m MQTT) Publish(topic string, value interface{})

Publish will publish a value to the given channel

func (*MQTT) Start

func (m *MQTT) Start()

func (*MQTT) Subscribe

func (m *MQTT) Subscribe(id string, path string, f gomqtt.MessageHandler)

ss/<ethaddr>/<data>/tempf value ss/<ethaddr>/<data>/humidity value

type Msg

type Msg struct {
	ID   int64      `json:"id"`
	Type string     `json:"type"`
	Data MsgStation `json:"station"`

	time.Time `json:"time"`
}

Msg holds a value and some type of meta data to be pass around in the system.

func MsgFromMQTT

func MsgFromMQTT(topic string, payload []byte) (m *Msg, err error)

func (*Msg) String

func (m *Msg) String() string

type MsgStation

type MsgStation struct {
	ID      string             `json:"id"`
	Sensors map[string]float64 `json:"sensors"`
	Relays  map[string]bool    `json:"relays"`
}

type OttO

type OttO struct {
	*Dispatcher
	*MQTT
	*Server
	*Store

	Plugins []string

	Done chan bool
}
var (
	O *OttO = nil
)

func NewOttO

func NewOttO() *OttO

func (*OttO) LoadPlugin

func (o *OttO) LoadPlugin(p string)

func (*OttO) LoadPlugins

func (o *OttO) LoadPlugins(plugins []string)

func (*OttO) Publish

func (o *OttO) Publish(t string, v interface{})

func (*OttO) Register

func (o *OttO) Register(path string, h http.Handler)

func (*OttO) Start

func (o *OttO) Start()

func (*OttO) Subscribe

func (o *OttO) Subscribe(topic string, s Sub)

Subscribe to MQTT Message the Sub (Subscriber) will is an Interface that must have a Callback(topic string, payload []byte) signature

type Ping

type Ping struct {
}

Ping is a full fledged Request handler, You can write your own!

func (Ping) ServeHTTP

func (p Ping) ServeHTTP(w http.ResponseWriter, r *http.Request)

ServeHTTP will respond to the writer with 'Pong'

type Rando

type Rando struct {
	F float64
}

PeriodicRandomData will collected a new random piece of data every period and transmit it to the given mqtt channel

func NewRando

func NewRando() (r *Rando)

func (Rando) Get

func (p Rando) Get() interface{}

func (Rando) GetFloat

func (p Rando) GetFloat() float64

func (Rando) ServeHTTP

func (p Rando) ServeHTTP(w http.ResponseWriter, r *http.Request)

type Sensor

type Sensor struct {
	ID        string `json:"id"`
	LastValue interface{}
	LastHeard time.Time `json:"-"`
}

func (*Sensor) Update

func (s *Sensor) Update(value interface{}, t time.Time)

type Server

type Server struct {
	Addr   string
	Appdir string
}

Server serves up HTTP on Addr (default 0.0.0.0:8011) It takes care of REST API, serving the web app if Appdir does not equal nil and initial Websocket upgrade

func (*Server) Register

func (s *Server) Register(p string, h http.Handler)

Register to handle HTTP requests for particular paths in the URL or MQTT channel.

func (*Server) ServeHTTP

func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request)

ServeHTTP provides a REST interface to the config structure

func (*Server) Start

func (s *Server) Start()

Start the HTTP server after registering REST API callbacks and initializing the Web application directory

type Station

type Station struct {
	ID         string        `json:"id"`
	LastHeard  time.Time     `json:"last-heard"`
	Expiration time.Duration `json:"expiration"` // how long to timeout a station

	Sensors map[string]float64 `json:"sensors"`
	Relays  map[string]bool    `json:"relays"`
	// contains filtered or unexported fields
}

Station is the primary structure that holds an array of Sensors which in turn hold a timeseries of datapoints.

func NewStation

func NewStation(id string) (st *Station)

NewStation creates a new Station with an ID as provided by the first parameter

func (*Station) Relay

func (s *Station) Relay(id string, v string)

func (*Station) Stop

func (s *Station) Stop()

Stop the station from advertising

func (*Station) Update

func (s *Station) Update(msg *Msg)

Update() will append a new data value to the series of data points.

type StationEvent

type StationEvent struct {
	Type      string `json:"type"`
	Device    string `json:"device"`
	StationID string `json:"stationid"`
	Value     string `json:"value"`
}

type StationManager

type StationManager struct {
	Stations map[string]*Station `json:"stations"`
	Stale    map[string]*Station `json:"stale"`
	EventQ   chan *StationEvent
	// contains filtered or unexported fields
}

StationManager keeps track of all the stations we have seen

var (
	Stations *StationManager
)

func NewStationManager

func NewStationManager() (sm *StationManager)

func (*StationManager) Add

func (sm *StationManager) Add(st string) (station *Station, err error)

func (*StationManager) Count

func (sm *StationManager) Count() int

func (*StationManager) Get

func (sm *StationManager) Get(stid string) *Station

func (StationManager) ServeHTTP

func (sm StationManager) ServeHTTP(w http.ResponseWriter, r *http.Request)

func (*StationManager) Update

func (sm *StationManager) Update(msg *Msg) (st *Station)

type Store

type Store struct {
	Source map[string]map[string]float64
	StoreQ chan *Msg
}

func NewStore

func NewStore() *Store

func (*Store) Store

func (s *Store) Store(msg *Msg) error

type Sub

type Sub interface {
	Callback(string, []byte)
}

type Subscriber

type Subscriber struct {
	ID   string
	Path string
	gomqtt.MessageHandler
}

func (*Subscriber) String

func (sub *Subscriber) String() string

type Websock

type Websock struct {
	// contains filtered or unexported fields
}

func (Websock) ServeHTTP

func (ws Websock) ServeHTTP(w http.ResponseWriter, r *http.Request)

Directories

Path Synopsis
plugins

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL