digger

package
v0.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 22, 2022 License: MIT Imports: 24 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Consumer

type Consumer interface {
	Run(ctx context.Context, messageChan chan message) error
}

Consumer is an interface for types that consume messages from a source and feed them into a channel for downstream processing.

type Digger

type Digger struct {
	SourceConsumer Consumer
	Processors     []Processor
}

Digger is a struct that digs through JSON or proto formatted message streams.

func (*Digger) Run

func (d *Digger) Run(ctx context.Context) error

Run runs the digger with the provided context. The function returns when all data has been consumed, a fatal error is encountered, or the context is cancelled.

type FileConsumer

type FileConsumer struct {
	Paths     []string
	Recursive bool
}

FileConsumer is a Consumer implementation that reads from local files.

func (*FileConsumer) Run

func (f *FileConsumer) Run(
	ctx context.Context,
	messageChan chan message,
) error

Run starts the file consumer. Messages are passed to the argument message channel.

type KafkaConsumer

type KafkaConsumer struct {
	Address    string
	Topic      string
	Partitions []kafka.Partition
	Offset     int64
	Since      time.Time
	Until      time.Time

	MinBytes int
	MaxBytes int
}

KafkaConsumer is a Consumer implementation that reads messages from a Kafka topic.

func (*KafkaConsumer) Run

func (k *KafkaConsumer) Run(
	ctx context.Context,
	messageChan chan message,
) error

Run starts the kafka consumer. Messages are passed to the argument message channel.

type LiveStats

type LiveStats struct {
	// contains filtered or unexported fields
}

LiveStats is a processor that calculates and displays stats based on a structured message stream.

func NewLiveStats

func NewLiveStats(config LiveStatsConfig) (*LiveStats, error)

NewLiveStats creates a new LiveStats instance and starts the main progress printing loop.

func (*LiveStats) Process

func (l *LiveStats) Process(ctx context.Context, messageObj message) error

Process updates the stats in this LiveStats for a single message.

func (*LiveStats) Stop

func (l *LiveStats) Stop() error

Stop stops this LiveStats instance.

func (*LiveStats) Summary

func (l *LiveStats) Summary() string

Summary returns a pretty table summary of the stats calculated by this LiveStats instance.

type LiveStatsConfig

type LiveStatsConfig struct {
	Filter       string
	K            int
	Numeric      bool
	PathsStr     string
	PrintMissing bool
	ProtoTypes   []string
	Raw          bool
	RawExtended  bool
	SortByName   bool
}

LiveStatsConfig stores the inputs for a LiveStats processor.

type Processor

type Processor interface {
	Process(context.Context, message) error
	Stop() error
	Summary() string
}

Processor is an interface that can process and summarize messages.

type S3Consumer

type S3Consumer struct {
	S3Client   *s3.S3
	Bucket     string
	Prefixes   []string
	NumWorkers int
}

S3Consumer is a Consumer implementation that reads from one or more prefixes in an S3 bucket.

func (*S3Consumer) Run

func (s *S3Consumer) Run(
	ctx context.Context,
	messageChan chan message,
) error

Run starts the s3 consumer. Messages are passed to the argument message channel.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL