input

package
v0.1.0-alpha Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 19, 2021 License: MIT Imports: 35 Imported by: 0

Documentation

Overview

Package input provides input components

Index

Constants

This section is empty.

Variables

All is the list of all baker inputs.

View Source
var KCLDesc = baker.InputDesc{
	Name:   "KCL",
	New:    NewKCL,
	Config: &KCLConfig{},
	Help: "This input fetches records from Kinesis with KCL. It consumes a specified stream, and\n" +
		"processes all shards in that stream. It never exits.\n" +
		"Multiple baker instances can consume the same stream, in that case the KCL will take care of\n" +
		"balancing the shards between workers. Careful (shard stealing is not implemented yet).\n" +
		"Resharding on the producer side is automatically handled by the KCL that will distribute\n" +
		"the shards among KCL workers.",
}

KCLDesc describes the KCL input.

View Source
var KinesisDesc = baker.InputDesc{
	Name:   "Kinesis",
	New:    NewKinesis,
	Config: &KinesisConfig{},
	Help: "This input fetches log lines from Kinesis. It listens on a specified stream, and\n" +
		"processes all the shards in that stream. It never exits.\n",
}
View Source
var ListDesc = baker.InputDesc{
	Name:   "List",
	New:    NewList,
	Config: &ListConfig{},
	Help: "This input fetches logs from a predefined list of local or remote sources. The \"Files\"\n" +
		"configuration variable is a list of \"file specifiers\". Each \"file specifier\" can be:\n\n" +
		"  * A local file path on the filesystem: the log file at that path will be processed\n" +
		"  * A HTTP/HTTPS URL: the log file at that URL will be downloaded and processed\n" +
		"  * A S3 URL: the log file at that URL that will be downloaded and processed\n" +
		"  * \"@\" followed by a local path pointing to a file: the file is expected to be a text file\n" +
		"    and each line will be read and parsed as a \"file specifier\"\n" +
		"  * \"@\" followed by a HTTP/HTTPS URL: the text file pointed by the URL will be downloaded,\n" +
		"    and each line will be read and parsed as a \"file specifier\"\n" +
		"  * \"@\" followed by a S3 URL pointing to a file: the text file pointed by the URL will be\n" +
		"    downloaded, and each line will be read and parsed as a \"file specifier\"\n" +
		"  * \"@\" followed by a local path pointing to a directory (must end with a slash): the directory will be recursively\n" +
		"    walked, and all files matching the \"MatchPath\" option regexp will be processed as logfiles\n" +
		"  * \"@\" followed by a S3 URL pointing to a directory: the directory on S3 will be recursively\n" +
		"    walked, and all files matching the \"MatchPath\" option regexp will be processed as logfiles\n" +
		"  * \"-\": the contents of a log file will be read from stdin and processed\n" +
		"  * \"@-\": each line read from stdin will be parsed as a \"file specifier\"\n\n" +
		"All records produced by this input contain 2 metadata values:\n" +
		"  * url: the files that originally contained the record\n" +
		"  * last_modified: the last modification datetime of the above file\n",
}
View Source
var SQSDesc = baker.InputDesc{
	Name:   "SQS",
	New:    NewSQS,
	Config: &SQSConfig{},
	Help: "This input listens on multiple SQS queues for new incoming log files\n" +
		"on S3; it is meant to be used with SQS queues popoulated by SNS.\n" +
		"It never exits.\n",
}
View Source
var TCPDesc = baker.InputDesc{
	Name:   "TCP",
	New:    NewTCP,
	Config: &TCPConfig{},
	Help: "This input relies on a TCP connection to receive records in the usual format\n" +
		"Configure it with a host and port that you want to accept connection from.\n" +
		"By default it listens on port 6000 for any connection\n" +
		"It never exits.\n",
}

Functions

func NewKCL

func NewKCL(cfg baker.InputParams) (baker.Input, error)

NewKCL creates a new KCL.

func NewKinesis

func NewKinesis(cfg baker.InputParams) (baker.Input, error)

NewKinesis creates a Kinesis tail, and immediately do a first connection to get the current shard list.

func NewList

func NewList(cfg baker.InputParams) (baker.Input, error)

func NewSQS

func NewSQS(cfg baker.InputParams) (baker.Input, error)

func NewTCP

func NewTCP(cfg baker.InputParams) (baker.Input, error)

Types

type KCL

type KCL struct {
	// contains filtered or unexported fields
}

KCL is a Baker input reading from Kinesis with the KCL (Kinesis Client Library).

func (*KCL) CreateProcessor

func (k *KCL) CreateProcessor() interfaces.IRecordProcessor

CreateProcessor implements interfaces.IRecordProcessorFactory.

func (*KCL) FreeMem

func (k *KCL) FreeMem(data *baker.Data)

FreeMem implements baker.Input

func (*KCL) Run

func (k *KCL) Run(inch chan<- *baker.Data) error

Run implements baker.Input.

func (*KCL) Stats

func (k *KCL) Stats() baker.InputStats

Stats implements baker.Input

func (*KCL) Stop

func (k *KCL) Stop()

Stop implements baker.Input

type KCLConfig

type KCLConfig struct {
	AwsRegion       string        `help:"AWS region to connect to" default:"us-west-2"`
	Stream          string        `help:"Name of Kinesis stream" required:"true"`
	AppName         string        `help:"Used by KCL to allow multiple app to consume the same stream." required:"true"`
	MaxShards       int           `help:"Max shards this Worker can handle at a time" default:"32767"`
	ShardSync       time.Duration `help:"Time between tasks to sync leases and Kinesis shards" default:"60s"`
	InitialPosition string        `help:"Position in the stream where a new application should start from. Values: LATEST or TRIM_HORIZON" default:"LATEST"`
	// contains filtered or unexported fields
}

KCLConfig is the configuration for the KCL input.

type Kinesis

type Kinesis struct {
	Cfg  *KinesisConfig
	Data chan<- *baker.Data
	// contains filtered or unexported fields
}

func (*Kinesis) FreeMem

func (s *Kinesis) FreeMem(data *baker.Data)

func (*Kinesis) ProcessRecords

func (s *Kinesis) ProcessRecords(shard *kinesis.Shard) error

func (*Kinesis) Run

func (s *Kinesis) Run(data chan<- *baker.Data) error

func (*Kinesis) Stats

func (s *Kinesis) Stats() baker.InputStats

func (*Kinesis) Stop

func (s *Kinesis) Stop()

type KinesisConfig

type KinesisConfig struct {
	AwsRegion string        `help:"AWS region to connect to" default:"us-west-2"`
	Stream    string        `help:"Stream name on Kinesis" required:"true"`
	IdleTime  time.Duration `help:"Time between polls of each shard" default:"100ms"`
}

type List

type List struct {
	Cfg *ListConfig
	// contains filtered or unexported fields
}

func (*List) FreeMem

func (s *List) FreeMem(data *baker.Data)

func (*List) ProcessDirectory

func (s *List) ProcessDirectory(dir string, matchPath *regexp.Regexp) error

func (*List) Run

func (s *List) Run(inch chan<- *baker.Data) error

func (*List) Stats

func (s *List) Stats() baker.InputStats

func (*List) Stop

func (s *List) Stop()

type ListConfig

type ListConfig struct {
	Files     []string `help:"List of log-files, directories and/or list-files to process" default:"[\"-\"]"`
	MatchPath string   `help:"regexp to filter files in specified directories" default:".*\\.log\\.gz"`
	Region    string   `help:"AWS Region for fetching from S3" default:"us-west-2"`
}

type SQS

type SQS struct {
	Cfg            *SQSConfig
	FilePathRegexp *regexp.Regexp
	// contains filtered or unexported fields
}

func (*SQS) FreeMem

func (s *SQS) FreeMem(data *baker.Data)

func (*SQS) Run

func (s *SQS) Run(inch chan<- *baker.Data) error

func (*SQS) Stats

func (s *SQS) Stats() baker.InputStats

func (*SQS) Stop

func (s *SQS) Stop()

type SQSConfig

type SQSConfig struct {
	AwsRegion      string   `help:"AWS region to connect to" default:"us-west-2"`
	Bucket         string   `help:"S3 Bucket to use for processing" default:""`
	QueuePrefixes  []string `help:"Prefixes of the names of the SQS queues to monitor" required:"true"`
	MessageFormat  string   `` /* 189-byte string literal not displayed */
	FilePathFilter string   `help:"If provided, will only use S3 files with the given path."`
}

type TCP

type TCP struct {
	Cfg *TCPConfig
	// contains filtered or unexported fields
}

func (*TCP) FreeMem

func (s *TCP) FreeMem(data *baker.Data)

func (*TCP) Run

func (s *TCP) Run(inch chan<- *baker.Data) error

func (*TCP) Stats

func (s *TCP) Stats() baker.InputStats

func (*TCP) Stop

func (s *TCP) Stop()

type TCPConfig

type TCPConfig struct {
	Listener string `help:"Host:Port to bind to"`
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL