digdispatch

package module
v0.0.26 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 11, 2019 License: MIT Imports: 8 Imported by: 0

README

DigDispatch

A dispatch queue for a websocet server.

Documentation

Index

Constants

View Source
const (
	ACTION_ID        = iota // ACTION_ID == The "identify" action
	ACTION_SUBSCRIBE = iota
	ACTION_PUBLISH   = iota
)
View Source
const (
	DRIVE_SET = iota // reciever should set physical motors to this state.
	DRIVE_GET = iota // reciever should report the motor state to UX.
)

Variables

View Source
var MessageQueue *goconcurrentqueue.FIFO

MessageQueue is a structure to handle inbound messages if a queue is needed.

Functions

This section is empty.

Types

type ActionMessage

type ActionMessage struct {
	ActionType int     // which action is this?
	Payload    Message // The needed data to handle the message.
}

ActionMessage is a struct to simplify client/server communication

func TryParseActionMessage

func TryParseActionMessage(stream []byte) (*ActionMessage, error)

TryParseActionMessage receives a byte stream, tries to make an actionmessage from it.

func (*ActionMessage) FromBytes

func (webAction *ActionMessage) FromBytes(stream []byte)

FromBytes is to comply with the Serializable interface.

func (*ActionMessage) ToBytes

func (webAction *ActionMessage) ToBytes(clientID string) []byte

ToBytes -- to comply with the serializable interface.

type ActionQueue added in v0.0.3

type ActionQueue struct {
	// contains filtered or unexported fields
}

ActionQueue is a struct to simplify internal client dispatches

func (*ActionQueue) AddCallback added in v0.0.13

func (queueInstance *ActionQueue) AddCallback(key string, callback func(msg *Message, params ...interface{}))

AddCallback adds a callback to the subscription queue.

func (*ActionQueue) Identify added in v0.0.3

func (queueInstance *ActionQueue) Identify(clientID string)

Identify creates an action message, which it then sends via the link.

func (*ActionQueue) Init added in v0.0.3

func (queueInstance *ActionQueue) Init(massagePump chan []byte)

Init prepares the action queue for work

func (*ActionQueue) ProcessMessage added in v0.0.3

func (queueInstance *ActionQueue) ProcessMessage(msg *Message)

ProcessMessage a message by Generating a key, calling the correct function callback.

func (*ActionQueue) PublishMessage added in v0.0.4

func (queueInstance *ActionQueue) PublishMessage(msg Serializable)

PublishMessage actually publishes the message...

func (*ActionQueue) Subscribe added in v0.0.3

func (queueInstance *ActionQueue) Subscribe(Notify string, Topic string, callback func(msg *Message, params ...interface{}))

Subscribe generates a subscription message and sends it to the service, then holds a callback for those messages when receieved from a server.

Notify -- that's the clientID for my caller
Topic -- the topic the caller is interested in
callback -- the function to call when the server pushes the right message to me

type DriveCommand

type DriveCommand struct {
	Command string // what do we want to send?
}

DriveCommand is Serializable, sent from controllers to robots.

func (DriveCommand) FromBytes

func (robot DriveCommand) FromBytes(stream []byte)

FromBytes finishes up the Serializable interface

func (DriveCommand) ToBytes

func (robot DriveCommand) ToBytes() []byte

ToBytes required

type DriveState

type DriveState struct {
	Direction       int // is this a set ( 0 ) or a get ( 1 )
	LeftMotorPower  int // from -100 .. 100, % power to left motor.
	RightMotorPower int // same as left
}

DriveState is a serializable information struct, sent from robot to the controller.

func (DriveState) FromBytes

func (robot DriveState) FromBytes(stream []byte)

FromBytes finishes up the Serializable interface

func (DriveState) ToBytes

func (robot DriveState) ToBytes() []byte

ToBytes calls a "generic" serializer.

type Message

type Message struct {
	MetaData      MessageMetaData // info to process the message
	MessageBuffer []byte          // the message itself
}

Message is the interchange type. Everything should .

func NewMessage

func NewMessage(sender string, topic string, buffer []byte) *Message

NewMessage creates is a simpification function to create a Message with fields

func TryParseMessage

func TryParseMessage(byteStream []byte) (*Message, error)

TryParseMessage attempts to convert the bytestream to a message.

func (*Message) Copy added in v0.0.5

func (thisMessage *Message) Copy(msg Message)

Copy makes a copy of a message.

func (*Message) Pickup

func (thisMessage *Message) Pickup(who string) bool

Pickup sets the pickup time of a message, determines if this string is already in the pickup table.

func (Message) ToBytes

func (thisMessage Message) ToBytes() ([]byte, error)

ToBytes creates a bytestream from this structure.

type MessageMetaData

type MessageMetaData struct {
	Sender        string    // who sent this?
	Topic         string    // What are we informing?
	TemporalShake TimeShake // when did things happen?
	IsPickedUp    bool      // Has this been picked up?
	// contains filtered or unexported fields
}

MessageMetaData hold metadata about any message, can be composited to anything that may need this data for processing.

func NewMessageMetaData

func NewMessageMetaData() MessageMetaData

NewMessageMetaData Makes a new, initialized message metadata item.

type NetworkedTopicMap

type NetworkedTopicMap map[string][]string

NetworkedTopicMap represents nodes in a connection graph and what topics those nodes want notifications for.

func (NetworkedTopicMap) GetKeys

func (myMap NetworkedTopicMap) GetKeys() []string

GetKeys provides the network "ids" of nodes. in the connetion graph.

type PublishMessage added in v0.0.24

type PublishMessage struct {
	PublishedTopic string   // who is the publisher of this message?
	MessageData    *Message // what's the message?
}

PublishMessage manages new publications

type Reducable added in v0.0.25

type Reducable interface {
	AddToReductionQueue(element interface{})
	AddManyToReductionQueue(list []interface{})
	Reduce()
}

Reducable aka "reduce-able" is a way for a message to collapse an array to a single element.

type Serializable

type Serializable interface {
	ToBytes() []byte
	FromBytes([]byte)
}

Serializable requires that all message data can go to/from byte slices

type TimeShake

type TimeShake struct {
	EnquedTime       time.Time // when did we enque this?
	AcknowledgedTime time.Time // when did we acknowledge the task?
}

TimeShake is a time handshake, used to track when an object gets recieved.

func NewTimeShake

func NewTimeShake() TimeShake

NewTimeShake creates a timeshake instance and returns it.

type Weblink struct {
	Server        string          // the web server we need
	SecurityToken string          // a random GUID we need to always transmit on each command.
	URL           url.URL         // Whereto?
	Conn          *websocket.Conn // Let's keep this connection alive...
	// contains filtered or unexported fields
}

Weblink is the information we need to send/recieve data from the web.

func (*Weblink) Close

func (webhook *Weblink) Close()

Close closes the weblink.

func (*Weblink) Connect

func (webhook *Weblink) Connect(header http.Header)

Connect starts the Connection

func (*Weblink) Init

func (webhook *Weblink) Init(webServer string)

Init initializes the type/struct

func (*Weblink) RunActionListener added in v0.0.3

func (webhook *Weblink) RunActionListener(queueInsance *ActionQueue)

RunActionListener listens for messages

func (*Weblink) TryReconnect added in v0.0.26

func (webhook *Weblink) TryReconnect()

TryReconnect attempts to recontact a server if the connection is broken

func (*Weblink) WriteText

func (webhook *Weblink) WriteText(message chan []byte)

WriteText sends a text message to the server

type WorkQueue

type WorkQueue struct {
	PublishChannel chan PublishMessage  // a channel used to handle produce/drain of published messages
	Publishers     map[string]time.Time // A map of connected IDs and when they last published
	Subscribers    NetworkedTopicMap    // A map of connected IDs and a list what topics they want messages about.
	Messages       map[string]*Message  // all messages from all robots, key is catenation  of (sender+topic)
}

WorkQueue actually manages what each robot is doing/saying...

func (*WorkQueue) AddSubscriber

func (workItems *WorkQueue) AddSubscriber(Notify string, From string, Topic string)

AddSubscriber adds a subscriber to the list of who to notify when.

func (*WorkQueue) ExecuteAction

func (workItems *WorkQueue) ExecuteAction(action *ActionMessage)

ExecuteAction takes an action message and modifies the work queue as needed.

func (*WorkQueue) Init

func (workItems *WorkQueue) Init() bool

Init initializes all the information we need.

func (*WorkQueue) PublishActionMessage added in v0.0.5

func (workItems *WorkQueue) PublishActionMessage(action *ActionMessage)

PublishActionMessage sets the various structs in the workque for the drainer go-routine. WARNING: this routine is meant for services with a reducer goroutine to drain the queue! Do not call this from a client!! This function blocks until a reducer picks up the publish message, and will stall the calling thread.

func (*WorkQueue) ReceiveData

func (workItems *WorkQueue) ReceiveData(stream []byte) *ActionMessage

ReceiveData takes a bytestream, figures out what it is, and adds to appropriate queue.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL