pachyderm: Index | Files | Directories

package obj

import ""


Package Files

amazon_client.go cache_client.go google_client.go limited_client.go local_client.go microsoft_client.go minio_client.go monkey_client.go obj.go tracing.go util.go


const (
    Minio     = "MINIO"
    Amazon    = "AMAZON"
    Google    = "GOOGLE"
    Microsoft = "MICROSOFT"
    Local     = "LOCAL"

Valid object storage backends

const (
    GoogleBucketEnvVar = "GOOGLE_BUCKET"
    GoogleCredEnvVar   = "GOOGLE_CRED"

Google environment variables

const (
    MicrosoftContainerEnvVar = "MICROSOFT_CONTAINER"
    MicrosoftIDEnvVar        = "MICROSOFT_ID"
    MicrosoftSecretEnvVar    = "MICROSOFT_SECRET"

Microsoft environment variables

const (
    MinioBucketEnvVar    = "MINIO_BUCKET"
    MinioEndpointEnvVar  = "MINIO_ENDPOINT"
    MinioIDEnvVar        = "MINIO_ID"
    MinioSecretEnvVar    = "MINIO_SECRET"
    MinioSecureEnvVar    = "MINIO_SECURE"
    MinioSignatureEnvVar = "MINIO_SIGNATURE"

Minio environment variables

const (
    AmazonRegionEnvVar       = "AMAZON_REGION"
    AmazonBucketEnvVar       = "AMAZON_BUCKET"
    AmazonIDEnvVar           = "AMAZON_ID"
    AmazonSecretEnvVar       = "AMAZON_SECRET"
    AmazonTokenEnvVar        = "AMAZON_TOKEN"
    AmazonVaultAddrEnvVar    = "AMAZON_VAULT_ADDR"
    AmazonVaultRoleEnvVar    = "AMAZON_VAULT_ROLE"
    AmazonVaultTokenEnvVar   = "AMAZON_VAULT_TOKEN"
    AmazonDistributionEnvVar = "AMAZON_DISTRIBUTION"
    CustomEndpointEnvVar     = "CUSTOM_ENDPOINT"

Amazon environment variables

const (
    RetriesEnvVar        = "RETRIES"
    TimeoutEnvVar        = "TIMEOUT"
    UploadACLEnvVar      = "UPLOAD_ACL"
    ReverseEnvVar        = "REVERSE"
    PartSizeEnvVar       = "PART_SIZE"
    MaxUploadPartsEnvVar = "MAX_UPLOAD_PARTS"
    DisableSSLEnvVar     = "DISABLE_SSL"
    NoVerifySSLEnvVar    = "NO_VERIFY_SSL"
    LogOptionsEnvVar     = "OBJ_LOG_OPTS"

Advanced configuration environment variables

const (
    // DefaultRetries is the default number of retries for object storage requests.
    DefaultRetries = 10
    // DefaultTimeout is the default timeout for object storage requests.
    DefaultTimeout = "5m"
    // DefaultUploadACL is the default upload ACL for object storage uploads.
    DefaultUploadACL = "bucket-owner-full-control"
    // DefaultReverse is the default for whether to reverse object storage paths or not.
    DefaultReverse = true
    // DefaultPartSize is the default part size for object storage uploads.
    DefaultPartSize = 5242880
    // DefaultMaxUploadParts is the default maximum number of upload parts.
    DefaultMaxUploadParts = 10000
    // DefaultDisableSSL is the default for whether SSL should be disabled.
    DefaultDisableSSL = false
    // DefaultNoVerifySSL is the default for whether SSL certificate verification should be disabled.
    DefaultNoVerifySSL = false
    // DefaultAwsLogOptions is the default set of enabled S3 client log options
    DefaultAwsLogOptions = ""
const (
    StorageBackendEnvVar = "STORAGE_BACKEND"

Environment variables for determining storage backend and pathing


var EnvVarToSecretKey = []struct {
    Key   string
    Value string
    {Key: GoogleBucketEnvVar, Value: "google-bucket"},
    {Key: GoogleCredEnvVar, Value: "google-cred"},
    {Key: MicrosoftContainerEnvVar, Value: "microsoft-container"},
    {Key: MicrosoftIDEnvVar, Value: "microsoft-id"},
    {Key: MicrosoftSecretEnvVar, Value: "microsoft-secret"},
    {Key: MinioBucketEnvVar, Value: "minio-bucket"},
    {Key: MinioEndpointEnvVar, Value: "minio-endpoint"},
    {Key: MinioIDEnvVar, Value: "minio-id"},
    {Key: MinioSecretEnvVar, Value: "minio-secret"},
    {Key: MinioSecureEnvVar, Value: "minio-secure"},
    {Key: MinioSignatureEnvVar, Value: "minio-signature"},
    {Key: AmazonRegionEnvVar, Value: "amazon-region"},
    {Key: AmazonBucketEnvVar, Value: "amazon-bucket"},
    {Key: AmazonIDEnvVar, Value: "amazon-id"},
    {Key: AmazonSecretEnvVar, Value: "amazon-secret"},
    {Key: AmazonTokenEnvVar, Value: "amazon-token"},
    {Key: AmazonVaultAddrEnvVar, Value: "amazon-vault-addr"},
    {Key: AmazonVaultRoleEnvVar, Value: "amazon-vault-role"},
    {Key: AmazonVaultTokenEnvVar, Value: "amazon-vault-token"},
    {Key: AmazonDistributionEnvVar, Value: "amazon-distribution"},
    {Key: CustomEndpointEnvVar, Value: "custom-endpoint"},
    {Key: RetriesEnvVar, Value: "retries"},
    {Key: TimeoutEnvVar, Value: "timeout"},
    {Key: UploadACLEnvVar, Value: "upload-acl"},
    {Key: ReverseEnvVar, Value: "reverse"},
    {Key: PartSizeEnvVar, Value: "part-size"},
    {Key: MaxUploadPartsEnvVar, Value: "max-upload-parts"},
    {Key: DisableSSLEnvVar, Value: "disable-ssl"},
    {Key: NoVerifySSLEnvVar, Value: "no-verify-ssl"},
    {Key: LogOptionsEnvVar, Value: "log-options"},

EnvVarToSecretKey is an environment variable name to secret key mapping This is being used to temporarily bridge the gap as we transition to a model where object storage access in the workers is based on environment variables and a library rather than mounting a secret to a sidecar container which accesses object storage

func Copy Uses

func Copy(ctx context.Context, src, dst Client, srcPath, dstPath string) (retErr error)

Copy copys an object from src at srcPath to dst at dstPath

func DisableMonkeyTest Uses

func DisableMonkeyTest()

DisableMonkeyTest disables sporadic request failures.

func EnableMonkeyTest Uses

func EnableMonkeyTest()

EnableMonkeyTest enables sporadic request failures.

func InitMonkeyTest Uses

func InitMonkeyTest(seed int64)

InitMonkeyTest sets up this package for monkey testing. Object storage clients will be wrapped with a client that sporadically fails requests.

func IsMonkeyError Uses

func IsMonkeyError(err error) bool

IsMonkeyError checks if an error was caused by a monkey client.

func IsRetryable Uses

func IsRetryable(client Client, err error) bool

IsRetryable determines if an operation should be retried given an error

func NewExponentialBackOffConfig Uses

func NewExponentialBackOffConfig() *backoff.ExponentialBackOff

NewExponentialBackOffConfig creates an exponential back-off config with longer wait times than the default.

func StorageRootFromEnv Uses

func StorageRootFromEnv(storageRoot string) (string, error)

StorageRootFromEnv gets the storage root based on environment variables.

func TestStorage Uses

func TestStorage(ctx context.Context, c Client) error

TestStorage is a defensive method for checking to make sure that storage is properly configured.

func WithLocalClient Uses

func WithLocalClient(f func(objC Client) error) (retErr error)

WithLocalClient constructs a local object storage client for testing during the lifetime of the callback. DEPRECATED: WithLocalClient implements a testing pattern deprecated since go 1.14. consider switching to NewTestClient TODO: delete this function

type AmazonAdvancedConfiguration Uses

type AmazonAdvancedConfiguration struct {
    Retries int    `env:"RETRIES, default=10"`
    Timeout string `env:"TIMEOUT, default=5m"`
    // By default, objects uploaded to a bucket are only accessible to the
    // uploader, and not the owner of the bucket. Using the default ensures that
    // the owner of the bucket can access the objects as well.
    UploadACL      string `env:"UPLOAD_ACL, default=bucket-owner-full-control"`
    Reverse        bool   `env:"REVERSE, default=false"`
    PartSize       int64  `env:"PART_SIZE, default=5242880"`
    MaxUploadParts int    `env:"MAX_UPLOAD_PARTS, default=10000"`
    DisableSSL     bool   `env:"DISABLE_SSL, default=false"`
    NoVerifySSL    bool   `env:"NO_VERIFY_SSL, default=false"`
    LogOptions     string `env:"OBJ_LOG_OPTS, default="`

AmazonAdvancedConfiguration contains the advanced configuration for the amazon client.

type AmazonCreds Uses

type AmazonCreds struct {
    // Direct credentials. Only applicable if Pachyderm is given its own permanent
    // AWS credentials
    ID     string // Access Key ID
    Secret string // Secret Access Key
    Token  string // Access token (if using temporary security credentials

    // Vault options (if getting AWS credentials from Vault)
    VaultAddress string // normally addresses come from env, but don't have vault service name
    VaultRole    string
    VaultToken   string

AmazonCreds are options that are applicable specifically to Pachd's credentials in an AWS deployment

type BackoffReadCloser Uses

type BackoffReadCloser struct {
    // contains filtered or unexported fields

BackoffReadCloser retries with exponential backoff in the case of failures

func (*BackoffReadCloser) Close Uses

func (b *BackoffReadCloser) Close() (retErr error)

Close closes the ReaderCloser contained in b.

func (*BackoffReadCloser) Read Uses

func (b *BackoffReadCloser) Read(data []byte) (retN int, retErr error)

type BackoffWriteCloser Uses

type BackoffWriteCloser struct {
    // contains filtered or unexported fields

BackoffWriteCloser retries with exponential backoff in the case of failures

func (*BackoffWriteCloser) Close Uses

func (b *BackoffWriteCloser) Close() (retErr error)

Close closes the WriteCloser contained in b.

func (*BackoffWriteCloser) Write Uses

func (b *BackoffWriteCloser) Write(data []byte) (retN int, retErr error)

type Client Uses

type Client interface {
    // Writer returns a writer which writes to an object.
    // It should error if the object already exists or we don't have sufficient
    // permissions to write it.
    Writer(ctx context.Context, name string) (io.WriteCloser, error)
    // Reader returns a reader which reads from an object.
    // If `size == 0`, the reader should read from the offset till the end of the object.
    // It should error if the object doesn't exist or we don't have sufficient
    // permission to read it.
    Reader(ctx context.Context, name string, offset uint64, size uint64) (io.ReadCloser, error)
    // Delete deletes an object.
    // It should error if the object doesn't exist or we don't have sufficient
    // permission to delete it.
    Delete(ctx context.Context, name string) error
    // Walk calls `fn` with the names of objects which can be found under `prefix`.
    Walk(ctx context.Context, prefix string, fn func(name string) error) error
    // Exsits checks if a given object already exists
    Exists(ctx context.Context, name string) bool
    // IsRetryable determines if an operation should be retried given an error
    IsRetryable(err error) bool
    // IsNotExist returns true if err is a non existence error
    IsNotExist(err error) bool
    // IsIgnorable returns true if the error can be ignored
    IsIgnorable(err error) bool

Client is an interface to object storage.

func NewAmazonClient Uses

func NewAmazonClient(region, bucket string, creds *AmazonCreds, distribution string, endpoint string, reverse ...bool) (c Client, err error)

NewAmazonClient creates an amazon client with the following credentials:

bucket - S3 bucket name
distribution - cloudfront distribution ID
id     - AWS access key id
secret - AWS secret access key
token  - AWS access token
region - AWS region
endpoint - Custom endpoint (generally used for S3 compatible object stores)
reverse - Reverse object storage paths (overwrites configured value)

func NewAmazonClientFromEnv Uses

func NewAmazonClientFromEnv() (Client, error)

NewAmazonClientFromEnv creates a Amazon client based on environment variables.

func NewAmazonClientFromSecret Uses

func NewAmazonClientFromSecret(bucket string, reverse ...bool) (Client, error)

NewAmazonClientFromSecret constructs an amazon client by reading credentials from a mounted AmazonSecret. You may pass "" for bucket in which case it will read the bucket from the secret.

func NewCacheClient Uses

func NewCacheClient(slow, fast Client, size int) Client

NewCacheClient returns slow wrapped in an LRU write-through cache using fast for storing cached data.

func NewClientFromEnv Uses

func NewClientFromEnv(storageRoot string) (c Client, err error)

NewClientFromEnv creates a client based on environment variables.

func NewClientFromSecret Uses

func NewClientFromSecret(storageRoot string) (c Client, err error)

NewClientFromSecret creates a client based on mounted secret files.

func NewClientFromURLAndSecret Uses

func NewClientFromURLAndSecret(url *ObjectStoreURL, reverse ...bool) (c Client, err error)

NewClientFromURLAndSecret constructs a client by parsing `URL` and then constructing the correct client for that URL using secrets.

func NewGoogleClient Uses

func NewGoogleClient(bucket string, opts []option.ClientOption) (c Client, err error)

NewGoogleClient creates a google client with the given bucket name.

func NewGoogleClientFromEnv Uses

func NewGoogleClientFromEnv() (Client, error)

NewGoogleClientFromEnv creates a Google client based on environment variables.

func NewGoogleClientFromSecret Uses

func NewGoogleClientFromSecret(bucket string) (Client, error)

NewGoogleClientFromSecret creates a google client by reading credentials from a mounted GoogleSecret. You may pass "" for bucket in which case it will read the bucket from the secret.

func NewLimitedClient Uses

func NewLimitedClient(client Client, maxReaders, maxWriters int) Client

NewLimitedClient constructs a Client which will only ever have

<= maxReaders objects open for reading
<= maxWriters objects open for writing

if either is < 1 then that constraint is ignored.

func NewLocalClient Uses

func NewLocalClient(root string) (c Client, err error)

NewLocalClient returns a Client that stores data on the local file system

func NewMicrosoftClient Uses

func NewMicrosoftClient(container string, accountName string, accountKey string) (c Client, err error)

NewMicrosoftClient creates a microsoft client:

container   - Azure Blob Container name
accountName - Azure Storage Account name
accountKey  - Azure Storage Account key

func NewMicrosoftClientFromEnv Uses

func NewMicrosoftClientFromEnv() (Client, error)

NewMicrosoftClientFromEnv creates a Microsoft client based on environment variables.

func NewMicrosoftClientFromSecret Uses

func NewMicrosoftClientFromSecret(container string) (Client, error)

NewMicrosoftClientFromSecret creates a microsoft client by reading credentials from a mounted MicrosoftSecret. You may pass "" for container in which case it will read the container from the secret.

func NewMinioClient Uses

func NewMinioClient(endpoint, bucket, id, secret string, secure, isS3V2 bool) (c Client, err error)

NewMinioClient creates an s3 compatible client with the following credentials:

endpoint - S3 compatible endpoint
bucket - S3 bucket name
id     - AWS access key id
secret - AWS secret access key
secure - Set to true if connection is secure.
isS3V2 - Set to true if client follows S3V2

func NewMinioClientFromEnv Uses

func NewMinioClientFromEnv() (Client, error)

NewMinioClientFromEnv creates a Minio client based on environment variables.

func NewMinioClientFromSecret Uses

func NewMinioClientFromSecret(bucket string) (Client, error)

NewMinioClientFromSecret constructs an s3 compatible client by reading credentials from a mounted AmazonSecret. You may pass "" for bucket in which case it will read the bucket from the secret.

func NewTestClient Uses

func NewTestClient(t testing.TB) Client

NewTestClient creates a Client which is cleaned up after the test exists

func TracingObjClient Uses

func TracingObjClient(provider string, c Client) Client

TracingObjClient wraps the given object client 'c', adding tracing to all calls made by the returned interface

type ObjectStoreURL Uses

type ObjectStoreURL struct {
    // The object store, e.g. s3, gcs, as...
    Store string
    // The "bucket" (in AWS parlance) or the "container" (in Azure parlance).
    Bucket string
    // The object itself.
    Object string

ObjectStoreURL represents a parsed URL to an object in an object store.

func ParseURL Uses

func ParseURL(urlStr string) (*ObjectStoreURL, error)

ParseURL parses an URL into ObjectStoreURL.

type RetryError Uses

type RetryError struct {
    Err               string
    TimeTillNextRetry string
    BytesProcessed    int

RetryError is used to log retry attempts.



Package obj imports 50 packages (graph) and is imported by 13 packages. Updated 2021-01-21. Refresh now. Tools for package owners.