afs

package module
v1.25.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 7, 2023 License: Apache-2.0 Imports: 25 Imported by: 85

README

afs - abstract file storage

GoReportCard GoDoc goversion-image gopherbadger-tag-do-not-edit

Please refer to CHANGELOG.md if you encounter breaking changes.

Motivation

When dealing with various storage systems, like cloud storage, SCP, container or local file system, using shared API for typical storage operation provides an excellent simplification. What's more, the ability to simulate storage-related errors like Auth or EOF allows you to test an app error handling.

Introduction

This library uses a storage manager abstraction to provide an implementation for a specific storage system with following

  • CRUD Operation:
List(ctx context.Context, URL string, options ...Option) ([]Object, error)

Walk(ctx context.Context, URL string, handler OnVisit, options ...Option) error

Open(ctx context.Context, object Object, options ...Option) (io.ReadCloser, error)

OpenURL(ctx context.Context, URL string, options ...Option) (io.ReadCloser, error)


Upload(ctx context.Context, URL string, mode os.FileMode, reader io.Reader, options ...Option) error

Create(ctx context.Context, URL string, mode os.FileMode, isDir bool, options ...Option) error

Delete(ctx context.Context, URL string, options ...Option) error
  • Batch uploader:
type Upload func(ctx context.Context, parent string, info os.FileInfo, reader io.Reader) error
 
Uploader(ctx context.Context, URL string, options ...Option) (Upload, io.Closer, error)
  • Utilities:

Copy(ctx context.Context, sourceURL, destURL string, options ...Option) error

Move(ctx context.Context, sourceURL, destURL string, options ...Option) error

NewWriter(ctx context.Context, URL string, mode os.FileMode, options ...storage.Option) (io.WriteCloser, error)

DownloadWithURL(ctx context.Context, URL string, options ...Option) ([]byte, error)

Download(ctx context.Context, object Object, options ...Option) ([]byte, error)

URL scheme is used to identify storage system, or alternatively relative/absolute path can be used for local file storage. By default, all operations using the same baseURL share the same corresponding storage manager instance. For example, instead supplying SCP auth details for all operations, auth option can be used only once.


func main() {

    ctx := context.Background()
    {
        //auth with first call 
        fs := afs.New()
        defer fs.Close()
        keyAuth, err := scp.LocalhostKeyAuth("")
        if err != nil {
           log.Fatal(err)
        }
        reader1, err := fs.OpenURL(ctx, "scp://host1:22/myfolder/asset.txt", keyAuth)
        if err != nil {
               log.Fatal(err)
        }
        ...
        reader2, err := fs.OpenURL(ctx, "scp://host1:22/myfolder/asset.txt", keyAuth)
    }
    
    {
        //auth per baseURL 
        fs := afs.New()
        err = fs.Init(ctx, "scp://host1:22/", keyAuth)
        if err != nil {
            log.Fatal(err)
        }
        defer fs.Destroy("scp://host1:22/")
        reader, err := fs.OpenURL(ctx, "scp://host1:22/myfolder/asset.txt")
     }
}

Usage

Downloading location content
func main() {
	
    fs := afs.New()
    ctx := context.Background()
    objects, err := fs.List(ctx, "/tmp/folder")
    if err != nil {
        log.Fatal(err)
    }
    for _, object := range objects {
        fmt.Printf("%v %v\n", object.Name(), object.URL())
        if object.IsDir() {
            continue
        }
        reader, err := fs.Open(ctx, object)
        if err != nil {
            log.Fatal(err)
        }
        data, err := ioutil.ReadAll(reader)
        if err != nil {
            log.Fatal(err)
        }
        fmt.Printf("%s\n", data)
    }
}
Uploading Content
func main() {
	
    fs := afs.New()
    ctx := context.Background()
    keyAuth, err := scp.LocalhostKeyAuth("")
    if err != nil {
        log.Fatal(err)
    }
    err  = fs.Init(ctx, "scp://127.0.0.1:22/", keyAuth)
    if err != nil {
        log.Fatal(err)
    }	
    err = fs.Upload(ctx, "scp://127.0.0.1:22/folder/asset.txt", 0644, strings.NewReader("test me"))
    if err != nil {
        log.Fatal(err)
    }
    ok, err := fs.Exists(ctx, "scp://127.0.0.1:22/folder/asset.txt")
    if err != nil {
        log.Fatal(err)
    }
    fmt.Printf("has file: %v\n", ok)
    _ = fs.Delete(ctx, "scp://127.0.0.1:22/folder/asset.txt")
}
Uploading Content With Writer
func main() {
	
    fs := afs.New()
    ctx := context.Background()
    keyAuth, err := scp.LocalhostKeyAuth("")
    if err != nil {
        log.Fatal(err)
    }
    err  = fs.Init(ctx, "scp://127.0.0.1:22/", keyAuth)
    if err != nil {
        log.Fatal(err)
    }	
    writer = fs.NewWriter(ctx, "scp://127.0.0.1:22/folder/asset.txt", 0644)
    _, err := writer.Write([]byte("test me")))
    if err != nil {
        log.Fatal(err)
    }
    err = writer.Close()
    if err != nil {
        log.Fatal(err)
    }
    ok, err := fs.Exists(ctx, "scp://127.0.0.1:22/folder/asset.txt")
    if err != nil {
        log.Fatal(err)
    }
    fmt.Printf("has file: %v\n", ok)
    _ = fs.Delete(ctx, "scp://127.0.0.1:22/folder/asset.txt")
}
Data Copy
func main() {

    fs := afs.New()
    ctx := context.Background()
    keyAuth, err := scp.LocalhostKeyAuth("")
    if err != nil {
        log.Fatal(err)
    }
    err = fs.Copy(ctx, "s3://mybucket/myfolder", "scp://127.0.0.1/tmp", option.NewSource(), option.NewDest(keyAuth))
    if err != nil {
        log.Fatal(err)
    }
}
Archiving content

func main() {
	
    secretPath := path.Join(os.Getenv("HOME"), ".secret", "gcp-e2e.json")
    auth, err := gs.NewJwtConfig(option.NewLocation(secretPath))
    if err != nil {
        return
    }
    sourceURL := "mylocalPath/"
    destURL := "gs:mybucket/test.zip/zip://localhost/dir1"
    fs := afs.New()
    ctx := context.Background()
    err = fs.Copy(ctx, sourceURL, destURL, option.NewDest(auth))
    if err != nil {
        log.Fatal(err)
    }

}	
Archive Walker

Walker can be created for tar or zip archive.

func main() {
	
    ctx := context.Background()
	fs := afs.New()
	walker := tar.NewWalker(s3afs.New())
	err := fs.Copy(ctx, "/tmp/test.tar", "s3:///dest/folder/test", walker)
	if err != nil {
		log.Fatal(err)
	}
Archive Uploader

Uploader can be created for tar or zip archive.

func main() {
	
    ctx := context.Background()
	fs := afs.New()
	uploader := zip.NewBatchUploader(gsafs.New())
	err := fs.Copy(ctx, "gs:///tmp/test/data", "/tmp/data.zip", uploader)
	if err != nil {
		log.Fatal(err)
	}
}
Data Move
func main() {
	
    fs := afs.New()
	ctx := context.Background()
	keyAuth, err := scp.LocalhostKeyAuth("")
	if err != nil {
		log.Fatal(err)
	}
	err = fs.Move(ctx, "/tmp/transient/app", "scp://127.0.0.1/tmp", option.NewSource(), option.NewDest(keyAuth))
	if err != nil {
		log.Fatal(err)
	}
}	
Batch Upload
func main() {
	
    fs := afs.New()
	ctx := context.Background()
	upload, closer, err := fs.Uploader(ctx, "/tmp/clone")
	if err != nil {
		log.Fatal(err)
	}
	defer closer.Close()
	assets := []*asset.Resource{
		asset.NewFile("asset1.txt", []byte("test 1"), 0644),
		asset.NewFile("asset2.txt", []byte("test 2"), 0644),
		asset.NewDir("folder1", file.DefaultDirOsMode),
		asset.NewFile("folder1/asset1.txt", []byte("test 3"), 0644),
		asset.NewFile("folder1/asset2.txt", []byte("test 4"), 0644),
	}
	for _, asset := range assets {
		relative := ""
		var reader io.Reader
		if strings.Contains(asset.Name, "/") {
			relative, _ = path.Split(asset.Name)
		}
		if ! asset.Dir {
			reader = bytes.NewReader(asset.Data)
		}
		err = upload(ctx, relative, asset.Info(), reader)
		if err != nil {
			log.Fatal(err)
		}
	}
}

Matchers

To filter source content you can use Matcher option. The following have been implemented.

Basic Matcher

func main() {
	
    matcher, err := NewBasic("/data", ".avro", nil)
    fs := afs.New()
    ctx := context.Background()
    err := fs.Copy(ctx, "/tmp/data", "s3://mybucket/data/", matcher.Match)
    if err != nil {
        log.Fatal(err)
    }
}

Exclusion

func main() {
	
    matcher := matcher.Basic{Exclusion:".+/data/perf/\\d+/.+"}
    fs := afs.New()
    ctx := context.Background()
    err := fs.Copy(ctx, "/tmp/data", "s3://mybucket/data/", matcher.Match)
    if err != nil {
        log.Fatal(err)
    }
}

Filepath matcher

OS style filepath match, with the following terms:

  • '*' matches any sequence of non-Separator characters
  • '?' matches any single non-Separator character
  • '[' [ '^' ] { character-range } ']'

func main() {
	
    matcher := matcher.Filepath("*.avro")
    fs := afs.New()
    ctx := context.Background()
    err := fs.Copy(ctx, "/tmp/data", "gs://mybucket/data/", matcher)
    if err != nil {
        log.Fatal(err)
    }
}	
		

Ignore Matcher

Ignore matcher represents matcher that matches file that are not in the ignore rules. The syntax of ignore borrows heavily from that of .gitignore; see https://git-scm.com/docs/gitignore or man gitignore for a full reference.

func mian(){
	
	ignoreMatcher, err := matcher.NewIgnore([]string{"*.txt", ".ignore"})
  	//or matcher.NewIgnore(option.NewLocation(".cloudignore"))
	if err != nil {
		log.Fatal(err)
	}
	fs := afs.New()
	ctx := context.Background()
	objects, err := fs.List(ctx, "/tmp/folder", ignoreMatcher.Match)
	if err != nil {
		log.Fatal(err)
	}
	for _, object := range objects {
		fmt.Printf("%v %v\n", object.Name(), object.URL())
		if object.IsDir() {
			continue
		}
	}
}	

Modification Time Matcher

Modification Time Matcher represents matcher that matches file that were modified either before or after specified time.

func mian(){
	
	before, err := toolbox.TimeAt("2 days ago in UTC")
    if err != nil {
		log.Fatal(err)
	}	
	modTimeMatcher, err := matcher.NewModification(before, nil)
	if err != nil {
		log.Fatal(err)
	}
	fs := afs.New()
	ctx := context.Background()
	objects, err := fs.List(ctx, "/tmp/folder", modTimeMatcher.Match)
	if err != nil {
		log.Fatal(err)
	}
	for _, object := range objects {
		fmt.Printf("%v %v\n", object.Name(), object.URL())
		if object.IsDir() {
			continue
		}
	}
}	

Content modifiers

To modify resource content on the fly you can use Modifier option.

func main() {
	fs := afs.New()
	ctx := context.Background()
	sourceURL := "file:/tmp/app.war/zip://localhost/WEB-INF/classes/config.properties"
	destURL := "file:/tmp/app.war/zip://localhost/"
	err := fs.Copy(ctx, sourceURL, destURL, modifier.Replace(map[string]string{
		"${changeMe}": os.Getenv("USER"),
	}))
	if err != nil {
		log.Fatal(err)
	}
}
package main

import (
	"context"
	"log"
	"github.com/viant/afs"
	"io"
	"fmt"
	"io/ioutil"
	"os"
	"strings"
)

func modifyContent(info os.FileInfo, reader io.ReadCloser) (closer io.ReadCloser, e error) {
   if strings.HasSuffix(info.Name() ,".info") {
       data, err := ioutil.ReadAll(reader)
       if err != nil {
           return nil, err
       }
       _ = reader.Close()
       expanded := strings.Replace(string(data), "$os.User", os.Getenv("USER"), 1)
       reader = ioutil.NopCloser(strings.NewReader(expanded))
   }
   return reader, nil
}                           

func main() {

    fs := afs.New()
    reader ,err := fs.OpenURL(context.Background(), "s3://mybucket/meta.info", modifyContent)
    if err != nil {
        log.Fatal(err)	
    }
    
    defer reader.Close()
    content, err := ioutil.ReadAll(reader)
    if err != nil {
        log.Fatal(err)	
    }
    fmt.Printf("content: %s\n", content)
	
}
Streaming data

Streaming data allows data reading and uploading in chunks with small memory footprint.


    jwtConfig, err := gs.NewJwtConfig()
	if err != nil {
		log.Fatal(err)
	}

	ctx := context.Background()
	fs := afs.New()
	sourceURL := "gs://myBucket/path/myasset.gz"
	reader, err := fs.OpenURL(ctx, sourceURL, jwtConfig, option.NewStream(64*1024*1024, 0))
	if err != nil {
		log.Fatal(err)
	}
    
	_ = os.Setenv("AWS_SDK_LOAD_CONFIG", "true")
	destURL := "s3://myBucket/path/myasset.gz"
	err = fs.Upload(ctx, destURL, 0644, reader, &option.Checksum{Skip:true})
	if err != nil {
		log.Fatal(err)
	}

    // or
    writer = fs.NewWriter(ctx, destURL, 0644, &option.Checksum{Skip:true})
    _, err = io.Copy(writer, reader)
    if err != nil {
        log.Fatal(err)
    }
    err = writer.Close()
	if err != nil {
		log.Fatal(err)
	}

Options

To control number and position of listed resources you can yse page option.

Provider specific timeout.

Provides user/password auth.

  • Source & Dest Options

Groups options by source or destination options. This options work with Copy or Move operations.


func main() {
	
    fs := afs.New()
    secretPath :=  path.Join(os.Getenv("HOME"), ".secret","gcp.json")
    jwtConfig, err := gs.NewJwtConfig(option.NewLocation(secretPath))
    if err != nil {
    	log.Fatal(err)
    }
    sourceOptions := option.NewSource(jwtConfig)
    authConfig, err := s3.NewAuthConfig(option.NewLocation("aws.json"))
    if err != nil {
        log.Fatal(err)
    }
    destOptions := option.NewDest(authConfig)
	err = fs.Copy(ctx, "gs://mybucket/data", "s3://mybucket/data",  sourceOptions, destOptions)
}

  • option.Checksum skip computing checksum if Skip is set, this option allows streaming upload in chunks
  • option.Stream: download reader reads data with specified stream PartSize

Check out storage manager for additional options.

Storage Implementations

Testing fs

To unit test all storage operation all in memory you can use faker fs.

In addition you can use error options to test exception handling.

  • DownloadError

func mian() {
	fs := afs.NewFaker()
	ctx := context.Background()
	err := fs.Upload(ctx, "gs://myBucket/folder/asset.txt", 0, strings.NewReader("some data"), option.NewUploadError(io.EOF))
	if err != nil {
		log.Fatalf("expect upload error: %v", err)
	}
}
  • ReaderError

func mian() {
    fs := afs.NewFaker()
	ctx := context.Background()
	err := fs.Upload(ctx, "gs://myBucket/folder/asset.txt", 0, strings.NewReader("some data"), option.NewDownloadError(io.EOF))
	if err != nil {
		log.Fatal(err)
	}
	_, err = fs.OpenURL(ctx, "gs://myBucket/folder/asset.txt")
	if err != nil {
		log.Fatalf("expect download error: %v", err)
	}
}
  • UploadError

func mian() {
    fs := afs.NewFaker()
    ctx := context.Background()
    err := fs.Upload(ctx, "gs://myBucket/folder/asset.txt", 0, strings.NewReader("some data"), option.NewUploadError(io.EOF))
    if err != nil {
        log.Fatalf("expect upload error: %v", err)
    }
}
Code generation for static or in memory go file

Generate with mem storage

package main

import (
    "log"
    "github.com/viant/afs/parrot
)

func mian() {
  ctx := context.Background()
  err := parrot.GenerateWithMem(ctx, "pathToBinaryAsset", "gen.go", false)
  if err != nil {
    log.Fatal(err)
  }
}

Generate static data files

package main

import (
    "log"
    "github.com/viant/afs/parrot
)

func mian() {
  ctx := context.Background()
  err := parrot.Generate(ctx, "pathToBinaryAsset", "data/", false)
  if err != nil {
    log.Fatal(err)
  }
}

Test setup utilities

Package asset defines basic utilities to quickly manage asset related unit tests.


func Test_XXX(t *testing.T) {

    var useCases = []struct {
		description string
		location    string
		options     []storage.Option
		assets      []*asset.Resource
	}{

	}

	ctx := context.Background()
	for _, useCase := range useCases {
		fs := afs.New()
		mgr, err := afs.Manager(useCase.location, useCase.options...)
		if err != nil {
			log.Fatal(err)
		}
		err = asset.Create(mgr, useCase.location, useCase.assets)
		if err != nil {
			log.Fatal(err)
		}
		
		//... actual app logic

		actuals, err := asset.Load(mgr, useCase.location)
		if err != nil {
			log.Fatal(err)
		}
        for _, expect := range useCase.assets {
            actual, ok := actuals[expect.Name]
            if !assert.True(t, ok, useCase.description+": "+expect.Name+fmt.Sprintf(" - actuals: %v", actuals)) {
                continue
            }
            assert.EqualValues(t, expect.Name, actual.Name, useCase.description+" "+expect.Name)
            assert.EqualValues(t, expect.Mode, actual.Mode, useCase.description+" "+expect.Name)
            assert.EqualValues(t, expect.Dir, actual.Dir, useCase.description+" "+expect.Name)
            assert.EqualValues(t, expect.Data, actual.Data, useCase.description+" "+expect.Name)
        }

		_ = asset.Cleanup(mgr, useCase.location)

	}
}

GoCover

GoCover

License

The source code is made available under the terms of the Apache License, Version 2, as stated in the file LICENSE.

Individual files may be made available under their own specific license, all compatible with Apache License, Version 2. Please see individual files for details.

Credits and Acknowledgements

Library Author: Adrian Witas

Documentation

Overview

Package afs (abstract file storage) defines interface and abstraction for storage systems

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Manager

func Manager(URL string, options ...storage.Option) (storage.Manager, error)

Manager returns a manager for supplied sourceURL

Types

type Provider

type Provider func(options ...storage.Option) (storage.Manager, error)

Provider represents manager provider

type Registry

type Registry interface {
	//Register register schemeURL with storage service
	Register(uRLScheme string, provider Provider)

	//Get returns service provider for supplied schemeURL
	Get(uRLScheme string) (Provider, error)
}

Registry represents abstract file system service provider registry

func GetRegistry

func GetRegistry() Registry

GetRegistry return singleton registry

type Service

type Service interface {
	storage.Lister
	storage.Opener
	storage.Uploader
	storage.BatchUploader
	storage.Deleter
	storage.Creator
	storage.Walker
	storage.Getter
	//Exists returns true if resource exists
	Exists(ctx context.Context, URL string, options ...storage.Option) (bool, error)

	//Download download bytes
	Download(ctx context.Context, object storage.Object, options ...storage.Option) ([]byte, error)

	//DownloadWithURL download bytes for URL
	DownloadWithURL(ctx context.Context, URL string, options ...storage.Option) ([]byte, error)

	storage.Copier
	storage.Mover

	//Initialises manager for baseURL with storage options (i.e. auth)
	Init(ctx context.Context, baseURL string, options ...storage.Option) error

	//NewWriter creates an upload writer
	NewWriter(ctx context.Context, URL string, mode os.FileMode, options ...storage.Option) (io.WriteCloser, error)

	//Closes all active managers
	CloseAll() error
	//Closes matched active manager
	Close(baseURL string) error

	//ErrorCode returns an error code or zero
	ErrorCode(scheme string, err error) int
}

Service represents storage storage

func New

func New() Service

New returns a abstract storage service

func NewFaker

func NewFaker() Service

NewFaker returns new faker service. All operation uses in memory service

Directories

Path Synopsis
adapter
Package asset define asset testing utility
Package asset define asset testing utility
Package base define base manager
Package base define base manager
Package cache define cache afs.Service to cache read operation for specified URL
Package cache define cache afs.Service to cache read operation for specified URL
Package file defines a file system storage
Package file defines a file system storage
Package http defines simple http based storage operation
Package http defines simple http based storage operation
Package matcher define common resource matcher
Package matcher define common resource matcher
Package mem implements in memory file system
Package mem implements in memory file system
Package object provide storage object/link implementation
Package object provide storage object/link implementation
Package option define storage options
Package option define storage options
Package parrot provide storage to go memory or static go file mapping code generation
Package parrot provide storage to go memory or static go file mapping code generation
Package scp implements SSH scp storager and abstract file manager
Package scp implements SSH scp storager and abstract file manager
Package storage defines Storage API
Package storage defines Storage API
Package sync define atomic file system base counter
Package sync define atomic file system base counter
Package tar provides support for operating on TAR archives
Package tar provides support for operating on TAR archives
Package url define URL utilites
Package url define URL utilites
Package walker define storager based walker
Package walker define storager based walker
Package zip provides support for operating on ZIP archives
Package zip provides support for operating on ZIP archives

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL