handlers

package
v0.0.0-...-dc29129 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 9, 2023 License: Apache-2.0 Imports: 27 Imported by: 0

Documentation

Index

Constants

View Source
const (
	GRACE_PERIOD int64 = 1 // grace period in seconds before a job is deleted
)
View Source
const (
	MAX_RETRIES int = 3
)

Variables

This section is empty.

Functions

func CheckIndexingJobsImageConfig

func CheckIndexingJobsImageConfig(jobConfigs []JobConfig) error

Check that all "indexing" jobs have both Indexd and Metadata Service creds configured. If not, return an error.

func GetCleanupTime

func GetCleanupTime() int

GetCleanupTime returns the cleanuo time for completed jobs

func GetMaxJobConfig

func GetMaxJobConfig() int

GetMaxJobConfig returns maximum number of jobs allowed to run simultanously

func GetRandString

func GetRandString(n int) string

GetRandString returns a random string of lenght N

func GetValueFromJSON

func GetValueFromJSON(jsonBytes []byte, keys []string) (interface{}, error)

func LookupCredFile

func LookupCredFile() string

LookupCredFile looks up the credential file

func NewJobHandler

func NewJobHandler() *jobHandler

func NewSQSClient

func NewSQSClient() (sqsiface.SQSAPI, error)

NewSQSClient create new SQSAPI client

func ReadFile

func ReadFile(path string) ([]byte, error)

func RegisterJob

func RegisterJob()

func RegisterSystem

func RegisterSystem()

RegisterSystem

func StringContainsPrefixInSlice

func StringContainsPrefixInSlice(s string, prefixList []string) bool

Types

type AWSCredentials

type AWSCredentials struct {
	// contains filtered or unexported fields
}

S3Credentials contains AWS credentials

type JobConfig

type JobConfig struct {
	Name           string
	Pattern        string
	Image          string
	ImageConfig    interface{}
	RequestCPU     string
	RequestMem     string
	DeadLine       int64
	ServiceAccount string
}

type JobInfo

type JobInfo struct {
	UID    string `json:"uid"`
	Name   string `json:"name"`
	Status string `json:"status"`
	URL    string `json:"url"`

	SQSMessage *sqs.Message
	// contains filtered or unexported fields
}

func CreateK8sJob

func CreateK8sJob(inputURL string, jobConfig JobConfig) (*JobInfo, error)

CreateK8sJob creates a k8s job to handle s3 object

func (*JobInfo) DetailedStatus

func (j *JobInfo) DetailedStatus() string

type JobsArray

type JobsArray struct {
	JobInfo []JobInfo `json:"jobs"`
}

type RetryMessage

type RetryMessage struct {
	Bucket string
	Key    string
}

type SQSHandler

type SQSHandler struct {
	QueueURL   string
	Start      bool
	JobConfigs []JobConfig
	// contains filtered or unexported fields
}

func NewSQSHandler

func NewSQSHandler(queueURL string) *SQSHandler

NewSQSHandler creates new SQSHandler instance

func (*SQSHandler) GetIndexingJobStatus

func (handler *SQSHandler) GetIndexingJobStatus(w http.ResponseWriter, r *http.Request)

GetIndexingJobStatus get indexing job status

func (*SQSHandler) HandleDispatchJob

func (handler *SQSHandler) HandleDispatchJob(w http.ResponseWriter, r *http.Request)

HandleDispatchJob dispatch an job

func (*SQSHandler) HandleJobConfig

func (handler *SQSHandler) HandleJobConfig(w http.ResponseWriter, r *http.Request)

HandleJobConfig handles job config endpoints to add a jobConfig

curl -X POST http://localhost/jobConfig -d `{"name": "usersync", "pattern": "s3://bucket/usersync.yaml", "image": "quay.io/cdis/fence:master", "imageConfig":{}}`

to delete jobConfig

curl -X DELETE http://localhost/jobConfig?pattern=s3://bucket/usersync.yaml

func (*SQSHandler) HandleSQSMessage

func (handler *SQSHandler) HandleSQSMessage(message *sqs.Message) error

HandleSQSMessage handles SQS message

The function takes a sqs message as input, extract the object urls and decide which image need to be pulled to handle the s3 object based on the object url

A SQS message may contains multiple records. The service goes though all the records and compute the number of records need to be processed base on their url and jobConfig list. If the number is larger than the availbility of jobpool, the service will take a sleep until the resource is available.

If the function returns an error other than nil, the message is put back to the queue and retry later (handled by `md` library). That makes sure the message is properly handle before it actually deleted

func (*SQSHandler) RegisterSQSHandler

func (handler *SQSHandler) RegisterSQSHandler()

RegisterSQSHandler registers endpoints

func (*SQSHandler) RemoveCompletedJobsProcess

func (handler *SQSHandler) RemoveCompletedJobsProcess()

RemoveCompletedJobsProcess starts the process to remove completed jobs

func (*SQSHandler) RemoveSQSMessage

func (handler *SQSHandler) RemoveSQSMessage(message *sqs.Message) error

RemoveSQSMessage removes SQS message

func (*SQSHandler) RetryCreateIndexingJob

func (handler *SQSHandler) RetryCreateIndexingJob(jsonBytes []byte) error

RetryCreateIndexingJob creates manually job

func (*SQSHandler) StartConsumingProcess

func (handler *SQSHandler) StartConsumingProcess() error

StartConsumingProcess starts consuming the queue

func (*SQSHandler) StartMonitoringProcess

func (handler *SQSHandler) StartMonitoringProcess()

StartMonitoringProcess starts the process to monitor the created job

func (*SQSHandler) StartServer

func (handler *SQSHandler) StartServer() error

StartServer starts a server

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL