Documentation ¶
Index ¶
- Variables
- type AuditAccount
- type IncomingManifest
- type Processor
- func (p *Processor) AppendToBlob(account models.Account, upload *models.Upload, contents io.Reader, ...) error
- func (p *Processor) CheckManifestOnPrimary(ctx context.Context, account models.Account, repo models.Repository, ...) (bool, error)
- func (p *Processor) CreateOrUpdateAccount(ctx context.Context, account keppel.Account, fd keppel.FederationDriver, ...) (models.Account, *keppel.RegistryV2Error)
- func (p *Processor) DeleteManifest(account models.Account, repo models.Repository, manifestDigest digest.Digest, ...) error
- func (p *Processor) DeleteTag(account models.Account, repo models.Repository, tagName string, ...) error
- func (p *Processor) FindBlobOrInsertUnbackedBlob(desc distribution.Descriptor, account models.Account) (*models.Blob, error)
- func (p *Processor) GetPlatformFilterFromPrimaryAccount(ctx context.Context, peer models.Peer, replicaAccount models.Account) (models.PlatformFilter, error)
- func (p *Processor) OverrideGenerateStorageID(generateStorageID func() string) *Processor
- func (p *Processor) OverrideTimeNow(timeNow func() time.Time) *Processor
- func (p *Processor) ReplicateBlob(ctx context.Context, blob models.Blob, account models.Account, ...) (responseWasWritten bool, returnErr error)
- func (p *Processor) ReplicateManifest(ctx context.Context, account models.Account, repo models.Repository, ...) (*models.Manifest, []byte, error)
- func (p *Processor) ValidateAndStoreManifest(account models.Account, repo models.Repository, m IncomingManifest, ...) (*models.Manifest, error)
- func (p *Processor) ValidateExistingBlob(account models.Account, blob models.Blob) (returnErr error)
- func (p *Processor) ValidateExistingManifest(account models.Account, repo models.Repository, manifest *models.Manifest, ...) error
- func (p *Processor) WithLowlevelAccess(action func(*keppel.DB, keppel.StorageDriver) error) error
- type UpstreamManifestMissingError
Constants ¶
This section is empty.
Variables ¶
var ( // InboundManifestCacheHitCounter is a prometheus.CounterVec. InboundManifestCacheHitCounter = prometheus.NewCounterVec( prometheus.CounterOpts{ Name: "keppel_inbound_manifest_cache_hits", Help: "Counter for manifests pulled by Keppel from external registries where the inbound cache had a hit and no external request was made.", }, []string{"external_hostname"}, ) // InboundManifestCacheMissCounter is a prometheus.CounterVec. InboundManifestCacheMissCounter = prometheus.NewCounterVec( prometheus.CounterOpts{ Name: "keppel_inbound_manifest_cache_misses", Help: "Counter for manifests pulled by Keppel from external registries where the inbound cache had a cache miss and therefore an external request had to be made.", }, []string{"external_hostname"}, ) )
var ( // ErrConcurrentReplication is returned from Processor.ReplicateBlob() when the // same blob is already being replicated by another worker. ErrConcurrentReplication = errors.New("currently replicating") )
Functions ¶
This section is empty.
Types ¶
type AuditAccount ¶
AuditAccount is an audittools.TargetRenderer.
func (AuditAccount) Render ¶
func (a AuditAccount) Render() cadf.Resource
Render implements the audittools.TargetRenderer interface.
type IncomingManifest ¶
type IncomingManifest struct { Reference models.ManifestReference MediaType string Contents []byte PushedAt time.Time // usually time.Now(), but can be different in unit tests }
IncomingManifest contains information about a manifest uploaded by the user (or downloaded from a peer registry in the case of replication).
type Processor ¶
type Processor struct {
// contains filtered or unexported fields
}
Processor is a higher-level interface wrapping keppel.DB and keppel.StorageDriver. It abstracts DB accesses into high-level interactions and keeps DB updates in lockstep with StorageDriver accesses.
func New ¶
func New(cfg keppel.Configuration, db *keppel.DB, sd keppel.StorageDriver, icd keppel.InboundCacheDriver, auditor keppel.Auditor) *Processor
New creates a new Processor.
func (*Processor) AppendToBlob ¶
func (p *Processor) AppendToBlob(account models.Account, upload *models.Upload, contents io.Reader, lengthBytes *uint64) error
AppendToBlob appends bytes to a blob upload, and updates the upload's SizeBytes and NumChunks fields appropriately. Chunking of large uploads is implemented at this level, to accommodate storage drivers that have a size restriction on blob chunks.
Warning: The upload's Digest field is *not* read or written. For chunked uploads, the caller is responsible for performing and validating the digest computation.
func (*Processor) CheckManifestOnPrimary ¶
func (p *Processor) CheckManifestOnPrimary(ctx context.Context, account models.Account, repo models.Repository, reference models.ManifestReference) (bool, error)
CheckManifestOnPrimary checks if the given manifest exists on its account's upstream registry. If not, false is returned, An error is returned only if the account is not a replica, or if the upstream registry cannot be queried.
func (*Processor) CreateOrUpdateAccount ¶
func (p *Processor) CreateOrUpdateAccount(ctx context.Context, account keppel.Account, fd keppel.FederationDriver, userInfo audittools.UserInfo, r *http.Request, getSubleaseToken func(models.Peer) (string, *keppel.RegistryV2Error)) (models.Account, *keppel.RegistryV2Error)
CreateOrUpdate can be used on an API account and returns the database representation of it.
func (*Processor) DeleteManifest ¶
func (p *Processor) DeleteManifest(account models.Account, repo models.Repository, manifestDigest digest.Digest, actx keppel.AuditContext) error
DeleteManifest deletes the given manifest from both the database and the backing storage.
If the manifest does not exist, sql.ErrNoRows is returned.
func (*Processor) DeleteTag ¶
func (p *Processor) DeleteTag(account models.Account, repo models.Repository, tagName string, actx keppel.AuditContext) error
DeleteTag deletes the given tag from the database. The manifest is not deleted. If the tag does not exist, sql.ErrNoRows is returned.
func (*Processor) FindBlobOrInsertUnbackedBlob ¶
func (p *Processor) FindBlobOrInsertUnbackedBlob(desc distribution.Descriptor, account models.Account) (*models.Blob, error)
FindBlobOrInsertUnbackedBlob is used by the replication code path. If the requested blob does not exist, a blob record with an empty storage ID will be inserted into the DB. This indicates to the registry API handler that this blob shall be replicated when it is first pulled.
func (*Processor) GetPlatformFilterFromPrimaryAccount ¶
func (p *Processor) GetPlatformFilterFromPrimaryAccount(ctx context.Context, peer models.Peer, replicaAccount models.Account) (models.PlatformFilter, error)
GetPlatformFilterFromPrimaryAccount takes a replica account and queries the peer holding the primary account for that account.
func (*Processor) OverrideGenerateStorageID ¶
OverrideGenerateStorageID replaces keppel.GenerateStorageID with a test double.
func (*Processor) OverrideTimeNow ¶
OverrideTimeNow replaces time.Now with a test double.
func (*Processor) ReplicateBlob ¶
func (p *Processor) ReplicateBlob(ctx context.Context, blob models.Blob, account models.Account, repo models.Repository, w http.ResponseWriter) (responseWasWritten bool, returnErr error)
ReplicateBlob replicates the given blob from its account's upstream registry.
If a ResponseWriter is given, the response to the GET request to the upstream registry is also copied into it as the blob contents are being streamed into our local registry. The result value `responseWasWritten` indicates whether this happened. It may be false if an error occurred before writing into the ResponseWriter took place.
func (*Processor) ReplicateManifest ¶
func (p *Processor) ReplicateManifest(ctx context.Context, account models.Account, repo models.Repository, reference models.ManifestReference, actx keppel.AuditContext) (*models.Manifest, []byte, error)
ReplicateManifest replicates the manifest from its account's upstream registry. On success, the manifest's metadata and contents are returned.
func (*Processor) ValidateAndStoreManifest ¶
func (p *Processor) ValidateAndStoreManifest(account models.Account, repo models.Repository, m IncomingManifest, actx keppel.AuditContext) (*models.Manifest, error)
ValidateAndStoreManifest validates the given manifest and stores it under the given reference. If the reference is a digest, it is validated. Otherwise, a tag with that name is created that points to the new manifest.
func (*Processor) ValidateExistingBlob ¶
func (p *Processor) ValidateExistingBlob(account models.Account, blob models.Blob) (returnErr error)
ValidateExistingBlob validates the given blob that already exists in the DB. Validation includes computing the digest of the blob contents and comparing to the digest in the DB. On success, nil is returned.
func (*Processor) ValidateExistingManifest ¶
func (p *Processor) ValidateExistingManifest(account models.Account, repo models.Repository, manifest *models.Manifest, now time.Time) error
ValidateExistingManifest validates the given manifest that already exists in the DB. The `now` argument will be used instead of time.Now() to accommodate unit tests that use a different clock.
func (*Processor) WithLowlevelAccess ¶
WithLowlevelAccess lets the caller access the low-level interfaces wrapped by this Processor instance. The existence of this method means that the low-level interfaces are basically public, but having to use this method makes it more obvious when code bypasses the interface of Processor.
NOTE: This method is not used widely at the moment because callers usually have direct access to `db` and `sd`, but my plan is to convert most or all DB accesses into methods on type Processor eventually.
type UpstreamManifestMissingError ¶
type UpstreamManifestMissingError struct { Ref models.ManifestReference Inner error }
UpstreamManifestMissingError is returned from ReplicateManifest when a manifest is legitimately nonexistent on upstream (i.e. returning a valid 404 error in the correct format).
func (UpstreamManifestMissingError) Error ¶
func (e UpstreamManifestMissingError) Error() string
Error implements the builtin/error interface.