parquet

package module
v0.0.0-...-b8bb60e Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 2, 2021 License: Apache-2.0 Imports: 20 Imported by: 0

README

parquet

Refactoring of https://github.com/xitongsys/parquet-go (Copyright 2017 Xitong Zhang)

  • Code was linted and refactored using golangci-lint
  • Added a couple of missing error checking
  • Relevant sub packages were moved into root pkg
  • Some of the packages related to internals were made private
  • JSON marshalling removed (feels like it should be elsewhere)

Planned

  • refactor tagging system to a more strong typed approach and ditch the custom tag dsl
  • improve error handling
  • use io.Writer and at most io.ReadSeeker

Status

It's the same code base but instead of using sub packages, it should work as:

import (
	"github.com/xintongsys/parquet-go-source/local"
	"github.com/stdiopt/parquet"
)

type My struct {
	metric1 int `parquet:"name=metric1"`
	metric2 int `parquet:"name=metric2"`
}

func main() {
	f, err := local.NewLocalFileWriter("name.parquet")
	// err check
	defer f.Close()

	pw, err := parquet.NewWriter(f, &My{},1)
	// err check
	defer pw.Close()

	err := pw.Write(&My{1,2})
	// errcheck
}

Documentation

Overview

Package parquet is a fully refactoring from github.com/xitongsys/parquet-go linted, cleared etc

Package parquet ...

Index

Constants

View Source
const (
	JulianDayOfEpoch int64 = 2440588
	MicrosPerDay     int64 = 3600 * 24 * 1000 * 1000
)

nolint: lll From Spark https://github.com/apache/spark/blob/b9f2f78de59758d1932c1573338539e485a01112/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala#L47

Variables

This section is empty.

Functions

func ConvertToThriftReader

func ConvertToThriftReader(file File, offset int64, size int64) (*thrift.TBufferedTransport, error)

ConvertToThriftReader convert a file reater to Thrift reader.

func DECIMAL_BYTE_ARRAY_ToString

func DECIMAL_BYTE_ARRAY_ToString(dec []byte, _ int, scale int) string

nolint

func DECIMAL_INT_ToString

func DECIMAL_INT_ToString(dec int64, _ int, scale int) string

nolint

func DeepCopy

func DeepCopy(src, dst interface{})

DeepCopy a weird no no nolint errcheck

TODO: {lpf} aparently this is only used to copy a Tag structure which would be just better to create a specific tag clone method

func HeadToUpper

func HeadToUpper(str string) string

HeadToUpper convert the first letter of a string to uppercase.

func INT96ToTime

func INT96ToTime(int96 string) time.Time

INT96ToTime parquet int96 to time.

func NewLogicalTypeFromConvertedType

func NewLogicalTypeFromConvertedType(schemaElement *parquet.SchemaElement, info *Tag) *parquet.LogicalType

NewLogicalTypeFromConvertedType returns a parquet logical type from schema element and tag.

func NewLogicalTypeFromFieldsMap

func NewLogicalTypeFromFieldsMap(mp map[string]string) (*parquet.LogicalType, error)

NewLogicalTypeFromFieldsMap returns a logicalType from a map.

func NewSchemaElementFromTagMap

func NewSchemaElementFromTagMap(info *Tag) (*parquet.SchemaElement, error)

NewSchemaElementFromTagMap returns a schema element from a tag struct.

func SizeOf

func SizeOf(val reflect.Value) int64

SizeOf Get the size of a parquet value.

func StringToVariableName

func StringToVariableName(str string) string

StringToVariableName convert string to a golang variable name.

func TIMESTAMP_MICROSToTime

func TIMESTAMP_MICROSToTime(micros int64, adjustedToUTC bool) time.Time

nolint: revive, lll

func TIMESTAMP_MILLISToTime

func TIMESTAMP_MILLISToTime(millis int64, adjustedToUTC bool) time.Time

nolint: revive, lll

func TIMESTAMP_NANOSToTime

func TIMESTAMP_NANOSToTime(nanos int64, adjustedToUTC bool) time.Time

nolint: revive, lll

func TimeToINT96

func TimeToINT96(t time.Time) string

TimeToINT96 turns time into a int96 type.

func TimeToTIMESTAMP_MICROS

func TimeToTIMESTAMP_MICROS(t time.Time, adjustedToUTC bool) int64

nolint: revive, lll

func TimeToTIMESTAMP_MILLIS

func TimeToTIMESTAMP_MILLIS(t time.Time, adjustedToUTC bool) int64

nolint: revive, lll

func TimeToTIMESTAMP_NANOS

func TimeToTIMESTAMP_NANOS(t time.Time, adjustedToUTC bool) int64

nolint: revive, lll

func TimeToTIME_MICROS

func TimeToTIME_MICROS(t time.Time, adjustedToUTC bool) int64

nolint: revive, lll

func TimeToTIME_MILLIS

func TimeToTIME_MILLIS(t time.Time, adjustedToUTC bool) int64

nolint: revive, lll

Types

type File

type File interface {
	io.Seeker
	io.Reader
	io.Writer
	io.Closer
	Open(name string) (File, error)
	Create(name string) (File, error)
}

File to be used in parquet reader or writer.

type Item

type Item struct {
	GoType reflect.Type
	Info   *Tag
}

Item represents a field.

func NewItem

func NewItem() *Item

NewItem returns a new item.

type JSONSchemaItemType

type JSONSchemaItemType struct {
	Tag    string                `json:"Tag"`
	Fields []*JSONSchemaItemType `json:"Fields,omitempty"`
}

JSONSchemaItemType represents something.

func NewJSONSchemaItem

func NewJSONSchemaItem() *JSONSchemaItemType

NewJSONSchemaItem returns a jsonschemaitem.

type Marshaler

type Marshaler interface {
	Marshal(node *marshalNode, nodeBuf *marshalNodeBufType) []*marshalNode
}

Marshaler interface with MarshalMethod.

type PathMapType

type PathMapType struct {
	Path     string
	Children map[string]*PathMapType
}

PathMapType records the path and its children; This is used in Marshal for improve performance.

func NewPathMap

func NewPathMap(path string) *PathMapType

NewPathMap returns am initialized pathMap type.

func (*PathMapType) Add

func (pmt *PathMapType) Add(path []string)

Add adds a path.

type Reader

type Reader struct {
	SchemaHandler *SchemaHandler
	NP            int64 // parallel number
	Footer        *parquet.FileMetaData
	PFile         File

	ColumnBuffers map[string]*columnBufferType

	// One reader can only read one type objects
	ObjType        reflect.Type
	ObjPartialType reflect.Type
}

Reader contains methods and information to read parquet files.

func NewParquetColumnReader

func NewParquetColumnReader(pFile File, np int64) (*Reader, error)

NewParquetColumnReader creates a parquet column reader.

func NewReader

func NewReader(pFile File, obj interface{}, np int64) (*Reader, error)

NewReader Create a parquet reader: obj is a object with schema tags or a JSON schema string.

func (*Reader) Close

func (pr *Reader) Close() error

Close closes the parquer file.

func (*Reader) GetFooterSize

func (pr *Reader) GetFooterSize() (uint32, error)

GetFooterSize returns the footer size.

func (*Reader) GetNumRows

func (pr *Reader) GetNumRows() int64

GetNumRows return number of rows.

func (*Reader) Read

func (pr *Reader) Read(dstInterface interface{}) error

Read rows of parquet file and unmarshal all to dst.

func (*Reader) ReadByNumber

func (pr *Reader) ReadByNumber(maxReadNumber int) ([]interface{}, error)

ReadByNumber read maxReadNumber objects.

func (*Reader) ReadColumnByIndex

func (pr *Reader) ReadColumnByIndex(
	index int64,
	num int64,
) (values []interface{}, rls []int32, dls []int32, err error)

ReadColumnByIndex reads column by index. The index of first column is 0.

func (*Reader) ReadColumnByPath

func (pr *Reader) ReadColumnByPath(
	pathStr string,
	num int64,
) (values []interface{}, rls []int32, dls []int32, err error)

ReadColumnByPath reads column by path in schema.

func (*Reader) ReadFooter

func (pr *Reader) ReadFooter() error

ReadFooter reads the footer from parquet file.

func (*Reader) ReadPartial

func (pr *Reader) ReadPartial(dstInterface interface{}, prefixPath string) error

ReadPartial read rows of parquet file and unmarshal all to dst.

func (*Reader) ReadPartialByNumber

func (pr *Reader) ReadPartialByNumber(maxReadNumber int, prefixPath string) ([]interface{}, error)

ReadPartialByNumber read maxReadNumber partial objects.

func (*Reader) RenameSchema

func (pr *Reader) RenameSchema()

RenameSchema renames schema name to inname.

func (*Reader) SetSchemaHandlerFromJSON

func (pr *Reader) SetSchemaHandlerFromJSON(jsonSchema string) error

SetSchemaHandlerFromJSON reads a json schema.

func (*Reader) SkipRows

func (pr *Reader) SkipRows(num int64) error

SkipRows of parquet file.

func (*Reader) SkipRowsByIndex

func (pr *Reader) SkipRowsByIndex(index int64, num int64) error

SkipRowsByIndex skip rows by index.

func (*Reader) SkipRowsByPath

func (pr *Reader) SkipRowsByPath(pathStr string, num int64) error

SkipRowsByPath skip certain rows.

type SchemaHandler

type SchemaHandler struct {
	SchemaElements []*parquet.SchemaElement
	MapIndex       map[string]int32
	IndexMap       map[int32]string
	PathMap        *PathMapType
	Infos          []*Tag

	InPathToExPath map[string]string
	ExPathToInPath map[string]string

	ValueColumns []string
}

SchemaHandler stores the schema data.

func NewSchemaHandlerFromJSON

func NewSchemaHandlerFromJSON(str string) (sh *SchemaHandler, err error)

NewSchemaHandlerFromJSON returns a new json schema handler.

func NewSchemaHandlerFromSchemaList

func NewSchemaHandlerFromSchemaList(schemas []*parquet.SchemaElement) *SchemaHandler

NewSchemaHandlerFromSchemaList creates schema handler from schema list.

func NewSchemaHandlerFromStruct

func NewSchemaHandlerFromStruct(obj interface{}) (sh *SchemaHandler, err error)

NewSchemaHandlerFromStruct create schema handler from a object.

func (*SchemaHandler) ConvertToInPathStr

func (sh *SchemaHandler) ConvertToInPathStr(pathStr string) (string, error)

ConvertToInPathStr converts a path to internal path.

func (*SchemaHandler) CreateInExMap

func (sh *SchemaHandler) CreateInExMap()

CreateInExMap maps in and ex fields.

func (*SchemaHandler) GetColumnNum

func (sh *SchemaHandler) GetColumnNum() int64

GetColumnNum returns number of columns.

func (*SchemaHandler) GetExName

func (sh *SchemaHandler) GetExName(index int) string

GetExName returns ex name from index.

func (*SchemaHandler) GetInName

func (sh *SchemaHandler) GetInName(index int) string

GetInName returns in name from index.

func (*SchemaHandler) GetRepetitionLevelIndex

func (sh *SchemaHandler) GetRepetitionLevelIndex(path []string, rl int32) (int32, error)

GetRepetitionLevelIndex returns the max repetition level type of a column by it's schema path.

func (*SchemaHandler) GetRepetitionType

func (sh *SchemaHandler) GetRepetitionType(path []string) (parquet.FieldRepetitionType, error)

GetRepetitionType returns the repetition type of a column by it's schema path.

func (*SchemaHandler) GetRootExName

func (sh *SchemaHandler) GetRootExName() string

GetRootExName get root ex name from the schema handler.

func (*SchemaHandler) GetRootInName

func (sh *SchemaHandler) GetRootInName() string

GetRootInName get root name from the schema handler.

func (*SchemaHandler) GetType

func (sh *SchemaHandler) GetType(prefixPath string) (reflect.Type, error)

GetType returns type by path, returns error if not exists.

func (*SchemaHandler) GetTypes

func (sh *SchemaHandler) GetTypes() []reflect.Type

GetTypes returns object type from schema by reflect.

func (*SchemaHandler) MaxDefinitionLevel

func (sh *SchemaHandler) MaxDefinitionLevel(path []string) (int32, error)

MaxDefinitionLevel returns the max definition level type of a column by it's schema path.

func (*SchemaHandler) MaxRepetitionLevel

func (sh *SchemaHandler) MaxRepetitionLevel(path []string) (int32, error)

MaxRepetitionLevel returns the max repetition level type of a column by it's schema path.

type Tag

type Tag struct {
	InName string
	ExName string

	Type      string
	KeyType   string
	ValueType string

	ConvertedType      string
	KeyConvertedType   string
	ValueConvertedType string

	Length      int32
	KeyLength   int32
	ValueLength int32

	Scale      int32
	KeyScale   int32
	ValueScale int32

	Precision      int32
	KeyPrecision   int32
	ValuePrecision int32

	IsAdjustedToUTC      bool
	KeyIsAdjustedToUTC   bool
	ValueIsAdjustedToUTC bool

	FieldID      int32
	KeyFieldID   int32
	ValueFieldID int32

	Encoding      parquet.Encoding
	KeyEncoding   parquet.Encoding
	ValueEncoding parquet.Encoding

	OmitStats      bool
	KeyOmitStats   bool
	ValueOmitStats bool

	RepetitionType      parquet.FieldRepetitionType
	KeyRepetitionType   parquet.FieldRepetitionType
	ValueRepetitionType parquet.FieldRepetitionType

	LogicalTypeFields      map[string]string
	KeyLogicalTypeFields   map[string]string
	ValueLogicalTypeFields map[string]string
}

Tag parser `parquet:"name=Name, type=FIXED_LEN_BYTE_ARRAY, length=12"`.

func GetKeyTagMap

func GetKeyTagMap(src *Tag) *Tag

GetKeyTagMap gets key tag map for map.

func GetValueTagMap

func GetValueTagMap(src *Tag) *Tag

GetValueTagMap gets value tag map for map.

func NewTag

func NewTag() *Tag

NewTag returns an initialized tag.

func StringToTag

func StringToTag(tag string) (*Tag, error)

StringToTag parses tag.

type Writer

type Writer struct {
	SchemaHandler *SchemaHandler
	NP            int64 // parallel number
	Footer        *parquet.FileMetaData
	PFile         File

	PageSize        int64
	RowGroupSize    int64
	CompressionType parquet.CompressionCodec
	Offset          int64

	Objs              []interface{}
	ObjsSize          int64
	ObjSize           int64
	CheckSizeCritical int64

	PagesMapBuf map[string][]*layoutPage
	Size        int64
	NumRows     int64

	DictRecs map[string]*layoutDictRecType

	ColumnIndexes []*parquet.ColumnIndex
	OffsetIndexes []*parquet.OffsetIndex

	MarshalFunc func(src []interface{}, sh *SchemaHandler) (*map[string]*layoutTable, error)
}

Writer encodes parquet.

func NewWriter

func NewWriter(pFile File, obj interface{}, np int64) (*Writer, error)

NewWriter create a parquet handler. Obj is a object with tags or JSON schema string.

func (*Writer) Close

func (pw *Writer) Close() error

Close flushes and closes the parquet file.

func (*Writer) Flush

func (pw *Writer) Flush(flag bool) error

Flush the write buffer to parquet file.

func (*Writer) RenameSchema

func (pw *Writer) RenameSchema()

RenameSchema renames schema name to exname in tags.

func (*Writer) Write

func (pw *Writer) Write(src interface{}) error

Write one object to parquet file.

Directories

Path Synopsis
internal

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL