Documentation ¶
Overview ¶
Package kafka contains receives CockroachDB CDC changefeed events that are routed via a kafka cluster.
Index ¶
Constants ¶
This section is empty.
Variables ¶
var Set = wire.NewSet( ProvideConn, ProvideEagerConfig, )
Set is used by Wire.
Functions ¶
This section is empty.
Types ¶
type Config ¶
type Config struct { DLQ dlq.Config Script script.Config Sequencer sequencer.Config Staging sinkprod.StagingConfig Target sinkprod.TargetConfig TargetSchema ident.Schema TLS secure.Config BatchSize int // How many messages to accumulate before committing to the target Brokers []string // The address of the Kafka brokers Group string // the Kafka consumer group id. MaxTimestamp string // Only accept messages at or older than this timestamp MinTimestamp string // Only accept messages at or newer than this timestamp ResolvedInterval time.Duration // Minimal duration between resolved timestamps. Strategy string // Kafka consumer group re-balance strategy Topics []string // The list of topics that the consumer should use. // contains filtered or unexported fields }
Config contains the configuration necessary for creating a replication connection. ServerID and SourceConn are mandatory.
type Conn ¶
type Conn struct {
// contains filtered or unexported fields
}
Conn encapsulates all wire-connection behavior. It is responsible for receiving replication messages and replying with status updates. TODO (silvano) : support transactional mode https://github.com/cockroachdb/cdc-sink/issues/777 note: we get resolved timestamps on all the partitions, so we should be able to leverage that.
TODO (silvano): support Avro format, schema registry. https://github.com/cockroachdb/cdc-sink/issues/776
TODO (silvano): add metrics. https://github.com/cockroachdb/cdc-sink/issues/778
func ProvideConn ¶
func ProvideConn( ctx *stopper.Context, acc *apply.Acceptor, sw *switcher.Switcher, chaos *chaos.Chaos, config *Config, memo types.Memo, stagingPool *types.StagingPool, targetPool *types.TargetPool, watchers types.Watchers, ) (*Conn, error)
ProvideConn is called by Wire to construct this package's logical.Dialect implementation. There's a fake dependency on the script loader so that flags can be evaluated first.
type EagerConfig ¶
type EagerConfig Config
EagerConfig is a hack to get Wire to move userscript evaluation to the beginning of the injector. This allows CLI flags to be set by the script.
func ProvideEagerConfig ¶
func ProvideEagerConfig(cfg *Config, _ *script.Loader) *EagerConfig
ProvideEagerConfig is a hack to move up the evaluation of the user script so that the options callbacks can set any non-script-related CLI flags.
type Handler ¶
type Handler struct {
// contains filtered or unexported fields
}
Handler represents a Sarama consumer group consumer
func (*Handler) Cleanup ¶
func (c *Handler) Cleanup(session sarama.ConsumerGroupSession) error
Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
func (*Handler) ConsumeClaim ¶
func (c *Handler) ConsumeClaim( session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim, ) (err error)
ConsumeClaim processes new messages for the topic/partition specified in the claim.
type Kafka ¶
type Kafka struct { Conn *Conn Diagnostics *diag.Diagnostics }
Kafka is a kafka logical replication loop.
func (*Kafka) GetDiagnostics ¶
func (k *Kafka) GetDiagnostics() *diag.Diagnostics
GetDiagnostics implements stdlogical.HasDiagnostics.
type OffsetSeeker ¶
type OffsetSeeker interface { // GetOffsets finds the most recent offsets for resolved timestamp messages // that are before the given time, and in the given topics. GetOffsets([]string, hlc.Time) ([]*partitionState, error) // Close shuts down the connection with the Kafka broker. Close() error }
OffsetSeeker finds offsets within Kafka topics.
func NewOffsetSeeker ¶
func NewOffsetSeeker(config *Config) (OffsetSeeker, error)
NewOffsetSeeker instantiates an offsetManager.