qlbridge: github.com/araddon/qlbridge/datasource/files Index | Files

package files

import "github.com/araddon/qlbridge/datasource/files"

Package files is a cloud (gcs, s3) and local file datasource that translates json, csv, files into appropriate interface for qlbridge DataSource so we can run queries. Provides FileHandler interface to allow custom file type handling

Package files implements Cloud Files logic for getting, reading, and converting files into databases. It reads cloud(or local) files, gets lists of tables, and can scan through them using distributed query engine.


Package Files

file.go filehandler.go filepager.go filesource.go filestore.go scanner_csv.go scanner_json.go storesource.go


const (
    // SourceType is the registered Source name in the qlbridge source registry
    SourceType = "cloudstore"


var (

    // Default file queue size to buffer by pager
    FileBufferSize = 5
var (
    // FileColumns are the default columns for the "file" table
    FileColumns = []string{"file", "table", "path", "partialpath", "size", "partition", "updated", "deleted", "filetype"}

func FileStoreLoader Uses

func FileStoreLoader(ss *schema.Schema) (cloudstorage.StoreReader, error)

FileStoreLoader defines the interface for loading files

func RegisterFileHandler Uses

func RegisterFileHandler(scannerType string, fh FileHandler)

RegisterFileHandler Register a FileHandler available by the provided @scannerType

func RegisterFileStore Uses

func RegisterFileStore(storeType string, fs FileStoreCreator)

RegisterFileStore global registry for Registering implementations of FileStore factories of the provided @storeType

func SipPartitioner Uses

func SipPartitioner(partitionCt uint64, fi *FileInfo) int

func TableFromFileAndPath Uses

func TableFromFileAndPath(path, fileIn string) string

Find table name from full path of file, and the path of tables.

There are different "table" naming conventions we use to find table names.

1) Support multiple partitioned files in folder which is name of table


2) Suport Table as name of file inside folder


type FileHandler Uses

type FileHandler interface {
    Init(store FileStore, ss *schema.Schema) error
    // File Each time the underlying FileStore finds a new file it hands it off
    // to filehandler to determine if it is File or not (directory?), and to to extract any
    // metadata such as partition, and parse out fields that may exist in File/Folder path
    File(path string, obj cloudstorage.Object) *FileInfo
    // Create a scanner for particiular file
    Scanner(store cloudstorage.StoreReader, fr *FileReader) (schema.ConnScanner, error)
    // FileAppendColumns provides a method that this file-handler is going to provide additional
    // columns to the files list table, ie we are going to extract column info from the
    // folder paths, file-names.
    // For example:   `tables/appearances/2017/appearances.csv may extract the "2017" as "year"
    // and append that as column to all rows.
    // Optional:  may be nil
    FileAppendColumns() []string

FileHandler defines an interface for developers to build new File processing to allow these custom file-types to be queried with SQL.

Features of Filehandler - File() Converts a cloudstorage object to a FileInfo that describes the File. - Scanner() create a file-scanner, will allow the scanner to implement any

custom file-type reading (csv, protobuf, json, enrcyption).

The File Reading, Opening, Listing is a separate layer, see FileSource for the Cloudstorage layer.

So it is a a factory to create Scanners for a speciffic format type such as csv, json

func NewJsonHandler Uses

func NewJsonHandler(lh datasource.FileLineHandler) FileHandler

NewJsonHandler creates a json file handler for paging new-line delimited rows of json file

func NewJsonHandlerTables Uses

func NewJsonHandlerTables(lh datasource.FileLineHandler, tables []string) FileHandler

NewJsonHandler creates a json file handler for paging new-line delimited rows of json file

type FileHandlerSchema Uses

type FileHandlerSchema interface {

FileHandlerSchema - file handlers may optionally provide info about tables contained

type FileHandlerTables Uses

type FileHandlerTables interface {
    Tables() []*FileTable

FileHandlerTables - file handlers may optionally provide info about tables contained in store

type FileInfo Uses

type FileInfo struct {
    Path        string         // Root path
    Name        string         // Name/Path of file
    PartialPath string         // non-file-name part of path
    Table       string         // Table name this file participates in
    FileType    string         // csv, json, etc
    Partition   int            // which partition
    Size        int            // Content-Length size in bytes
    AppendCols  []driver.Value // Additional Column info extracted from file name/folder path
    // contains filtered or unexported fields

FileInfo describes a single file Say a folder of "./tables" is the "root path" specified then say it has folders for "table names" underneath a redundant "tables" ./tables/


Name = "tables/appearances/appearances1.csv" Table = "appearances" PartialPath = tables/appearances

func FileInfoFromCloudObject Uses

func FileInfoFromCloudObject(path string, obj cloudstorage.Object) *FileInfo

Convert a cloudstorage object to a File. Interpret the table name for given full file path.

func (*FileInfo) String Uses

func (m *FileInfo) String() string

func (*FileInfo) Values Uses

func (m *FileInfo) Values() []driver.Value

Values as as slice, create a row of values describing this file for use in sql listing of files

type FilePager Uses

type FilePager struct {
    Limit int

    // contains filtered or unexported fields

FilePager acts like a Partitionied Data Source Conn, wrapping underlying FileSource and paging through list of files and only scanning those that match this pagers partition - by default the partitionct is -1 which means no partitioning

func NewFilePager Uses

func NewFilePager(tableName string, fs *FileSource) *FilePager

NewFilePager creates default new FilePager

func (*FilePager) Close Uses

func (m *FilePager) Close() error

Close this connection/pager

func (*FilePager) Columns Uses

func (m *FilePager) Columns() []string

Columns part of Conn interface for providing columns for this table/conn

func (*FilePager) Next Uses

func (m *FilePager) Next() schema.Message

Next iterator for next message, wraps the file Scanner, Next file abstractions

func (*FilePager) NextFile Uses

func (m *FilePager) NextFile() (*FileReader, error)

NextFile gets next file

func (*FilePager) NextScanner Uses

func (m *FilePager) NextScanner() (schema.ConnScanner, error)

NextScanner provides the next scanner assuming that each scanner represents different file, and multiple files for single source

func (*FilePager) RunFetcher Uses

func (m *FilePager) RunFetcher()

func (*FilePager) WalkExecSource Uses

func (m *FilePager) WalkExecSource(p *plan.Source) (exec.Task, error)

WalkExecSource Provide ability to implement a source plan for execution

type FileReader Uses

type FileReader struct {
    F    io.ReadCloser // Actual file reader
    Exit chan bool     // exit channel to shutdown reader

FileReader file info and access to file to supply to ScannerMakers

type FileReaderIterator Uses

type FileReaderIterator interface {
    // NextFile returns io.EOF on last file
    NextFile() (*FileReader, error)

FileReaderIterator defines a file source that can page through files getting next file from partition

type FileSource Uses

type FileSource struct {
    Partitioner string // random, ??  (date, keyed?)
    // contains filtered or unexported fields

FileSource Source for reading files, and scanning them allowing the contents to be treated as a database, like doing a full table scan in mysql. But, you can partition across files.

- readers: gcs, local-fs - tablesource: translate lists of files into tables. Normally we would have

multiple files per table (ie partitioned, per-day, etc)

- scanners: responsible for file-specific - files table: a "table" of all the files from this cloud source

func NewFileSource Uses

func NewFileSource() *FileSource

NewFileSource provides a singleton manager for a particular Source Schema, and File-Handler to read/manage all files from a source such as gcs folder x, s3 folder y

func (*FileSource) Close Uses

func (m *FileSource) Close() error

Close this File Source manager

func (*FileSource) File Uses

func (m *FileSource) File(o cloudstorage.Object) *FileInfo

func (*FileSource) Init Uses

func (m *FileSource) Init()

func (*FileSource) Open Uses

func (m *FileSource) Open(tableName string) (schema.Conn, error)

Open a connection to given table, partition of Source interface

func (*FileSource) Setup Uses

func (m *FileSource) Setup(ss *schema.Schema) error

Setup the filesource with schema info

func (*FileSource) Table Uses

func (m *FileSource) Table(tableName string) (*schema.Table, error)

Table satisfys SourceSchema interface to get table schema for given table

func (*FileSource) Tables Uses

func (m *FileSource) Tables() []string

Tables for this file-source

type FileStore Uses

type FileStore interface {

FileStore Defines handler for reading Files, understanding folders and how to create scanners/formatters for files. Created by FileStoreCreator

FileStoreCreator(schema) -> FileStore

FileStore.Objects() -> File
         FileHandler(File) -> FileScanner
              FileScanner.Next() ->  Row

type FileStoreCreator Uses

type FileStoreCreator func(*schema.Schema) (FileStore, error)

FileStoreCreator defines a Factory type for creating FileStore

type FileTable Uses

type FileTable struct {
    Table       string
    PartialPath string
    FileCount   int

type Partitioner Uses

type Partitioner func(uint64, *FileInfo) int

Package files imports 20 packages (graph) and is imported by 2 packages. Updated 2019-12-06. Refresh now. Tools for package owners.