kafka

package
v0.0.0-...-9e054ec Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 25, 2024 License: Apache-2.0 Imports: 43 Imported by: 0

Documentation

Overview

Package kafka contains receives CockroachDB CDC changefeed events that are routed via a kafka cluster.

Index

Constants

This section is empty.

Variables

Set is used by Wire.

Functions

This section is empty.

Types

type Config

type Config struct {
	DLQ          dlq.Config
	Script       script.Config
	Sequencer    sequencer.Config
	Staging      sinkprod.StagingConfig
	Target       sinkprod.TargetConfig
	TargetSchema ident.Schema
	TLS          secure.Config

	BatchSize    int      // How many messages to accumulate before committing to the target
	Brokers      []string // The address of the Kafka brokers
	Group        string   // the Kafka consumer group id.
	MaxTimestamp string   // Only accept messages at or older than this timestamp
	MinTimestamp string   // Only accept messages at or newer than this timestamp
	Strategy     string   // Kafka consumer group re-balance strategy
	Topics       []string // The list of topics that the consumer should use.
	// contains filtered or unexported fields
}

Config contains the configuration necessary for creating a replication connection. ServerID and SourceConn are mandatory.

func (*Config) Bind

func (c *Config) Bind(f *pflag.FlagSet)

Bind adds flags to the set. It delegates to the embedded Config.Bind.

func (*Config) Preflight

func (c *Config) Preflight(ctx context.Context) error

Preflight updates the configuration with sane defaults or returns an error if there are missing options for which a default cannot be provided.

type Conn

type Conn struct {
	// contains filtered or unexported fields
}

Conn encapsulates all wire-connection behavior. It is responsible for receiving replication messages and replying with status updates. TODO (silvano) : support transactional mode https://github.com/cockroachdb/cdc-sink/issues/777 note: we get resolved timestamps on all the partitions, so we should be able to leverage that.

TODO (silvano): support Avro format, schema registry. https://github.com/cockroachdb/cdc-sink/issues/776

TODO (silvano): add metrics. https://github.com/cockroachdb/cdc-sink/issues/778

func ProvideConn

func ProvideConn(
	ctx *stopper.Context,
	acc *apply.Acceptor,
	sw *switcher.Switcher,
	chaos *chaos.Chaos,
	config *Config,
	memo types.Memo,
	stagingPool *types.StagingPool,
	targetPool *types.TargetPool,
	watchers types.Watchers,
) (*Conn, error)

ProvideConn is called by Wire to construct this package's logical.Dialect implementation. There's a fake dependency on the script loader so that flags can be evaluated first.

func (*Conn) Start

func (c *Conn) Start(ctx *stopper.Context) (err error)

Start the replication loop. Connect to the Kafka cluster and process events from the given topics. If more that one processes is started, the partitions within the topics are allocated to each process based on the chosen rebalance strategy.

type EagerConfig

type EagerConfig Config

EagerConfig is a hack to get Wire to move userscript evaluation to the beginning of the injector. This allows CLI flags to be set by the script.

func ProvideEagerConfig

func ProvideEagerConfig(cfg *Config, _ *script.Loader) *EagerConfig

ProvideEagerConfig is a hack to move up the evaluation of the user script so that the options callbacks can set any non-script-related CLI flags.

type Handler

type Handler struct {
	// contains filtered or unexported fields
}

Handler represents a Sarama consumer group consumer

func (*Handler) Cleanup

func (c *Handler) Cleanup(session sarama.ConsumerGroupSession) error

Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited

func (*Handler) ConsumeClaim

func (c *Handler) ConsumeClaim(
	session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim,
) (err error)

ConsumeClaim processes new messages for the topic/partition specified in the claim.

func (*Handler) Setup

func (c *Handler) Setup(session sarama.ConsumerGroupSession) error

Setup is run at the beginning of a new session, before ConsumeClaim

type Kafka

type Kafka struct {
	Conn        *Conn
	Diagnostics *diag.Diagnostics
}

Kafka is a kafka logical replication loop.

func Start

func Start(ctx *stopper.Context, config *Config) (*Kafka, error)

Start creates a Kafka logical replication loop using the provided configuration.

func (*Kafka) GetDiagnostics

func (k *Kafka) GetDiagnostics() *diag.Diagnostics

GetDiagnostics implements stdlogical.HasDiagnostics.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL