storage

package
v5.3.0+incompatible Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 6, 2021 License: Apache-2.0 Imports: 37 Imported by: 0

Documentation

Index

Constants

View Source
const (

	// LocalURIPrefix represents the local storage prefix.
	LocalURIPrefix = "file://"
)

Variables

This section is empty.

Functions

func DefineFlags

func DefineFlags(flags *pflag.FlagSet)

DefineFlags adds flags to the flag set corresponding to all backend options.

func ExtractQueryParameters

func ExtractQueryParameters(u *url.URL, options interface{})

ExtractQueryParameters moves the query parameters of the URL into the options using reflection.

The options must be a pointer to a struct which contains only string or bool fields (more types will be supported in the future), and tagged for JSON serialization.

All of the URL's query parameters will be removed after calling this method.

func FormatBackendURL

func FormatBackendURL(backend *backuppb.StorageBackend) (u url.URL)

FormatBackendURL obtains the raw URL which can be used the reconstruct the backend. The returned URL does not contain options for further configurating the backend. This is to avoid exposing secret tokens.

func ParseBackend

func ParseBackend(rawURL string, options *BackendOptions) (*backuppb.StorageBackend, error)

ParseBackend constructs a structured backend description from the storage URL.

func ParseRawURL

func ParseRawURL(rawURL string) (*url.URL, error)

ParseRawURL parse raw url to url object.

Types

type BackendOptions

type BackendOptions struct {
	S3  S3BackendOptions  `json:"s3" toml:"s3"`
	GCS GCSBackendOptions `json:"gcs" toml:"gcs"`
}

BackendOptions further configures the storage backend not expressed by the storage URL.

func (*BackendOptions) ParseFromFlags

func (options *BackendOptions) ParseFromFlags(flags *pflag.FlagSet) error

ParseFromFlags obtains the backend options from the flag set.

type BytesWriter

type BytesWriter struct {
	// contains filtered or unexported fields
}

BytesWriter is a Writer implementation on top of bytes.Buffer that is useful for testing.

func NewBufferWriter

func NewBufferWriter() *BytesWriter

NewBufferWriter creates a Writer that simply writes to a buffer (useful for testing).

func (*BytesWriter) Bytes

func (u *BytesWriter) Bytes() []byte

Bytes delegates to bytes.Buffer.

func (*BytesWriter) Close

func (u *BytesWriter) Close(ctx context.Context) error

Close delegates to bytes.Buffer.

func (*BytesWriter) Reset

func (u *BytesWriter) Reset()

Reset delegates to bytes.Buffer.

func (*BytesWriter) String

func (u *BytesWriter) String() string

String delegates to bytes.Buffer.

func (*BytesWriter) Write

func (u *BytesWriter) Write(ctx context.Context, p []byte) (int, error)

Write delegates to bytes.Buffer.

type CompressType

type CompressType uint8

CompressType represents the type of compression.

const (
	// NoCompression won't compress given bytes.
	NoCompression CompressType = iota
	// Gzip will compress given bytes in gzip format.
	Gzip
)

type ExternalFileReader

type ExternalFileReader interface {
	io.ReadCloser
	io.Seeker
}

ExternalFileReader represents the streaming external file reader.

type ExternalFileWriter

type ExternalFileWriter interface {
	// Write writes to buffer and if chunk is filled will upload it
	Write(ctx context.Context, p []byte) (int, error)
	// Close writes final chunk and completes the upload
	Close(ctx context.Context) error
}

ExternalFileWriter represents the streaming external file writer.

func NewUploaderWriter

func NewUploaderWriter(writer ExternalFileWriter, chunkSize int, compressType CompressType) ExternalFileWriter

NewUploaderWriter wraps the Writer interface over an uploader.

type ExternalStorage

type ExternalStorage interface {
	// WriteFile writes a complete file to storage, similar to os.WriteFile
	WriteFile(ctx context.Context, name string, data []byte) error
	// ReadFile reads a complete file from storage, similar to os.ReadFile
	ReadFile(ctx context.Context, name string) ([]byte, error)
	// FileExists return true if file exists
	FileExists(ctx context.Context, name string) (bool, error)
	// Open a Reader by file path. path is relative path to storage base path
	Open(ctx context.Context, path string) (ExternalFileReader, error)
	// WalkDir traverse all the files in a dir.
	//
	// fn is the function called for each regular file visited by WalkDir.
	// The argument `path` is the file path that can be used in `Open`
	// function; the argument `size` is the size in byte of the file determined
	// by path.
	WalkDir(ctx context.Context, opt *WalkOption, fn func(path string, size int64) error) error

	// URI returns the base path as a URI
	URI() string

	// Create opens a file writer by path. path is relative path to storage base path
	Create(ctx context.Context, path string) (ExternalFileWriter, error)
}

ExternalStorage represents a kind of file system storage.

func Create

func Create(ctx context.Context, backend *backuppb.StorageBackend, sendCreds bool) (ExternalStorage, error)

Create creates ExternalStorage.

Please consider using `New` in the future.

func New

New creates an ExternalStorage with options.

func WithCompression

func WithCompression(inner ExternalStorage, compressionType CompressType) ExternalStorage

WithCompression returns an ExternalStorage with compress option

func WrapNew

func WrapNew(ctx context.Context, backend *backuppb.StorageBackend, opts *ExternalStorageOptions, hdfsConfig *HdfsConfig) (ExternalStorage, error)

type ExternalStorageOptions

type ExternalStorageOptions struct {
	// SendCredentials marks whether to send credentials downstream.
	//
	// This field should be set to false if the credentials are provided to
	// downstream via external key managers, e.g. on K8s or cloud provider.
	SendCredentials bool

	// NoCredentials means that no cloud credentials are supplied to BR
	NoCredentials bool

	// deprecated: use checkPermissions and specify the checkPermission instead.
	SkipCheckPath bool

	// HTTPClient to use. The created storage may ignore this field if it is not
	// directly using HTTP (e.g. the local storage).
	HTTPClient *http.Client

	// CheckPermissions check the given permission in New() function.
	// make sure we can access the storage correctly before execute tasks.
	CheckPermissions []Permission
}

ExternalStorageOptions are backend-independent options provided to New.

type GCSBackendOptions

type GCSBackendOptions struct {
	Endpoint        string `json:"endpoint" toml:"endpoint"`
	StorageClass    string `json:"storage-class" toml:"storage-class"`
	PredefinedACL   string `json:"predefined-acl" toml:"predefined-acl"`
	CredentialsFile string `json:"credentials-file" toml:"credentials-file"`
}

GCSBackendOptions are options for configuration the GCS storage.

type HdfsConfig

type HdfsConfig struct {
	Address string
}

func WrapParseBackend

func WrapParseBackend(rawURL string, options *BackendOptions) (*backuppb.StorageBackend, *HdfsConfig, error)

type HdfsReader

type HdfsReader hdfs.FileReader

实现storage.ExternalFileReader接口

type HdfsStorage

type HdfsStorage struct {
	// contains filtered or unexported fields
}

实现storage.ExternalStorage接口

func (*HdfsStorage) Create

func (s *HdfsStorage) Create(ctx context.Context, name string) (ExternalFileWriter, error)

func (*HdfsStorage) FileExists

func (s *HdfsStorage) FileExists(ctx context.Context, name string) (bool, error)

func (*HdfsStorage) Open

func (s *HdfsStorage) Open(ctx context.Context, path string) (ExternalFileReader, error)

func (*HdfsStorage) ReadFile

func (s *HdfsStorage) ReadFile(ctx context.Context, name string) ([]byte, error)

ReadFile reads the file from the storage and returns the contents

func (*HdfsStorage) URI

func (s *HdfsStorage) URI() string

func (*HdfsStorage) WalkDir

func (s *HdfsStorage) WalkDir(ctx context.Context, opt *WalkOption, fn func(path string, size int64) error) error

WalkDir traverse all the files in a dir.

func (*HdfsStorage) WriteFile

func (s *HdfsStorage) WriteFile(ctx context.Context, name string, data []byte) error

WriteFile writes data to a file to storage

type HdfsWriter

type HdfsWriter struct {
	// contains filtered or unexported fields
}

TODO: 在csv写入通路中加入该writer 实现storage.ExternalFileWriter

func (*HdfsWriter) Close

func (w *HdfsWriter) Close(ctx context.Context) error

func (*HdfsWriter) Write

func (w *HdfsWriter) Write(ctx context.Context, p []byte) (int, error)

type LocalStorage

type LocalStorage struct {
	// contains filtered or unexported fields
}

LocalStorage represents local file system storage.

export for using in tests.

func NewLocalStorage

func NewLocalStorage(base string) (*LocalStorage, error)

NewLocalStorage return a LocalStorage at directory `base`.

export for test.

func (*LocalStorage) Create

func (l *LocalStorage) Create(ctx context.Context, name string) (ExternalFileWriter, error)

Create implements ExternalStorage interface.

func (*LocalStorage) FileExists

func (l *LocalStorage) FileExists(ctx context.Context, name string) (bool, error)

FileExists implement ExternalStorage.FileExists.

func (*LocalStorage) Open

Open a Reader by file path, path is a relative path to base path.

func (*LocalStorage) ReadFile

func (l *LocalStorage) ReadFile(ctx context.Context, name string) ([]byte, error)

ReadFile reads the file from the storage and returns the contents.

func (*LocalStorage) URI

func (l *LocalStorage) URI() string

URI returns the base path as an URI with a file:/// prefix.

func (*LocalStorage) WalkDir

func (l *LocalStorage) WalkDir(ctx context.Context, opt *WalkOption, fn func(string, int64) error) error

WalkDir traverse all the files in a dir.

fn is the function called for each regular file visited by WalkDir. The first argument is the file path that can be used in `Open` function; the second argument is the size in byte of the file determined by path.

func (*LocalStorage) WriteFile

func (l *LocalStorage) WriteFile(ctx context.Context, name string, data []byte) error

WriteFile writes data to a file to storage.

type Permission

type Permission string

Permission represents the permission we need to check in create storage.

const (
	// AccessBuckets represents bucket access permission
	// it replace the origin skip-check-path.
	AccessBuckets Permission = "AccessBucket"

	// ListObjects represents listObjects permission
	ListObjects Permission = "ListObjects"
	// GetObject represents GetObject permission
	GetObject Permission = "GetObject"
	// PutObject represents PutObject permission
	PutObject Permission = "PutObject"
)

type RangeInfo

type RangeInfo struct {
	// Start is the absolute position of the first byte of the byte range,
	// starting from 0.
	Start int64
	// End is the absolute position of the last byte of the byte range. This end
	// offset is inclusive, e.g. if the Size is 1000, the maximum value of End
	// would be 999.
	End int64
	// Size is the total size of the original file.
	Size int64
}

RangeInfo represents the an HTTP Content-Range header value of the form `bytes [Start]-[End]/[Size]`.

func ParseRangeInfo

func ParseRangeInfo(info *string) (ri RangeInfo, err error)

ParseRangeInfo parses the Content-Range header and returns the offsets.

type ReadSeekCloser

type ReadSeekCloser interface {
	io.Reader
	io.Seeker
	io.Closer
}

ReadSeekCloser is the interface that groups the basic Read, Seek and Close methods.

type S3BackendOptions

type S3BackendOptions struct {
	Endpoint              string `json:"endpoint" toml:"endpoint"`
	Region                string `json:"region" toml:"region"`
	StorageClass          string `json:"storage-class" toml:"storage-class"`
	Sse                   string `json:"sse" toml:"sse"`
	SseKmsKeyID           string `json:"sse-kms-key-id" toml:"sse-kms-key-id"`
	ACL                   string `json:"acl" toml:"acl"`
	AccessKey             string `json:"access-key" toml:"access-key"`
	SecretAccessKey       string `json:"secret-access-key" toml:"secret-access-key"`
	Provider              string `json:"provider" toml:"provider"`
	ForcePathStyle        bool   `json:"force-path-style" toml:"force-path-style"`
	UseAccelerateEndpoint bool   `json:"use-accelerate-endpoint" toml:"use-accelerate-endpoint"`
}

S3BackendOptions contains options for s3 storage.

func (*S3BackendOptions) Apply

func (options *S3BackendOptions) Apply(s3 *backuppb.S3) error

Apply apply s3 options on backuppb.S3.

type S3Storage

type S3Storage struct {
	// contains filtered or unexported fields
}

S3Storage info for s3 storage.

func NewS3Storage deprecated

func NewS3Storage(
	backend *backuppb.S3,
	sendCredential bool,
) (*S3Storage, error)

NewS3Storage initialize a new s3 storage for metadata.

Deprecated: Create the storage via `New()` instead of using this.

func NewS3StorageForTest

func NewS3StorageForTest(svc s3iface.S3API, options *backuppb.S3) *S3Storage

NewS3StorageForTest creates a new S3Storage for testing only.

func (*S3Storage) Create

func (rs *S3Storage) Create(ctx context.Context, name string) (ExternalFileWriter, error)

Create creates multi upload request.

func (*S3Storage) CreateUploader

func (rs *S3Storage) CreateUploader(ctx context.Context, name string) (ExternalFileWriter, error)

CreateUploader create multi upload request.

func (*S3Storage) FileExists

func (rs *S3Storage) FileExists(ctx context.Context, file string) (bool, error)

FileExists check if file exists on s3 storage.

func (*S3Storage) Open

func (rs *S3Storage) Open(ctx context.Context, path string) (ExternalFileReader, error)

Open a Reader by file path.

func (*S3Storage) ReadFile

func (rs *S3Storage) ReadFile(ctx context.Context, file string) ([]byte, error)

ReadFile reads the file from the storage and returns the contents.

func (*S3Storage) URI

func (rs *S3Storage) URI() string

URI returns s3://<base>/<prefix>.

func (*S3Storage) WalkDir

func (rs *S3Storage) WalkDir(ctx context.Context, opt *WalkOption, fn func(string, int64) error) error

WalkDir traverse all the files in a dir.

fn is the function called for each regular file visited by WalkDir. The first argument is the file path that can be used in `Open` function; the second argument is the size in byte of the file determined by path.

func (*S3Storage) WriteFile

func (rs *S3Storage) WriteFile(ctx context.Context, file string, data []byte) error

WriteFile writes data to a file to storage.

type S3Uploader

type S3Uploader struct {
	// contains filtered or unexported fields
}

S3Uploader does multi-part upload to s3.

func (*S3Uploader) Close

func (u *S3Uploader) Close(ctx context.Context) error

Close complete multi upload request.

func (*S3Uploader) Write

func (u *S3Uploader) Write(ctx context.Context, data []byte) (int, error)

UploadPart update partial data to s3, we should call CreateMultipartUpload to start it, and call CompleteMultipartUpload to finish it.

type Uploader

type Uploader interface {
	// UploadPart upload part of file data to storage
	UploadPart(ctx context.Context, data []byte) error
	// CompleteUpload make the upload data to a complete file
	CompleteUpload(ctx context.Context) error
}

Uploader upload file with chunks.

type WalkOption

type WalkOption struct {
	// walk on SubDir of specify directory
	SubDir string
	// ListCount is the number of entries per page.
	//
	// In cloud storages such as S3 and GCS, the files listed and sent in pages.
	// Typically a page contains 1000 files, and if a folder has 3000 descendant
	// files, one would need 3 requests to retrieve all of them. This parameter
	// controls this size. Note that both S3 and GCS limits the maximum to 1000.
	//
	// Typically you want to leave this field unassigned (zero) to use the
	// default value (1000) to minimize the number of requests, unless you want
	// to reduce the possibility of timeout on an extremely slow connection, or
	// perform testing.
	ListCount int64
}

WalkOption is the option of storage.WalkDir.

type Writer

type Writer interface {
	// Write writes to buffer and if chunk is filled will upload it
	Write(ctx context.Context, p []byte) (int, error)
	// Close writes final chunk and completes the upload
	Close(ctx context.Context) error
}

Writer is like io.Writer but with Context, create a new writer on top of Uploader with NewUploaderWriter.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL