processor

package
v0.0.0-...-479019f Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 6, 2024 License: Apache-2.0 Imports: 30 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// InboundManifestCacheHitCounter is a prometheus.CounterVec.
	InboundManifestCacheHitCounter = prometheus.NewCounterVec(
		prometheus.CounterOpts{
			Name: "keppel_inbound_manifest_cache_hits",
			Help: "Counter for manifests pulled by Keppel from external registries where the inbound cache had a hit and no external request was made.",
		},
		[]string{"external_hostname"},
	)
	// InboundManifestCacheMissCounter is a prometheus.CounterVec.
	InboundManifestCacheMissCounter = prometheus.NewCounterVec(
		prometheus.CounterOpts{
			Name: "keppel_inbound_manifest_cache_misses",
			Help: "Counter for manifests pulled by Keppel from external registries where the inbound cache had a cache miss and therefore an external request had to be made.",
		},
		[]string{"external_hostname"},
	)
)
View Source
var (
	// ErrConcurrentReplication is returned from Processor.ReplicateBlob() when the
	// same blob is already being replicated by another worker.
	ErrConcurrentReplication = errors.New("currently replicating")
)

Functions

This section is empty.

Types

type AuditAccount

type AuditAccount struct {
	Account models.Account
}

AuditAccount is an audittools.TargetRenderer.

func (AuditAccount) Render

func (a AuditAccount) Render() cadf.Resource

Render implements the audittools.TargetRenderer interface.

type IncomingManifest

type IncomingManifest struct {
	Reference models.ManifestReference
	MediaType string
	Contents  []byte
	PushedAt  time.Time // usually time.Now(), but can be different in unit tests
}

IncomingManifest contains information about a manifest uploaded by the user (or downloaded from a peer registry in the case of replication).

type Processor

type Processor struct {
	// contains filtered or unexported fields
}

Processor is a higher-level interface wrapping keppel.DB and keppel.StorageDriver. It abstracts DB accesses into high-level interactions and keeps DB updates in lockstep with StorageDriver accesses.

func New

New creates a new Processor.

func (*Processor) AppendToBlob

func (p *Processor) AppendToBlob(account models.Account, upload *models.Upload, contents io.Reader, lengthBytes *uint64) error

AppendToBlob appends bytes to a blob upload, and updates the upload's SizeBytes and NumChunks fields appropriately. Chunking of large uploads is implemented at this level, to accommodate storage drivers that have a size restriction on blob chunks.

Warning: The upload's Digest field is *not* read or written. For chunked uploads, the caller is responsible for performing and validating the digest computation.

func (*Processor) CheckManifestOnPrimary

func (p *Processor) CheckManifestOnPrimary(ctx context.Context, account models.Account, repo models.Repository, reference models.ManifestReference) (bool, error)

CheckManifestOnPrimary checks if the given manifest exists on its account's upstream registry. If not, false is returned, An error is returned only if the account is not a replica, or if the upstream registry cannot be queried.

func (*Processor) CreateOrUpdateAccount

func (p *Processor) CreateOrUpdateAccount(ctx context.Context, account keppel.Account, fd keppel.FederationDriver, userInfo audittools.UserInfo, r *http.Request, getSubleaseToken func(models.Peer) (string, *keppel.RegistryV2Error)) (models.Account, *keppel.RegistryV2Error)

CreateOrUpdate can be used on an API account and returns the database representation of it.

func (*Processor) DeleteManifest

func (p *Processor) DeleteManifest(account models.Account, repo models.Repository, manifestDigest digest.Digest, actx keppel.AuditContext) error

DeleteManifest deletes the given manifest from both the database and the backing storage.

If the manifest does not exist, sql.ErrNoRows is returned.

func (*Processor) DeleteTag

func (p *Processor) DeleteTag(account models.Account, repo models.Repository, tagName string, actx keppel.AuditContext) error

DeleteTag deletes the given tag from the database. The manifest is not deleted. If the tag does not exist, sql.ErrNoRows is returned.

func (*Processor) FindBlobOrInsertUnbackedBlob

func (p *Processor) FindBlobOrInsertUnbackedBlob(desc distribution.Descriptor, account models.Account) (*models.Blob, error)

FindBlobOrInsertUnbackedBlob is used by the replication code path. If the requested blob does not exist, a blob record with an empty storage ID will be inserted into the DB. This indicates to the registry API handler that this blob shall be replicated when it is first pulled.

func (*Processor) GetPlatformFilterFromPrimaryAccount

func (p *Processor) GetPlatformFilterFromPrimaryAccount(ctx context.Context, peer models.Peer, replicaAccount models.Account) (models.PlatformFilter, error)

GetPlatformFilterFromPrimaryAccount takes a replica account and queries the peer holding the primary account for that account.

func (*Processor) OverrideGenerateStorageID

func (p *Processor) OverrideGenerateStorageID(generateStorageID func() string) *Processor

OverrideGenerateStorageID replaces keppel.GenerateStorageID with a test double.

func (*Processor) OverrideTimeNow

func (p *Processor) OverrideTimeNow(timeNow func() time.Time) *Processor

OverrideTimeNow replaces time.Now with a test double.

func (*Processor) ReplicateBlob

func (p *Processor) ReplicateBlob(ctx context.Context, blob models.Blob, account models.Account, repo models.Repository, w http.ResponseWriter) (responseWasWritten bool, returnErr error)

ReplicateBlob replicates the given blob from its account's upstream registry.

If a ResponseWriter is given, the response to the GET request to the upstream registry is also copied into it as the blob contents are being streamed into our local registry. The result value `responseWasWritten` indicates whether this happened. It may be false if an error occurred before writing into the ResponseWriter took place.

func (*Processor) ReplicateManifest

func (p *Processor) ReplicateManifest(ctx context.Context, account models.Account, repo models.Repository, reference models.ManifestReference, actx keppel.AuditContext) (*models.Manifest, []byte, error)

ReplicateManifest replicates the manifest from its account's upstream registry. On success, the manifest's metadata and contents are returned.

func (*Processor) ValidateAndStoreManifest

func (p *Processor) ValidateAndStoreManifest(account models.Account, repo models.Repository, m IncomingManifest, actx keppel.AuditContext) (*models.Manifest, error)

ValidateAndStoreManifest validates the given manifest and stores it under the given reference. If the reference is a digest, it is validated. Otherwise, a tag with that name is created that points to the new manifest.

func (*Processor) ValidateExistingBlob

func (p *Processor) ValidateExistingBlob(account models.Account, blob models.Blob) (returnErr error)

ValidateExistingBlob validates the given blob that already exists in the DB. Validation includes computing the digest of the blob contents and comparing to the digest in the DB. On success, nil is returned.

func (*Processor) ValidateExistingManifest

func (p *Processor) ValidateExistingManifest(account models.Account, repo models.Repository, manifest *models.Manifest, now time.Time) error

ValidateExistingManifest validates the given manifest that already exists in the DB. The `now` argument will be used instead of time.Now() to accommodate unit tests that use a different clock.

func (*Processor) WithLowlevelAccess

func (p *Processor) WithLowlevelAccess(action func(*keppel.DB, keppel.StorageDriver) error) error

WithLowlevelAccess lets the caller access the low-level interfaces wrapped by this Processor instance. The existence of this method means that the low-level interfaces are basically public, but having to use this method makes it more obvious when code bypasses the interface of Processor.

NOTE: This method is not used widely at the moment because callers usually have direct access to `db` and `sd`, but my plan is to convert most or all DB accesses into methods on type Processor eventually.

type UpstreamManifestMissingError

type UpstreamManifestMissingError struct {
	Ref   models.ManifestReference
	Inner error
}

UpstreamManifestMissingError is returned from ReplicateManifest when a manifest is legitimately nonexistent on upstream (i.e. returning a valid 404 error in the correct format).

func (UpstreamManifestMissingError) Error

Error implements the builtin/error interface.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL