kafka

package
v3.35.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 30, 2023 License: Apache-2.0, Apache-2.0 Imports: 31 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func CreateKafkaTopic

func CreateKafkaTopic(ctx context.Context, topic string, p *confluent.Producer, numPartitions int, replicationFactor int) error

Types

type Main

type Main struct {
	idk.Main             `flag:"!embed"`
	idk.ConfluentCommand `flag:"!embed"`
	Group                string        `help:"Kafka group."`
	Topics               []string      `help:"Kafka topics to read from."`
	Timeout              time.Duration `help:"Time to wait for more records from Kafka before flushing a batch. 0 to disable."`
	SkipOld              bool          `short:"" help:"False sets kafka consumer configuration auto.offset.reset to earliest, True sets it to latest."`
	ConsumerCloseTimeout int           `help:"The amount of time in seconds to wait for the consumer to close properly."`
}

func NewMain

func NewMain() (*Main, error)

type PutCmd

type PutCmd struct {
	idk.ConfluentCommand `flag:"!embed"`
	Topic                string `help:"Kafka topic to post to."`

	Schema     string
	SchemaFile string
	Subject    string

	Data string

	DryRun bool `help:"Dry run - just flag parsing."`

	NumPartitions     int
	ReplicationFactor int
	// contains filtered or unexported fields
}

func NewPutCmd

func NewPutCmd() (*PutCmd, error)

func (*PutCmd) Log

func (p *PutCmd) Log() logger.Logger

func (*PutCmd) Run

func (p *PutCmd) Run() (err error)

type PutSource

type PutSource struct {
	idk.ConfluentCommand
	Topic     string `help:"Kafka topic to post to."`
	Subject   string `help:"Kafka schema subject."`
	BatchSize int    `help:"Size of record batches to submit to Kafka."`

	// FB specific config for setting partition key efficiently
	FBPrimaryKeyFields []string
	FBIndexName        string
	FBIDField          string

	Concurrency int `help:"Number of concurrent sources and indexing routines to launch."`

	TrackProgress bool `help:"Periodically print status updates on how many records have been sourced." short:""`

	ReplicationFactor int `help:"set replication factor for kafka cluster"`
	NumPartitions     int `help:"set partition for kafka cluster"`
	// NewSource must be set by the user of Main before calling
	// Main.Run. Main.Run will call this function "Concurrency" times. It
	// is the job of this function to ensure that the concurrent
	// sources which are started partition work appropriately. This is
	// typically set up (by convention) in the Source's package in
	// cmd.go
	NewSource func() (idk.Source, error) `flag:"-"`

	Log logger.Logger

	ConfigMap *confluent.ConfigMap `flag:"-"`
	Target    string
	// contains filtered or unexported fields
}

PutSource represents a kafka put process that operates on one or more idk.Source.

func NewPutSource

func NewPutSource() (*PutSource, error)

NewPutSource returns a new instance of PutSource.

func (*PutSource) Run

func (p *PutSource) Run() error

Run sends the source records to kafka based on concurrency.

type Record

type Record struct {
	// contains filtered or unexported fields
}

func (*Record) Commit

func (r *Record) Commit(ctx context.Context) error

func (*Record) Data

func (r *Record) Data() []interface{}

func (*Record) Schema added in v3.34.0

func (r *Record) Schema() interface{}

func (*Record) StreamOffset

func (r *Record) StreamOffset() (string, uint64)

type Schema

type Schema struct {
	Schema  string `json:"schema"`  // The actual AVRO schema
	Subject string `json:"subject"` // Subject where the schema is registered for
	Version int    `json:"version"` // Version within this subject
	ID      int    `json:"id"`      // Registry's unique id
}

The Schema type is an object produced by the schema registry.

type Source

type Source struct {
	idk.ConfluentCommand
	Topics  []string
	Group   string
	Log     logger.Logger
	Timeout time.Duration
	SkipOld bool
	Verbose bool

	TLS idk.TLSConfig

	ConfigMap *confluent.ConfigMap
	// contains filtered or unexported fields
}

Source implements the idk.Source interface using kafka as a data source. It is not threadsafe! Due to the way Kafka clients work, to achieve concurrency, create multiple Sources.

func NewSource

func NewSource() *Source

NewSource gets a new Source

func (*Source) Close

func (s *Source) Close() error

Close closes the underlying kafka consumer.

func (*Source) CommitMessages

func (s *Source) CommitMessages(recs []confluent.TopicPartition) ([]confluent.TopicPartition, error)

func (*Source) Open

func (s *Source) Open() error

Open initializes the kafka source. (i.e. creating and configuring a consumer) The configuration options for the confluentinc/confluent-kafka-go/kafka libarary are: https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md

func (*Source) Record

func (s *Source) Record() (idk.Record, error)

Record returns the value of the next kafka message. The same Record object may be used by successive calls to Record, so it should not be retained.

func (*Source) Schema

func (s *Source) Schema() []idk.Field

func (*Source) SchemaID

func (s *Source) SchemaID() int

func (*Source) SchemaMetadata

func (s *Source) SchemaMetadata() string

func (*Source) SchemaSchema

func (s *Source) SchemaSchema() string

func (*Source) SchemaSubject

func (s *Source) SchemaSubject() string

func (*Source) SchemaVersion

func (s *Source) SchemaVersion() int

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL