occamy

package module
v0.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 26, 2022 License: MIT Imports: 7 Imported by: 0

README

Occamy

Servers that utilise spare resources for computational intense distributed tasks.

Elastic horizontal scaling of services in cloud infrastructure is a convenient and effective way to provide the required resources for running a service. For computational intense request which have irregular and unpredictable schedules resources often end up being overprovisioned/underutilised to ensure that requests can be handled promptly.

This library contains a server that utilises spare provisioned resources. The server

  1. ensures resources can promptly handle any incoming request, and
  2. uses spare resources to assist in processing current requests.

The core idea is to have computational tasks which expand by creating assistant tasks to help the computation and can be stopped to provide resources for new incoming request with the server providing coordination.

Documentation
  • The concept document contains the concept for an Occamy server. It is written independent of implementation and is a useful starting point to understanding the core design.

  • The implemention document contains comments on the implementation of the Occamy server as well as useful advice for using this library and implementing the required interfaces.

  • The examples module contains example implementations of interfaces appearing in this module as well as some example use cases. This provides a practical demonstration on how different aspects of occamy work.

    It is recommended that new users start by copying the relevant code from the examples as a way to quickly get started.

Maintainer

Documentation

Overview

Package occamy ... TODO: Complete package description.

Index

Constants

View Source
const (
	ProcessHandleRequestMessage = "handle_request_message"
	ProcessHandleControlMessage = "handle_control_message"
	ProcessExpansion            = "expansion_process"
)
View Source
const (
	TaskGroupNone string = ""
)

Variables

This section is empty.

Functions

This section is empty.

Types

type BasicError

type BasicError string
const (
	// ErrInternalHandlerError covers situations where the
	// handler encounters an internal error.
	ErrInternalHandlerError BasicError = "internal_handler_error"

	// ErrTaskInterrupted is exclusively for when a task
	// (generated by a handler) is interrupted by the occamy
	// server through the context being cancelled.
	ErrTaskInterrupted BasicError = "task_interrupted"

	// ErrInvalidBody covers situations where a message
	// was received with invalid bodies.
	ErrInvalidBody BasicError = "invalid_body"

	// ErrInvalidHeader covers situations where a message
	// was received with invalid headers.
	ErrInvalidHeader BasicError = "invalid_headers"

	// ErrInvalidTask covers situations where the task
	// requested is invalid.
	ErrInvalidTask BasicError = "invalid_task"

	// ErrMessageNotAcked is for when the ack method
	// fails for a message.
	ErrMessageNotAcked BasicError = "message_not_acked"

	// ErrMessageNotNacked is for when the nack method
	// fails for a message.
	ErrMessageNotNacked BasicError = "message_not_nacked"

	// ErrTaskNotAdded is exclusively for when a task
	// (generated by a handler) could not be added.
	ErrTaskNotAdded BasicError = "task_not_added"

	// ErrTaskNotKilled is exclusively for when a task
	//	(generated by a handler) could not be killed.
	ErrTaskNotKilled BasicError = "task_not_killed"
)

func (BasicError) Error

func (e BasicError) Error() string

type DetailedError

type DetailedError struct {
	BasicErr BasicError
	Cause    string
}

func NewDetailedError

func NewDetailedError(err BasicError, cause string) *DetailedError

func (*DetailedError) Error

func (w *DetailedError) Error() string

type ErrorMonitor

type ErrorMonitor interface {
	RecordError(err error)
}

type Handler

type Handler func(header Headers, body []byte) (Task, error)

Handle is a method that takes the header and body of a message and generates a task that must be completed. The header should contain relevant details about the task type and how to decode the message. The header will always contain cid in the field "X-Request-ID". The error returned should be one of the custom occamy errors.

type Headers

type Headers map[string]interface{}

Headers contain the header information of a message.

type LatencyMonitor

type LatencyMonitor interface {
	RecordProcessDuration(process string, duration time.Duration)
	RecordTaskDuration(group string, status SlotStatus, duration time.Duration)
}

type Message

type Message interface {
	// Body must return the body of the message.
	Body() []byte

	// Headers must return the headers of the message.
	Headers() Headers

	// Ack must acknowledge the messages as successfully handled and does not
	// need to be passed to another server.
	Ack() error

	// Reject must negative acknowledge the message as uncompleted. The requeue
	// parameter will determine if the message should be requeued and passed to
	// another server.
	//
	// For example, if a server shuts down the task associated with a message
	// will not be completed and will be passed to another server. On the other
	// hand if
	Reject(requeue bool) error
}

Message represents an asynchronous message.

type Monitors

type Monitors struct {
	Error    ErrorMonitor
	Latency  LatencyMonitor
	Resource ResourceMonitor
}

type NopErrorMonitor

type NopErrorMonitor struct{}

func (NopErrorMonitor) RecordError

func (NopErrorMonitor) RecordError(_ error)

type NopLatencyMonitor

type NopLatencyMonitor struct{}

func (NopLatencyMonitor) RecordProcessDuration

func (NopLatencyMonitor) RecordProcessDuration(_ string, _ time.Duration)

func (NopLatencyMonitor) RecordTaskDuration

func (NopLatencyMonitor) RecordTaskDuration(_ string, _ SlotStatus, _ time.Duration)

type NopResourceMonitor

type NopResourceMonitor struct{}

func (NopResourceMonitor) RecordTaskStarting

func (NopResourceMonitor) RecordTaskStarting(_ string, _ SlotStatus)

func (NopResourceMonitor) RecordTaskStopping

func (NopResourceMonitor) RecordTaskStopping(_ string, _ SlotStatus)

type ResourceMonitor

type ResourceMonitor interface {
	RecordTaskStarting(group string, status SlotStatus)
	RecordTaskStopping(group string, status SlotStatus)
}

type Server

type Server struct {
	// contains filtered or unexported fields
}

Server is a server that handles incoming messages and passes them to the handler (only one is allowed). The task associated with the messages are performed and when possible spare resources are used by the handler for additional tasks. The server must always be created using the NewServer method.

func NewServer

func NewServer(config ServerConfig) *Server

NewServer creates a new occamy server based using the configuration provided. The expansion process is automatically started.

func (*Server) ExpandTasks added in v0.3.0

func (server *Server) ExpandTasks()

ExpandTasks calls expand on running tasks and runs externally added tasks provided there is sufficient space.

It is generally recommended have the server using a periodic expansion. This method has been included so that custom expansion schedules can be performed.

func (*Server) HandleControlMsg

func (server *Server) HandleControlMsg(msg Message)

HandleControlMsg handles a control method.

Control messages with a task ID set in the header will be passed on to ALL tasks which have matching IDs. If there is no task ID set then the message will be interpreted as an optional external request and stored to be used in the expansion process.

func (*Server) HandleRequestMsg

func (server *Server) HandleRequestMsg(msg Message)

HandleRequestMsg handles an incoming request message.

The handler defined in the server is used to generate a task. The task should be started immediately, unless an error is encountered in which case the message will be nacked.

func (*Server) Shutdown

func (server *Server) Shutdown(ctx context.Context)

Shutdown stops the expansion process and ends every task. It will allow some time for the tasks to gracefully stop.

type ServerConfig

type ServerConfig struct {
	// Slots sets how many slots will be crated i.e. the maximum number of
	// simultaneous tasks allowed.
	Slots int

	// ExpansionSlotBuffer sets how many
	ExpansionSlotBuffer int

	// ExpansionPeriod sets how often the expansion process will be run. A value
	// of zero or less will mean that the expansion process will never be run.
	ExpansionPeriod time.Duration

	// KillTimeout sets the maximum duration allowed for task a task to be
	// killed in. This is important to set for allowing unprotected tasks
	// to be killed and replaced by protected ones.
	KillTimeout time.Duration

	// HeaderKeyTaskID sets the header key used to obtain the task ID for any
	// control message received. In other words the value in the header
	// corresponding to this key will be treated as the task ID.
	HeaderKeyTaskID string

	Handler Handler

	Monitors Monitors
}

ServerConfig contains the necessary data to create an occamy server.

type SlotStatus

type SlotStatus string
const (
	SlotStatusEmpty               SlotStatus = "empty"
	SlotStatusProtected           SlotStatus = "protected"
	SlotStatusUnprotectedInternal SlotStatus = "unprotected_internal"
	SlotStatusUnprotectedExternal SlotStatus = "unprotected_external"
)

type Task

type Task interface {
	// Do should perform the task and if a failure is encountered
	// one of the custom occamy errors should be returned.
	//
	// If the context provided is canceled the process must be stopped
	// immediately and ungracefully, and return the error
	// `ErrTaskInterrupted` which will cause the task to be run on
	// another server.
	Do(ctx context.Context) error

	Details() TaskDetails

	// Expand creates additional tasks. These tasks will be unprotected
	// and maybe cancelled at anytime.
	Expand(n int) []Task

	// Handle handles a control message. The error return should be
	// one of the custom occamy errors.
	Handle(ctx context.Context, headers Headers, body []byte) error
}

Task represents a task that must be done.

type TaskDetails

type TaskDetails struct {
	// Deadline is the deadline of the task, if it is known,
	// and is only relevant for tasks that that arrived via
	// the control channel.
	Deadline time.Time

	// The ID of the the task. Any control messages
	// for this task must have this ID included in the header.
	ID string

	// Group denotes the general group that task this belongs to. This
	// is used for monitoring usage.
	Group string
}

type WrappedError

type WrappedError struct {
	BasicErr BasicError
	InnerErr error
}

func NewWrappedError

func NewWrappedError(basicErr BasicError, innerErr error) *WrappedError

func (*WrappedError) Error

func (w *WrappedError) Error() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL