server

package
v1.4.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 27, 2021 License: GPL-3.0 Imports: 33 Imported by: 0

Documentation

Overview

Package server implements RNNR main logic to manage tasks and worker nodes.

Index

Constants

View Source
const (
	// TaskCollection is the collection name for tasks
	TaskCollection = "tasks"
	// NodeCollection is the collection name for nodes
	NodeCollection = "nodes"
)

Variables

This section is empty.

Functions

func CheckTask added in v1.2.1

func CheckTask(task *models.Task, node *models.Node, res chan<- *models.Task, wg *sync.WaitGroup)

CheckTask remotely check a running task.

func GetNodeResources added in v1.2.1

func GetNodeResources(node *models.Node) (*proto.Info, error)

GetNodeResources gets node resource information.

func RemoteCancel added in v1.2.1

func RemoteCancel(task *models.Task, node *models.Node) error

RemoteCancel cancels remotely a task.

func RemoteCheck added in v1.2.1

func RemoteCheck(task *models.Task, address string) error

RemoteCheck checks remotely a task.

func RemoteRun added in v1.2.1

func RemoteRun(task *models.Task, address string) error

RemoteRun remotely runs a task as a container.

Types

type DB added in v1.2.1

type DB struct {
	// contains filtered or unexported fields
}

DB wraps MongoDB client to provides task- and node-related operations.

func MongoConnect added in v1.2.1

func MongoConnect(uri, database string) (*DB, error)

MongoConnect creates a MongoDB client.

func (*DB) AddNode added in v1.2.1

func (d *DB) AddNode(n *models.Node) error

AddNode activates a node. If already registered it updates node fields with same ID.

func (*DB) GetNode added in v1.2.1

func (d *DB) GetNode(host string) (*models.Node, error)

GetNode retrieves a computing node by its server address.

func (*DB) GetTask added in v1.2.1

func (d *DB) GetTask(id string, view models.View) (*models.Task, error)

GetTask finds a task by its ID.

func (*DB) ListNodes added in v1.2.1

func (d *DB) ListNodes(active *bool) ([]*models.Node, error)

ListNodes returns worker nodes (disabled included). Set active to return active (enabled) or disable nodes.

func (*DB) ListTasks added in v1.2.1

func (d *DB) ListTasks(limit, skip int64, view models.View, nodes []string, states []models.State) ([]*models.Task, error)

ListTasks retrieves tasks that match given worker nodes and states. Pagination is done via limit and skip parameters. view defines task fields to be returned.

Minimal returns only task ID and state.

Basic returns all fields except Logs.ExecutorLogs.Stdout, Logs.ExecutorLogs.Stderr, Inputs.Content and Logs.SystemLogs.

Full returns all fields.

func (*DB) SaveTask added in v1.2.1

func (d *DB) SaveTask(t *models.Task) error

SaveTask stores a task. It will set Task.Created and Task.Updated to current local time.

func (*DB) UpdateNode added in v1.2.1

func (d *DB) UpdateNode(n *models.Node) error

UpdateNode updates node information.

func (*DB) UpdateTask added in v1.2.1

func (d *DB) UpdateTask(t *models.Task) error

UpdateTask saves task changes in database. It will set Task.Updated to current local time.

type Docker added in v1.2.1

type Docker struct {
	// contains filtered or unexported fields
}

Docker struct wraps Docker client

func DockerConnect added in v1.2.1

func DockerConnect(volumes []string, user, group string) (*Docker, error)

DockerConnect creates a Docker client using environment variables

func (*Docker) Check added in v1.2.1

func (d *Docker) Check(ctx context.Context, container *proto.Container) (*proto.State, error)

Check verifies if container is still running.

func (*Docker) RemoveContainer added in v1.2.1

func (d *Docker) RemoveContainer(ctx context.Context, id string)

RemoveContainer removes a container.

func (*Docker) Run added in v1.2.1

func (d *Docker) Run(ctx context.Context, container *proto.Container) error

Run runs a container

func (*Docker) Stop added in v1.2.1

func (d *Docker) Stop(ctx context.Context, id string) error

Stop stops a container

type Main added in v1.2.1

type Main struct {
	Router      *mux.Router
	DB          *DB
	ServiceInfo *models.ServiceInfo
}

Main is a main instance.

func NewMain added in v1.2.1

func NewMain(database string, sleepTime time.Duration) (*Main, error)

NewMain creates a server and initializes Task and Node endpoints. database is URI to MongoDB (without database name, which is rnnr). sleepTimes defines the time in seconds that main will sleep after task management iteration.

func (*Main) CancelTask added in v1.2.1

func (m *Main) CancelTask(id string) error

CancelTask cancels a task by its ID.

func (*Main) CheckTasks added in v1.2.1

func (m *Main) CheckTasks() error

CheckTasks will iterate over running tasks checking if they have been completed well or not. It runs concurrently.

func (*Main) CreateTask added in v1.2.1

func (m *Main) CreateTask(t *models.Task) error

CreateTask creates a task with new ID and queue state.

func (*Main) DisableNode added in v1.2.1

func (m *Main) DisableNode(host string, cancel bool) error

DisableNode updates node availability removing usage information. Cancel argument will cancels remote tasks and puts them back to queue.

func (*Main) EnableNode added in v1.2.1

func (m *Main) EnableNode(node *models.Node) error

EnableNode inserts or enables a worker node. Use the current computational resources of the node if it had not been defined.

func (*Main) GetTask added in v1.2.1

func (m *Main) GetTask(id string, view models.View) (*models.Task, error)

GetTask returns a task by its ID.

func (*Main) InitializeTasks added in v1.2.1

func (m *Main) InitializeTasks() error

InitializeTasks iterates over all Queued tasks requesting a computing node for each task. The selected node is assigned to perform the task. The task changes to the Initializing state. If no active node has enough computing resources to perform the task the same is kept in queue.

func (*Main) ListNodes added in v1.2.1

func (m *Main) ListNodes(active *bool) ([]*models.Node, error)

ListNodes returns worker nodes (disabled included). Set active to return active (enabled) or disable nodes.

func (*Main) ListTasks added in v1.2.1

func (m *Main) ListTasks(namePrefix string, limit int64, start int64, view models.View, nodes []string, states []models.State) (*models.ListTasksResponse, error)

ListTasks returns all tasks.

func (*Main) RequestNode added in v1.2.1

func (m *Main) RequestNode(resources *models.Resources) (*models.Node, error)

RequestNode selects a node that have enough computing resource to execute task. If there is no active node it returns NoActiveNodes error. If there is some active node but none of them is able to process then it returns NoEnoughResources error. Once found a node it will update in database.

func (*Main) RunTask added in v1.2.1

func (m *Main) RunTask(task *models.Task, node *models.Node, res chan<- *models.Task, wg *sync.WaitGroup)

RunTask remotely starts a task.

func (*Main) RunTasks added in v1.2.1

func (m *Main) RunTasks() error

RunTasks tries to start initialized tasks.

func (*Main) StartTaskManager added in v1.2.1

func (m *Main) StartTaskManager(sleepTime time.Duration)

StartTaskManager starts task management. It will iterate over: 1) queued tasks; 2) initialized tasks; and 3) running tasks. Then it will sleepTime seconds and start over.

func (*Main) UpdateNodesWorkload added in v1.2.1

func (m *Main) UpdateNodesWorkload(nodes []*models.Node) error

UpdateNodesWorkload gets active tasks (Initializing or Running) and update node usage.

type NetworkError added in v1.2.1

type NetworkError struct {
	// contains filtered or unexported fields
}

NetworkError represents a network error.

type NoActiveNodes added in v1.2.1

type NoActiveNodes struct {
	// contains filtered or unexported fields
}

NoActiveNodes error is returned when there is no active node for processing tasks remotely.

type NoEnoughResources added in v1.2.1

type NoEnoughResources struct {
	// contains filtered or unexported fields
}

NoEnoughResources error is returned when none os active node have enough computing resources to process task.

type Worker added in v1.2.1

type Worker struct {
	proto.UnimplementedWorkerServer
	Info   *proto.Info
	Docker *Docker
}

Worker struct wraps service info and Docker connection.

func NewWorker added in v1.2.1

func NewWorker(cpuCores int32, ramGb float64, volumes []string, user, group string) (*Worker, error)

NewWorker creates a Worker. If cpuCores or ramGb is not defined (equal to 0) it will guess the available resources. It will warn if the defined values are bigger than guessed values.

func (*Worker) CheckContainer added in v1.2.1

func (w *Worker) CheckContainer(ctx context.Context, container *proto.Container) (*proto.State, error)

CheckContainer checks if container is running.

func (*Worker) GetInfo added in v1.2.1

func (w *Worker) GetInfo(context.Context, *empty.Empty) (*proto.Info, error)

GetInfo returns service info.

func (*Worker) RunContainer added in v1.2.1

func (w *Worker) RunContainer(ctx context.Context, container *proto.Container) (*empty.Empty, error)

RunContainer starts a Docker container.

func (*Worker) StopContainer added in v1.2.1

func (w *Worker) StopContainer(ctx context.Context, container *proto.Container) (*empty.Empty, error)

StopContainer stops and removes container.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL