publisher

package
v0.0.0-...-162ef27 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 22, 2020 License: MIT Imports: 21 Imported by: 0

Documentation

Overview

Package publisher with handling of input commands

Package publisher with handling of input commands

Package publisher with updating and publishing of node outputs

Package publisher ... - Publishes updates to node, inputs and outputs when they are (re)discovered - configuration of nodes - control of inputs - update of security keys and identity signature Thread-safe. All public functions can be invoked from multiple goroutines

Package publisher with facade functions for nodes, inputs and outputs that work using nodeIDs instead of use of full addresses on the internal Nodes, Inputs and Outputs collections. Mostly intended to reduce boilerplate code in managing nodes, inputs and outputs

Package publisher with handling of configuration commands

Package publisher with handling of input commands

Package publisher with handling of publisher discovery

Package publisher with functions for managing the publisher's identity

Index

Constants

View Source
const (
	// DefaultDiscoveryInterval in which node discovery information is republished
	DefaultDiscoveryInterval = 24 * 3600
	// DefaultPollInterval in which the output values are queried for polling based sources
	DefaultPollInterval = 24 * 3600
)

reserved keywords

View Source
const (
	SigningMethodNone = ""
	SigningMethodJWS  = "jws"
)

Message signing methods

Variables

This section is empty.

Functions

func CreateIdentity

func CreateIdentity(domain string, publisherID string) (
	fullIdentity *iotc.PublisherFullIdentity, privKey *ecdsa.PrivateKey)

CreateIdentity creates a new identity for a domain publisher The validity is 1 year

func IsIdentityExpired

func IsIdentityExpired(identity *iotc.PublisherPublicIdentity) bool

IsIdentityExpired tests if the given identity is expired

func LoadIdentity

func LoadIdentity(folder string, publisherID string) (fullIdentity *iotc.PublisherFullIdentity, privateKey *ecdsa.PrivateKey, err error)

LoadIdentity loads the publisher identity and private key from file in the given folder. The expected identity file is named <publisherID>-identity.json. Returns the identity with corresponding ECDSA private key, or nil if no identity is found If anything goes wrong, err will contain the error and nil identity is returned

func SaveIdentity

func SaveIdentity(folder string, publisherID string, identity *iotc.PublisherFullIdentity) error

SaveIdentity save the full identity of the publisher and its keys in the given folder. The identity is saved as a json file. see also https://stackoverflow.com/questions/21322182/how-to-store-ecdsa-private-key-in-go

func SetupPublisherIdentity

func SetupPublisherIdentity(identityFolder string, domain string, publisherID string) (
	fullIdentity *iotc.PublisherFullIdentity, privKey *ecdsa.PrivateKey)

SetupPublisherIdentity loads the publisher identity and keys from file in the identityFolder. If no identity and keys are found, a self signed identity is created. If the loaded identity is invalid, due to a domain/publisher/address mismatch, or its public key is missing, a new identity is also created. See SaveIdentity for info on how the identity is saved.

identityFolder contains the folder with the identity files, use "" for default config folder (.config/iotc) domain and publisherID are used to define the identity address

Types

type NodeConfigHandler

type NodeConfigHandler func(node *iotc.NodeDiscoveryMessage, config iotc.NodeAttrMap) iotc.NodeAttrMap

NodeConfigHandler callback when command to update node config is received

type NodeInputHandler

type NodeInputHandler func(input *iotc.InputDiscoveryMessage, message *iotc.SetInputMessage)

NodeInputHandler callback when command to update node input is received

type Publisher

type Publisher struct {
	Nodes           *nodes.NodeList           // discovered nodes published by this publisher
	Inputs          *nodes.InputList          // discovered inputs published by this publisher
	Outputs         *nodes.OutputList         // discovered outputs published by this publisher
	OutputForecasts *nodes.OutputForecastList // output forecasts values published by this publisher
	OutputValues    *nodes.OutputValueList    // output values published by this publisher
	// contains filtered or unexported fields
}

Publisher carries the operating state of 'this' publisher

func NewAppPublisher

func NewAppPublisher(appID string, configFolder string, cacheFolder string, appConfig interface{}, persistNodes bool) (*Publisher, error)

NewAppPublisher function for all the boilerplate. This:

  1. Loads messenger config and create messenger instance
  2. Load app config from <appID>.yaml and extract field PublisherID
  3. Create a publisher using the domain from messenger config and publisherID from <appID>.yaml
  4. Set to persist nodes and load previously saved nodes

'appID' is the application ID, used as publisher ID unless overridden in <appID>.yaml. The 'configFolder' location contains the messenger and application configuration files. Use "" for default location (.config/iotc)

The cache folder location contains saved publisher, nodes, inputs and outputs, use "" for default location (.cache/iotc) appConfig optional application object to load <appID>.yaml configuration into. If it contains

a field named 'PublisherID' it will allow override the default publisherID.

persistNodes flags whether to save discovered nodes and their configuration changes.

This returns publisher instance or error if messenger fails to load

func NewPublisher

func NewPublisher(
	identityFolder string,
	cacheFolder string,
	domain string,
	publisherID string,
	signingMethod string,
	messenger messenger.IMessenger,
) *Publisher

NewPublisher creates a publisher instance. This is used for all publications.

The identityFolder contains the publisher identity file <publisherID>-identity.json. "" for default config folder. The identity is written here when it is first created or is renewed by the domain security service. This file only needs to be accessible during publisher startup.

The cacheFolder contains stored discovered nodes, inputs, outputs, and external publishers. It also contains node configuration so deleting these files will remove custom node configuration, for example configuration of alias and name.

domain and publisherID identity this publisher. If the identity file does not match these, it is discarded and a new identity is created. If the publisher has joined the domain and the DSS has issued the identity then changing domain or publisherID invalidates the publisher and it has to rejoin the domain. If no domain is provided, the default 'local' is used.

signingMethod indicates if and how publications must be signed. The default is jws. For testing 'none' can be used.

messenger for publishing onto the message bus

func (*Publisher) Address

func (publisher *Publisher) Address() string

Address returns the publisher's identity address

func (*Publisher) Domain

func (publisher *Publisher) Domain() string

Domain returns the publication domain

func (*Publisher) GetNodeByID

func (publisher *Publisher) GetNodeByID(nodeID string) (node *iotc.NodeDiscoveryMessage)

GetNodeByID returns a node from this publisher or nil if the id isn't found in this publisher This is a convenience function as publishers tend to do this quite often

func (*Publisher) GetNodeConfigBool

func (publisher *Publisher) GetNodeConfigBool(
	nodeID string, attrName iotc.NodeAttr, defaultValue bool) (value bool, err error)

GetNodeConfigBool convenience function to get a node configuration value as a boolean This retuns the given default if no configuration value exists and no configuration default is set

func (*Publisher) GetNodeConfigFloat

func (publisher *Publisher) GetNodeConfigFloat(
	nodeID string, attrName iotc.NodeAttr, defaultValue float32) (value float32, err error)

GetNodeConfigFloat convenience function to get a node configuration value as a float number This retuns the given default if no configuration value exists and no configuration default is set

func (*Publisher) GetNodeConfigInt

func (publisher *Publisher) GetNodeConfigInt(
	nodeID string, attrName iotc.NodeAttr, defaultValue int) (value int, err error)

GetNodeConfigInt convenience function to get a node configuration value as an integer This retuns the given default if no configuration value exists and no configuration default is set

func (*Publisher) GetNodeConfigString

func (publisher *Publisher) GetNodeConfigString(
	nodeID string, attrName iotc.NodeAttr, defaultValue string) (value string, err error)

GetNodeConfigString convenience function to get a node configuration value as a string This retuns the given default if no configuration value exists and no configuration default is set

func (*Publisher) GetNodeStatus

func (publisher *Publisher) GetNodeStatus(nodeID string, attrName iotc.NodeStatus) (value string, exists bool)

GetNodeStatus returns a node's status attribute This is a convenience function. See NodeList.GetNodeStatus for details

func (*Publisher) GetOutputByType

func (publisher *Publisher) GetOutputByType(nodeID string, outputType iotc.OutputType, instance string) *iotc.OutputDiscoveryMessage

GetOutputByType returns a node output object using node id and output type and instance This is a convenience function using the publisher's output list

func (*Publisher) GetPublisherKey

func (publisher *Publisher) GetPublisherKey(address string) *ecdsa.PublicKey

GetPublisherKey returns the public key of the publisher contained in the given address The address must at least contain a domain, publisherId and message type

func (*Publisher) Identity

func (publisher *Publisher) Identity() *iotc.PublisherFullIdentity

Identity return this publisher's full identity

func (*Publisher) LoadFromCache

func (publisher *Publisher) LoadFromCache(folder string, autosave bool) error

LoadFromCache loads previously cached nodes of this publisher, and discovered publishers in the domain. If a node file exists in the given folder the nodes will be added/updated. Existing nodes will be replaced. If autosave is set then save this publisher's nodes and configs when updated.

  • folder with the cache files. Use "" for default, which is persist.DefaultCacheFolder: <userhome>/.cache/iotc
  • autosave indicates to save updates to node configuration

returns error if folder doesn't exist

func (*Publisher) Logger

func (publisher *Publisher) Logger() *log.Logger

Logger returns the publication logger

func (*Publisher) MakeNodeDiscoveryAddress

func (publisher *Publisher) MakeNodeDiscoveryAddress(nodeID string) string

MakeNodeDiscoveryAddress makes the node discovery address using the publisher domain and publisherID

func (*Publisher) NewInput

func (publisher *Publisher) NewInput(nodeID string, inputType iotc.InputType, instance string) *iotc.InputDiscoveryMessage

NewInput creates a new node input and adds it to this publisher inputs list returns the input to allow for easy update

func (*Publisher) NewNode

func (publisher *Publisher) NewNode(nodeID string, nodeType iotc.NodeType) string

NewNode creates a new node and add it to this publisher's discovered nodes This is a convenience function that uses the publisher domain and id to create a node in its node list. returns the node's address

func (*Publisher) NewOutput

func (publisher *Publisher) NewOutput(nodeID string, outputType iotc.OutputType, instance string) *iotc.OutputDiscoveryMessage

NewOutput creates a new node output adds it to this publisher outputs list This is a convenience function for the publisher.Outputs list returns the output object to allow for easy updates

func (*Publisher) PublishConfigureNode

func (publisher *Publisher) PublishConfigureNode(remoteNodeAddress string, attr iotc.NodeAttrMap, encryptionKey *ecdsa.PublicKey)

PublishConfigureNode updates the configuration of a remote node by this publisher The signed message will be encrypted with the given encryption key

func (*Publisher) PublishIdentity

func (publisher *Publisher) PublishIdentity()

PublishIdentity publishes this publisher's identity on startup or update

func (*Publisher) PublishRaw

func (publisher *Publisher) PublishRaw(output *iotc.OutputDiscoveryMessage, sign bool, value []byte)

PublishRaw immediately publishes the given value of a node, output type and instance on the $raw output address. The content can be signed but is not encrypted. This is intended for publishing large values that should not be stored, for example images

func (*Publisher) PublishSetInput

func (publisher *Publisher) PublishSetInput(remoteNodeInputAddress string, value string, encryptionKey *ecdsa.PublicKey)

PublishSetInput sets the input of a remote node by this publisher The signed message will be encrypted with the given encryption key

func (*Publisher) PublishUpdatedDiscoveries

func (publisher *Publisher) PublishUpdatedDiscoveries()

PublishUpdatedDiscoveries publishes updated nodes, inputs and outputs discovery messages If updates are available then nodes are saved

func (*Publisher) PublishUpdatedOutputValues

func (publisher *Publisher) PublishUpdatedOutputValues()

PublishUpdatedOutputValues publishes pending updates to output values not thread-safe, using within a locked section

func (*Publisher) PublisherID

func (publisher *Publisher) PublisherID() string

PublisherID returns the publisher's ID

func (*Publisher) SetDiscoveryInterval

func (publisher *Publisher) SetDiscoveryInterval(interval int, handler func(publisher *Publisher))

SetDiscoveryInterval is a convenience function for periodic update of discovered nodes, inputs and outputs. Intended for publishers that need to poll for discovery.

interval in seconds to perform another discovery. Default is DefaultDiscoveryInterval handler is the callback with the publisher for publishing discovery

func (*Publisher) SetLogging

func (publisher *Publisher) SetLogging(levelName string, filename string)

SetLogging sets the logging level and output file for this publisher Intended for setting logging from configuration levelName is the requested logging level: error, warning, info, debug filename is the output log file full name including path

func (*Publisher) SetNodeAttr

func (publisher *Publisher) SetNodeAttr(nodeID string, attrParams map[iotc.NodeAttr]string) (changed bool)

SetNodeAttr sets one or more attributes of the node This only updates the node if the status or lastError message changes

func (*Publisher) SetNodeConfigHandler

func (publisher *Publisher) SetNodeConfigHandler(
	handler func(node *iotc.NodeDiscoveryMessage, config iotc.NodeAttrMap) iotc.NodeAttrMap)

SetNodeConfigHandler set the handler for updating node configuration

func (*Publisher) SetNodeErrorStatus

func (publisher *Publisher) SetNodeErrorStatus(nodeID string, status string, lastError string)

SetNodeErrorStatus sets the node RunState to the given status with a lasterror message Use NodeRunStateError for errors and NodeRunStateReady to clear error This only updates the node if the status or lastError message changes

func (*Publisher) SetNodeInputHandler

func (publisher *Publisher) SetNodeInputHandler(
	handler func(input *iotc.InputDiscoveryMessage, message *iotc.SetInputMessage))

SetNodeInputHandler set the handler for updating node inputs

func (*Publisher) SetNodeStatus

func (publisher *Publisher) SetNodeStatus(nodeID string, status map[iotc.NodeStatus]string) (changed bool)

SetNodeStatus sets one or more status attributes of the node This only updates the node if the status or lastError message changes

func (*Publisher) SetPollInterval

func (publisher *Publisher) SetPollInterval(seconds int, handler func(publisher *Publisher))

SetPollInterval is a convenience function for periodic update of output values seconds interval to perform another poll. Default (0) is DefaultPollInterval intended for publishers that need to poll for values

func (*Publisher) SetSigningMethod

func (publisher *Publisher) SetSigningMethod(signingMethod string)

SetSigningMethod sets the signing method: JWS or none for publications. Default is SigningMethodJWS

func (*Publisher) Start

func (publisher *Publisher) Start()

Start publishing and listen for configuration and input messages This will create the publisher node and load previously saved nodes Start will fail if no messenger has been provided. persistNodes will load previously saved nodes at startup and save them on configuration change

func (*Publisher) Stop

func (publisher *Publisher) Stop()

Stop publishing Wait until the heartbeat loop has finished processing messages

func (*Publisher) UpdateNodeConfig

func (publisher *Publisher) UpdateNodeConfig(nodeID string, attrName iotc.NodeAttr, configAttr *iotc.ConfigAttr)

UpdateNodeConfig updates a node's configuration and publishes the updated node.

If a config already exists then its value is retained but its configuration parameters are replaced. Nodes are immutable. A new node is created and published and the old node instance is discarded.

func (*Publisher) UpdateOutputValue

func (publisher *Publisher) UpdateOutputValue(nodeID string, outputType iotc.OutputType, instance string, newValue string) bool

UpdateOutputValue adds the new node output value to the front of the value history See NodeList.UpdateOutputValue for more details

func (*Publisher) WaitForSignal

func (publisher *Publisher) WaitForSignal()

WaitForSignal waits until a TERM or INT signal is received

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL