nanodm

package module
v0.0.8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 18, 2021 License: MIT Imports: 10 Imported by: 0

README

nanodm

A nanomsg based data model library. Allows multiple sources to feed a single data model.

Overview

Multiple data model sources can be implemented and connected to the coordinator server to implement a data model. The coordinator server then knows how to route Get and Set requests to the source that handles each data model object.

A bit about routing

If the following sources were registered with the coordinator server:

  • Source 1 objects:
    • Device.DeviceInfo.MemoryStatus.Total
    • Device.DeviceInfo.MemoryStatus.Free
  • Source 2 objects:
    • Device.WiFi.RadioNumberOfEntries
    • Device.WiFi.SSIDNumberOfEntries
    • Device.WiFi.Radio.0.Enable

Any calls to get/set the object Device.DeviceInfo.MemoryStatus.Total would be routed to source 1 and any calls to get/set the object Device.WiFi.RadioNumberOfEntries would be routed to source 2.

Source Example

A source must implement the GetObjects/SetObjects/AddRow/DeleteRow handlers interface.

type ExampleSource struct {}

// Called by the coordinator to get objects registered by this source
func (ex *ExampleSource) GetObjects(objectNames []string) (objects []nanodm.Object, err error) {
    return objects, err
}

// Called by the coordinator to set objects registered by this source
func (ex *ExampleSource) SetObjects(objects []nanodm.Object) error {
    return nil
}

// Called to add a row to a dynamic list.  For example:  Device.NAT.PortMapping.{i}.
// Should return the name of the newly created row
func (ex *ExampleSource) AddRow(objects nanodm.Object) (row string, err error) {
    return row, err
}

// Called to delete a row to a dynamic list.  For example:  Device.NAT.PortMapping.{i}.
func (ex *ExampleSource) DeleteRow(row nanodm.Object) error {
    return nil
}

Start the source to start receiving callbacks on the interface above.

log := logrus.NewEntry(logrus.New())
sourceName := "ExampleSource"
coordinatorUrl := "tcp://127.0.0.1:4500"
sourceUrl := "tcp://127.0.0.1:4501"
exampleSource := ExampleSource{}

// Create the new source passing the custom SourceHandler
source := source.NewSource(log, sourceName, coordinatorUrl, sourceUrl, exampleSource)
// Connect
source.Connect()

// Register a static DM entry, and a dynamic table/list
exampleObjects := []nanodm.Object{{
		Name:   "Device.DeviceInfo.MemoryStatus.Total",
		Access: nanodm.AccessRO,
		Type:   nanodm.TypeInt,
	},
    {
		Name:   "Device.Custom.Dynamic.",
		Access: nanodm.AccessRW,
		Type:   nanodm.TypeDynamicList,
	},
}
// Call register with a list of objects the source owns
err := source.Register(exampleObjects)

Once the example above is registered the server side will route all requests Get for the object Device.DeviceInfo.MemoryStatus.Total to this source.

Further all requests (Set/Get/AddRow/DeleteRow) for Device.NAT.PortMapping.* will be routed to this source.

Coordinator Server Example

A Coordinator must implement the Registered/Unregistered/UpdateObjects interface. For example:

type ExampleCoordinator struct {}

func (ch *ExampleCoordinator) Registered(server *coordinator.Server, sourceName string, objects []nanodm.Object) error {
	return nil
}

func (ch *ExampleCoordinator) Unregistered(server *coordinator.Server, sourceName string, objects []nanodm.Object) error {
	return nil
}

func (ch *ExampleCoordinator) UpdateObjects(server *Server, sourceName string, objects []nanodm.Object, deletedObjects map[string]nanodm.Object) error {
	return nil
}

Once the handler interface is implemented the server can be started:

url := "tcp://127.0.0.1:4500"

log := logrus.NewEntry(logrus.New())

exCoordinator := &ExampleCoordinator{}

server := coordinator.NewServer(log, url, exCoordinator)
err := server.Start()

Once the server is running the following APIs can be called to access registered sources:

Get an object (or list objects):

objs, errs := server.Get([]string{"Device.DeviceInfo.MemoryStatus.Total"})

Set an object:

err := server.Set(nanodm.Object{
    Name:  "Device.WiFi.Radio.0.Enable",
    Value: true,
    Type:  nanodm.TypeString,
})

Add a row to a dynamic list of a source:

// Define the new row
newRow := map[string]interface{}{
    "Description":          "Test",
    "Enable":               "false",
    "ExternalPort":         "210",
    "ExternalPortEndRange": "210",
    "InternalClient":       "10.0.0.48",
    "Protocol":             "BOTH",
}

// Add Row to dynamic list entry
row, err := server.AddRow(nanodm.Object{
    Name:  "Device.NAT.PortMapping.",
    Value: newRow,
    Type:  nanodm.TypeRow,
})
// row will have the new row path including the dynamically allocated index

Deletes a row to a dynamic list of a source:

err := server.DeleteRow(nanodm.Object{
    Name: "Device.NAT.PortMapping.1",
    Type: nanodm.TypeRow,
})

Development

Running tests:

go test ./... 

Documentation

Index

Constants

View Source
const (
	RETRY_PERIOD     = 2 * time.Second
	MAX_RETRY_PERIOD = time.Minute
)

Variables

This section is empty.

Functions

func GetTransactionUID

func GetTransactionUID() uuid.UUID

Types

type ConcurrentMessageMap

type ConcurrentMessageMap struct {
	// contains filtered or unexported fields
}

A thread save map for tracking transaction IDs

func NewConcurrentMessageMap

func NewConcurrentMessageMap() *ConcurrentMessageMap

func (*ConcurrentMessageMap) Delete

func (cm *ConcurrentMessageMap) Delete(key string)

func (*ConcurrentMessageMap) Get

func (cm *ConcurrentMessageMap) Get(key string) *Message

func (*ConcurrentMessageMap) Set

func (cm *ConcurrentMessageMap) Set(key string, message Message)

func (*ConcurrentMessageMap) WaitForKey

func (cm *ConcurrentMessageMap) WaitForKey(key string, timeout time.Duration) (*Message, error)

type Message

type Message struct {
	Type           MessageType `json:"type"`
	TransactionUID uuid.UUID   `json:"transactionUID,omitempty"`
	SourceName     string      `json:"sourceName,omitempty"`
	Source         string      `json:"source,omitempty"`
	Destination    string      `json:"destination,omitempty"`
	Objects        []Object    `json:"object,omitempty"`
	Error          string      `json:"error,omitempty"`
}

type MessageType

type MessageType uint
const (
	RegisterMessageType MessageType = iota
	UnregisterMessageType
	UpdateObjectsMessageType
	SetMessageType
	GetMessageType
	AckMessageType
	NackMessageType
	ListMessagesType
	PingMessageType
	AddRowMessageType
	DeleteRowMessageType
)

type Object

type Object struct {
	Name          string       `json:"name"`
	Access        ObjectAccess `json:"access"`
	Type          ObjectType   `json:"type"`
	IndexableFrom string       `json:"indexablefrom,omitempty"`
	Value         interface{}  `json:"value,omitempty"`
}

func GetObjectsFromMap

func GetObjectsFromMap(objMap map[string]Object) (objects []Object)

type ObjectAccess

type ObjectAccess uint
const (
	AccessRW ObjectAccess = iota
	AccessRO
)

type ObjectType

type ObjectType uint
const (
	TypeString ObjectType = iota
	TypeInt
	TypeUnsignedInt
	TypeBool
	TypeDateTime
	TypeBase64
	TypeLong
	TypeUnsignedLong
	TypeFloat
	TypeDouble
	TypeByte
	TypeRow         ObjectType = 10000000
	TypeDynamicList ObjectType = 100000000
)

type Puller

type Puller struct {
	// contains filtered or unexported fields
}

func NewPuller

func NewPuller(log *logrus.Entry, url string, messageChan chan Message) *Puller

func (*Puller) Start

func (pu *Puller) Start() error

func (*Puller) Stop

func (pu *Puller) Stop() error

type Pusher

type Pusher struct {
	// contains filtered or unexported fields
}

func NewPusher

func NewPusher(log *logrus.Entry, url string, messageChan chan Message) *Pusher

func (*Pusher) Start

func (pu *Pusher) Start() error

func (*Pusher) Stop

func (pu *Pusher) Stop() error

Directories

Path Synopsis
examples

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL