orchestrator

package
v1.0.0-beta.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 22, 2023 License: GPL-3.0 Imports: 28 Imported by: 0

README

Orchestrator

The orchestrator module is responsible for triggering analysis, autofix and transformer jobs on the configured architecture — Kubernetes, Nomad, etc.

Flows

flowchart LR
    http(HTTP) & grpc(gRPC) & rmq(RMQ) --> protocol("protocol (Interface)")
    protocol --> command(command)
    command --> driver("driver(Interface)")
    driver --> Kubernetes(Kubernetes) & Nomad(Nomad)

Implementation Specs

  • The orchestrator module will be used for initializing a server that is configured via the runner config file.

  • The config file will specify which protocol is being used to communicate with the server - HTTP, gRPC, etc., and what infrastructure the jobs will be spawned on - Kubernetes, Nomad, etc.

  • The orchestrator will initialize and split off into a separate goroutine and keep listening for requests for the configured protocol.

  • The requests will be of the following types (as currently configured in atlas):

    • analysis_run
    • autofix_run
    • transforer_run
    • ssh_verify_run
    • cancel_check_run
  • These requests will be translated via the protocol interface into their respective implementation; HTTP, gRPC, etc.

  • The request having been validated and sanitized, will call the respective command in the command module.

  • The command module will then call the driver module to execute the command on the configured driver: Kubernetes, Nomad, etc.

SAMPLE PAYLOAD

Analysis /tasks/analysis

{"run_id": "f231fc3d-d586-4d65-a478-402005e1736f", "run_serial": "0", "config": {"version": 1, "analyzers": [{"name": "python", "enabled": true, "meta": {"runtime_version": "3.x.x"}}], "transformers": [{"name": "autopep8", "enabled": true}, {"name": "isort", "enabled": true}, {"name": "black", "enabled": true}]}, "ds_config_updated": false, "vcs_meta": {"remote_url": "https://***:***@github.com/t3st-org/monitors-test-repo.git", "is_for_default_analysis_branch": false, "base_branch": "main", "base_oid": "5cd7c55b65720b5251d85e53dfb1dd7c6fb6561e", "checkout_oid": "7d8dd9f8f0b1d1c7aaafda6f4adde8ca13db185c", "clone_submodules": false}, "keys": {}, "checks": [{"check_seq": "1", "artifacts": [], "analyzer_meta": {"name": "python", "analyzer_type": "core", "command": "/app/marvin-python", "version": "dev", "cpu_limit": "2400", "memory_limit": "6501"}, "processors": ["source_code_load", "skip_cq"], "diff_meta_commits": []}], "_meta": {"resource_pool": "plan-pf"}}

Autofix /tasks/autofix

{"run_id": "277adfe1-f7ea-4767-bace-8e84e2556b69", "run_serial": "0", "config": {"version": 1, "analyzers": [{"meta": {"runtime_version": "3.x.x"}, "name": "python", "enabled": true}], "transformers": [{"name": "autopep8", "enabled": true}]}, "vcs_meta": {"remote_url": "https://***:***@github.com/t3st-org/demo-python.git", "checkout_oid": "2b9caeb6f21d9f13bb1cc1bdb984ebdccb1956f2", "base_branch": "master", "clone_submodules": false}, "keys": {}, "autofixer": {"autofix_meta": {"name": "python", "command": "/app/autofix.sh", "version": "dev", "cpu_limit": "2400", "memory_limit": "6501"}, "autofixes": [{"issue_code": "PTC-W0014", "occurrences": [{"path": "demo_code.py", "position": {"begin": {"line": 56, "column": 4}, "end": {"line": 56, "column": 4}}}]}]}, "_meta": {"resource_pool": "plan-pf"}}

Documentation

Index

Constants

View Source
const (
	CoatVersion = "latest"

	LabelNameManager  = "manager"
	LabelNameRole     = "role"
	LabelNameApp      = "application"
	LabelNameAnalyzer = "analyzer"

	EnvNameCodePath                 = "CODE_PATH"
	EnvNameToolboxPath              = "TOOLBOX_PATH"
	EnvNameMemoryLimit              = "MEMORY_LIMIT"
	EnvNameCPULimit                 = "CPU_LIMIT"
	EnvNameTimeLimit                = "TIME_LIMIT"
	EnvNameOnPrem                   = "ON_PREM"
	EnvNameSSHPrivateKey            = "SSH_PRIVATE_KEY"
	EnvNameSSHPublicKey             = "SSH_PUBLIC_KEY"
	EnvNamePublisher                = "PUBLISHER"
	EnvNamePublisherURL             = "RESULT_HTTP_URL"
	EnvNamePublisherToken           = "RESULT_HTTP_TOKEN"
	EnvNameResultTask               = "RESULT_RMQ_TASK"
	EnvNameArtifactsCredentialsPath = "ARTIFACTS_CREDENTIALS_PATH"
	EnvNameArtifactsSecretName      = "TASK_ARTIFACT_SECRET_NAME"
	EnvNameSentryDSN                = "SENTRY_DSN"

	MarvinCmdCpy       = "cp /marvin/marvin /toolbox &&"
	MarvinCmdBase      = "/toolbox/marvin"
	MarvinCmdArgConfig = "--config"

	MarvinModeAnalyze   = "--analyze"
	MarvinModeAutofix   = "--autofix"
	MarvinModeTransform = "--transform"

	MarvinSnippetStorageType   = "--snippet-storage-type"
	MarvinSnippetStorageBucket = "--snippet-storage-bucket"

	CoatCmdName = "/app/coat"

	CoatArgNameRunID            = "--run-id"
	CoatArgNameCheckSeq         = "--check-seq"
	CoatArgNameRemoteURL        = "--remote-url"
	CoatArgNameCheckoutOid      = "--checkout-oid"
	CoatArgNameCloneSubmodules  = "--clone-submodules"
	CoatArgTestCoverageArtifact = "--artifacts"
	CoatArgNameDecryptRemote    = "--decrypt-remote-url"
	CoatArgSnippetStorageBucket = "--snippet-storage-bucket"
	CoatArgSnippetStorageType   = "--snippet-storage-type"
	CoatArgNameBaseBranch       = "--base-branch"
	CoatArgPatchMeta            = "--patch-meta"
	CoatArgArtifacts            = "--artifacts"

	CoatCPULimit      = "1400m"
	CoatMemoryLimit   = "4000Mi"
	CoatCPURequest    = "300m"
	CoatMemoryRequest = "500Mi"

	ScopeAnalysis  = "analysis.*"
	ScopeAutofix   = "autofix.*"
	ScopeTransform = "transform.*"

	AnalysisResultTask    = "contrib.atlas.tasks.store_analysis_run_result"
	AutofixResultTask     = "contrib.atlas.tasks.store_autofix_run_result"
	TransformerResultTask = "contrib.atlas.tasks.store_transformer_run_result"
	CancelCheckResultTask = "contrib.atlas.tasks.confirm_check_cancellation"
	PatcherResultTask     = "contrib.runner.tasks.store_autofix_committer_result"
)
View Source
const (
	KindJob      = "Job"
	APIVersionV1 = "batch/v1"
)
View Source
const (
	DriverTypePrinter = "printer"
	DriverTypeK8s     = "k8s"
)
View Source
const CleanupInterval = -1 * time.Hour
View Source
const DefaultK8sTokenPath = "/var/run/secrets/kubernetes.io/serviceaccount/token"
View Source
const (
	DriverPrinter = "printer"
)

Variables

View Source
var (
	ErrMissingOpts = errors.New("missing opts")
)
View Source
var ErrUnknownStorageProvider = errors.New("unknown storage provider")
View Source
var VolumeMounts = map[string]string{
	"codedir":      "/code",
	"artifactsdir": "/artifacts",
	"ssh":          "/home/runner/.ssh",
	"marvindir":    "/marvin",
}

Functions

This section is empty.

Types

type AnalysisDriverJob

type AnalysisDriverJob struct {
	// contains filtered or unexported fields
}

AnalysisDriverJob is a struct that implements the IDriverJob interface.

func (*AnalysisDriverJob) Container

func (j *AnalysisDriverJob) Container() *Container

func (*AnalysisDriverJob) ImagePullSecrets

func (j *AnalysisDriverJob) ImagePullSecrets() []string

func (*AnalysisDriverJob) InitContainer

func (j *AnalysisDriverJob) InitContainer() *Container

func (*AnalysisDriverJob) JobLabels

func (j *AnalysisDriverJob) JobLabels() map[string]string

func (*AnalysisDriverJob) Name

func (j *AnalysisDriverJob) Name() string

func (*AnalysisDriverJob) Namespace

func (j *AnalysisDriverJob) Namespace() string

func (*AnalysisDriverJob) NodeSelector

func (j *AnalysisDriverJob) NodeSelector() map[string]string

func (*AnalysisDriverJob) PodLabels

func (j *AnalysisDriverJob) PodLabels() map[string]string

func (*AnalysisDriverJob) Volumes

func (*AnalysisDriverJob) Volumes() []string

type AnalysisOpts

type AnalysisOpts struct {
	PublisherURL         string
	PublisherToken       string
	SnippetStorageType   string
	SnippetStorageBucket string

	SentryDSN string

	KubernetesOpts *KubernetesOpts
}

type AnalysisRunRequest

type AnalysisRunRequest struct {
	Run            *artifact.AnalysisRun
	AppID          string
	InstallationID string
}

type AnalysisTask

type AnalysisTask struct {
	// contains filtered or unexported fields
}

func NewAnalysisTask

func NewAnalysisTask(runner *Runner, opts *TaskOpts, driver Driver, provider Provider, signer Signer) *AnalysisTask

func (*AnalysisTask) Run

Run executes the analysis task for the given analysis run. For each check in the run, it creates a new analysis driver job and triggers the job in a separate goroutine. The function waits for all jobs to complete before returning.

The context is used to control the overall execution of the task. The run parameter contains the information about the analysis run to be executed.

If any of the driver jobs fail, Run logs the error and returns it. If all jobs complete successfully, Run returns nil.

Example usage:

err := task.Run(ctx, run)
if err != nil {
  log.Fatal(err)
}

Run is safe for concurrent use.

type AutofixConfig

type AutofixConfig struct {
	*artifact.MarvinAutofixConfig
}

func NewAutofixConfig

func NewAutofixConfig(run *artifact.AutofixRun) (*AutofixConfig, error)

func (*AutofixConfig) Bytes

func (c *AutofixConfig) Bytes() ([]byte, error)

type AutofixDriverJob

type AutofixDriverJob struct {
	// contains filtered or unexported fields
}

func (*AutofixDriverJob) Container

func (j *AutofixDriverJob) Container() *Container

func (*AutofixDriverJob) ImagePullSecrets

func (j *AutofixDriverJob) ImagePullSecrets() []string

func (*AutofixDriverJob) InitContainer

func (j *AutofixDriverJob) InitContainer() *Container

func (*AutofixDriverJob) JobLabels

func (j *AutofixDriverJob) JobLabels() map[string]string

func (*AutofixDriverJob) Name

func (j *AutofixDriverJob) Name() string

func (*AutofixDriverJob) Namespace

func (j *AutofixDriverJob) Namespace() string

func (*AutofixDriverJob) NodeSelector

func (j *AutofixDriverJob) NodeSelector() map[string]string

func (*AutofixDriverJob) PodLabels

func (j *AutofixDriverJob) PodLabels() map[string]string

func (*AutofixDriverJob) Volumes

func (*AutofixDriverJob) Volumes() []string

type AutofixOpts

type AutofixOpts struct {
	PublisherURL         string
	PublisherToken       string
	SnippetStorageType   string
	SnippetStorageBucket string

	SentryDSN string

	KubernetesOpts *KubernetesOpts
}

type AutofixRunRequest

type AutofixRunRequest struct {
	Run            *artifact.AutofixRun
	AppID          string
	InstallationID string
}

type AutofixTask

type AutofixTask struct {
	// contains filtered or unexported fields
}

func NewAutofixTask

func NewAutofixTask(runner *Runner, opts *TaskOpts, driver Driver, provider Provider, signer Signer) *AutofixTask

func (*AutofixTask) Run

type CancelCheckDriverJob

type CancelCheckDriverJob struct {
	// contains filtered or unexported fields
}

func (*CancelCheckDriverJob) Name

func (j *CancelCheckDriverJob) Name() string

func (*CancelCheckDriverJob) Namespace

func (j *CancelCheckDriverJob) Namespace() string

type CancelCheckOpts

type CancelCheckOpts struct {
	KubernetesOpts *KubernetesOpts
}

type CancelCheckTask

type CancelCheckTask struct {
	// contains filtered or unexported fields
}

func NewCancelCheckTask

func NewCancelCheckTask(runner *Runner, opts *TaskOpts, driver Driver, signer Signer, client *http.Client) *CancelCheckTask

NewCancelCheckTask registers a new cancel check task with the supplied properties of driver, provider facade and license store.

func (*CancelCheckTask) Run

Run creates the template for the cancel check job and triggers the cancel check job.

type Cleaner

type Cleaner struct {
	// contains filtered or unexported fields
}

func NewCleaner

func NewCleaner(driver Driver, opts *CleanerOpts) *Cleaner

func (*Cleaner) Start

func (c *Cleaner) Start(ctx context.Context)

type CleanerOpts

type CleanerOpts struct {
	Namespace string
	Interval  *time.Duration
}

type Container

type Container struct {
	Env          map[string]string
	VolumeMounts map[string]string
	Limit        Resource
	Requests     Resource
	Name         string
	Image        string
	Cmd          []string
	Args         []string
}

type Driver

type Driver interface {
	TriggerJob(ctx context.Context, request JobCreator) error
	DeleteJob(ctx context.Context, request JobDeleter) error
	CleanExpiredJobs(ctx context.Context, namespace string, interval *time.Duration) error
}

func NewK8sDriver

func NewK8sDriver(tokenPath string) (Driver, error)

func NewK8sPrinterDriver

func NewK8sPrinterDriver() Driver

type Facade

type Facade struct {
	OrchestratorHandler *Handler
	Cleaner             *Cleaner
}

func New

func New(opts *Opts) (*Facade, error)

func (*Facade) AddRoutes

func (f *Facade) AddRoutes(router Router, middleware []echo.MiddlewareFunc) Router

type Handler

type Handler struct {
	// contains filtered or unexported fields
}

func NewHandler

func NewHandler(
	opts *TaskOpts,
	driver Driver,
	provider Provider,
	signer Signer,
	runner *Runner,
) *Handler

func (*Handler) HandleAnalysis

func (h *Handler) HandleAnalysis(c echo.Context) error

func (*Handler) HandleAutofix

func (h *Handler) HandleAutofix(c echo.Context) error

func (*Handler) HandleCancelCheck

func (h *Handler) HandleCancelCheck(c echo.Context) error

HandleCancelCheck handles the cancel check workflow.

func (*Handler) HandlePatcher

func (h *Handler) HandlePatcher(c echo.Context) error

HandlePatcher handles the patching job workflow.

func (*Handler) HandleTransformer

func (h *Handler) HandleTransformer(c echo.Context) error

type JobCreator

type JobCreator interface {
	Name() string
	Namespace() string

	JobLabels() map[string]string
	PodLabels() map[string]string

	Volumes() []string

	Container() *Container
	InitContainer() *Container

	NodeSelector() map[string]string

	ImagePullSecrets() []string
}

Type JobCreator interface defines methods to access data required for a job creation irrespective of the driver implementation.

func NewAnalysisDriverJob

func NewAnalysisDriverJob(run *artifact.AnalysisRun, check artifact.Check, opts *AnalysisOpts) (JobCreator, error)

func NewAutofixDriverJob

func NewAutofixDriverJob(run *artifact.AutofixRun, opts *AutofixOpts) (JobCreator, error)

func NewPatcherDriverJob

func NewPatcherDriverJob(run *artifact.PatcherRun, opts *PatcherJobOpts) (JobCreator, error)

NewPatcherDriverJob is responsible for creating the patcher run config and then returning an instance of the patcher job.

func NewTransformerJob

func NewTransformerJob(run *artifact.TransformerRun, opts *TransformerOpts) (JobCreator, error)

type JobDeleter

type JobDeleter interface {
	Name() string
	Namespace() string
}

Type JobDeleter interface defines methods to access data required for a job deletion irrespective of the driver implementation.

func NewCancelCheckDriverJob

func NewCancelCheckDriverJob(run *artifact.CancelCheckRun, opts *CancelCheckOpts) (JobDeleter, error)

type K8sDriver

type K8sDriver struct {
	// contains filtered or unexported fields
}

func (*K8sDriver) CleanExpiredJobs

func (d *K8sDriver) CleanExpiredJobs(ctx context.Context, namespace string, interval *time.Duration) error

func (*K8sDriver) DeleteJob

func (d *K8sDriver) DeleteJob(ctx context.Context, job JobDeleter) error

DeleteJob deletes the kubernetes job supplied as a parameter.

func (*K8sDriver) TriggerJob

func (d *K8sDriver) TriggerJob(ctx context.Context, job JobCreator) error

TriggerJob creates the kubernetes job supplied as a parameter.

type K8sPrinterDriver

type K8sPrinterDriver struct{}

func (*K8sPrinterDriver) CleanExpiredJobs

func (*K8sPrinterDriver) CleanExpiredJobs(_ context.Context, _ string, _ *time.Duration) error

func (*K8sPrinterDriver) DeleteJob

func (*K8sPrinterDriver) DeleteJob(_ context.Context, _ JobDeleter) error

func (*K8sPrinterDriver) TriggerJob

func (*K8sPrinterDriver) TriggerJob(_ context.Context, job JobCreator) error

type KubernetesOpts

type KubernetesOpts struct {
	Namespace        string
	NodeSelector     map[string]string
	ImageURL         url.URL
	ImagePullSecrets []string
}

type MarvinAnalysisConfig

type MarvinAnalysisConfig struct {
	*artifact.MarvinAnalysisConfig
}

func NewMarvinAnalysisConfig

func NewMarvinAnalysisConfig(run *artifact.AnalysisRun, check artifact.Check) *MarvinAnalysisConfig

func (*MarvinAnalysisConfig) Bytes

func (c *MarvinAnalysisConfig) Bytes() ([]byte, error)

type MarvinK8sJob

type MarvinK8sJob struct {
	JobCreator
}

func (*MarvinK8sJob) Job

func (j *MarvinK8sJob) Job() (*batchv1.Job, error)

type Opts

type Opts struct {
	*TaskOpts
	*CleanerOpts
	Provider
	Signer
	Driver
	*Runner
}

type PatcherDriverJob

type PatcherDriverJob struct {
	// contains filtered or unexported fields
}

PatcherDriverJob represents the patcher job and the data required by it in the form of config or opts.

func (*PatcherDriverJob) Container

func (j *PatcherDriverJob) Container() *Container

func (*PatcherDriverJob) ImagePullSecrets

func (j *PatcherDriverJob) ImagePullSecrets() []string

func (*PatcherDriverJob) InitContainer

func (*PatcherDriverJob) InitContainer() *Container

func (*PatcherDriverJob) JobLabels

func (j *PatcherDriverJob) JobLabels() map[string]string

func (*PatcherDriverJob) Name

func (j *PatcherDriverJob) Name() string

func (*PatcherDriverJob) Namespace

func (j *PatcherDriverJob) Namespace() string

func (*PatcherDriverJob) NodeSelector

func (j *PatcherDriverJob) NodeSelector() map[string]string

func (*PatcherDriverJob) PodLabels

func (j *PatcherDriverJob) PodLabels() map[string]string

func (*PatcherDriverJob) Volumes

func (*PatcherDriverJob) Volumes() []string

type PatcherJobOpts

type PatcherJobOpts struct {
	PublisherURL         string
	PublisherToken       string
	SnippetStorageType   string
	SnippetStorageBucket string

	SentryDSN string

	KubernetesOpts *KubernetesOpts
}

PatcherJobOpts represents the data that needs to be passed to the patcher job like the results URL.

type PatcherRunRequest

type PatcherRunRequest struct {
	Run            *artifact.PatcherRun
	AppID          string
	InstallationID string
}

PatcherRunRequest represents the data corresponding to the patcher run including the AppID and InstallationID of the client.

type PatcherTask

type PatcherTask struct {
	// contains filtered or unexported fields
}

PatcherTask represents the patcher job task structure.

func NewPatcherTask

func NewPatcherTask(runner *Runner, opts *TaskOpts, driver Driver, provider Provider, signer Signer) *PatcherTask

NewPatcherTask creates a new patching job task based on PatcherTask structure and returns it.

func (*PatcherTask) Run

PatcherTask.Run creates a new patcher job based on the data passed to it and then triggers that job using the specified driver in the task.

type Provider

type Provider interface {
	AuthenticatedRemoteURL(appID, installationID string, srcURL string) (string, error)
}

type Resource

type Resource struct {
	CPU    string
	Memory string
}

type Router

type Router interface {
	AddRoute(method string, path string, handlerFunc echo.HandlerFunc, middleware ...echo.MiddlewareFunc)
}

type Runner

type Runner struct {
	ID string
}

type Signer

type Signer interface {
	GenerateToken(issuer string, scope []string, claims map[string]interface{}, expiry time.Duration) (string, error)
}

type Storer

type Storer interface {
	GenerateURL(bucket, object string) (string, error)
}

type TaskOpts

type TaskOpts struct {
	RemoteHost           string
	SnippetStorageType   string
	SnippetStorageBucket string

	SentryDSN string

	KubernetesOpts *KubernetesOpts
}

type TransformerJob

type TransformerJob struct {
	// contains filtered or unexported fields
}

func (*TransformerJob) Container

func (j *TransformerJob) Container() *Container

func (*TransformerJob) ImagePullSecrets

func (j *TransformerJob) ImagePullSecrets() []string

func (*TransformerJob) InitContainer

func (j *TransformerJob) InitContainer() *Container

func (*TransformerJob) JobLabels

func (j *TransformerJob) JobLabels() map[string]string

func (*TransformerJob) Name

func (j *TransformerJob) Name() string

func (*TransformerJob) Namespace

func (j *TransformerJob) Namespace() string

func (*TransformerJob) NodeSelector

func (j *TransformerJob) NodeSelector() map[string]string

func (*TransformerJob) PodLabels

func (j *TransformerJob) PodLabels() map[string]string

func (*TransformerJob) Volumes

func (*TransformerJob) Volumes() []string

type TransformerMarvinConfig

type TransformerMarvinConfig struct {
	*artifact.MarvinTransformerConfig
}

func NewTransformerMarvinConfig

func NewTransformerMarvinConfig(run *artifact.TransformerRun) *TransformerMarvinConfig

func (*TransformerMarvinConfig) Bytes

func (c *TransformerMarvinConfig) Bytes() ([]byte, error)

type TransformerOpts

type TransformerOpts struct {
	PublisherURL   string
	PublisherToken string

	SentryDSN string

	KubernetesOpts *KubernetesOpts
}

type TransformerRunRequest

type TransformerRunRequest struct {
	Run            *artifact.TransformerRun
	AppID          string
	InstallationID string
}

type TransformerTask

type TransformerTask struct {
	// contains filtered or unexported fields
}

func NewTransformerTask

func NewTransformerTask(runner *Runner, opts *TaskOpts, driver Driver, provider Provider, signer Signer) *TransformerTask

func (*TransformerTask) Run

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL