kafka_static

package
v3.35.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 30, 2023 License: Apache-2.0, Apache-2.0 Imports: 17 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Main

type Main struct {
	idk.Main           `flag:"!embed"`
	KafkaHosts         []string      `help:"Comma separated list of host:port pairs for Kafka."`
	Group              string        `help:"Kafka group."`
	Topics             []string      `help:"Kafka topics to read from."`
	Timeout            time.Duration `help:"Time to wait for more records from Kafka before flushing a batch. 0 to disable."`
	SkipOld            bool          `short:"" help:"Skip to the most recent Kafka message rather than starting at the beginning."`
	Header             string        `` /* 175-byte string literal not displayed */
	S3Region           string        `help:"S3 Region, optionally used when header is specified as an S3 URI. Alternatively, use environment variable AWS_REGION."`
	AllowMissingFields bool          `help:"Will proceed with ingest even if a field is missing from a record but specified in the JSON config file. Default false"`

	KafkaTLS idk.TLSConfig
}

func NewMain

func NewMain() *Main

type Record

type Record struct {
	// contains filtered or unexported fields
}

func (*Record) Commit

func (r *Record) Commit(ctx context.Context) error

func (*Record) Data

func (r *Record) Data() []interface{}

func (*Record) Schema added in v3.34.0

func (r *Record) Schema() interface{}

func (*Record) StreamOffset

func (r *Record) StreamOffset() (string, uint64)

type Source

type Source struct {
	Hosts   []string
	Topics  []string
	Group   string
	TLS     idk.TLSConfig
	Log     logger.Logger
	Timeout time.Duration
	SkipOld bool

	// Header is a file or url referencing a file containing JSON header
	// configuration.
	Header string

	// HeaderFields can be provided instead of Header. It is a slice of
	// RawFields which will be marshalled and parsed the same way a JSON object
	// in Header would be. It is used only if a Header is not provided.
	HeaderFields []idk.RawField

	S3Region string

	AllowMissingFields bool
	// contains filtered or unexported fields
}

Source implements the idk.Source interface using kafka as a data source. It is not threadsafe! Due to the way Kafka clients work, to achieve concurrency, create multiple Sources.

func NewSource

func NewSource() *Source

NewSource gets a new Source

func (*Source) Close

func (s *Source) Close() error

Close closes the underlying kafka consumer.

func (*Source) Open

func (s *Source) Open() error

Open initializes the kafka source.

func (*Source) Record

func (s *Source) Record() (idk.Record, error)

Record returns the value of the next kafka message. The same Record object may be used by successive calls to Record, so it should not be retained.

func (*Source) Schema

func (s *Source) Schema() []idk.Field

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL