casng

package
v0.0.0-...-ebb4f00 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 9, 2023 License: Apache-2.0 Imports: 29 Imported by: 0

Documentation

Overview

Package casng provides a CAS client implementation with the following incomplete list of features:

  • Streaming interface to upload files during the digestion process rather than after.
  • Unified uploads and downloads.
  • Simplifed public API.

Index

Constants

View Source
const (

	// DefaultGRPCConcurrentCallsLimit is set arbitrarily to 256 as a power of 2.
	DefaultGRPCConcurrentCallsLimit = 256

	// DefaultGRPCBytesLimit is the same as the default gRPC request size limit of 4MiB.
	// See: https://pkg.go.dev/google.golang.org/grpc#MaxCallRecvMsgSize and https://github.com/grpc/grpc-go/blob/2997e84fd8d18ddb000ac6736129b48b3c9773ec/clientconn.go#L96
	DefaultGRPCBytesLimit = 4 * megaByte

	// DefaultGRPCItemsLimit is a 10th of the max.
	DefaultGRPCItemsLimit = 1000

	// DefaultMaxGRPCItems is heuristcally (with Google's RBE) set to 10k.
	DefaultMaxGRPCItems = 10_000

	// DefaultRPCTimeout is arbitrarily set to what is reasonable for a large action.
	DefaultRPCTimeout = time.Minute

	// DefaultOpenFilesLimit is based on GCS recommendations.
	// See: https://cloud.google.com/compute/docs/disks/optimizing-pd-performance#io-queue-depth
	DefaultOpenFilesLimit = 32

	// DefaultOpenLargeFilesLimit is arbitrarily set.
	DefaultOpenLargeFilesLimit = 2

	// DefaultCompressionSizeThreshold is disabled by default.
	DefaultCompressionSizeThreshold = math.MaxInt64

	// DefaultBufferSize is based on GCS recommendations.
	// See: https://cloud.google.com/compute/docs/disks/optimizing-pd-performance#io-size
	DefaultBufferSize = 4 * megaByte

	// DefaultSmallFileSizeThreshold is set to 1MiB.
	DefaultSmallFileSizeThreshold = megaByte

	// DefaultLargeFileSizeThreshold is set to 256MiB.
	DefaultLargeFileSizeThreshold = 256 * megaByte
)

Variables

View Source
var (
	// ErrNegativeLimit indicates an invalid value that is < 0.
	ErrNegativeLimit = errors.New("limit value must be >= 0")

	// ErrZeroOrNegativeLimit indicates an invalid value that is <= 0.
	ErrZeroOrNegativeLimit = errors.New("limit value must be > 0")
)
View Source
var (
	// ErrNilClient indicates an invalid nil argument.
	ErrNilClient = errors.New("client cannot be nil")

	// ErrCompression indicates an error in the compression routine.
	ErrCompression = errors.New("compression error")

	// ErrIO indicates an error in an IO routine.
	ErrIO = errors.New("io error")

	// ErrGRPC indicates an error in a gRPC routine.
	ErrGRPC = errors.New("grpc error")

	// ErrOversizedItem indicates an item that is too large to fit into the set byte limit for the corresponding gRPC call.
	ErrOversizedItem = errors.New("oversized item")

	// ErrTerminatedUploader indicates an attempt to use a terminated uploader.
	ErrTerminatedUploader = errors.New("cannot use a terminated uploader")
)

Functions

func IsCompressedWriteResourceName

func IsCompressedWriteResourceName(name string) bool

IsCompressedWriteResourceName returns true if the name was generated using MakeCompressedWriteResourceName.

func MakeCompressedWriteResourceName

func MakeCompressedWriteResourceName(instanceName, hash string, size int64) string

MakeCompressedWriteResourceName returns a valid resource name for writing a compressed blob.

func MakeWriteResourceName

func MakeWriteResourceName(instanceName, hash string, size int64) string

MakeWriteResourceName returns a valid resource name for writing an uncompressed blob.

func ReplaceWorkingDir

func ReplaceWorkingDir(path, root impath.Absolute, workingDir, remoteWorkingDir impath.Relative) (impath.Absolute, error)

ReplaceWorkingDir swaps remoteWorkingDir for workingDir in path which must be prefixed by root. workingDir is assumed to be prefixed by root, and the returned path will be a descendant of root, but not necessarily a descendant of remoteWorkingDir. Example: path=/root/out/foo.c, root=/root, workdingDir=out/reclient, remoteWorkingDir=set_by_reclient/a, result=/root/set_by_reclient/foo.c

Types

type BatchingUploader

type BatchingUploader struct {
	// contains filtered or unexported fields
}

BatchingUploader provides a blocking interface to query and upload to the CAS.

func NewBatchingUploader

func NewBatchingUploader(
	ctx context.Context, cas regrpc.ContentAddressableStorageClient, byteStream bsgrpc.ByteStreamClient, instanceName string,
	queryCfg, batchCfg, streamCfg GRPCConfig, ioCfg IOConfig,
) (*BatchingUploader, error)

NewBatchingUploader creates a new instance of the batching uploader. WIP: While this is intended to replace the uploader in the client and cas packages, it is not yet ready for production envionrments.

The specified configs must be compatible with the capabilities of the server that the specified clients are connected to. ctx is used to make unified calls and terminate saturated throttlers and in-flight workers. ctx must be cancelled after all batching calls have returned to properly shutdown the uploader. It is only used for cancellation (not used with remote calls). gRPC timeouts are multiplied by retries. Batched RPCs are retried per batch. Streaming PRCs are retried per chunk.

func (*BatchingUploader) DigestTree

func (u *BatchingUploader) DigestTree(ctx context.Context, root impath.Absolute, slo symlinkopts.Options, exclude walker.Filter) (digest.Digest, Stats, error)

DigestTree returns the digest of the merkle tree for root.

func (*BatchingUploader) MissingBlobs

func (u *BatchingUploader) MissingBlobs(ctx context.Context, digests []digest.Digest) ([]digest.Digest, error)

MissingBlobs queries the CAS for digests and returns a slice of the missing ones.

This method is useful when a large number of digests is already known. For other use cases, consider the streaming uploader. This method does not use internal processors and does not use the uploader's context. It is safe to use even if the uploader's context is cancelled.

The digests are batched based on ItemLimits of the gRPC config. BytesLimit and BundleTimeout are not used in this method. Errors from a batch do not affect other batches, but all digests from such bad batches will be reported as missing by this call. In other words, if an error is returned, any digest that is not in the returned slice is not missing. If no error is returned, the returned slice contains all the missing digests.

func (BatchingUploader) Node

func (u BatchingUploader) Node(req UploadRequest) proto.Message

Node looks up a node from the node cache which is populated during digestion. The node is either an repb.FileNode, repb.DirectoryNode, or repb.SymlinkNode.

Returns nil if no node corresponds to req.

func (*BatchingUploader) Upload

func (u *BatchingUploader) Upload(ctx context.Context, reqs ...UploadRequest) ([]digest.Digest, Stats, error)

Upload processes reqs for upload.

Blobs that already exist in the CAS are not uploaded. Any path that is not a regular file, a directory or a symlink file is skipped (e.g. sockets and pipes).

Cancelling ctx gracefully aborts the upload process.

Requests are unified across a window of time defined by the BundleTimeout value of the gRPC configuration. The unification is affected by the order of the requests, bundle limits (length, size, timeout) and the upload speed. With infinite speed and limits, every blob will be uploaded exactly once. On the other extreme, every blob is uploaded alone and no unification takes place. In the average case, blobs that make it into the same bundle will be unified (deduplicated).

Returns a slice of the digests of the blobs that were uploaded (did not exist in the CAS). If the returned error is nil, any digest that is not in the returned slice was already in the CAS. If the returned error is not nil, the returned slice may be incomplete (fatal error) and every digest in it may or may not have been successfully uploaded (individual errors). The returned error wraps a number of errors proportional to the length of the specified slice.

This method must not be called after cancelling the uploader's context.

func (*BatchingUploader) UploadTree

func (u *BatchingUploader) UploadTree(ctx context.Context, execRoot impath.Absolute, workingDir, remoteWorkingDir impath.Relative, reqs ...UploadRequest) (rootDigest digest.Digest, uploaded []digest.Digest, stats Stats, err error)

UploadTree is a convenient method to upload a tree described with multiple requests.

This is useful when the list of paths is known and the root might have too many descendants such that traversing and filtering might add a significant overhead.

All requests must share the same filter. Digest fields on the requests are ignored to ensure proper hierarchy caching via the internal digestion process. remoteWorkingDir replaces workingDir inside the merkle tree such that the server is only aware of remoteWorkingDir.

func (*BatchingUploader) WriteBytes

func (u *BatchingUploader) WriteBytes(ctx context.Context, name string, r io.Reader, size, offset int64) (Stats, error)

WriteBytes uploads all the bytes of r directly to the resource name starting remotely at offset.

r must return io.EOF to terminate the call.

ctx is used to make and cancel remote calls. This method does not use the uploader's context which means it is safe to call even after that context is cancelled.

Compression is turned on based on the resource name. size is used to report stats. It must reflect the actual number of bytes r has to give. The server is notified to finalize the resource name and subsequent writes may not succeed. The errors returned are either from the context, ErrGRPC, ErrIO, or ErrCompression. More errors may be wrapped inside. If an error was returned, the returned stats may indicate that all the bytes were sent, but that does not guarantee that the server committed all of them.

func (*BatchingUploader) WriteBytesPartial

func (u *BatchingUploader) WriteBytesPartial(ctx context.Context, name string, r io.Reader, size, offset int64) (Stats, error)

WriteBytesPartial is the same as WriteBytes, but does not notify the server to finalize the resource name.

type GRPCConfig

type GRPCConfig struct {
	// ConcurrentCallsLimit sets the upper bound of concurrent calls.
	// Must be > 0.
	ConcurrentCallsLimit int

	// BytesLimit sets the upper bound for the size of each request.
	// Comparisons against this value may not be exact due to padding and other serialization naunces.
	// Clients should choose a value that is sufficiently lower than the max size limit for the corresponding gRPC connection.
	// Any blob that does not fit in a batching request based on this value will be streamed using the ByteStream API.
	// Must be > 0.
	// This is defined as int rather than int64 because gRPC uses int for its limit.
	BytesLimit int

	// ItemsLimit sets the upper bound for the number of items per request.
	// Must be > 0.
	ItemsLimit int

	// BundleTimeout sets the maximum duration a call is delayed while bundling.
	// Bundling is used to ammortize the cost of a gRPC call over time. Instead of sending
	// many requests with few items, bundling attempt to maximize the number of items sent in a single request.
	// This includes waiting for a bit to see if more items are requested.
	BundleTimeout time.Duration

	// Timeout sets the upper bound of the total time spent processing a request.
	// For streaming calls, this applies to each Send/Recv call individually, not the whole streaming session.
	// This does not take into account the time it takes to abort the request upon timeout.
	Timeout time.Duration

	// RetryPolicy sets the retry policy for calls using this config.
	RetryPolicy retry.BackoffPolicy

	// RetryPredicate is called to determine if the error is retryable. If not set, nothing is retried.
	RetryPredicate func(error) bool
}

GRPCConfig specifies the configuration for a gRPC endpoint.

type IOConfig

type IOConfig struct {
	// ConcurrentWalksLimit sets the upper bound of concurrent filesystem tree traversals.
	// This affects the number of concurrent upload requests for the uploader since each one requires a walk.
	// Must be > 0.
	ConcurrentWalksLimit int

	// OpenFilesLimit sets the upper bound for the number of files being simultanuously processed.
	// Must be > 0.
	OpenFilesLimit int

	// OpenLargeFilesLimit sets the upper bound for the number of large files being simultanuously processed.
	//
	// This value counts towards open files. I.e. the following inequality is always effectively true:
	// OpenFilesLimit >= OpenLargeFilesLimit
	// Must be > 0.
	OpenLargeFilesLimit int

	// SmallFileSizeThreshold sets the upper bound (inclusive) for the file size to be considered a small file.
	//
	// Files that are larger than this value (medium and large files) are uploaded via the streaming API.
	//
	// Small files are buffered entirely in memory and uploaded via the batching API.
	// However, it is still possible for a file to be small in size, but still results in a request that is larger than the gRPC size limit.
	// In that case, the file is uploaded via the streaming API instead.
	//
	// The amount of memory used to buffer files is affected by this value and OpenFilesLimit as well as bundling limits for gRPC.
	// The uploader will stop buffering once the OpenFilesLimit is reached, before which the number of buffered files is bound by
	// the number of blobs buffered for uploading (and whatever the GC hasn't freed yet).
	// In the extreme case, the number of buffered bytes for small files (not including streaming buffers) equals
	// the concurrency limit for the upload gRPC call, times the bytes limit per call, times this value.
	// Note that the amount of memory used to buffer bytes of a generated proto messages is not included in this estimate.
	//
	// Must be >= 0.
	SmallFileSizeThreshold int64

	// LargeFileSizeThreshold sets the lower bound (inclusive) for the file size to be considered a large file.
	// Such files are uploaded in chunks using the file streaming API.
	// Must be >= 0.
	LargeFileSizeThreshold int64

	// CompressionSizeThreshold sets the lower bound for the chunk size before it is subject to compression.
	// A value of 0 enables compression for any chunk size. To disable compression, use math.MaxInt64.
	// Must >= 0.
	CompressionSizeThreshold int64

	// BufferSize sets the buffer size for IO read/write operations.
	// Must be > 0.
	BufferSize int

	// OptimizeForDiskLocality enables sorting files by path before they are written to disk to optimize for disk locality.
	// Assuming files under the same directory are located close to each other on disk, then such files are batched together.
	OptimizeForDiskLocality bool
}

IOConfig specifies the configuration for IO operations.

type MissingBlobsResponse

type MissingBlobsResponse struct {
	Digest  digest.Digest
	Missing bool
	Err     error
}

MissingBlobsResponse represents a query result for a single digest.

If Err is not nil, Missing is false.

type Stats

type Stats struct {
	// BytesRequested is the total number of bytes in a request.
	// It does not necessarily equal the total number of bytes uploaded/downloaded.
	BytesRequested int64

	// LogicalBytesMoved is the amount of BytesRequested that was processed.
	// It cannot be larger than BytesRequested, but may be smaller in case of a partial response.
	// The quantity is more granular for streaming than it is for batching. In streaming, it is an increment of the buffer size.
	// For batching, it is a sum of the size of items that were batched.
	LogicalBytesMoved int64

	// TotalBytesMoved is the total number of bytes moved over the wire.
	// This may not be accurate since a gRPC call may be interrupted in which case this number may be higher than the real one.
	// It may be larger than (retries) or smaller than BytesRequested (compression, cache hits or partial response).
	TotalBytesMoved int64

	// EffectiveBytesMoved is the total number of bytes moved over the wire, excluding retries.
	// This may not be accurate since a gRPC call may be interrupted in which case this number may be higher than the real one.
	// For failures, this is reported as 0.
	// It may be higher than BytesRequested (compression headers), but never higher than TotalBytesMoved.
	EffectiveBytesMoved int64

	// LogicalBytesCached is the total number of bytes not moved over the wire due to caching (either remotely or locally).
	// For failures, this is reported as 0.
	LogicalBytesCached int64

	// LogicalBytesStreamed is the total number of logical bytes moved by the streaming API.
	// It may be larger than (retries) or smaller than (cache hits or partial response) than the requested size.
	// For failures, this is reported as 0.
	LogicalBytesStreamed int64

	// LogicalBytesBatched is the total number of logical bytes moved by the batching API.
	// It may be larger than (retries) or smaller than (cache hits or partial response) the requested size.
	// For failures, this is reported as 0.
	LogicalBytesBatched int64

	// InputFileCount is the number of processed regular files.
	InputFileCount int64

	// InputDirCount is the number of processed directories.
	InputDirCount int64

	// InputSymlinkCount is the number of processed symlinks (not the number of symlinks in the uploaded merkle tree which may be lower).
	InputSymlinkCount int64

	// CacheHitCount is the number of cache hits.
	CacheHitCount int64

	// CacheMissCount is the number of cache misses.
	CacheMissCount int64

	// DigestCount is the number of processed digests.
	// The counter is incremened regardless of digestion failures.
	DigestCount int64

	// BatchedCount is the number of batched files.
	BatchedCount int64

	// StreamedCount is the number of streamed files.
	// For methods that accept bytes, the value is 1 upon success, 0 otherwise.
	StreamedCount int64
}

Stats represents potential metrics reported by various methods. Not all fields are populated by every method.

func (*Stats) Add

func (s *Stats) Add(other Stats)

Add mutates the stats by adding all the corresponding fields of the specified instance.

func (*Stats) ToCacheHit

func (s *Stats) ToCacheHit() Stats

ToCacheHit returns a copy of the stats that represents a cache hit of the original. All "bytes moving" stats are zeroed-out and cache stats are updated based on other values. Everything else remains the same. A pointer receiver allows this method to work on nil values of this type.

type StreamingUploader

type StreamingUploader struct {
	// contains filtered or unexported fields
}

StreamingUploader provides an non-blocking interface to query and upload to the CAS

func NewStreamingUploader

func NewStreamingUploader(
	ctx context.Context, cas regrpc.ContentAddressableStorageClient, byteStream bsgrpc.ByteStreamClient, instanceName string,
	queryCfg, batchCfg, streamCfg GRPCConfig, ioCfg IOConfig,
) (*StreamingUploader, error)

NewStreamingUploader creates a new instance of the streaming uploader. WIP: While this is intended to replace the uploader in the client and cas packages, it is not yet ready for production envionrments.

The specified configs must be compatible with the capabilities of the server which the specified clients are connected to. ctx is used to make unified calls and terminate saturated throttlers and in-flight workers. ctx must be cancelled after all response channels have been closed to properly shutdown the uploader. It is only used for cancellation (not used with remote calls). gRPC timeouts are multiplied by retries. Batched RPCs are retried per batch. Streaming PRCs are retried per chunk.

func (*StreamingUploader) MissingBlobs

func (u *StreamingUploader) MissingBlobs(ctx context.Context, in <-chan digest.Digest) <-chan MissingBlobsResponse

MissingBlobs is a non-blocking call that queries the CAS for incoming digests.

This method is useful when digests are calculated and dispatched on the fly. For a large list of known digests, consider using the batching uploader.

To properly stop this call, close in and cancel ctx, then wait for the returned channel to close. The channel in must be closed as a termination signal. Cancelling ctx is not enough. The uploader's context is used to make remote calls using metadata from ctx. Metadata unification assumes all requests share the same correlated invocation ID.

The digests are unified (aggregated/bundled) based on ItemsLimit, BytesLimit and BundleTimeout of the gRPC config. The returned channel is unbuffered and will be closed after the input channel is closed and all sent requests get their corresponding responses. This could indicate completion or cancellation (in case the context was canceled). Slow consumption speed on the returned channel affects the consumption speed on in.

This method must not be called after cancelling the uploader's context.

func (StreamingUploader) Node

func (u StreamingUploader) Node(req UploadRequest) proto.Message

Node looks up a node from the node cache which is populated during digestion. The node is either an repb.FileNode, repb.DirectoryNode, or repb.SymlinkNode.

Returns nil if no node corresponds to req.

func (*StreamingUploader) Upload

func (u *StreamingUploader) Upload(ctx context.Context, in <-chan UploadRequest) <-chan UploadResponse

Upload is a non-blocking call that uploads incoming files to the CAS if necessary.

To properly stop this call, close in and cancel ctx, then wait for the returned channel to close. The channel in must be closed as a termination signal. Cancelling ctx is not enough. The uploader's context is used to make remote calls using metadata from ctx. Metadata unification assumes all requests share the same correlated invocation ID.

The consumption speed is subject to the concurrency and timeout configurations of the gRPC call. All received requests will have corresponding responses sent on the returned channel.

Requests are unified across a window of time defined by the BundleTimeout value of the gRPC configuration. The unification is affected by the order of the requests, bundle limits (length, size, timeout) and the upload speed. With infinite speed and limits, every blob will be uploaded exactly once. On the other extreme, every blob is uploaded alone and no unification takes place. In the average case, blobs that make it into the same bundle will be grouped by digest. Once a digest is processed, each requester of that digest receives a copy of the coorresponding UploadResponse.

This method must not be called after cancelling the uploader's context.

type UploadRequest

type UploadRequest struct {
	// Digest is for pre-digested requests. This digest is trusted to be the one for the associated Bytes or Path.
	//
	// If not set, it will be calculated.
	// If set, it implies that this request is a single blob. I.e. either Bytes is set or Path is a regular file and both SymlinkOptions and Exclude are ignored.
	Digest digest.Digest

	// Bytes is meant for small blobs. Using a large slice of bytes might cause memory thrashing.
	//
	// If Bytes is nil, BytesFileMode is ignored and Path is used for traversal.
	// If Bytes is not nil (may be empty), Path is used as the corresponding path for the bytes content and is not used for traversal.
	Bytes []byte

	// BytesFileMode describes the bytes content. It is ignored if Bytes is not set.
	BytesFileMode fs.FileMode

	// Path is used to access and read files if Bytes is nil. Otherwise, Bytes is assumed to be the paths content (even if empty).
	//
	// This must not be equal to impath.Root since this is considered a zero value (Path not set).
	// If Bytes is not nil and Path is not set, a node cannot be constructed and therefore no node is cached.
	Path impath.Absolute

	// SymlinkOptions are used to handle symlinks when Path is set and Bytes is not.
	SymlinkOptions slo.Options

	// Exclude is used to exclude paths during traversal when Path is set and Bytes is not.
	//
	// The filter ID is used in the keys of the node cache, even when Bytes is set.
	// Using the same ID for effectively different filters will cause erroneous cache hits.
	// Using a different ID for effectively identical filters will reduce cache hit rates and increase digestion compute cost.
	Exclude walker.Filter
	// contains filtered or unexported fields
}

UploadRequest represents a path to start uploading from.

If the path is a directory, its entire tree is traversed and only files that are not excluded by the filter are uploaded. Symlinks are handled according to the SymlinkOptions field.

type UploadResponse

type UploadResponse struct {
	// Digest identifies the blob associated with this response.
	// May be empty (created from an empty byte slice or from a composite literal), in which case Err is set.
	Digest digest.Digest

	// Stats may be zero if this response has not been updated yet. It should be ignored if Err is set.
	// If this response has been processed, then either CacheHitCount or CacheHitMiss is not zero.
	Stats Stats

	// Err indicates the error encountered while processing the request associated with Digest.
	// If set, Stats should be ignored.
	Err error
	// contains filtered or unexported fields
}

UploadResponse represents an upload result for a single request (which may represent a tree of files).

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL