cloudstorage

package module
v0.2.16 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 23, 2024 License: MIT Imports: 16 Imported by: 22

README

Introduction

Cloudstorage is an library for working with Cloud Storage (Google, AWS, Azure) and SFTP, Local Files. It provides a unified api for local files, sftp and Cloud files that aids testing and operating on multiple cloud storage.

GoDoc Go ReportCard

Features

  • Provide single unified api for multiple cloud (google, azure, aws) & local files.
  • Cloud Upload/Download is unified in api so you don't have to download file to local, work with it, then upload.
  • Buffer/Cache files from cloud local so speed of usage is very high.

Example usage:

Note: For these examples all errors are ignored, using the _ for them.

Creating a Store object:
// This is an example of a local storage object:  
// See(https://github.com/lytics/cloudstorage/blob/master/google/google_test.go) for a GCS example:
config := &cloudstorage.Config{
	Type:            localfs.StoreType,
	AuthMethod:      localfs.AuthFileSystem,
	LocalFS:         "/tmp/mockcloud",
	TmpDir:          "/tmp/localcache",
}
store, _ := cloudstorage.NewStore(config)
Listing Objects:

See go Iterator pattern doc for api-design: https://github.com/GoogleCloudPlatform/google-cloud-go/wiki/Iterator-Guidelines

// From a store that has been created

// Create a query
q := cloudstorage.NewQuery("list-test/")
// Create an Iterator
iter, err := store.Objects(context.Background(), q)
if err != nil {
	// handle
}

for {
	o, err := iter.Next()
	if err == iterator.Done {
		break
	}
	log.Println("found object ", o.Name())
}
Writing an object :
obj, _ := store.NewObject("prefix/test.csv")
// open for read and writing.  f is a filehandle to the local filesystem.
f, _ := obj.Open(cloudstorage.ReadWrite) 
w := bufio.NewWriter(f)
_, _ := w.WriteString("Year,Make,Model\n")
_, _ := w.WriteString("1997,Ford,E350\n")
w.Flush()

// Close sync's the local file to the remote store and removes the local tmp file.
obj.Close()
Reading an existing object:
// Calling Get on an existing object will return a cloudstorage object or the cloudstorage.ErrObjectNotFound error.
obj2, _ := store.Get(context.Background(), "prefix/test.csv")
// Note, the file is not yet open
f2, _ := obj2.Open(cloudstorage.ReadOnly)
bytes, _ := ioutil.ReadAll(f2)
fmt.Println(string(bytes)) // should print the CSV file from the block above...
Transferring an existing object:
var config = &storeutils.TransferConfig{
	Type:                  google.StoreType,
	AuthMethod:            google.AuthGCEDefaultOAuthToken,
	ProjectID:             "my-project",
	DestBucket:            "my-destination-bucket",
	Src:                   storeutils.NewGcsSource("my-source-bucket"),
	IncludePrefxies:       []string{"these", "prefixes"},
}

transferer, _ := storeutils.NewTransferer(client)
resp, _ := transferer.NewTransfer(config)

See testsuite.go for more examples

Testing

Due to the way integration tests act against a cloud bucket and objects; run tests without parallelization.

cd $GOPATH/src/github.com/lytics/cloudstorage
go test -p 1 ./...

Documentation

Overview

Package cloudstorage is an interface to make Local, Google, s3 file storage share a common interface to aid testing local as well as running in the cloud.

The primary goal is to create a Store which is a common interface over each of the (google, s3, local-file-system, azure) etc file storage systems. Then the methods (Query, filter, get, put) are common, as are the Files (Objects) themselves. Writing code that supports multiple backends is now simple.

Creating and iterating files

In this example we are going to create a local-filesystem store.

// This is an example of a local-storage (local filesystem) provider:
config := &cloudstorage.Config{
	Type: localfs.StoreType,
	TokenSource:     localfs.AuthFileSystem,
	LocalFS:         "/tmp/mockcloud",
	TmpDir:          "/tmp/localcache",
}
store, _ := cloudstorage.NewStore(config)

// Create a query to define the search path
q := cloudstorage.NewQuery("list-test/")

// Create an Iterator to list files
iter := store.Objects(context.Background(), q)
for {
	o, err := iter.Next()
	if err == iterator.Done {
		break
	}
	log.Println("found object %v", o.Name())
}

Index

Constants

View Source
const (
	// StoreCacheFileExt = ".cache"
	StoreCacheFileExt = ".cache"
	// ContentTypeKey
	ContentTypeKey = "content_type"
	// MaxResults default number of objects to retrieve during a list-objects request,
	// if more objects exist, then they will need to be paged
	MaxResults = 3000
)

Variables

View Source
var (
	// ErrObjectNotFound Error of not finding a file(object)
	ErrObjectNotFound = fmt.Errorf("object not found")
	// ErrObjectExists error trying to create an already existing file.
	ErrObjectExists = fmt.Errorf("object already exists in backing store (use store.Get)")
	// ErrNotImplemented this feature is not implemented for this store
	ErrNotImplemented = fmt.Errorf("Not implemented")
)
View Source
var ObjectSortFilter = func(objs Objects) Objects {
	sort.Stable(objs)
	return objs
}

Functions

func Backoff

func Backoff(try int)

Backoff sleeps a random amount so we can. retry failed requests using a randomized exponential backoff: wait a random period between [0..1] seconds and retry; if that fails, wait a random period between [0..2] seconds and retry; if that fails, wait a random period between [0..4] seconds and retry, and so on, with an upper bounds to the wait period being 16 seconds. http://play.golang.org/p/l9aUHgiR8J

func CachePathObj

func CachePathObj(cachepath, oname, storeid string) string

CachePathObj check the cache path.

func CleanETag

func CleanETag(etag string) string

CleanETag transforms a string into the full etag spec, removing extra quote-marks, whitespace from etag.

per Etag spec https://tools.ietf.org/html/rfc7232#section-2.3 the etag value (<ETAG VALUE>) may: - W/"<ETAG VALUE>" - "<ETAG VALUE>" - ""

func CleanupCacheFiles

func CleanupCacheFiles(maxage time.Duration, TmpDir string) (err error)

CleanupCacheFiles cleans up old store cache files if your process crashes all it's old cache files, the local copies of the cloudfiles, will left behind. This function is a convenience func to help clean up those old files.

I suggest you call this behind a package var sync.Once struct, so its only called at the startup of your application.

func ContentType

func ContentType(name string) string

ContentType check content type of file by looking at extension (.html, .png) uses package mime for global types. Use mime.AddExtensionType to add new global types.

func Copy

func Copy(ctx context.Context, s Store, src, des Object) error

Copy source to destination.

func EnsureContextType

func EnsureContextType(o string, md map[string]string) string

EnsureContextType read Type of metadata

func EnsureDir

func EnsureDir(filename string) error

EnsureDir ensure directory exists

func Exists

func Exists(filename string) bool

Exists does this file path exists on the local file-system?

func Move

func Move(ctx context.Context, s Store, src, des Object) error

Move source object to destination.

func Register

func Register(storeType string, provider StoreProvider)

Register adds a store type provider.

Types

type AccessLevel

type AccessLevel int

AccessLevel is the level of permissions on files

const (
	// ReadOnly File Permissions Levels
	ReadOnly  AccessLevel = 0
	ReadWrite AccessLevel = 1
)

type AuthMethod

type AuthMethod string

AuthMethod Is the source/location/type of auth token

type Config

type Config struct {
	// Type is StoreType [gcs,localfs,s3,azure]
	Type string
	// AuthMethod the methods of authenticating store.  Ie, where/how to
	// find auth tokens.
	AuthMethod AuthMethod
	// Cloud Bucket Project
	Project string
	// Region is the cloud region
	Region string
	// Endpoint is the api endpoint
	Endpoint string
	// Bucket is the "path" or named bucket in cloud
	Bucket string
	// the page size to use with api requests (default 1000)
	PageSize int
	// used by JWTKeySource
	JwtConf *JwtConf
	// JwtFile is the file-path to local auth-token file.
	JwtFile string `json:"jwtfile,omitempty"`
	// BaseUrl is the base-url path for customizing regions etc.  IE
	// AWS has different url paths per region on some situations.
	BaseUrl string `json:"baseurl,omitempty"`
	// Permissions scope
	Scope string `json:"scope,omitempty"`
	// LocalFS is filesystem path to use for the local files
	// for Type=localfs
	LocalFS string `json:"localfs,omitempty"`
	// The filesystem path to save locally cached files as they are
	// being read/written from cloud and need a staging area.
	TmpDir string `json:"tmpdir,omitempty"`
	// Settings are catch-all-bag to allow per-implementation over-rides
	Settings gou.JsonHelper `json:"settings,omitempty"`
	// LogPrefix Logging Prefix/Context message
	LogPrefix string
	// EnableCompression turns on transparent compression of objects
	// Reading pre-existing non-compressed objects continues to work
	EnableCompression bool `json:"enablecompression,omitempty"`
}

Config the cloud store config settings.

type Filter

type Filter func(objects Objects) Objects

Filter func type definition for filtering objects

type JwtConf

type JwtConf struct {
	// Unfortuneately we departed from the standard jwt service account field-naming
	// for reasons we forgot.  So, during load, we convert from bad->correct format.
	PrivateKeyDeprecated string `json:"private_keybase64,omitempty"`
	KeyTypeDeprecated    string `json:"keytype,omitempty"`

	// Jwt Service Account Fields
	ProjectID    string `json:"project_id,omitempty"`
	PrivateKeyID string `json:"private_key_id,omitempty"`
	PrivateKey   string `json:"private_key,omitempty"`
	ClientEmail  string `json:"client_email,omitempty"`
	ClientID     string `json:"client_id,omitempty"`
	Type         string `json:"type,omitempty"`
	// Scopes is list of what scope to use when the token is created.
	// for example https://github.com/google/google-api-go-client/blob/0d3983fb069cb6651353fc44c5cb604e263f2a93/storage/v1/storage-gen.go#L54
	Scopes []string `json:"scopes,omitempty"`
}

JwtConf For use with google/google_jwttransporter.go Which can be used by the google go sdk's. This struct is based on the Google Jwt files json for service accounts.

func (*JwtConf) KeyBytes

func (j *JwtConf) KeyBytes() ([]byte, error)

func (*JwtConf) Validate

func (j *JwtConf) Validate() error

Validate that this is a valid jwt conf set of tokens

type Object

type Object interface {
	// Name of object/file.
	Name() string
	// String is default descriptor.
	String() string
	// Updated timestamp.
	Updated() time.Time
	// MetaData is map of arbitrary name/value pairs about object.
	MetaData() map[string]string
	// SetMetaData allows you to set key/value pairs.
	SetMetaData(meta map[string]string)
	// StorageSource is the type of store.
	StorageSource() string
	// Open copies the remote file to a local cache and opens the cached version
	// for read/writing.  Calling Close/Sync will push the copy back to the
	// backing store.
	Open(readonly AccessLevel) (*os.File, error)
	// Release will remove the locally cached copy of the file.  You most call Close
	// before releasing.  Release will call os.Remove(local_copy_file) so opened
	// filehandles need to be closed.
	Release() error
	// Implement io.ReadWriteCloser Open most be called before using these
	// functions.
	Read(p []byte) (n int, err error)
	Write(p []byte) (n int, err error)
	Sync() error
	Close() error
	// File returns the cached/local copy of the file
	File() *os.File
	// Delete removes the object from the cloud store and local cache.
	Delete() error
}

Object is a handle to a cloud stored file/object. Calling Open will pull the remote file onto your local filesystem for reading/writing. Calling Sync/Close will push the local copy backup to the cloud store.

type ObjectIterator

type ObjectIterator interface {
	// Next gets next object, returns google.golang.org/api/iterator iterator.Done error.
	Next() (Object, error)
	// Close this down (and or context.Close)
	Close()
}

ObjectIterator interface to page through objects See go doc for examples https://github.com/GoogleCloudPlatform/google-cloud-go/wiki/Iterator-Guidelines

func NewObjectPageIterator

func NewObjectPageIterator(ctx context.Context, s Store, q Query) ObjectIterator

NewObjectPageIterator create an iterator that wraps the store List interface.

type ObjectPageIterator

type ObjectPageIterator struct {
	// contains filtered or unexported fields
}

ObjectPageIterator iterator to facilitate easy paging through store.List() method to read all Objects that matched query.

func (*ObjectPageIterator) Close

func (it *ObjectPageIterator) Close()

Close the object iterator.

func (*ObjectPageIterator) Next

func (it *ObjectPageIterator) Next() (Object, error)

Next iterator to go to next object or else returns error for done.

type Objects

type Objects []Object

Objects are just a collection of Object(s). Used as the results for store.List commands.

func ObjectsAll

func ObjectsAll(iter ObjectIterator) (Objects, error)

ObjectsAll get all objects for an iterator.

func (Objects) Len

func (o Objects) Len() int

func (Objects) Less

func (o Objects) Less(i, j int) bool

func (Objects) Swap

func (o Objects) Swap(i, j int)

type ObjectsResponse

type ObjectsResponse struct {
	Objects    Objects
	NextMarker string
}

ObjectsResponse for paged object apis.

func NewObjectsResponse

func NewObjectsResponse() *ObjectsResponse

func ObjectResponseFromIter

func ObjectResponseFromIter(iter ObjectIterator) (*ObjectsResponse, error)

ObjectResponseFromIter get all objects for an iterator.

type Opts added in v0.2.2

type Opts struct {
	IfNotExists bool
}

type Query

type Query struct {
	Delimiter   string   // Delimiter is most likely "/"
	Prefix      string   // prefix (directory) to search for or object name if one file
	StartOffset string   // (gcs/localfs only) "bar/", Only list objects lexicographically >= "bar/"
	EndOffset   string   // (gcs/localfs only) "foo/", Only list objects lexicographically < "foo/"
	Marker      string   // Next Page Marker if provided is a start next page fetch bookmark.
	ShowHidden  bool     // Show hidden files?
	Filters     []Filter // Applied to the result sets to filter out Objects (i.e. remove objects by extension)
	PageSize    int      // PageSize defaults to global, or you can supply an override
}

Query used to query the cloud source. The primary query is a prefix query like `ls /my-csv-files/baseball/*`. This is the Request, and includes the PageSize, cursor/next token as well.

func NewQuery

func NewQuery(prefix string) Query

NewQuery create a query for finding files under given prefix.

func NewQueryAll

func NewQueryAll() Query

NewQueryAll query for all objects/files.

func NewQueryForFolders

func NewQueryForFolders(folderPath string) Query

NewQueryForFolders create a query for finding Folders under given path.

func (*Query) AddFilter

func (q *Query) AddFilter(f Filter) *Query

AddFilter adds a post prefix query, that can be used to alter results set from the prefix query.

func (*Query) ApplyFilters

func (q *Query) ApplyFilters(objects Objects) Objects

ApplyFilters is called as the last step in store.List() to filter out the results before they are returned.

func (*Query) Sorted

func (q *Query) Sorted() *Query

Sorted added a sort Filter to the filter chain, if its not the last call while building your query, Then sorting is only guaranteed for the next filter in the chain.

type Store

type Store interface {
	StoreReader

	// NewWriter returns a io.Writer that writes to a Cloud object
	// associated with this backing Store object.
	//
	// A new object will be created if an object with this name already exists.
	// Otherwise any previous object with the same name will be replaced.
	// The object will not be available (and any previous object will remain)
	// until Close has been called
	NewWriter(o string, metadata map[string]string) (io.WriteCloser, error)
	// NewWriter but with context.
	NewWriterWithContext(ctx context.Context, o string, metadata map[string]string, opts ...Opts) (io.WriteCloser, error)

	// NewObject creates a new empty object backed by the cloud store
	// This new object isn't' synced/created in the backing store
	// until the object is Closed/Sync'ed.
	NewObject(o string) (Object, error)

	// Delete removes the object from the cloud store.
	Delete(ctx context.Context, o string) error
}

Store interface to define the Storage Interface abstracting the GCS, S3, LocalFile interfaces

func NewStore

func NewStore(conf *Config) (Store, error)

NewStore create new Store from Storage Config/Context.

type StoreCopy

type StoreCopy interface {
	// Copy from object, to object
	Copy(ctx context.Context, src, dst Object) error
}

StoreCopy Optional interface to fast path copy. Many of the cloud providers don't actually copy bytes. Rather they allow a "pointer" that is a fast copy.

type StoreMove

type StoreMove interface {
	// Move from object location, to object location.
	Move(ctx context.Context, src, dst Object) error
}

StoreMove Optional interface to fast path move. Many of the cloud providers don't actually copy bytes.

type StoreProvider

type StoreProvider func(*Config) (Store, error)

StoreProvider a provider function for creating New Stores

type StoreReader

type StoreReader interface {
	// Type is he Store Type [google, s3, azure, localfs, etc]
	Type() string
	// Client gets access to the underlying native Client for Google, S3, etc
	Client() interface{}
	// Get returns an object (file) from the cloud store. The object
	// isn't opened already, see Object.Open()
	// ObjectNotFound will be returned if the object is not found.
	Get(ctx context.Context, o string) (Object, error)
	// Objects returns an object Iterator to allow paging through object
	// which keeps track of page cursors.  Query defines the specific set
	// of filters to apply to request.
	Objects(ctx context.Context, q Query) (ObjectIterator, error)
	// List file/objects filter by given query.  This just wraps the object-iterator
	// returning full list of objects.
	List(ctx context.Context, q Query) (*ObjectsResponse, error)
	// Folders creates list of folders
	Folders(ctx context.Context, q Query) ([]string, error)
	// NewReader creates a new Reader to read the contents of the object.
	// ErrObjectNotFound will be returned if the object is not found.
	NewReader(o string) (io.ReadCloser, error)
	// NewReader with context (for cancelation, etc)
	NewReaderWithContext(ctx context.Context, o string) (io.ReadCloser, error)
	// String default descriptor.
	String() string
}

StoreReader interface to define the Storage Interface abstracting the GCS, S3, LocalFile, etc interfaces

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL