worker

package
v0.0.0-...-2c9ee18 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 15, 2017 License: Apache-2.0 Imports: 27 Imported by: 0

Documentation

Index

Constants

View Source
const ImageMetadataFile = "metadata.json"
View Source
const RawRootFSScheme = "raw"

Variables

View Source
var (
	ErrNoWorkers     = errors.New("no workers")
	ErrMissingWorker = errors.New("worker for container is missing")
)
View Source
var ErrBaseResourceTypeNotFound = errors.New("base-resource-type-not-found")
View Source
var ErrCreatedContainerNotFound = errors.New("container-in-created-state-not-found-in-garden")
View Source
var ErrCreatedVolumeNotFound = errors.New("failed-to-find-created-volume-in-baggageclaim")
View Source
var ErrDesiredWorkerNotRunning = errors.New("desired-garden-worker-is-not-known-to-be-running")
View Source
var ErrIncompatiblePlatform = errors.New("incompatible platform")
View Source
var ErrMismatchedTags = errors.New("mismatched tags")
View Source
var ErrMissingVolume = errors.New("volume mounted to container is missing")
View Source
var ErrNoVolumeManager = errors.New("worker does not support volume management")
View Source
var ErrNotImplemented = errors.New("Not implemented")
View Source
var ErrTeamMismatch = errors.New("mismatched team")
View Source
var ErrUnsupportedResourceType = errors.New("unsupported resource type")
View Source
var ErrVolumeExpiredImmediately = errors.New("volume expired immediately after saving")

Functions

func NewHardcoded

func NewHardcoded(
	logger lager.Logger, workerDB SaveWorkerDB, clock c.Clock,
	gardenAddr string, baggageclaimURL string, resourceTypesNG []atc.WorkerResourceType,
) ifrit.RunFunc

Types

type ArtifactDestination

type ArtifactDestination interface {
	// StreamIn is called with a destination directory and the tar stream to
	// expand into the destination directory.
	StreamIn(string, io.Reader) error
}

Destination is the inverse of Source. This interface allows the receiving end to determine the location of the data, e.g. based on a task's input configuration.

type ArtifactName

type ArtifactName string

ArtifactName is just a string, with its own type to make interfaces using it more self-documenting.

type ArtifactRepository

type ArtifactRepository struct {
	// contains filtered or unexported fields
}

ArtifactRepository is the mapping from a ArtifactName to an ArtifactSource. Steps will both populate this map with new artifacts (e.g. the resource fetched by a Get step), and look up required artifacts (e.g. the inputs configured for a Task step).

There is only one ArtifactRepository for the duration of a build plan's execution.

ArtifactRepository is, itself, an ArtifactSource. As an ArtifactSource it acts as the set of all ArtifactSources it contains, as if they were each in subdirectories corresponding to their ArtifactName.

func NewArtifactRepository

func NewArtifactRepository() *ArtifactRepository

NewArtifactRepository constructs a new repository.

func (*ArtifactRepository) AsMap

AsMap extracts the current contents of the ArtifactRepository into a new map and returns it. Changes to the returned map or the ArtifactRepository will not affect each other.

func (*ArtifactRepository) RegisterSource

func (repo *ArtifactRepository) RegisterSource(name ArtifactName, source ArtifactSource)

RegisterSource inserts an ArtifactSource into the map under the given ArtifactName. Producers of artifacts, e.g. the Get step and the Task step, will call this after they've successfully produced their artifact(s).

func (*ArtifactRepository) ScopedTo

func (repo *ArtifactRepository) ScopedTo(names ...ArtifactName) (*ArtifactRepository, error)

ScopedTo returns a new ArtifactRepository restricted to the given set of ArtifactNames. This is used by the Put step to stream in the sources that did not have a volume available on its destination.

func (*ArtifactRepository) SourceFor

func (repo *ArtifactRepository) SourceFor(name ArtifactName) (ArtifactSource, bool)

SourceFor looks up an Source for the given ArtifactName. Consumers of artifacts, e.g. the Task step, will call this to locate their dependencies.

func (*ArtifactRepository) StreamFile

func (repo *ArtifactRepository) StreamFile(path string) (io.ReadCloser, error)

StreamFile streams a single file out of the repository, using the first path segment to determine the ArtifactSource to stream out of. For example, StreamFile("a/b.yml") will look up the "a" ArtifactSource and return the result of StreamFile("b.yml") on it.

If the ArtifactSource determined by the path is not present, FileNotFoundError will be returned.

func (*ArtifactRepository) StreamTo

func (repo *ArtifactRepository) StreamTo(dest ArtifactDestination) error

StreamTo will stream all currently registered artifacts to the destination. This is used by the Put step, which currently does not have an explicit set of dependencies, and instead just pulls in everything.

Each ArtifactSource will be streamed to a subdirectory matching its ArtifactName.

func (*ArtifactRepository) VolumeOn

func (repo *ArtifactRepository) VolumeOn(worker Worker) (Volume, bool, error)

VolumeOn returns nothing, as it's impossible for there to be a single volume representing all ArtifactSources.

type ArtifactSource

type ArtifactSource interface {
	// StreamTo copies the data from the source to the destination. Note that
	// this potentially uses a lot of network transfer, for larger artifacts, as
	// the ATC will effectively act as a middleman.
	StreamTo(ArtifactDestination) error

	// StreamFile returns the contents of a single file in the artifact source.
	// This is used for loading a task's configuration at runtime.
	//
	// If the file cannot be found, FileNotFoundError should be returned.
	StreamFile(path string) (io.ReadCloser, error)

	// VolumeOn attempts to locate a volume equivalent to this source on the
	// given worker. If a volume can be found, it will be used directly. If not,
	// `StreamTo` will be used to copy the data to the destination instead.
	VolumeOn(Worker) (Volume, bool, error)
}

Source represents data produced by the steps, that can be transferred to other steps.

type Client

type Client interface {
	FindOrCreateBuildContainer(
		lager.Logger,
		<-chan os.Signal,
		ImageFetchingDelegate,
		Identifier,
		Metadata,
		ContainerSpec,
		atc.ResourceTypes,
		map[string]string,
	) (Container, error)

	FindOrCreateResourceGetContainer(
		logger lager.Logger,
		cancel <-chan os.Signal,
		delegate ImageFetchingDelegate,
		id Identifier,
		metadata Metadata,
		spec ContainerSpec,
		resourceTypes atc.ResourceTypes,
		outputPaths map[string]string,
		resourceType string,
		version atc.Version,
		source atc.Source,
		params atc.Params,
	) (Container, error)

	FindOrCreateResourceCheckContainer(
		logger lager.Logger,
		cancel <-chan os.Signal,
		delegate ImageFetchingDelegate,
		id Identifier,
		metadata Metadata,
		spec ContainerSpec,
		resourceTypes atc.ResourceTypes,
		resourceType string,
		source atc.Source,
	) (Container, error)

	FindOrCreateResourceTypeCheckContainer(
		logger lager.Logger,
		cancel <-chan os.Signal,
		delegate ImageFetchingDelegate,
		id Identifier,
		metadata Metadata,
		spec ContainerSpec,
		resourceTypes atc.ResourceTypes,
		resourceType string,
		source atc.Source,
	) (Container, error)

	FindOrCreateContainerForIdentifier(
		logger lager.Logger,
		id Identifier,
		metadata Metadata,
		containerSpec ContainerSpec,
		resourceTypes atc.ResourceTypes,
		imageFetchingDelegate ImageFetchingDelegate,
		resourceSources map[string]ArtifactSource,
	) (Container, []string, error)

	FindOrCreateVolumeForResourceCache(
		logger lager.Logger,
		vs VolumeSpec,
		resourceCache *dbng.UsedResourceCache,
	) (Volume, error)

	FindInitializedVolumeForResourceCache(
		logger lager.Logger,
		resourceCache *dbng.UsedResourceCache,
	) (Volume, bool, error)

	FindContainerForIdentifier(lager.Logger, Identifier) (Container, bool, error)
	FindContainerByHandle(lager.Logger, string, int) (Container, bool, error)
	ValidateResourceCheckVersion(container db.SavedContainer) (bool, error)
	FindResourceTypeByPath(path string) (atc.WorkerResourceType, bool)
	LookupVolume(lager.Logger, string) (Volume, bool, error)

	Satisfying(WorkerSpec, atc.ResourceTypes) (Worker, error)
	AllSatisfying(WorkerSpec, atc.ResourceTypes) ([]Worker, error)
	RunningWorkers() ([]Worker, error)
	GetWorker(workerName string) (Worker, error)
}

func NewPool

func NewPool(provider WorkerProvider) Client

type Container

type Container interface {
	garden.Container

	Destroy() error

	VolumeMounts() []VolumeMount

	WorkerName() string

	MarkAsHijacked() error
}

type ContainerProvider

type ContainerProvider interface {
	FindContainerByHandle(
		logger lager.Logger,
		handle string,
		teamID int,
	) (Container, bool, error)

	FindOrCreateBuildContainer(
		logger lager.Logger,
		cancel <-chan os.Signal,
		delegate ImageFetchingDelegate,
		id Identifier,
		metadata Metadata,
		spec ContainerSpec,
		resourceTypes atc.ResourceTypes,
		outputPaths map[string]string,
	) (Container, error)

	FindOrCreateResourceCheckContainer(
		logger lager.Logger,
		cancel <-chan os.Signal,
		delegate ImageFetchingDelegate,
		id Identifier,
		metadata Metadata,
		spec ContainerSpec,
		resourceTypes atc.ResourceTypes,
		resourceType string,
		source atc.Source,
	) (Container, error)

	FindOrCreateResourceTypeCheckContainer(
		logger lager.Logger,
		cancel <-chan os.Signal,
		delegate ImageFetchingDelegate,
		id Identifier,
		metadata Metadata,
		spec ContainerSpec,
		resourceTypes atc.ResourceTypes,
		resourceTypeName string,
		source atc.Source,
	) (Container, error)

	FindOrCreateResourceGetContainer(
		logger lager.Logger,
		cancel <-chan os.Signal,
		delegate ImageFetchingDelegate,
		id Identifier,
		metadata Metadata,
		spec ContainerSpec,
		resourceTypes atc.ResourceTypes,
		outputPaths map[string]string,
		resourceTypeName string,
		version atc.Version,
		source atc.Source,
		params atc.Params,
	) (Container, error)
}

type ContainerProviderFactory

type ContainerProviderFactory interface {
	ContainerProviderFor(Worker) ContainerProvider
}

func NewContainerProviderFactory

func NewContainerProviderFactory(
	gardenClient garden.Client,
	baggageclaimClient baggageclaim.Client,
	volumeClient VolumeClient,
	imageFactory ImageFactory,
	dbVolumeFactory dbng.VolumeFactory,
	dbResourceCacheFactory dbng.ResourceCacheFactory,
	dbResourceConfigFactory dbng.ResourceConfigFactory,
	dbTeamFactory dbng.TeamFactory,
	db GardenWorkerDB,
	httpProxyURL string,
	httpsProxyURL string,
	noProxy string,
	clock clock.Clock,
) ContainerProviderFactory

type ContainerRootFSStrategy

type ContainerRootFSStrategy struct {
	Parent Volume
}

type ContainerSpec

type ContainerSpec struct {
	Platform  string
	Tags      []string
	TeamID    int
	ImageSpec ImageSpec
	Ephemeral bool
	Env       []string

	// Not Copy-on-Write. Used for a single mount in Get containers.
	Inputs []VolumeMount

	// volumes that need to be mounted to container
	Mounts []VolumeMount

	// Optional user to run processes as. Overwrites the one specified in the docker image.
	User string
}

func (ContainerSpec) WorkerSpec

func (spec ContainerSpec) WorkerSpec() WorkerSpec

type FetchedImage

type FetchedImage struct {
	Metadata ImageMetadata
	Version  atc.Version
	URL      string
}

type FileNotFoundError

type FileNotFoundError struct {
	Path string
}

FileNotFoundError is the error to return from StreamFile when the given path does not exist.

func (FileNotFoundError) Error

func (err FileNotFoundError) Error() string

Error prints a helpful message including the file path. The user will see this message if e.g. their task config path does not exist.

type GardenConnectionFactory

type GardenConnectionFactory interface {
	BuildConnection() gconn.Connection
}

func NewGardenConnectionFactory

func NewGardenConnectionFactory(
	db transport.TransportDB,
	logger lager.Logger,
	workerName string,
	workerHost *string,
	retryBackOffFactory retryhttp.BackOffFactory,
) GardenConnectionFactory

type GardenWorkerDB

type GardenWorkerDB interface {
	CreateContainerToBeRemoved(container db.Container, maxLifetime time.Duration, volumeHandles []string) (db.SavedContainer, error)
	UpdateContainerTTLToBeRemoved(container db.Container, maxLifetime time.Duration) (db.SavedContainer, error)
	GetContainer(handle string) (db.SavedContainer, bool, error)
	ReapContainer(string) error
	GetPipelineByID(pipelineID int) (db.SavedPipeline, error)
	AcquireVolumeCreatingLock(lager.Logger, int) (lock.Lock, bool, error)
	AcquireContainerCreatingLock(lager.Logger, int) (lock.Lock, bool, error)
}

type HostRootFSStrategy

type HostRootFSStrategy struct {
	Path       string
	WorkerName string
	Version    *string
}

type Identifier

type Identifier db.ContainerIdentifier

type Image

type Image interface {
	FetchForContainer(
		logger lager.Logger,
		container dbng.CreatingContainer,
	) (FetchedImage, error)
}

type ImageArtifactReplicationStrategy

type ImageArtifactReplicationStrategy struct {
	Name string
}

type ImageFactory

type ImageFactory interface {
	GetImage(
		lager.Logger,
		Worker,
		VolumeClient,
		ImageSpec,
		int,
		<-chan os.Signal,
		ImageFetchingDelegate,
		Identifier,
		Metadata,
		atc.ResourceTypes,
	) (Image, error)
}

type ImageFetchingDelegate

type ImageFetchingDelegate interface {
	Stderr() io.Writer
	ImageVersionDetermined(ResourceCacheIdentifier) error
}

type ImageMetadata

type ImageMetadata struct {
	Env  []string `json:"env"`
	User string   `json:"user"`
}

type ImageSpec

type ImageSpec struct {
	ResourceType        string
	ImageURL            string
	ImageResource       *atc.ImageResource
	ImageArtifactSource ArtifactSource
	ImageArtifactName   ArtifactName
	Privileged          bool
}

type MalformedMetadataError

type MalformedMetadataError struct {
	UnmarshalError error
}

func (MalformedMetadataError) Error

func (err MalformedMetadataError) Error() string

type Metadata

type Metadata db.ContainerMetadata

type NoCompatibleWorkersError

type NoCompatibleWorkersError struct {
	Spec    WorkerSpec
	Workers []Worker
}

func (NoCompatibleWorkersError) Error

func (err NoCompatibleWorkersError) Error() string

type NoopImageFetchingDelegate

type NoopImageFetchingDelegate struct{}

func (NoopImageFetchingDelegate) ImageVersionDetermined

func (NoopImageFetchingDelegate) Stderr

type OutputStrategy

type OutputStrategy struct {
	Name string
}

type ResourceCacheIdentifier

type ResourceCacheIdentifier db.ResourceCacheIdentifier

type ResourceCacheStrategy

type ResourceCacheStrategy struct {
	ResourceHash    string
	ResourceVersion atc.Version
}

type RetryableConnection

type RetryableConnection struct {
	gconn.Connection
}

func NewRetryableConnection

func NewRetryableConnection(connection gconn.Connection) *RetryableConnection

func (*RetryableConnection) Attach

func (conn *RetryableConnection) Attach(handle string, processID string, processIO garden.ProcessIO) (garden.Process, error)

func (*RetryableConnection) Run

func (conn *RetryableConnection) Run(handle string, processSpec garden.ProcessSpec, processIO garden.ProcessIO) (garden.Process, error)

type SaveWorkerDB

type SaveWorkerDB interface {
	SaveWorker(db.WorkerInfo, time.Duration) (db.SavedWorker, error)
}

type Sleeper

type Sleeper interface {
	Sleep(time.Duration)
}

type Strategy

type Strategy interface {
	// contains filtered or unexported methods
}

type Volume

type Volume interface {
	Handle() string
	Path() string

	SetProperty(key string, value string) error
	Properties() (baggageclaim.VolumeProperties, error)

	StreamIn(path string, tarStream io.Reader) error
	StreamOut(path string) (io.ReadCloser, error)

	COWStrategy() baggageclaim.COWStrategy

	IsInitialized() (bool, error)
	Initialize() error

	CreateChildForContainer(dbng.CreatingContainer, string) (dbng.CreatingVolume, error)

	Destroy() error
}

func NewVolume

func NewVolume(
	bcVolume baggageclaim.Volume,
	dbVolume dbng.CreatedVolume,
) Volume

type VolumeClient

type VolumeClient interface {
	FindOrCreateVolumeForResourceCache(
		lager.Logger,
		VolumeSpec,
		*dbng.UsedResourceCache,
	) (Volume, error)
	FindOrCreateVolumeForContainer(
		lager.Logger,
		VolumeSpec,
		dbng.CreatingContainer,
		int,
		string,
	) (Volume, error)
	FindOrCreateVolumeForBaseResourceType(
		lager.Logger,
		VolumeSpec,
		int,
		string,
	) (Volume, error)
	FindInitializedVolumeForResourceCache(
		lager.Logger,
		*dbng.UsedResourceCache,
	) (Volume, bool, error)
	LookupVolume(lager.Logger, string) (Volume, bool, error)
}

func NewVolumeClient

func NewVolumeClient(
	baggageclaimClient baggageclaim.Client,
	db GardenWorkerDB,
	dbVolumeFactory dbng.VolumeFactory,
	dbBaseResourceTypeFactory dbng.BaseResourceTypeFactory,
	clock clock.Clock,
	dbWorker *dbng.Worker,
) VolumeClient

type VolumeFactoryDB

type VolumeFactoryDB interface {
	ReapVolume(handle string) error
}

type VolumeMount

type VolumeMount struct {
	Volume    Volume
	MountPath string
}

type VolumeProperties

type VolumeProperties map[string]string

type VolumeSpec

type VolumeSpec struct {
	Strategy   Strategy
	Properties VolumeProperties
	Privileged bool
	TTL        time.Duration
}

type Worker

type Worker interface {
	Client

	ActiveContainers() int

	Description() string
	Name() string
	Address() *string
	ResourceTypes() []atc.WorkerResourceType
	Tags() atc.Tags
	Uptime() time.Duration
	IsOwnedByTeam() bool
}

func NewGardenWorker

func NewGardenWorker(
	containerProviderFactory ContainerProviderFactory,
	volumeClient VolumeClient,
	pipelineDBFactory db.PipelineDBFactory,
	db GardenWorkerDB,
	provider WorkerProvider,
	clock clock.Clock,
	activeContainers int,
	resourceTypes []atc.WorkerResourceType,
	platform string,
	tags atc.Tags,
	teamID int,
	name string,
	addr string,
	startTime int64,
) Worker

type WorkerDB

type WorkerDB interface {
	Workers() ([]db.SavedWorker, error)
	GetWorker(string) (db.SavedWorker, bool, error)
	CreateContainerToBeRemoved(container db.Container, maxLifetime time.Duration, volumeHandles []string) (db.SavedContainer, error)
	UpdateContainerTTLToBeRemoved(container db.Container, maxLifetime time.Duration) (db.SavedContainer, error)
	GetContainer(string) (db.SavedContainer, bool, error)
	FindContainerByIdentifier(db.ContainerIdentifier) (db.SavedContainer, bool, error)
	ReapContainer(handle string) error
	GetPipelineByID(pipelineID int) (db.SavedPipeline, error)
	ReapVolume(handle string) error
	AcquireVolumeCreatingLock(lager.Logger, int) (lock.Lock, bool, error)
	AcquireContainerCreatingLock(lager.Logger, int) (lock.Lock, bool, error)
}

type WorkerProvider

type WorkerProvider interface {
	RunningWorkers() ([]Worker, error)
	GetWorker(string) (Worker, bool, error)
	FindContainerForIdentifier(Identifier) (db.SavedContainer, bool, error)
	GetContainer(string) (db.SavedContainer, bool, error)
	ReapContainer(string) error
}

func NewDBWorkerProvider

func NewDBWorkerProvider(
	logger lager.Logger,
	db WorkerDB,
	dialer gconn.DialerFunc,
	retryBackOffFactory retryhttp.BackOffFactory,
	imageFactory ImageFactory,
	dbResourceCacheFactory dbng.ResourceCacheFactory,
	dbResourceConfigFactory dbng.ResourceConfigFactory,
	dbBaseResourceTypeFactory dbng.BaseResourceTypeFactory,
	dbVolumeFactory dbng.VolumeFactory,
	dbTeamFactory dbng.TeamFactory,
	pipelineDBFactory db.PipelineDBFactory,
	workerFactory dbng.WorkerFactory,
) WorkerProvider

type WorkerSpec

type WorkerSpec struct {
	Platform     string
	ResourceType string
	Tags         []string
	TeamID       int
}

func (WorkerSpec) Description

func (spec WorkerSpec) Description() string

Directories

Path Synopsis
imagefakes
This file was generated by counterfeiter This file was generated by counterfeiter
This file was generated by counterfeiter This file was generated by counterfeiter
transportfakes
This file was generated by counterfeiter This file was generated by counterfeiter This file was generated by counterfeiter This file was generated by counterfeiter
This file was generated by counterfeiter This file was generated by counterfeiter This file was generated by counterfeiter This file was generated by counterfeiter
This file was generated by counterfeiter This file was generated by counterfeiter This file was generated by counterfeiter This file was generated by counterfeiter This file was generated by counterfeiter This file was generated by counterfeiter This file was generated by counterfeiter This file was generated by counterfeiter This file was generated by counterfeiter This file was generated by counterfeiter This file was generated by counterfeiter This file was generated by counterfeiter This file was generated by counterfeiter This file was generated by counterfeiter This file was generated by counterfeiter This file was generated by counterfeiter This file was generated by counterfeiter This file was generated by counterfeiter This file was generated by counterfeiter
This file was generated by counterfeiter This file was generated by counterfeiter This file was generated by counterfeiter This file was generated by counterfeiter This file was generated by counterfeiter This file was generated by counterfeiter This file was generated by counterfeiter This file was generated by counterfeiter This file was generated by counterfeiter This file was generated by counterfeiter This file was generated by counterfeiter This file was generated by counterfeiter This file was generated by counterfeiter This file was generated by counterfeiter This file was generated by counterfeiter This file was generated by counterfeiter This file was generated by counterfeiter This file was generated by counterfeiter This file was generated by counterfeiter

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL