Documentation ¶
Index ¶
- Constants
- Variables
- func GetNonLoopbackIPs() ([]net.IP, error)
- func MakeHTTPAPIRequest(c *http.Client, endpoint Endpoint, apiMethod Method, params url.Values, ...) error
- func PublishDistributor(port uint16) (*mdns.Server, error)
- type AnnounceResponse
- type Calculator
- type Endpoint
- type ExecutionStatus
- type HTTPDistributorServer
- type HTTPNodeServer
- type Method
- type NodeServer
- type NodeStatistics
- type QueueStatistics
- type Rater
- type SerializableHandInEntry
- type Task
- type TaskDistributor
- type TaskID
- type TaskRating
- type TaskType
- type WorkerID
Constants ¶
const ( JSON returnFormat = iota BSON )
format constants.
const ( // Success implies that the task has been executed and all results have // been reported. Success ExecutionStatus = "success" // Failure indicates that the execution of the task has failed. Failure = "failure" // NotExecuted indicates that the task has not been executed. NotExecuted = "notExecuted" // UnknownExecutionStatus indicates an invalid value. UnknownExecutionStatus = "unknown" )
ExecutionStatus constants.
const ( // Localization tasks localize events in a frame. Localization TaskType = "localization" // Reconstruction tasks reconstruction an image from localization data. Reconstruction = "reconstruction" // Recipe tasks are generic image-processing recipes. Recipe = "recipe" // UnknownTaskType indicates an invalid TaskType. UnknownTaskType = "unknown" )
TaskType constants.
Variables ¶
var ( NodeServerGetTasks = Method{http.MethodGet, nodePrefix + "/tasks"} NodeServerPostHandIn = Method{http.MethodPost, nodePrefix + "/handin"} NodeServerPostRate = Method{http.MethodPost, nodePrefix + "/rate"} DistributorGetTasks = Method{http.MethodGet, distributorPrefix + "/tasks"} DistributorPostTasks = Method{http.MethodPost, distributorPrefix + "/tasks"} DistributorPostHandIn = Method{http.MethodPost, distributorPrefix + "/handin"} DistributorPostAnnounce = Method{http.MethodPost, distributorPrefix + "/announce"} DistributorGetQueues = Method{http.MethodGet, distributorPrefix + "/queues"} )
API Methods
var ErrContentWithGet = errors.New("content with GET")
ErrContentWithGet is returned if it is attempted to post content with a method that uses HTTP GET.
var ErrInternalServerError = errors.New("internal server error")
ErrInternalServerError is the error used for recovered API calls.
var ErrInvalidIP = errors.New("invalid IP")
ErrInvalidIP is returned if an invalid IP was specified.
var ErrInvalidNumWant = errors.New("invalid numWant")
ErrInvalidNumWant is returned if an invalid numWant was specified.
var ErrInvalidPort = errors.New("invalid port")
ErrInvalidPort is returned if an invalid port was specified.
var ErrInvalidTimeout = errors.New("invalid timeout")
ErrInvalidTimeout is returned if an invalid timeout was specified.
var ErrNoExecutionStatus = errors.New("no execution status")
ErrNoExecutionStatus is returned if no execution status was specified.
var ErrNoIP = errors.New("no IP")
ErrNoIP is returned if no IP was specified.
var ErrNoNodeID = errors.New("no node ID")
ErrNoNodeID is returned if no node ID was specified.
var ErrNoNumWant = errors.New("no numWant")
ErrNoNumWant is returned if no numWant was specified.
var ErrNoPort = errors.New("no port")
ErrNoPort is returned if no port was specified.
var ErrNoQueue = errors.New("no queue")
ErrNoQueue is returned if no queue was specified.
var ErrNoTaskID = errors.New("no task ID")
ErrNoTaskID is returned if no task ID was specified.
var ErrNoTimeout = errors.New("no timeout")
ErrNoTimeout is returned if no timeout was specified.
var ErrNoWorkerID = errors.New("no worker ID")
ErrNoWorkerID is returned if no worker ID was specified.
var ErrUnknownMethod = errors.New("unknown method")
ErrUnknownMethod is returned for unknown API methods.
Functions ¶
func GetNonLoopbackIPs ¶
GetNonLoopbackIPs returns a list of IP addresses of the local machine that are up and not loopback interfaces.
func MakeHTTPAPIRequest ¶
func MakeHTTPAPIRequest(c *http.Client, endpoint Endpoint, apiMethod Method, params url.Values, payload interface{}, result interface{}) error
MakeHTTPAPIRequest makes a request to the HTTP API of the node at endpoint. The payload must be JSON serializable, the result must be a pointer to a struct ready for JSON deserialization. ErrUnknownMethod will be returned for an unknown apiMethod. ErrContentWithGet will be returned if payload != nil and apiMethod specifies GET as the HTTP verb.
Types ¶
type AnnounceResponse ¶
type AnnounceResponse struct { // NodeID is the NodeID of the distributor. NodeID pyme.NodeID `json:"nodeID"` }
AnnounceResponse represents the response to an Announce.
type Calculator ¶
type Calculator interface { // Calculate estimates the execution cost of a task. Calculate(Task) (float64, error) // TaskType returns the TaskType this calculator is responsible for. TaskType() TaskType }
Calculator is the interface for a calculator. A calculator calculates the execution costs for a certain type of tasks by constructing URIs for all required resources and calling a Rater for each of them.
type Endpoint ¶
type Endpoint struct { // IP is the IP of the machine. IP net.IP // Port is the TCP port of the endpoint. Port uint16 }
Endpoint describes a TCP endpoint on a single machine.
func DiscoverDistributors ¶
DiscoverDistributors tries to discover Distributors via mDNS.
type ExecutionStatus ¶
type ExecutionStatus string
ExecutionStatus is the status of a task that has been distributed.
func ExecutionStatusFromString ¶
func ExecutionStatusFromString(s string) (ExecutionStatus, error)
ExecutionStatusFromString tries to determine the execution status indicated by s and returns an error if the execution status is unknown.
type HTTPDistributorServer ¶
type HTTPDistributorServer struct {
// contains filtered or unexported fields
}
HTTPDistributorServer is the HTTP wrapper for a Distributor.
func NewHTTPDistributorServer ¶
func NewHTTPDistributorServer(s TaskDistributor, apiKey string, endpoint string, debug bool) *HTTPDistributorServer
NewHTTPDistributorServer creates a new HTTPDistributorServer.
func (*HTTPDistributorServer) ListenAndServe ¶
func (s *HTTPDistributorServer) ListenAndServe() error
ListenAndServe starts serving requests on the endpoint specified.
type HTTPNodeServer ¶
type HTTPNodeServer struct {
// contains filtered or unexported fields
}
HTTPNodeServer is the HTTP wrapper for a NodeServer.
func NewHTTPNodeServer ¶
func NewHTTPNodeServer(s NodeServer, apiKey string, endpoint string, debug bool) *HTTPNodeServer
NewHTTPNodeServer creates a new HTTPNodeServer.
func (*HTTPNodeServer) ListenAndServe ¶
func (s *HTTPNodeServer) ListenAndServe() error
ListenAndServe starts serving requests on the specified endpoint.
type Method ¶
type Method struct { // HTTPMethod is the HTTP method (verb) for the method. HTTPMethod string // Path is the HTTP path for the method. Path string }
Method describes an HTTP API method.
type NodeServer ¶
type NodeServer interface { // GetTasks gets up to numWant task to be executed by a worker. // numWant <= 0 defaults to one task. GetTasks(id WorkerID, numWant int) ([]Task, error) // HandIn notifies the task server that the execution of the task with // the given TaskID is finished. // If the ExecutionStatus is Success, the task has been completed and // all results have been submitted. // If the ExecutionStatus is Failure, the task server can decide whether // to report the failure to the node that assigned the task or whether // to try executing it again on a local worker. HandIn(TaskID, ExecutionStatus) error // RateTask rates the given task for this node. // Rating must assign a value in [0,1] for each resource necessary // to complete the task. The rating for the complete task must be in // [0,n] for n resources required to execute the task or -1. // A rating of -1 indicates that this node does not rate tasks at the // moment and rating should be done on the distributor instead. RateTask(Task) (TaskRating, error) // Stop stops the NodeServer. // This will stop any open requests for GetTask and will stop announcing // regularly to its Distributors. They will register the node as being // inactive and redistribute tasks given to it. Stop() }
NodeServer describes the node-local server for task distribution.
type NodeStatistics ¶
type NodeStatistics struct { // TasksAccepted is the number of tasks the node has received as a // result of calling GetTasks. TasksAccepted uint64 `json:"tasksAccepted"` // TasksCompleted is the number of tasks the node has reported as // executed successfully. TasksCompleted uint64 `json:"tasksCompleted"` // TasksCompleted is the number of tasks the node has reported as // failed. TasksFailed uint64 `json:"tasksFailed"` // TasksCompleted is the sum of the number of tasks the node has // reported as not executed and tasks that were withdrawn from a node // as a result of the node timing out. TasksNotExecuted uint64 `json:"tasksNotExecuted"` }
NodeStatistics represent statistics about a node.
type QueueStatistics ¶
type QueueStatistics struct { // TasksPosted is the number of tasks posted to the queue. TasksPosted uint64 `json:"tasksPosted"` // TasksRunning is the number of tasks that have been rated and // assigned to a node. TasksRunning uint64 `json:"tasksRunning"` // TasksCompleted is the number of tasks that have completed // successfully. TasksCompleted uint64 `json:"tasksCompleted"` // TasksFailed is the number of tasks that have failed. TasksFailed uint64 `json:"tasksFailed"` // AverageExecutionCost is the average execution cost of a task in this // queue, determined by the execution cost known at the time of // assigning the task to a node. AverageExecutionCost float64 `json:"averageExecutionCost"` }
QueueStatistics represent statistics about a queue.
type Rater ¶
type Rater interface { // Rate estimates the cost of fetching or storing the resource identified // by URI. Rate(URI *url.URL) (float64, error) // Scheme returns the URI scheme this rater is responsible for. Scheme() string }
Rater is the interface for a rater. A rater estimates the cost of fetching or storing a resource identified by a URI. A rater is responsible for one URI scheme.
type SerializableHandInEntry ¶
type SerializableHandInEntry struct { // TaskID is the TaskID of the referenced Task. TaskID TaskID `json:"taskID"` // ExecutionStatus ExecutionStatus string `json:"status"` }
SerializableHandInEntry describes a change in the execution status of a task. The ExecutionStatus must not be assumed to be a valid ExecutionStatus and must be parsed using ExecutionStatusFromString before use.
type Task ¶
type Task struct { // ID identifies the task uniquely. ID TaskID `json:"id"` // Type determines the type of the task. Type TaskType `json:"type"` // Taskdef is the definition of a task. // It must not be present if TaskdefRef is not empty. Taskdef map[string]string `json:"taskdef,omitempty"` // TaskdefRef references the task definition through a resource URI. // It must not be present if Taskdef is not nil. TaskdefRef string `json:"taskdefRef,omitempty"` // Inputs describes how to obtain input data. Inputs map[string]string `json:"inputs"` // Outputs describes how to store results. Outputs map[string]string `json:"outputs"` }
Task represents a single task to be executed.
type TaskDistributor ¶
type TaskDistributor interface { // PostTask adds the given task to the set of tasks to be processed. // The task will be assigned to a node based on the location of the data // needed to process the task. // The ID of the given task needs to consist only of the unique // identifier within the queue. The queue name and distributor node's ID // will be prepended by the distributor. // This is typically called from the analysis software. PostTask(queue string, task Task) error // GetTasks gets up to numWant tasks to be executed on the node identified // by nodeID. // The call blocks until tasks are available or timeout has elapsed. // It is not guaranteed that numWant tasks will be returned. The task // server may (and should!) hold tasks for a node in a queue and can // return multiple tasks at once. // A task server is allowed to request more tasks than it immediately // needs, i.e. more tasks than it has workers to give them to. If the // task server shuts down or otherwise stops executing tasks, it must // report tasks not executed to the node that reported them with an // ExecutionStatus of NotExecuted. GetTasks(nodeID pyme.NodeID, numWant int, timeout time.Duration) ([]Task, error) // HandIn notifies the task server of the ExecutionStatus of the task // identified by the given TaskID. // If the ExecutionStatus is Success, the task has been completed and // all results have been submitted. // If the ExecutionStatus is NotExecuted, the task has not been // attempted to execute and must be redistributed to the cluster. // Including the NodeID prevents nodes from wrongfully handing in tasks // for another node. HandIn(pyme.NodeID, TaskID, ExecutionStatus) error // QueueStatistics returns statistics about queues running on the // Distributor. QueueStatistics() map[string]QueueStatistics // Announce announces the endpoint of a local node server to a // distributor. // A node has to reannounce at a certain interval - if a distributor // does not receive an announce in a set amount of time it presumes // the node defunct. // It will be taken out of the set of active nodes and all tasks // enqueued for this node will be redistributed. Announce(nodeID pyme.NodeID, ip net.IP, port uint16) (AnnounceResponse, error) }
TaskDistributor describes the central instance for task distribution.
type TaskID ¶
type TaskID string
TaskID is a unique identifier for a task. It consists of the NodeID of the distributor node followed by a dash, the name of the queue this task belongs to, followed by a dash and a unique identifier within the queue.
An example TaskID could look like this: hU5k4sAeOnAhf93nD8f5-series5025-loc25409
func (TaskID) Queue ¶
Queue extracts the queue name from the TaskID. If the TaskID is malformed or the queue name can not be extracted the whole TaskID is returned.
func (TaskID) TaskIdentifier ¶
TaskIdentifier extracts the queue-local task identifier from the TaskID. If the TaskID is malformed or the task identifier can not be extracted the whole TaskID is returned.
type TaskRating ¶
type TaskRating struct { // ID is the TaskID of the referenced task. ID TaskID `json:"id"` // Cost is the estimated execution cost for the referenced task. Cost float64 `json:"cost"` }
TaskRating represents the rating of a task for a single node.
type TaskType ¶
type TaskType string
TaskType is the type of a task.
func TaskTypeFromString ¶
TaskTypeFromString tries to determine the task type from s and returns an error if the task type is unknown.