file

package
v1.14.0-dev.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 25, 2024 License: Apache-2.0 Imports: 21 Imported by: 0

Documentation

Index

Constants

View Source
const (
	GZIP = "gzip"
	ZSTD = "zstd"
)
View Source
const (
	TupleError int = iota // display error in tuple
)

Variables

This section is empty.

Functions

func File

func File() api.Sink

Types

type CsvReader

type CsvReader struct {
	// contains filtered or unexported fields
}

func (*CsvReader) Close

func (r *CsvReader) Close() error

func (*CsvReader) Read

func (r *CsvReader) Read() (map[string]interface{}, error)

type FileSource

type FileSource struct {
	// contains filtered or unexported fields
}

FileSource The BATCH to load data from file at once

func (*FileSource) Close

func (fs *FileSource) Close(ctx api.StreamContext) error

func (*FileSource) Configure

func (fs *FileSource) Configure(fileName string, props map[string]interface{}) error

func (*FileSource) Load

func (fs *FileSource) Load(ctx api.StreamContext, consumer chan<- api.SourceTuple) error

func (*FileSource) Open

func (fs *FileSource) Open(ctx api.StreamContext, consumer chan<- api.SourceTuple, errCh chan<- error)

type FileSourceConfig

type FileSourceConfig struct {
	FileType         FileType `json:"fileType"`
	Path             string   `json:"path"`
	Interval         int      `json:"interval"`
	IsTable          bool     `json:"isTable"`
	Parallel         bool     `json:"parallel"`
	SendInterval     int      `json:"sendInterval"`
	ActionAfterRead  int      `json:"actionAfterRead"`
	MoveTo           string   `json:"moveTo"`
	HasHeader        bool     `json:"hasHeader"`
	Columns          []string `json:"columns"`
	IgnoreStartLines int      `json:"ignoreStartLines"`
	IgnoreEndLines   int      `json:"ignoreEndLines"`
	Delimiter        string   `json:"delimiter"`
	Decompression    string   `json:"decompression"`
}

type FileType

type FileType string
const (
	JSON_TYPE  FileType = "json"
	CSV_TYPE   FileType = "csv"
	LINES_TYPE FileType = "lines"
)

type FormatReader

type FormatReader interface {
	Read() (map[string]interface{}, error) // Reads the next record. Returns EOF when the input has reached its end.
	Close() error
}

func CreateCsvReader

func CreateCsvReader(fileStream io.Reader, config *FileSourceConfig, ctx api.StreamContext) (FormatReader, error)

func CreateJsonReader

func CreateJsonReader(fileStream io.Reader, config *FileSourceConfig, ctx api.StreamContext) (FormatReader, error)

func CreateLineReader

func CreateLineReader(fileStream io.Reader, config *FileSourceConfig, ctx api.StreamContext) (FormatReader, error)

func GetReader

func GetReader(fileType FileType, fileStream io.Reader, config *FileSourceConfig, ctx api.StreamContext) (FormatReader, error)

type JsonReader

type JsonReader struct {
	// contains filtered or unexported fields
}

func (*JsonReader) Close

func (r *JsonReader) Close() error

func (*JsonReader) Read

func (r *JsonReader) Read() (map[string]interface{}, error)

type LineReader

type LineReader struct {
	// contains filtered or unexported fields
}

func (*LineReader) Close

func (r *LineReader) Close() error

func (*LineReader) Read

func (r *LineReader) Read() (map[string]interface{}, error)

type ReaderError

type ReaderError struct {
	Code    int
	Message string
}

func BuildError

func BuildError(code int, msg string) *ReaderError

func (ReaderError) Error

func (e ReaderError) Error() string

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL