line

package
v0.0.0-...-eee359e Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 8, 2017 License: Apache-2.0 Imports: 23 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	//ErrAllocExists means a alloc exists while it was expected not to
	ErrAllocExists = errors.New("alloc already exists")

	//ErrAllocNotExists means a alloc was not found while expecting it to exist
	ErrAllocNotExists = errors.New("alloc doesn't exist")
)
View Source
var (
	//ErrPoolExists means a pool exists while it was expected not to
	ErrPoolExists = errors.New("pool already exists")

	//ErrPoolNotExists means a pool was not found while expecting it to exist
	ErrPoolNotExists = errors.New("pool doesn't exist")
)
View Source
var (
	//ErrWorkerExists means a worker exists while it was expected not to
	ErrWorkerExists = errors.New("worker already exists")

	//ErrWorkerNotExists means a worker was not found while expecting it to exist
	ErrWorkerNotExists = errors.New("worker doesn't exist")
)
View Source
var Handlers = map[*regexp.Regexp]Handler{
	regexp.MustCompile(`-schedule$`): HandleSchedule,
	regexp.MustCompile(`-release$`):  HandleRelease,
	regexp.MustCompile(`-gateway$`):  HandleGateway,
}

Handlers map arn suffixes to actual event handlers

Functions

func DeleteReplica

func DeleteReplica(conf *Conf, db DB, pk ReplicaPK) (err error)

DeleteReplica deletes a replica by pk

func DeleteWorker

func DeleteWorker(conf *Conf, db DB, pk WorkerPK) (err error)

DeleteWorker deletes a worker by pk

func FmtPoolQueueName

func FmtPoolQueueName(conf *Conf, poolID string) string

FmtPoolQueueName will format a sqs queue name consistently

func FmtPoolQueueURL

func FmtPoolQueueURL(conf *Conf, poolID string) string

FmtPoolQueueURL is able to "predict" an sqs queue url from configurations

func FmtReplicaID

func FmtReplicaID(datasetID, workerID string) string

FmtReplicaID formats the combined pool and worker id of a replica

func FmtWorkerQueueName

func FmtWorkerQueueName(conf *Conf, poolID, workerID string) string

FmtWorkerQueueName will format a sqs queue name consistently

func FmtWorkerQueueURL

func FmtWorkerQueueURL(conf *Conf, poolID, workerID string) string

FmtWorkerQueueURL is able to "predict" an sqs queue url from configurations

func HandleGateway

func HandleGateway(conf *Conf, svc *Services, ev json.RawMessage) (res interface{}, err error)

HandleGateway takes invocations from the API Gateway and handles them as HTTP requests to return HTTP responses based on restful principles

func HandleRelease

func HandleRelease(conf *Conf, svc *Services, ev json.RawMessage) (res interface{}, err error)

HandleRelease is a Lambda handler that periodically queries a pool's expired allocations, replicas and workers

func HandleSchedule

func HandleSchedule(conf *Conf, svc *Services, ev json.RawMessage) (res interface{}, err error)

HandleSchedule is a Lambda handler that periodically reads from the scheduling queue and queries the workers table for available capacity. If the capacity can be claimed an allocation is created.

func Mux

func Mux(conf *Conf, svc *Services) http.Handler

Mux sets up the HTTP multiplexer

func PutNewAlloc

func PutNewAlloc(conf *Conf, db DB, alloc *Alloc) (err error)

PutNewAlloc will put an alloc with the condition the pk doesn't exist yet

func PutNewPool

func PutNewPool(conf *Conf, db DB, pool *Pool) (err error)

PutNewPool will put an pool with the condition the pk doesn't exist yet

func PutNewWorker

func PutNewWorker(conf *Conf, db DB, worker *Worker) (err error)

PutNewWorker will put an worker with the condition the pk doesn't exist yet

func PutReplica

func PutReplica(conf *Conf, db DB, replica *Replica) (err error)

PutReplica will put an replica with the condition the pk doesn't exist yet

func ReceiveEvals

func ReceiveEvals(conf *Conf, svc *Services, pool *Pool) (err error)

ReceiveEvals will long poll for scheduling messages on the scheduling queue of the pool

func UpdateAllocTTL

func UpdateAllocTTL(conf *Conf, db DB, ttl int64, apk AllocPK) (err error)

UpdateAllocTTL under the condition that it exists

func UpdatePoolTTL

func UpdatePoolTTL(conf *Conf, db DB, ttl int64, pk PoolPK) (err error)

UpdatePoolTTL under the condition that it exists

func UpdateWorkerTTL

func UpdateWorkerTTL(conf *Conf, db DB, ttl int64, pk WorkerPK) (err error)

UpdateWorkerTTL under the condition that it exists

Types

type Alloc

type Alloc struct {
	AllocPK
	TTL      int64  `dynamodbav:"ttl"`
	WorkerID string `dynamodbav:"wrk"`
	Eval     *Eval  `dynamodbav:"eval"`
}

Alloc represents a planned execution

func GetAlloc

func GetAlloc(conf *Conf, db DB, pk AllocPK) (alloc *Alloc, err error)

GetAlloc returns a pool by its primary key

func Schedule

func Schedule(conf *Conf, svc *Services, eval *Eval, pool *Pool, replicas []*Replica) (alloc *Alloc, err error)

Schedule will try to query the workers table for available room and conditionally update their capacity if it fits.

type AllocPK

type AllocPK struct {
	PoolID  string `dynamodbav:"pool"`
	AllocID string `dynamodbav:"alloc"`
}

AllocPK describes the alloc's primary key in the base table

type Conf

type Conf struct {
	Deployment         string `envconfig:"DEPLOYMENT"`
	AWSAccountID       string `envconfig:"AWS_ACCOUNT_ID"`
	AWSAccessKeyID     string `envconfig:"AWS_ACCESS_KEY_ID"`
	AWSSecretAccessKey string `envconfig:"AWS_SECRET_ACCESS_KEY"`
	AWSRegion          string `envconfig:"AWS_REGION"`
	StripBaseMappings  int    `envconfig:"STRIP_BASE_MAPPINGS"`

	PoolTTL            int64  `envconfig:"POOL_TTL"`
	WorkerTTL          int64  `envconfig:"WORKER_TTL"`
	ReplicaTTL         int64  `envconfig:"REPLICA_TTL"`
	AllocTTL           int64  `envconfig:"ALLOC_TTL"`
	MaxRetry           int    `envconfig:"MAX_RETRY"`
	ScheduleDLQueueURL string `envconfig:"SCHEDULE_DLQUEUE_URL"`

	PoolsTableName     string `envconfig:"TABLE_NAME_POOLS"`
	ReplicasTableName  string `envconfig:"TABLE_NAME_REPLICAS"`
	ReplicasTTLIdxName string `envconfig:"TABLE_IDX_REPLICAS_TTL"`
	WorkersTTLIdxName  string `envconfig:"TABLE_IDX_WORKERS_TTL"`
	WorkersTableName   string `envconfig:"TABLE_NAME_WORKERS"`
	WorkersCapIdxName  string `envconfig:"TABLE_IDX_WORKERS_CAP"`
	AllocsTableName    string `envconfig:"TABLE_NAME_ALLOCS"`
	AllocsTTLIdxName   string `envconfig:"TABLE_IDX_ALLOCS_TTL"`
}

Conf holds our configuration taken from the environment

type DB

DB is our alias for the dynamodb iface

type Eval

type Eval struct {
	Dataset string `dynamodbav:"set"`  //certain dataset must be available
	Size    int    `dynamodbav:"size"` //certain capacity must be available
	Retry   int    `dynamodbav:"try"`
}

Eval is a scheduling evaluation

type GatewayRequest

type GatewayRequest struct {
	HTTPMethod            string
	Headers               map[string]string
	Resource              string
	PathParameters        map[string]string
	Path                  string
	QueryStringParameters map[string]string
	Body                  string
	IsBase64Encoded       bool
	StageVariables        map[string]string
}

GatewayRequest represents an Amazon API Gateway Proxy Event.

type GatewayResponse

type GatewayResponse struct {
	StatusCode int               `json:"statusCode"`
	Body       string            `json:"body"`
	Headers    map[string]string `json:"headers"`
}

GatewayResponse is returned to the API Gateway

type Handler

type Handler func(conf *Conf, svc *Services, ev json.RawMessage) (interface{}, error)

Handler describes a Lambda handler that matches a specific suffic

type Pool

type Pool struct {
	PoolPK
	QueueURL string `dynamodbav:"que"`
	TTL      int64  `dynamodbav:"ttl"`
}

Pool represents capacity provided by pools

func GetActivePool

func GetActivePool(conf *Conf, db DB, pk PoolPK) (pool *Pool, err error)

GetActivePool will get a pool by its pk but errors if it's disbanded

func GetPool

func GetPool(conf *Conf, db DB, pk PoolPK) (pool *Pool, err error)

GetPool returns a pool by its primary key

type PoolPK

type PoolPK struct {
	PoolID string `dynamodbav:"pool"`
}

PoolPK describes the pool's primary key in the base table

type Replica

type Replica struct {
	ReplicaPK
	TTL int64 `dynamodbav:"ttl"`
}

Replica represents the clone of a dataset available on a certain worker

func FindReplicas

func FindReplicas(conf *Conf, svc *Services, eval *Eval, pool *Pool) ([]*Replica, error)

FindReplicas returns locality information for an evaluation

type ReplicaPK

type ReplicaPK struct {
	PoolID    string `dynamodbav:"pool"`
	ReplicaID string `dynamodbav:"rpl"`
}

ReplicaPK describes the replicas PK in the base tample

type Services

type Services struct {
	SQS  sqsiface.SQSAPI           //message queues
	DB   dynamodbiface.DynamoDBAPI //dynamodb nosql database
	Logs *zap.Logger               //logging service
}

Services hold our backend services

type Worker

type Worker struct {
	WorkerPK
	Capacity int    `dynamodbav:"cap"`
	QueueURL string `dynamodbav:"que"`
	TTL      int64  `dynamodbav:"ttl"`
}

Worker represents a source of capacity

func GetWorker

func GetWorker(conf *Conf, db DB, pk WorkerPK) (worker *Worker, err error)

GetWorker returns a worker by its primary key

type WorkerPK

type WorkerPK struct {
	PoolID   string `dynamodbav:"pool"`
	WorkerID string `dynamodbav:"wrk"`
}

WorkerPK uniquely identifies a worker

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL