impl

package
v1.2.7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 4, 2023 License: MIT Imports: 13 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Exporter

type Exporter struct {
	// contains filtered or unexported fields
}

func NewExporter

func NewExporter(consumer *kafka.Consumer, topics []string, writer Writer, options *Options) (*Exporter, error)

func (*Exporter) Run

func (e *Exporter) Run() (exportedCount uint64, err error)

type Importer

type Importer struct {
	// contains filtered or unexported fields
}

func NewImporter

func NewImporter(producer *kafka.Producer, deliveryChan chan kafka.Event, reader Reader) *Importer

func (*Importer) Run

func (i *Importer) Run() error

type Options

type Options struct {
	Limit                       uint64
	MaxWaitingTimeForNewMessage *time.Duration
}

type ParquetMessage

type ParquetMessage struct {
	Value         string `parquet:"name=value, type=BYTE_ARRAY, convertedtype=UTF8, encoding=PLAIN"`
	Topic         string `parquet:"name=topic, type=BYTE_ARRAY, convertedtype=UTF8, encoding=PLAIN"`
	Partition     int32  `parquet:"name=partition, type=INT32, convertedtype=INT_32"`
	Offset        string `parquet:"name=offset, type=BYTE_ARRAY, convertedtype=UTF8, encoding=PLAIN"`
	Key           string `parquet:"name=key, type=BYTE_ARRAY, convertedtype=UTF8, encoding=PLAIN"`
	Headers       string `parquet:"name=headers, type=BYTE_ARRAY, convertedtype=UTF8, encoding=PLAIN"`
	Timestamp     string `parquet:"name=timestamp, type=BYTE_ARRAY, convertedtype=UTF8, encoding=PLAIN"`
	TimestampType string `parquet:"name=timestamptype, type=BYTE_ARRAY, convertedtype=UTF8, encoding=PLAIN"`
}

type ParquetReader

type ParquetReader struct {
	// contains filtered or unexported fields
}

func NewParquetReader

func NewParquetReader(filePath string, includePartitionAndOffset bool) (*ParquetReader, error)

func (*ParquetReader) GetNumberOfRows

func (p *ParquetReader) GetNumberOfRows() int64

func (*ParquetReader) Read

func (p *ParquetReader) Read() chan kafka.Message

type ParquetWriter

type ParquetWriter struct {
	// contains filtered or unexported fields
}

func NewParquetWriter

func NewParquetWriter(fileWriter source.ParquetFile) (*ParquetWriter, error)

func (*ParquetWriter) Flush

func (f *ParquetWriter) Flush() error

func (*ParquetWriter) Write

func (f *ParquetWriter) Write(msg kafka.Message) (err error)

type Reader

type Reader interface {
	Read() chan kafka.Message
}

type Streamer

type Streamer struct {
	// contains filtered or unexported fields
}

func NewStreamer

func NewStreamer(consumer *kafka.Consumer, producer *kafka.Producer, topicFrom string, topicTo string, deliveryChan chan kafka.Event, options StreamerOptions) *Streamer

func (*Streamer) Run

func (s *Streamer) Run() (transferredCount int64, err error)

type StreamerOptions

type StreamerOptions struct {
	MaxWaitingTimeForNewMessage *time.Duration
}

type Writer

type Writer interface {
	Write(msg kafka.Message) error
	Flush() error
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL