filesys

package
v1.2.7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 28, 2024 License: GPL-3.0 Imports: 42 Imported by: 2

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Delete

func Delete(fs FileSysClient, uri string) (err error)

Delete deletes the provided path with some safeguards so to not accidentally delete some root path

func GetDataflow

func GetDataflow(fs FileSysClient, nodes dbio.FileNodes, cfg FileStreamConfig) (df *iop.Dataflow, err error)

GetDataflow returns a dataflow from specified paths in specified FileSysClient

func GetDeepestParent added in v1.1.15

func GetDeepestParent(path string) string

func MakeDatastream

func MakeDatastream(reader io.Reader, cfg map[string]string) (ds *iop.Datastream, err error)

MakeDatastream create a datastream from a reader

func MergeReaders

func MergeReaders(fs FileSysClient, fileType FileType, nodes dbio.FileNodes, limit int) (ds *iop.Datastream, err error)

func NormalizeURI added in v1.1.15

func NormalizeURI(fs FileSysClient, uri string) string

func ParseURL

func ParseURL(uri string) (host, path string, err error)

ParseURL parses a URL

func ProcessStreamViaTempFile

func ProcessStreamViaTempFile(ds *iop.Datastream) (nDs *iop.Datastream, err error)

func TestFsPermissions

func TestFsPermissions(fs FileSysClient, pathURL string) (err error)

TestFsPermissions tests read/write permisions

func Write

func Write(reader io.Reader, writer io.Writer) (bw int64, err error)

Write writer to a writer from a reader

func WriteDatastream

func WriteDatastream(writer io.Writer, ds *iop.Datastream) (bw int64, err error)

WriteDatastream writes a datasream to a writer or use fs.Write(path, ds.NewCsvReader(0))

Types

type AzureFileSysClient

type AzureFileSysClient struct {
	BaseFileSysClient
	// contains filtered or unexported fields
}

AzureFileSysClient is a file system client to write file to Microsoft's Azure file sys.

func (*AzureFileSysClient) Buckets

func (fs *AzureFileSysClient) Buckets() (paths []string, err error)

Buckets returns the containers found in the project

func (*AzureFileSysClient) Connect

func (fs *AzureFileSysClient) Connect() (err error)

Connect initiates the fs client connection

func (*AzureFileSysClient) GetPath added in v1.1.15

func (fs *AzureFileSysClient) GetPath(uri string) (path string, err error)

GetPath returns the path of url

func (*AzureFileSysClient) GetReader

func (fs *AzureFileSysClient) GetReader(uri string) (reader io.Reader, err error)

GetReader returns an Azure FS reader

func (*AzureFileSysClient) Init

func (fs *AzureFileSysClient) Init(ctx context.Context) (err error)

Init initializes the fs client

func (*AzureFileSysClient) List

func (fs *AzureFileSysClient) List(uri string) (nodes dbio.FileNodes, err error)

List list objects in path

func (*AzureFileSysClient) ListRecursive

func (fs *AzureFileSysClient) ListRecursive(uri string) (nodes dbio.FileNodes, err error)

ListRecursive list objects in path

func (*AzureFileSysClient) Prefix added in v1.1.15

func (fs *AzureFileSysClient) Prefix(suffix ...string) string

Prefix returns the url prefix

func (*AzureFileSysClient) Write

func (fs *AzureFileSysClient) Write(uri string, reader io.Reader) (bw int64, err error)

type BaseFileSysClient

type BaseFileSysClient struct {
	FileSysClient
	// contains filtered or unexported fields
}

BaseFileSysClient is the base file system type.

func (*BaseFileSysClient) Buckets

func (fs *BaseFileSysClient) Buckets() (paths []string, err error)

Buckets returns the buckets found in the account

func (*BaseFileSysClient) Client

func (fs *BaseFileSysClient) Client() *BaseFileSysClient

Client provides a pointer to itself

func (*BaseFileSysClient) Context

func (fs *BaseFileSysClient) Context() (context *g.Context)

Context provides a pointer to context

func (*BaseFileSysClient) FsType

func (fs *BaseFileSysClient) FsType() dbio.Type

FsType return the type of the client

func (*BaseFileSysClient) GetDatastream

func (fs *BaseFileSysClient) GetDatastream(urlStr string) (ds *iop.Datastream, err error)

GetDatastream return a datastream for the given path

func (*BaseFileSysClient) GetProp

func (fs *BaseFileSysClient) GetProp(key string, keys ...string) string

GetProp returns the value of a property

func (*BaseFileSysClient) GetReaders

func (fs *BaseFileSysClient) GetReaders(paths ...string) (readers []io.Reader, err error)

GetReaders returns one or more readers from specified paths in specified FileSysClient

func (*BaseFileSysClient) GetRefTs

func (fs *BaseFileSysClient) GetRefTs() time.Time

func (*BaseFileSysClient) Prefix added in v1.1.15

func (fs *BaseFileSysClient) Prefix(suffix ...string) string

Prefix returns the url prefix

func (*BaseFileSysClient) Props

func (fs *BaseFileSysClient) Props() map[string]string

Props returns a copy of the properties map

func (*BaseFileSysClient) ReadDataflow

func (fs *BaseFileSysClient) ReadDataflow(url string, cfg ...FileStreamConfig) (df *iop.Dataflow, err error)

ReadDataflow read

func (*BaseFileSysClient) Self

func (fs *BaseFileSysClient) Self() FileSysClient

Instance returns the respective connection Instance This is useful to refer back to a subclass method from the superclass level. (Aka overloading)

func (*BaseFileSysClient) SetProp

func (fs *BaseFileSysClient) SetProp(key string, val string)

SetProp sets the value of a property

func (*BaseFileSysClient) WriteDataflow

func (fs *BaseFileSysClient) WriteDataflow(df *iop.Dataflow, url string) (bw int64, err error)

WriteDataflow writes a dataflow to a file sys.

func (*BaseFileSysClient) WriteDataflowReady

func (fs *BaseFileSysClient) WriteDataflowReady(df *iop.Dataflow, url string, fileReadyChn chan FileReady, sc *iop.StreamConfig) (bw int64, err error)

WriteDataflowReady writes to a file sys and notifies the fileReady chan.

type FileReady

type FileReady struct {
	Columns iop.Columns
	Node    dbio.FileNode
	BytesW  int64
	BatchID string
}

type FileStreamConfig

type FileStreamConfig struct {
	Limit  int
	Select []string
}

type FileSysClient

type FileSysClient interface {
	Self() FileSysClient
	Init(ctx context.Context) (err error)
	Client() *BaseFileSysClient
	Context() (context *g.Context)
	FsType() dbio.Type
	GetReader(path string) (reader io.Reader, err error)
	GetReaders(paths ...string) (readers []io.Reader, err error)
	GetDatastream(path string) (ds *iop.Datastream, err error)
	GetWriter(path string) (writer io.Writer, err error)
	Buckets() (paths []string, err error)
	List(path string) (paths dbio.FileNodes, err error)
	ListRecursive(path string) (paths dbio.FileNodes, err error)
	Write(path string, reader io.Reader) (bw int64, err error)
	Prefix(suffix ...string) string
	ReadDataflow(url string, cfg ...FileStreamConfig) (df *iop.Dataflow, err error)
	WriteDataflow(df *iop.Dataflow, url string) (bw int64, err error)
	WriteDataflowReady(df *iop.Dataflow, url string, fileReadyChn chan FileReady, sc *iop.StreamConfig) (bw int64, err error)
	GetProp(key string, keys ...string) (val string)
	SetProp(key string, val string)
	MkdirAll(path string) (err error)
	GetPath(uri string) (path string, err error)
	// contains filtered or unexported methods
}

FileSysClient is a client to a file systems such as local, s3, hdfs, azure storage, google cloud storage

func NewFileSysClient

func NewFileSysClient(fst dbio.Type, props ...string) (fsClient FileSysClient, err error)

NewFileSysClient create a file system client such as local, s3, azure storage, google cloud storage props are provided as `"Prop1=Value1", "Prop2=Value2", ...`

func NewFileSysClientContext

func NewFileSysClientContext(ctx context.Context, fst dbio.Type, props ...string) (fsClient FileSysClient, err error)

NewFileSysClientContext create a file system client with context such as local, s3, azure storage, google cloud storage props are provided as `"Prop1=Value1", "Prop2=Value2", ...`

func NewFileSysClientFromURL

func NewFileSysClientFromURL(url string, props ...string) (fsClient FileSysClient, err error)

NewFileSysClientFromURL returns the proper fs client for the given path props are provided as `"Prop1=Value1", "Prop2=Value2", ...`

func NewFileSysClientFromURLContext

func NewFileSysClientFromURLContext(ctx context.Context, url string, props ...string) (fsClient FileSysClient, err error)

NewFileSysClientFromURLContext returns the proper fs client for the given path with context props are provided as `"Prop1=Value1", "Prop2=Value2", ...`

type FileType

type FileType string
const FileTypeAvro FileType = "avro"
const FileTypeCsv FileType = "csv"
const FileTypeExcel FileType = "xlsx"
const FileTypeJson FileType = "json"
const FileTypeJsonLines FileType = "jsonlines"
const FileTypeNone FileType = ""
const FileTypeParquet FileType = "parquet"
const FileTypeSAS FileType = "sas7bdat"
const FileTypeXml FileType = "xml"

func InferFileFormat

func InferFileFormat(path string) FileType

func PeekFileType

func PeekFileType(reader io.Reader) (ft FileType, reader2 io.Reader, err error)

PeekFileType peeks into the file to try determine the file type CSV is the default

func (FileType) Ext

func (ft FileType) Ext() string

func (FileType) IsJson

func (ft FileType) IsJson() bool

type FtpFileSysClient added in v1.1.12

type FtpFileSysClient struct {
	BaseFileSysClient
	// contains filtered or unexported fields
}

FtpFileSysClient is for FTP file systems

func (*FtpFileSysClient) Close added in v1.1.12

func (fs *FtpFileSysClient) Close() error

func (*FtpFileSysClient) Connect added in v1.1.12

func (fs *FtpFileSysClient) Connect() (err error)

Connect initiates the Google Cloud Storage client

func (*FtpFileSysClient) GetPath added in v1.1.15

func (fs *FtpFileSysClient) GetPath(uri string) (path string, err error)

GetPath returns the path of url

func (*FtpFileSysClient) GetReader added in v1.1.12

func (fs *FtpFileSysClient) GetReader(urlStr string) (reader io.Reader, err error)

GetReader return a reader for the given path

func (*FtpFileSysClient) Init added in v1.1.12

func (fs *FtpFileSysClient) Init(ctx context.Context) (err error)

Init initializes the fs client

func (*FtpFileSysClient) List added in v1.1.12

func (fs *FtpFileSysClient) List(url string) (paths dbio.FileNodes, err error)

List list objects in path

func (*FtpFileSysClient) ListRecursive added in v1.1.12

func (fs *FtpFileSysClient) ListRecursive(url string) (nodes dbio.FileNodes, err error)

ListRecursive list objects in path recursively

func (*FtpFileSysClient) MkdirAll added in v1.1.12

func (fs *FtpFileSysClient) MkdirAll(path string) (err error)

MkdirAll creates child directories

func (*FtpFileSysClient) Prefix added in v1.1.15

func (fs *FtpFileSysClient) Prefix(suffix ...string) string

Prefix returns the url prefix

func (*FtpFileSysClient) Write added in v1.1.12

func (fs *FtpFileSysClient) Write(urlStr string, reader io.Reader) (bw int64, err error)

type GoogleFileSysClient

type GoogleFileSysClient struct {
	BaseFileSysClient
	// contains filtered or unexported fields
}

GoogleFileSysClient is a file system client to write file to Amazon's S3 file sys.

func (*GoogleFileSysClient) Buckets

func (fs *GoogleFileSysClient) Buckets() (paths []string, err error)

Buckets returns the buckets found in the project

func (*GoogleFileSysClient) Connect

func (fs *GoogleFileSysClient) Connect() (err error)

Connect initiates the Google Cloud Storage client

func (*GoogleFileSysClient) GetPath added in v1.1.15

func (fs *GoogleFileSysClient) GetPath(uri string) (path string, err error)

GetPath returns the path of url

func (*GoogleFileSysClient) GetReader

func (fs *GoogleFileSysClient) GetReader(path string) (reader io.Reader, err error)

GetReader returns the reader for the given path

func (*GoogleFileSysClient) Init

func (fs *GoogleFileSysClient) Init(ctx context.Context) (err error)

Init initializes the fs client

func (*GoogleFileSysClient) List

func (fs *GoogleFileSysClient) List(uri string) (nodes dbio.FileNodes, err error)

List returns the list of objects

func (*GoogleFileSysClient) ListRecursive

func (fs *GoogleFileSysClient) ListRecursive(uri string) (nodes dbio.FileNodes, err error)

ListRecursive returns the list of objects recursively

func (*GoogleFileSysClient) Prefix added in v1.1.15

func (fs *GoogleFileSysClient) Prefix(suffix ...string) string

Prefix returns the url prefix

func (*GoogleFileSysClient) Write

func (fs *GoogleFileSysClient) Write(path string, reader io.Reader) (bw int64, err error)

type HTTPFileSysClient

type HTTPFileSysClient struct {
	BaseFileSysClient
	// contains filtered or unexported fields
}

HTTPFileSysClient is for HTTP files

func (*HTTPFileSysClient) Connect

func (fs *HTTPFileSysClient) Connect() (err error)

Connect initiates the http client

func (*HTTPFileSysClient) GetPath added in v1.1.15

func (fs *HTTPFileSysClient) GetPath(uri string) (path string, err error)

GetPath returns the path of url

func (*HTTPFileSysClient) GetReader

func (fs *HTTPFileSysClient) GetReader(url string) (reader io.Reader, err error)

GetReader gets a reader for an HTTP resource (download)

func (*HTTPFileSysClient) Init

func (fs *HTTPFileSysClient) Init(ctx context.Context) (err error)

Init initializes the fs client

func (*HTTPFileSysClient) List

func (fs *HTTPFileSysClient) List(url string) (nodes dbio.FileNodes, err error)

List lists all urls on the page

func (*HTTPFileSysClient) ListRecursive

func (fs *HTTPFileSysClient) ListRecursive(url string) (nodes dbio.FileNodes, err error)

ListRecursive lists all urls on the page

func (*HTTPFileSysClient) Prefix added in v1.1.15

func (fs *HTTPFileSysClient) Prefix(suffix ...string) string

Prefix returns the url prefix

func (*HTTPFileSysClient) Write

func (fs *HTTPFileSysClient) Write(urlStr string, reader io.Reader) (bw int64, err error)

Write uploads an HTTP file

type LocalFileSysClient

type LocalFileSysClient struct {
	BaseFileSysClient
	// contains filtered or unexported fields
}

LocalFileSysClient is a file system client to write file to local file sys.

func (*LocalFileSysClient) GetDatastream

func (fs *LocalFileSysClient) GetDatastream(uri string) (ds *iop.Datastream, err error)

GetDatastream return a datastream for the given path

func (*LocalFileSysClient) GetPath added in v1.1.15

func (fs *LocalFileSysClient) GetPath(uri string) (path string, err error)

GetPath returns the path of url

func (*LocalFileSysClient) GetReader

func (fs *LocalFileSysClient) GetReader(uri string) (reader io.Reader, err error)

GetReader return a reader for the given path

func (*LocalFileSysClient) GetWriter

func (fs *LocalFileSysClient) GetWriter(uri string) (writer io.Writer, err error)

GetWriter creates the file if non-existent and return a writer

func (*LocalFileSysClient) Init

func (fs *LocalFileSysClient) Init(ctx context.Context) (err error)

Init initializes the fs client

func (*LocalFileSysClient) List

func (fs *LocalFileSysClient) List(uri string) (nodes dbio.FileNodes, err error)

List lists the file in given directory path

func (*LocalFileSysClient) ListRecursive

func (fs *LocalFileSysClient) ListRecursive(uri string) (nodes dbio.FileNodes, err error)

ListRecursive lists the file in given directory path recursively

func (*LocalFileSysClient) MkdirAll

func (fs *LocalFileSysClient) MkdirAll(uri string) (err error)

MkdirAll creates child directories

func (*LocalFileSysClient) Prefix added in v1.1.15

func (fs *LocalFileSysClient) Prefix(suffix ...string) string

Prefix returns the url prefix

func (*LocalFileSysClient) Write

func (fs *LocalFileSysClient) Write(uri string, reader io.Reader) (bw int64, err error)

Write creates the file if non-existent and writes from the reader

type S3FileSysClient

type S3FileSysClient struct {
	BaseFileSysClient

	RegionMap map[string]string
	// contains filtered or unexported fields
}

S3FileSysClient is a file system client to write file to Amazon's S3 file sys.

func (*S3FileSysClient) Buckets

func (fs *S3FileSysClient) Buckets() (paths []string, err error)

Buckets returns the buckets found in the account

func (*S3FileSysClient) Connect

func (fs *S3FileSysClient) Connect() (err error)

Connect initiates the Google Cloud Storage client

func (*S3FileSysClient) GenerateS3PreSignedURL

func (fs *S3FileSysClient) GenerateS3PreSignedURL(s3URL string, dur time.Duration) (httpURL string, err error)

func (*S3FileSysClient) GetPath added in v1.1.15

func (fs *S3FileSysClient) GetPath(uri string) (path string, err error)

GetPath returns the path of url

func (*S3FileSysClient) GetReader

func (fs *S3FileSysClient) GetReader(uri string) (reader io.Reader, err error)

GetReader return a reader for the given path path should specify the full path with scheme: `s3://my_bucket/key/to/file.txt` or `s3://my_bucket/key/to/directory`

func (*S3FileSysClient) GetWriter

func (fs *S3FileSysClient) GetWriter(uri string) (writer io.Writer, err error)

GetWriter creates the file if non-existent and return a writer path should specify the full path with scheme: `s3://my_bucket/key/to/file.txt`

func (*S3FileSysClient) Init

func (fs *S3FileSysClient) Init(ctx context.Context) (err error)

Init initializes the fs client

func (*S3FileSysClient) List

func (fs *S3FileSysClient) List(uri string) (nodes dbio.FileNodes, err error)

List lists the file in given directory path

func (*S3FileSysClient) ListRecursive

func (fs *S3FileSysClient) ListRecursive(uri string) (nodes dbio.FileNodes, err error)

ListRecursive lists the file in given directory path recusively path should specify the full path with scheme: `s3://my_bucket/key/to/directory`

func (*S3FileSysClient) Prefix added in v1.1.15

func (fs *S3FileSysClient) Prefix(suffix ...string) string

Prefix returns the url prefix

func (*S3FileSysClient) Write

func (fs *S3FileSysClient) Write(uri string, reader io.Reader) (bw int64, err error)

type SftpFileSysClient

type SftpFileSysClient struct {
	BaseFileSysClient
	// contains filtered or unexported fields
}

SftpFileSysClient is for SFTP / SSH file ops

func (*SftpFileSysClient) Connect

func (fs *SftpFileSysClient) Connect() (err error)

Connect initiates the Google Cloud Storage client

func (*SftpFileSysClient) GetPath added in v1.1.15

func (fs *SftpFileSysClient) GetPath(uri string) (path string, err error)

GetPath returns the path of url

func (*SftpFileSysClient) GetReader

func (fs *SftpFileSysClient) GetReader(urlStr string) (reader io.Reader, err error)

GetReader return a reader for the given path

func (*SftpFileSysClient) GetWriter

func (fs *SftpFileSysClient) GetWriter(urlStr string) (writer io.Writer, err error)

GetWriter creates the file if non-existent and return a writer

func (*SftpFileSysClient) Init

func (fs *SftpFileSysClient) Init(ctx context.Context) (err error)

Init initializes the fs client

func (*SftpFileSysClient) List

func (fs *SftpFileSysClient) List(url string) (nodes dbio.FileNodes, err error)

List list objects in path

func (*SftpFileSysClient) ListRecursive

func (fs *SftpFileSysClient) ListRecursive(uri string) (nodes dbio.FileNodes, err error)

ListRecursive list objects in path recursively

func (*SftpFileSysClient) MkdirAll

func (fs *SftpFileSysClient) MkdirAll(path string) (err error)

MkdirAll creates child directories

func (*SftpFileSysClient) Prefix added in v1.1.15

func (fs *SftpFileSysClient) Prefix(suffix ...string) string

Prefix returns the url prefix

func (*SftpFileSysClient) Write

func (fs *SftpFileSysClient) Write(urlStr string, reader io.Reader) (bw int64, err error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL