tasks

package
v0.0.0-...-7592909 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 20, 2016 License: MIT Imports: 17 Imported by: 0

Documentation

Index

Constants

View Source
const (
	JSON returnFormat = iota
	BSON
)

format constants.

View Source
const (
	// Success implies that the task has been executed and all results have
	// been reported.
	Success ExecutionStatus = "success"

	// Failure indicates that the execution of the task has failed.
	Failure = "failure"

	// NotExecuted indicates that the task has not been executed.
	NotExecuted = "notExecuted"

	// UnknownExecutionStatus indicates an invalid value.
	UnknownExecutionStatus = "unknown"
)

ExecutionStatus constants.

View Source
const (
	// Localization tasks localize events in a frame.
	Localization TaskType = "localization"

	// Reconstruction tasks reconstruction an image from localization data.
	Reconstruction = "reconstruction"

	// Recipe tasks are generic image-processing recipes.
	Recipe = "recipe"

	// UnknownTaskType indicates an invalid TaskType.
	UnknownTaskType = "unknown"
)

TaskType constants.

Variables

View Source
var (
	NodeServerGetTasks      = Method{http.MethodGet, nodePrefix + "/tasks"}
	NodeServerPostHandIn    = Method{http.MethodPost, nodePrefix + "/handin"}
	NodeServerPostRate      = Method{http.MethodPost, nodePrefix + "/rate"}
	DistributorGetTasks     = Method{http.MethodGet, distributorPrefix + "/tasks"}
	DistributorPostTasks    = Method{http.MethodPost, distributorPrefix + "/tasks"}
	DistributorPostHandIn   = Method{http.MethodPost, distributorPrefix + "/handin"}
	DistributorPostAnnounce = Method{http.MethodPost, distributorPrefix + "/announce"}
	DistributorGetQueues    = Method{http.MethodGet, distributorPrefix + "/queues"}
)

API Methods

View Source
var ErrContentWithGet = errors.New("content with GET")

ErrContentWithGet is returned if it is attempted to post content with a method that uses HTTP GET.

View Source
var ErrInternalServerError = errors.New("internal server error")

ErrInternalServerError is the error used for recovered API calls.

View Source
var ErrInvalidIP = errors.New("invalid IP")

ErrInvalidIP is returned if an invalid IP was specified.

View Source
var ErrInvalidNumWant = errors.New("invalid numWant")

ErrInvalidNumWant is returned if an invalid numWant was specified.

View Source
var ErrInvalidPort = errors.New("invalid port")

ErrInvalidPort is returned if an invalid port was specified.

View Source
var ErrInvalidTimeout = errors.New("invalid timeout")

ErrInvalidTimeout is returned if an invalid timeout was specified.

View Source
var ErrNoExecutionStatus = errors.New("no execution status")

ErrNoExecutionStatus is returned if no execution status was specified.

View Source
var ErrNoIP = errors.New("no IP")

ErrNoIP is returned if no IP was specified.

View Source
var ErrNoNodeID = errors.New("no node ID")

ErrNoNodeID is returned if no node ID was specified.

View Source
var ErrNoNumWant = errors.New("no numWant")

ErrNoNumWant is returned if no numWant was specified.

View Source
var ErrNoPort = errors.New("no port")

ErrNoPort is returned if no port was specified.

View Source
var ErrNoQueue = errors.New("no queue")

ErrNoQueue is returned if no queue was specified.

View Source
var ErrNoTaskID = errors.New("no task ID")

ErrNoTaskID is returned if no task ID was specified.

View Source
var ErrNoTimeout = errors.New("no timeout")

ErrNoTimeout is returned if no timeout was specified.

View Source
var ErrNoWorkerID = errors.New("no worker ID")

ErrNoWorkerID is returned if no worker ID was specified.

View Source
var ErrUnknownMethod = errors.New("unknown method")

ErrUnknownMethod is returned for unknown API methods.

Functions

func GetNonLoopbackIPs

func GetNonLoopbackIPs() ([]net.IP, error)

GetNonLoopbackIPs returns a list of IP addresses of the local machine that are up and not loopback interfaces.

func MakeHTTPAPIRequest

func MakeHTTPAPIRequest(c *http.Client, endpoint Endpoint, apiMethod Method, params url.Values, payload interface{}, result interface{}) error

MakeHTTPAPIRequest makes a request to the HTTP API of the node at endpoint. The payload must be JSON serializable, the result must be a pointer to a struct ready for JSON deserialization. ErrUnknownMethod will be returned for an unknown apiMethod. ErrContentWithGet will be returned if payload != nil and apiMethod specifies GET as the HTTP verb.

func PublishDistributor

func PublishDistributor(port uint16) (*mdns.Server, error)

PublishDistributor publishes a Distributor running on the local machine on the specified on mDNS.

Types

type AnnounceResponse

type AnnounceResponse struct {
	// NodeID is the NodeID of the distributor.
	NodeID pyme.NodeID `json:"nodeID"`
}

AnnounceResponse represents the response to an Announce.

type Calculator

type Calculator interface {
	// Calculate estimates the execution cost of a task.
	Calculate(Task) (float64, error)

	// TaskType returns the TaskType this calculator is responsible for.
	TaskType() TaskType
}

Calculator is the interface for a calculator. A calculator calculates the execution costs for a certain type of tasks by constructing URIs for all required resources and calling a Rater for each of them.

type Endpoint

type Endpoint struct {
	// IP is the IP of the machine.
	IP net.IP
	// Port is the TCP port of the endpoint.
	Port uint16
}

Endpoint describes a TCP endpoint on a single machine.

func DiscoverDistributors

func DiscoverDistributors() ([]Endpoint, error)

DiscoverDistributors tries to discover Distributors via mDNS.

type ExecutionStatus

type ExecutionStatus string

ExecutionStatus is the status of a task that has been distributed.

func ExecutionStatusFromString

func ExecutionStatusFromString(s string) (ExecutionStatus, error)

ExecutionStatusFromString tries to determine the execution status indicated by s and returns an error if the execution status is unknown.

type HTTPDistributorServer

type HTTPDistributorServer struct {
	// contains filtered or unexported fields
}

HTTPDistributorServer is the HTTP wrapper for a Distributor.

func NewHTTPDistributorServer

func NewHTTPDistributorServer(s TaskDistributor, apiKey string, endpoint string, debug bool) *HTTPDistributorServer

NewHTTPDistributorServer creates a new HTTPDistributorServer.

func (*HTTPDistributorServer) ListenAndServe

func (s *HTTPDistributorServer) ListenAndServe() error

ListenAndServe starts serving requests on the endpoint specified.

func (*HTTPDistributorServer) Stop

func (s *HTTPDistributorServer) Stop()

Stop stops the server.

type HTTPNodeServer

type HTTPNodeServer struct {
	// contains filtered or unexported fields
}

HTTPNodeServer is the HTTP wrapper for a NodeServer.

func NewHTTPNodeServer

func NewHTTPNodeServer(s NodeServer, apiKey string, endpoint string, debug bool) *HTTPNodeServer

NewHTTPNodeServer creates a new HTTPNodeServer.

func (*HTTPNodeServer) ListenAndServe

func (s *HTTPNodeServer) ListenAndServe() error

ListenAndServe starts serving requests on the specified endpoint.

func (*HTTPNodeServer) Stop

func (s *HTTPNodeServer) Stop()

Stop stops the server.

type Method

type Method struct {
	// HTTPMethod is the HTTP method (verb) for the method.
	HTTPMethod string
	// Path is the HTTP path for the method.
	Path string
}

Method describes an HTTP API method.

type NodeServer

type NodeServer interface {
	// GetTasks gets up to numWant task to be executed by a worker.
	// numWant <= 0 defaults to one task.
	GetTasks(id WorkerID, numWant int) ([]Task, error)

	// HandIn notifies the task server that the execution of the task with
	// the given TaskID is finished.
	// If the ExecutionStatus is Success, the task has been completed and
	// all results have been submitted.
	// If the ExecutionStatus is Failure, the task server can decide whether
	// to report the failure to the node that assigned the task or whether
	// to try executing it again on a local worker.
	HandIn(TaskID, ExecutionStatus) error

	// RateTask rates the given task for this node.
	// Rating must assign a value in [0,1] for each resource necessary
	// to complete the task. The rating for the complete task must be in
	// [0,n] for n resources required to execute the task or -1.
	// A rating of -1 indicates that this node does not rate tasks at the
	// moment and rating should be done on the distributor instead.
	RateTask(Task) (TaskRating, error)

	// Stop stops the NodeServer.
	// This will stop any open requests for GetTask and will stop announcing
	// regularly to its Distributors. They will register the node as being
	// inactive and redistribute tasks given to it.
	Stop()
}

NodeServer describes the node-local server for task distribution.

type NodeStatistics

type NodeStatistics struct {
	// TasksAccepted is the number of tasks the node has received as a
	// result of calling GetTasks.
	TasksAccepted uint64 `json:"tasksAccepted"`

	// TasksCompleted is the number of tasks the node has reported as
	// executed successfully.
	TasksCompleted uint64 `json:"tasksCompleted"`

	// TasksCompleted is the number of tasks the node has reported as
	// failed.
	TasksFailed uint64 `json:"tasksFailed"`

	// TasksCompleted is the sum of the number of tasks the node has
	// reported as not executed and tasks that were withdrawn from a node
	// as a result of the node timing out.
	TasksNotExecuted uint64 `json:"tasksNotExecuted"`
}

NodeStatistics represent statistics about a node.

type QueueStatistics

type QueueStatistics struct {
	// TasksPosted is the number of tasks posted to the queue.
	TasksPosted uint64 `json:"tasksPosted"`

	// TasksRunning is the number of tasks that have been rated and
	// assigned to a node.
	TasksRunning uint64 `json:"tasksRunning"`

	// TasksCompleted is the number of tasks that have completed
	// successfully.
	TasksCompleted uint64 `json:"tasksCompleted"`

	// TasksFailed is the number of tasks that have failed.
	TasksFailed uint64 `json:"tasksFailed"`

	// AverageExecutionCost is the average execution cost of a task in this
	// queue, determined by the execution cost known at the time of
	// assigning the task to a node.
	AverageExecutionCost float64 `json:"averageExecutionCost"`
}

QueueStatistics represent statistics about a queue.

type Rater

type Rater interface {
	// Rate estimates the cost of fetching or storing the resource identified
	// by URI.
	Rate(URI *url.URL) (float64, error)

	// Scheme returns the URI scheme this rater is responsible for.
	Scheme() string
}

Rater is the interface for a rater. A rater estimates the cost of fetching or storing a resource identified by a URI. A rater is responsible for one URI scheme.

type SerializableHandInEntry

type SerializableHandInEntry struct {
	// TaskID is the TaskID of the referenced Task.
	TaskID TaskID `json:"taskID"`

	// ExecutionStatus
	ExecutionStatus string `json:"status"`
}

SerializableHandInEntry describes a change in the execution status of a task. The ExecutionStatus must not be assumed to be a valid ExecutionStatus and must be parsed using ExecutionStatusFromString before use.

type Task

type Task struct {
	// ID identifies the task uniquely.
	ID TaskID `json:"id"`

	// Type determines the type of the task.
	Type TaskType `json:"type"`

	// Taskdef is the definition of a task.
	// It must not be present if TaskdefRef is not empty.
	Taskdef map[string]string `json:"taskdef,omitempty"`

	// TaskdefRef references the task definition through a resource URI.
	// It must not be present if Taskdef is not nil.
	TaskdefRef string `json:"taskdefRef,omitempty"`

	// Inputs describes how to obtain input data.
	Inputs map[string]string `json:"inputs"`

	// Outputs describes how to store results.
	Outputs map[string]string `json:"outputs"`
}

Task represents a single task to be executed.

type TaskDistributor

type TaskDistributor interface {
	// PostTask adds the given task to the set of tasks to be processed.
	// The task will be assigned to a node based on the location of the data
	// needed to process the task.
	// The ID of the given task needs to consist only of the unique
	// identifier within the queue. The queue name and distributor node's ID
	// will be prepended by the distributor.
	// This is typically called from the analysis software.
	PostTask(queue string, task Task) error

	// GetTasks gets up to numWant tasks to be executed on the node identified
	// by nodeID.
	// The call blocks until tasks are available or timeout has elapsed.
	// It is not guaranteed that numWant tasks will be returned. The task
	// server may (and should!) hold tasks for a node in a queue and can
	// return multiple tasks at once.
	// A task server is allowed to request more tasks than it immediately
	// needs, i.e. more tasks than it has workers to give them to. If the
	// task server shuts down or otherwise stops executing tasks, it must
	// report tasks not executed to the node that reported them with an
	// ExecutionStatus of NotExecuted.
	GetTasks(nodeID pyme.NodeID, numWant int, timeout time.Duration) ([]Task, error)

	// HandIn notifies the task server of the ExecutionStatus of the task
	// identified by the given TaskID.
	// If the ExecutionStatus is Success, the task has been completed and
	// all results have been submitted.
	// If the ExecutionStatus is NotExecuted, the task has not been
	// attempted to execute and must be redistributed to the cluster.
	// Including the NodeID prevents nodes from wrongfully handing in tasks
	// for another node.
	HandIn(pyme.NodeID, TaskID, ExecutionStatus) error

	// QueueStatistics returns statistics about queues running on the
	// Distributor.
	QueueStatistics() map[string]QueueStatistics

	// Announce announces the endpoint of a local node server to a
	// distributor.
	// A node has to reannounce at a certain interval - if a distributor
	// does not receive an announce in a set amount of time it presumes
	// the node defunct.
	// It will be taken out of the set of active nodes and all tasks
	// enqueued for this node will be redistributed.
	Announce(nodeID pyme.NodeID, ip net.IP, port uint16) (AnnounceResponse, error)
}

TaskDistributor describes the central instance for task distribution.

type TaskID

type TaskID string

TaskID is a unique identifier for a task. It consists of the NodeID of the distributor node followed by a dash, the name of the queue this task belongs to, followed by a dash and a unique identifier within the queue.

An example TaskID could look like this: hU5k4sAeOnAhf93nD8f5-series5025-loc25409

func (TaskID) NodeID

func (t TaskID) NodeID() pyme.NodeID

NodeID extracts the distributor node's ID from the TaskID.

func (TaskID) Queue

func (t TaskID) Queue() string

Queue extracts the queue name from the TaskID. If the TaskID is malformed or the queue name can not be extracted the whole TaskID is returned.

func (TaskID) TaskIdentifier

func (t TaskID) TaskIdentifier() string

TaskIdentifier extracts the queue-local task identifier from the TaskID. If the TaskID is malformed or the task identifier can not be extracted the whole TaskID is returned.

type TaskRating

type TaskRating struct {
	// ID is the TaskID of the referenced task.
	ID TaskID `json:"id"`

	// Cost is the estimated execution cost for the referenced task.
	Cost float64 `json:"cost"`
}

TaskRating represents the rating of a task for a single node.

type TaskType

type TaskType string

TaskType is the type of a task.

func TaskTypeFromString

func TaskTypeFromString(s string) (TaskType, error)

TaskTypeFromString tries to determine the task type from s and returns an error if the task type is unknown.

type WorkerID

type WorkerID string

WorkerID is a node-unique identifier for a worker.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL