legacy

package
v0.0.0-...-99fe3a6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 10, 2023 License: GPL-3.0 Imports: 35 Imported by: 0

Documentation

Overview

Package worker contains logic around managing worker threads

Index

Constants

View Source
const UserCreateAction = "create"

UserCreateAction represents the action of creating a user. This constant is used with the snapd api.

View Source
const UserRemoveAction = "remove"

UserRemoveAction represents the action of removing a user. This constant is used with the snapd api.

Variables

This section is empty.

Functions

func MockHTTPClient

func MockHTTPClient(mockedHTTPClient snapdapi.HTTPClient)

func MockSnapdClient

func MockSnapdClient(mockedSnapd snapdapi.SnapdClient)

func NewTaskSet

func NewTaskSet() *taskSet

Types

type Handler

type Handler struct {
	// contains filtered or unexported fields
}

func New

func New(mqttConn *mqtt.Connection, enrollment *identity.Enrollment) (*Handler, error)

func (*Handler) Health

func (h *Handler) Health()

Health publishes a health message to indicate that the device is still active

func (*Handler) IsConnected

func (h *Handler) IsConnected() bool

func (*Handler) Metrics

func (h *Handler) Metrics()

Metrics publishes a metrics messages to indicate so the device can be monitored

func (*Handler) Process

func (h *Handler) Process(ctx context.Context, task *TaskInput) (*TaskOutput, error)

func (*Handler) Run

func (h *Handler) Run()

func (*Handler) Send

func (h *Handler) Send(ctx context.Context, output *TaskOutput) error

func (*Handler) SetMQTTConnection

func (h *Handler) SetMQTTConnection(m *mqtt.Connection)

func (*Handler) ShouldSkipTask

func (h *Handler) ShouldSkipTask(task *TaskInput) (bool, error)

func (*Handler) SubscribeToActions

func (h *Handler) SubscribeToActions(mqttClient MQTT.Client) error

SubscribeToActions subscribes to the action topic

type HandlerIFace

type HandlerIFace interface {
	SubscribeToActions(mqttClient MQTT.Client) error
	Health()
	Metrics()
	IsConnected() bool
	Run()
}

type MockClient

type MockClient struct {
	// contains filtered or unexported fields
}

MockClient mocks the MQTT client

func (*MockClient) AddRoute

func (cli *MockClient) AddRoute(topic string, callback MQTT.MessageHandler)

AddRoute mocks routing

func (*MockClient) Connect

func (cli *MockClient) Connect() MQTT.Token

Connect mocks connecting to broker

func (*MockClient) Disconnect

func (cli *MockClient) Disconnect(quiesce uint)

Disconnect mocks client close

func (*MockClient) IsConnected

func (cli *MockClient) IsConnected() bool

IsConnected mocks the connect status

func (*MockClient) IsConnectionOpen

func (cli *MockClient) IsConnectionOpen() bool

IsConnectionOpen mocks the connect status

func (*MockClient) OptionsReader

func (cli *MockClient) OptionsReader() MQTT.ClientOptionsReader

OptionsReader mocks the options reader (badly)

func (*MockClient) Publish

func (cli *MockClient) Publish(topic string, qos byte, retained bool, payload interface{}) MQTT.Token

Publish mocks a publish message

func (*MockClient) Subscribe

func (cli *MockClient) Subscribe(topic string, qos byte, callback MQTT.MessageHandler) MQTT.Token

Subscribe mocks a subscribe message

func (*MockClient) SubscribeMultiple

func (cli *MockClient) SubscribeMultiple(filters map[string]byte, callback MQTT.MessageHandler) MQTT.Token

SubscribeMultiple mocks subscribe messages

func (*MockClient) Unsubscribe

func (cli *MockClient) Unsubscribe(topics ...string) MQTT.Token

Unsubscribe mocks a unsubscribe message

type MockHTTP

type MockHTTP struct {
	DoFunc func(req *http.Request) (*http.Response, error)
}

MockHTTP mocks the http.Client

func NewMockHTTP

func NewMockHTTP() *MockHTTP

func (*MockHTTP) Do

func (client *MockHTTP) Do(req *http.Request) (*http.Response, error)

Do mocks the Do method of http.Client This is structured so that tests can set DoFunc to whatever is needed, and MockHTTP will run the function as if it were a request.

type MockMessage

type MockMessage struct {
	// contains filtered or unexported fields
}

MockMessage implements an MQTT message

func (*MockMessage) Ack

func (m *MockMessage) Ack()

Ack mocks the message ack

func (*MockMessage) Duplicate

func (m *MockMessage) Duplicate() bool

Duplicate mocks a duplicate message check

func (*MockMessage) MessageID

func (m *MockMessage) MessageID() uint16

MessageID mocks the message ID

func (*MockMessage) Payload

func (m *MockMessage) Payload() []byte

Payload mocks the payload retrieval

func (*MockMessage) Qos

func (m *MockMessage) Qos() byte

Qos mocks the QoS flag

func (*MockMessage) Retained

func (m *MockMessage) Retained() bool

Retained mocks the retained flag

func (*MockMessage) Topic

func (m *MockMessage) Topic() string

Topic mocks the topic

type MockToken

type MockToken struct {
	// contains filtered or unexported fields
}

MockToken implements a Token

func (*MockToken) Done

func (t *MockToken) Done() <-chan struct{}

Done mocks a token done channel

func (*MockToken) Error

func (t *MockToken) Error() error

Error mocks a token error check

func (*MockToken) Wait

func (t *MockToken) Wait() bool

Wait mocks the token wait

func (*MockToken) WaitTimeout

func (t *MockToken) WaitTimeout(time.Duration) bool

WaitTimeout mocks the token wait timeout

type Queue

type Queue struct {
	// Tasks that any worker thread can pick up and perform
	IncomingEvents chan Task

	// serialized messages that should be sent back out on MQTT
	ProcessedEvents chan TaskResult
	// contains filtered or unexported fields
}

func NewQueue

func NewQueue(actions QueueActions, opts QueueOptions) (*Queue, error)

initialize channels set the function that will be used for a go routine in this case will be something along the lines of legacy.performAction

func (*Queue) AddTask

func (q *Queue) AddTask(input *TaskInput)

called within a subscribe handler for MQTT

func (*Queue) RunServer

func (q *Queue) RunServer(ctx context.Context)

start worker threads, run loop forever until cancelled by owner send out any processed events over MQTT

func (*Queue) RunWorker

func (q *Queue) RunWorker(ctx context.Context)

type QueueActions

type QueueActions interface {
	// The function that should be used for the worker threads
	Process(context.Context, *TaskInput) (*TaskOutput, error)

	// The function that should be used for the results returned by the worker threads
	Send(context.Context, *TaskOutput) error
}

type QueueOptions

type QueueOptions struct {
	NumWorkerThreads int
}

func ViperQueueOptions

func ViperQueueOptions() QueueOptions

pull options for queue from viper and put into struct

type SubscribeAction

type SubscribeAction struct {
	messages.SubscribeAction
}

SubscribeAction is the message format for the action topic

func (*SubscribeAction) Device

func (act *SubscribeAction) Device(orgId, deviceId string) messages.PublishDevice

Device gets details of the device

func (*SubscribeAction) RetrieveLogs

func (act *SubscribeAction) RetrieveLogs() messages.PublishResponse

RetrieveLogs pulls syslog logs from the snapd api and uploads them to an accessible S3 url

func (*SubscribeAction) SSHUserCreate

func (act *SubscribeAction) SSHUserCreate(data *messages.DeviceUser) messages.PublishResponse

func (*SubscribeAction) SSHUserRemove

func (act *SubscribeAction) SSHUserRemove(data *messages.DeviceUser) messages.PublishResponse

func (*SubscribeAction) ShouldRunOnce

func (act *SubscribeAction) ShouldRunOnce() (bool, error)

func (*SubscribeAction) SnapAck

func (act *SubscribeAction) SnapAck() messages.PublishResponse

SnapAck adds an assertion to the device

func (*SubscribeAction) SnapConf

func (act *SubscribeAction) SnapConf() messages.PublishResponse

SnapConf gets the config for a snap

func (*SubscribeAction) SnapDisable

func (act *SubscribeAction) SnapDisable() messages.PublishSnapTask

SnapDisable disables an existing snap

func (*SubscribeAction) SnapEnable

func (act *SubscribeAction) SnapEnable() messages.PublishSnapTask

SnapEnable enables an existing snap

func (*SubscribeAction) SnapInfo

func (act *SubscribeAction) SnapInfo() messages.PublishSnap

SnapInfo gets the info for a snap

func (*SubscribeAction) SnapInstall

func (act *SubscribeAction) SnapInstall() messages.PublishSnapTask

SnapInstall installs a new snap

func (*SubscribeAction) SnapList

func (act *SubscribeAction) SnapList(deviceId string) messages.PublishSnapsV2

SnapList lists installed snaps

func (*SubscribeAction) SnapRefresh

func (act *SubscribeAction) SnapRefresh() messages.PublishSnapTask

SnapRefresh refreshes an existing snap

func (*SubscribeAction) SnapRemove

func (act *SubscribeAction) SnapRemove() messages.PublishSnapTask

SnapRemove removes an existing snap

func (*SubscribeAction) SnapRestart

func (act *SubscribeAction) SnapRestart() messages.PublishSnapTask

SnapRestart sets the config for a snap

func (*SubscribeAction) SnapRevert

func (act *SubscribeAction) SnapRevert() messages.PublishSnapTask

SnapRevert reverts an existing snap

func (*SubscribeAction) SnapServerVersion

func (act *SubscribeAction) SnapServerVersion(deviceId string) messages.PublishDeviceVersion

SnapServerVersion gets details of the device

func (*SubscribeAction) SnapSetConf

func (act *SubscribeAction) SnapSetConf() messages.PublishSnapTask

SnapSetConf sets the config for a snap

func (*SubscribeAction) SnapSnapshot

func (act *SubscribeAction) SnapSnapshot() messages.PublishResponse

SnapSnapshot creates a snapshot of a snap and uploads it to an S3 url

func (*SubscribeAction) SnapStart

func (act *SubscribeAction) SnapStart() messages.PublishSnapTask

SnapStart sets the config for a snap

func (*SubscribeAction) SnapStop

func (act *SubscribeAction) SnapStop() messages.PublishSnapTask

SnapStop sets the config for a snap

func (*SubscribeAction) SnapSwitch

func (act *SubscribeAction) SnapSwitch() messages.PublishSnapTask

SnapSwitch refreshes an existing snap

func (*SubscribeAction) Unregister

func (act *SubscribeAction) Unregister(orgId, deviceId string) messages.PublishDevice

func (*SubscribeAction) User

type Task

type Task struct {
	// contains filtered or unexported fields
}

type TaskId

type TaskId uint

type TaskInput

type TaskInput struct {
	// contains filtered or unexported fields
}

type TaskOutput

type TaskOutput struct {
	// contains filtered or unexported fields
}

type TaskResult

type TaskResult struct {
	// contains filtered or unexported fields
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL