dstore

package module
v0.1.1-0...-b7df14c Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 19, 2024 License: Apache-2.0 Imports: 33 Imported by: 0

README

StreamingFast Storage Abstraction

reference License

dstore is a simple abstraction on top of Local storage and Cloud storage. It handles commonly used functions to store things (locally, or on cloud storage providers), list files, delete, etc..

It is used by StreamingFast.

Features

It currently supports:

  • AWS S3 (s3://[bucket]/path?region=us-east-1, with AWS-specific env vars)
    • Minio (through the S3 interface)
  • Google Storage (gs://[bucket]/path, with GOOGLE_APPLICATION_CREDENTIALS env var set)
  • Azure Blob Storage (az://[account].[container]/path, with AZURE_STORAGE_KEY env var set)
  • Local file systems (including virtual of fused-based) (file:/// prefix)
Testing

The storetests package contains all our integration tests we perform on our store implementation. Some of the store implementations can be tested directly while few others, from Cloud Providers essentially, requires some extra environment variables to run. They are skip if the correct environment variables for the provider is not set.

To run the full test suite, you will need to perform the following steps.

First, you will need to have locally a few dependencies:

Then, start minio server:

mkdir -p /tmp/minio-tests/store-tests
cd /tmp/minio-tests
minio server .

Ensure you have access to GCP Storage Bucket, S3 Bucket, then run the full test suite:

STORETESTS_GS_STORE_URL="gs://streamingfast-developement-random/store-tests"\
STORETESTS_S3_STORE_URL="s3://streamingfast-customer-outbox/store-tests?region=us-east-2"\
STORETESTS_S3_MINIO_STORE_URL="s3://localhost:9000/store-tests?region=none&insecure=true&access_key_id=minioadmin&secret_access_key=minioadmin"\
go test ./...

Note S3 bucket is not existing anymore, you need to provide your own.

Contributing

Issues and PR in this repo related strictly to the dstore library.

Report any protocol-specific issues in their respective repositories

Please first refer to the general StreamingFast contribution guide, if you wish to contribute to this code base.

License

Apache 2.0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrNotFound = errors.New("not found")
View Source
var NewStoreFromURL = NewStoreFromFileURL

Deprecated: Use NewStoreFromFileURL

View Source
var StopIteration = errors.New("stop iteration")

Functions

func ParseS3URL

func ParseS3URL(s3URL *url.URL) (config *aws.Config, bucket string, path string, err error)

func ReadObject

func ReadObject(ctx context.Context, fileURL string, opts ...Option) ([]byte, error)

ReadObject directly reads the giving file URL by parsing the file url, extracting the path and the filename from it, creating the store interface, opening the object directly and returning all this.

This is a shortcut helper function that make it simpler to get store from a single file url.

Types

type AzureStore

type AzureStore struct {
	// contains filtered or unexported fields
}

func NewAzureStore

func NewAzureStore(baseURL *url.URL, extension, compressionType string, overwrite bool) (*AzureStore, error)

func (*AzureStore) BaseURL

func (s *AzureStore) BaseURL() *url.URL

func (*AzureStore) Clone

func (s *AzureStore) Clone(_ context.Context) (Store, error)

context not used here

func (*AzureStore) CopyObject

func (s *AzureStore) CopyObject(ctx context.Context, src, dest string) error

func (AzureStore) DeleteObject

func (s AzureStore) DeleteObject(ctx context.Context, base string) error

func (*AzureStore) FileExists

func (s *AzureStore) FileExists(ctx context.Context, base string) (bool, error)

func (*AzureStore) ListFiles

func (s *AzureStore) ListFiles(ctx context.Context, prefix string, max int) ([]string, error)

func (*AzureStore) ObjectAttributes

func (s *AzureStore) ObjectAttributes(ctx context.Context, base string) (*ObjectAttributes, error)

func (*AzureStore) ObjectPath

func (s *AzureStore) ObjectPath(name string) string

func (*AzureStore) ObjectURL

func (s *AzureStore) ObjectURL(name string) string

func (*AzureStore) OpenObject

func (s *AzureStore) OpenObject(ctx context.Context, name string) (out io.ReadCloser, err error)

func (AzureStore) Overwrite

func (c AzureStore) Overwrite() bool

func (*AzureStore) PushLocalFile

func (s *AzureStore) PushLocalFile(ctx context.Context, localFile, toBaseName string) error

func (AzureStore) SetMeter

func (c AzureStore) SetMeter(m Meter)

func (AzureStore) SetOverwrite

func (c AzureStore) SetOverwrite(in bool)

func (*AzureStore) SubStore

func (s *AzureStore) SubStore(subFolder string) (Store, error)

func (*AzureStore) Walk

func (s *AzureStore) Walk(ctx context.Context, prefix string, f func(filename string) (err error)) error

func (*AzureStore) WalkFrom

func (s *AzureStore) WalkFrom(ctx context.Context, prefix, startingPoint string, f func(filename string) (err error)) error

func (*AzureStore) WriteObject

func (s *AzureStore) WriteObject(ctx context.Context, base string, f io.Reader) (err error)

type BufferedFileReadCloser

type BufferedFileReadCloser struct {
	// contains filtered or unexported fields
}

func NewBufferedFileReadCloser

func NewBufferedFileReadCloser(file *os.File) *BufferedFileReadCloser

func (*BufferedFileReadCloser) Close

func (readCloser *BufferedFileReadCloser) Close() error

func (*BufferedFileReadCloser) Read

func (readCloser *BufferedFileReadCloser) Read(p []byte) (n int, err error)

type Clonable

type Clonable interface {
	Clone(context.Context) (Store, error)
}

type GSStore

type GSStore struct {
	// contains filtered or unexported fields
}

func NewGSStore

func NewGSStore(baseURL *url.URL, extension, compressionType string, overwrite bool) (*GSStore, error)

func (*GSStore) BaseURL

func (s *GSStore) BaseURL() *url.URL

func (*GSStore) Clone

func (s *GSStore) Clone(ctx context.Context) (Store, error)

func (*GSStore) CopyObject

func (s *GSStore) CopyObject(ctx context.Context, src, dest string) error

func (*GSStore) DeleteObject

func (s *GSStore) DeleteObject(ctx context.Context, base string) error

func (*GSStore) FileExists

func (s *GSStore) FileExists(ctx context.Context, base string) (bool, error)

func (*GSStore) ListFiles

func (s *GSStore) ListFiles(ctx context.Context, prefix string, max int) ([]string, error)

func (*GSStore) ObjectAttributes

func (s *GSStore) ObjectAttributes(ctx context.Context, base string) (*ObjectAttributes, error)

func (*GSStore) ObjectPath

func (s *GSStore) ObjectPath(name string) string

func (*GSStore) ObjectURL

func (s *GSStore) ObjectURL(name string) string

func (*GSStore) OpenObject

func (s *GSStore) OpenObject(ctx context.Context, name string) (out io.ReadCloser, err error)

func (GSStore) Overwrite

func (c GSStore) Overwrite() bool

func (*GSStore) PushLocalFile

func (s *GSStore) PushLocalFile(ctx context.Context, localFile, toBaseName string) error

func (GSStore) SetMeter

func (c GSStore) SetMeter(m Meter)

func (GSStore) SetOverwrite

func (c GSStore) SetOverwrite(in bool)

func (*GSStore) SubStore

func (s *GSStore) SubStore(subFolder string) (Store, error)

func (*GSStore) Walk

func (s *GSStore) Walk(ctx context.Context, prefix string, f func(filename string) (err error)) error

func (*GSStore) WalkFrom

func (s *GSStore) WalkFrom(ctx context.Context, prefix, startingPoint string, f func(filename string) (err error)) error

func (*GSStore) WriteObject

func (s *GSStore) WriteObject(ctx context.Context, base string, f io.Reader) (err error)

type GZipReadCloser

type GZipReadCloser struct {
	*gzip.Reader
	// contains filtered or unexported fields
}

func NewGZipReadCloser

func NewGZipReadCloser(src io.ReadCloser) (*GZipReadCloser, error)

func (*GZipReadCloser) Close

func (g *GZipReadCloser) Close() error

type LocalStore

type LocalStore struct {
	// contains filtered or unexported fields
}

func NewLocalStore

func NewLocalStore(baseURL *url.URL, extension, compressionType string, overwrite bool) (*LocalStore, error)

func (*LocalStore) BaseURL

func (s *LocalStore) BaseURL() *url.URL

func (*LocalStore) Clone

func (s *LocalStore) Clone(ctx context.Context) (Store, error)

func (*LocalStore) CopyObject

func (s *LocalStore) CopyObject(ctx context.Context, src, dest string) error

func (*LocalStore) DeleteObject

func (s *LocalStore) DeleteObject(ctx context.Context, base string) error

func (*LocalStore) FileExists

func (s *LocalStore) FileExists(ctx context.Context, base string) (bool, error)

func (*LocalStore) ListFiles

func (s *LocalStore) ListFiles(ctx context.Context, prefix string, max int) ([]string, error)

func (*LocalStore) ObjectAttributes

func (s *LocalStore) ObjectAttributes(ctx context.Context, base string) (*ObjectAttributes, error)

func (*LocalStore) ObjectPath

func (s *LocalStore) ObjectPath(name string) string

func (*LocalStore) ObjectURL

func (s *LocalStore) ObjectURL(name string) string

func (*LocalStore) OpenObject

func (s *LocalStore) OpenObject(ctx context.Context, name string) (out io.ReadCloser, err error)

func (LocalStore) Overwrite

func (c LocalStore) Overwrite() bool

func (*LocalStore) PushLocalFile

func (s *LocalStore) PushLocalFile(ctx context.Context, localFile, toBaseName string) error

func (LocalStore) SetMeter

func (c LocalStore) SetMeter(m Meter)

func (LocalStore) SetOverwrite

func (c LocalStore) SetOverwrite(in bool)

func (*LocalStore) SubStore

func (s *LocalStore) SubStore(subFolder string) (Store, error)

func (*LocalStore) Walk

func (s *LocalStore) Walk(ctx context.Context, prefix string, f func(filename string) (err error)) error

func (*LocalStore) WalkFrom

func (s *LocalStore) WalkFrom(ctx context.Context, prefix, startingPoint string, f func(filename string) (err error)) error

func (*LocalStore) WriteObject

func (s *LocalStore) WriteObject(ctx context.Context, base string, reader io.Reader) (err error)

type Meter

type Meter interface {
	AddBytesRead(int)
	AddBytesWritten(int)

	AddBytesWrittenCtx(context.Context, int)
	AddBytesReadCtx(context.Context, int)
}

type MockStore

type MockStore struct {
	OpenObjectFunc       func(ctx context.Context, name string) (out io.ReadCloser, err error)
	WriteObjectFunc      func(ctx context.Context, base string, f io.Reader) error
	CopyObjectFunc       func(ctx context.Context, src, dest string) error
	DeleteObjectFunc     func(ctx context.Context, base string) error
	FileExistsFunc       func(ctx context.Context, base string) (bool, error)
	ObjectAttributesFunc func(ctx context.Context, base string) (*ObjectAttributes, error)
	ListFilesFunc        func(ctx context.Context, prefix string, max int) ([]string, error)
	WalkFunc             func(ctx context.Context, prefix string, f func(filename string) error) error
	PushLocalFileFunc    func(ctx context.Context, localFile string, toBaseName string) (err error)

	Files map[string][]byte
	// contains filtered or unexported fields
}

func NewMockStore

func NewMockStore(writeFunc func(base string, f io.Reader) (err error)) *MockStore

func (*MockStore) BaseURL

func (s *MockStore) BaseURL() *url.URL

func (*MockStore) CopyObject

func (s *MockStore) CopyObject(ctx context.Context, src, dest string) error

func (*MockStore) DeleteObject

func (s *MockStore) DeleteObject(ctx context.Context, base string) error

func (*MockStore) FileExists

func (s *MockStore) FileExists(ctx context.Context, base string) (bool, error)

func (*MockStore) ListFiles

func (s *MockStore) ListFiles(ctx context.Context, prefix string, max int) ([]string, error)

func (*MockStore) ObjectAttributes

func (s *MockStore) ObjectAttributes(ctx context.Context, base string) (*ObjectAttributes, error)

func (*MockStore) ObjectPath

func (s *MockStore) ObjectPath(base string) string

func (*MockStore) ObjectURL

func (s *MockStore) ObjectURL(base string) string

func (*MockStore) OpenObject

func (s *MockStore) OpenObject(ctx context.Context, name string) (out io.ReadCloser, err error)

func (*MockStore) Overwrite

func (s *MockStore) Overwrite() bool

func (*MockStore) PushLocalFile

func (s *MockStore) PushLocalFile(ctx context.Context, localFile string, toBaseName string) (err error)

func (*MockStore) SetFile

func (s *MockStore) SetFile(name string, content []byte)

SetFile sets the content of a file. Set the value "err" to trigger an error when reading this file.

func (*MockStore) SetMeter

func (s *MockStore) SetMeter(meter Meter)

func (*MockStore) SetOverwrite

func (s *MockStore) SetOverwrite(in bool)

func (*MockStore) SubStore

func (s *MockStore) SubStore(subFolder string) (Store, error)

func (*MockStore) Walk

func (s *MockStore) Walk(ctx context.Context, prefix string, f func(filename string) error) error

func (*MockStore) WalkFrom

func (s *MockStore) WalkFrom(ctx context.Context, prefix, startingPoint string, f func(filename string) (err error)) error

func (*MockStore) WriteFiles

func (s *MockStore) WriteFiles(toDirectory string) error

WriteFiles dumps currently know file

func (*MockStore) WriteObject

func (s *MockStore) WriteObject(ctx context.Context, base string, f io.Reader) (err error)

type ObjectAttributes

type ObjectAttributes struct {
	// Size is the size of the object in bytes.
	Size int64

	// LastModified is the time the object was last modified.
	LastModified time.Time
}

type Option

type Option interface {
	// contains filtered or unexported methods
}

func AllowOverwrite

func AllowOverwrite() Option

AllowOverwrite allow files to be overwritten when already exist at a given location.

func Compression

func Compression(compressionType string) Option

Compression defines which kind of compression to use when creating the store instance.

Valid `compressionType` values: - <empty> No compression - zstd Use ZSTD compression - gzip Use GZIP compression

type S3Store

type S3Store struct {
	// contains filtered or unexported fields
}

func NewS3Store

func NewS3Store(baseURL *url.URL, extension, compressionType string, overwrite bool) (*S3Store, error)

func (*S3Store) BaseURL

func (s *S3Store) BaseURL() *url.URL

func (*S3Store) Clone

func (s *S3Store) Clone(ctx context.Context) (Store, error)

func (*S3Store) CopyObject

func (s *S3Store) CopyObject(ctx context.Context, src, dest string) error

func (*S3Store) DeleteObject

func (s *S3Store) DeleteObject(ctx context.Context, base string) error

func (*S3Store) FileExists

func (s *S3Store) FileExists(ctx context.Context, base string) (bool, error)

func (*S3Store) ListFiles

func (s *S3Store) ListFiles(ctx context.Context, prefix string, max int) ([]string, error)

func (*S3Store) ObjectAttributes

func (s *S3Store) ObjectAttributes(ctx context.Context, base string) (*ObjectAttributes, error)

func (*S3Store) ObjectPath

func (s *S3Store) ObjectPath(name string) string

func (*S3Store) ObjectURL

func (s *S3Store) ObjectURL(name string) string

func (*S3Store) OpenObject

func (s *S3Store) OpenObject(ctx context.Context, name string) (out io.ReadCloser, err error)

func (S3Store) Overwrite

func (c S3Store) Overwrite() bool

func (*S3Store) PushLocalFile

func (s *S3Store) PushLocalFile(ctx context.Context, localFile, toBaseName string) error

func (S3Store) SetMeter

func (c S3Store) SetMeter(m Meter)

func (S3Store) SetOverwrite

func (c S3Store) SetOverwrite(in bool)

func (*S3Store) SubStore

func (s *S3Store) SubStore(subFolder string) (Store, error)

func (*S3Store) Walk

func (s *S3Store) Walk(ctx context.Context, prefix string, f func(filename string) (err error)) error

func (*S3Store) WalkFrom

func (s *S3Store) WalkFrom(ctx context.Context, prefix, startingPoint string, f func(filename string) (err error)) error

func (*S3Store) WriteObject

func (s *S3Store) WriteObject(ctx context.Context, base string, f io.Reader) (err error)

type Store

type Store interface {
	OpenObject(ctx context.Context, name string) (out io.ReadCloser, err error)
	FileExists(ctx context.Context, base string) (bool, error)

	ObjectPath(base string) string
	ObjectURL(base string) string
	ObjectAttributes(ctx context.Context, base string) (*ObjectAttributes, error)

	WriteObject(ctx context.Context, base string, f io.Reader) (err error)
	PushLocalFile(ctx context.Context, localFile, toBaseName string) (err error)

	CopyObject(ctx context.Context, src, dest string) error
	Overwrite() bool
	SetOverwrite(enabled bool)

	WalkFrom(ctx context.Context, prefix, startingPoint string, f func(filename string) (err error)) error

	// Walk recursively all files starting with the given prefix within this store. The `f` callback is invoked
	// for each file found.
	//
	// If you return `dstore.StopIteration` from your callback, iteration stops right away and `nil` will
	// returned by the `Walk` function. If your callback returns any error, iteration stops right away and
	// callback returned error is return by the `Walk` function.
	Walk(ctx context.Context, prefix string, f func(filename string) (err error)) error
	ListFiles(ctx context.Context, prefix string, max int) ([]string, error)

	DeleteObject(ctx context.Context, base string) error

	// Used to retrieve original query parameters, allowing further
	// configurability of the consumers of this store.
	BaseURL() *url.URL
	SubStore(subFolder string) (Store, error)

	SetMeter(meter Meter)
}

func NewDBinStore

func NewDBinStore(baseURL string) (Store, error)

func NewJSONLStore

func NewJSONLStore(baseURL string) (Store, error)

func NewSimpleStore

func NewSimpleStore(baseURL string) (Store, error)

func NewStore

func NewStore(baseURL, extension, compressionType string, overwrite bool) (Store, error)

NewStore creates a new Store instance. The baseURL is always a directory, and does not end with a `/`.

func NewStoreFromFileURL

func NewStoreFromFileURL(fileURL string, opts ...Option) (store Store, filename string, err error)

NewStoreFromFileURL works against a full file URL to derive the store from it as well as the filename it points to. Use this method **only and only if** the input points to a file directly, if your input is to build a store, use NewStore instead.

This is a shortcut helper function that make it simpler to get store from a single file url.

func OpenObject

func OpenObject(ctx context.Context, fileURL string, opts ...Option) (out io.ReadCloser, store Store, filename string, err error)

OpenObject directly opens the giving file URL by parsing the file url, extracting the path and the filename from it, creating the store interface, opening the object directly and returning all this.

This is a shortcut helper function that make it simpler to get store from a single file url.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL