kaf-relay

command module
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 11, 2024 License: MIT Imports: 33 Imported by: 0

README

kaf-relay

kaf-relay is an opinionated, high performance program for keeping Kafka clusters in sync by replicating topics. It is specfically designed for high-availability with background healthchecks, offset tracking, and topic lag checks.

Features

  • Topic Forwarding: Relay consumes messages from topics in one Kafka cluster and forwards them to topics in another Kafka cluster.
  • Authentication: PLAIN, SCRAM-SHA-256, SCRAM-SHA-512
  • Topic Remapping: Relay allows you to specify topic remappings, enabling you to map a topic from the source cluster to a different topic in the destination cluster.
  • Consumer group failover: Given identical Kafka instances (separate nodes 1...N) at the upstream, instantly switch over to the next node in a round-robin fashion on current node failure. Offset tracking between source and target nodes allows de-duplication without external stores.
  • Topic lag failover: Monitors offsets amongst N identical nodes to detect lags and to instantly switch upstream consumer nodes.
  • Stop at end: Flag --stop-at-end allows the program to stop after reaching the end of consumer topic offsets that was picked up on boot.
  • Filter messages using go plugins: Flag --filter allows the program to filter messages based on the logic in plugin code.
kaf-relay in different modes

image

Prerequisites

  • Go installed.
  • Access to the source and destination Kafka clusters.

Installation

git clone https://github.com/zerodha/kaf-relay.git
cd kaf-relay
make dist

Usage

To run Relay, follow these steps:

Create a configuration file named config.toml with the necessary settings. You can use the provided config.example.toml file as a template.

./kaf-relay.bin --config config.toml --mode <single/failover>

Filter plugins

Build your own filter plugins by implementing filter.Provider interface.

Sample

package main

import (
	"encoding/json"
)

type TestFilter struct {
}

type Config struct {
}

func New(b []byte) (interface{}, error) {
	var cfg Config
	if err := json.Unmarshal(b, &cfg); err != nil {
		return nil, err
	}

	return &TestFilter{}, nil
}

func (f *TestFilter) ID() string {
	return "testfilter"
}

func (f *TestFilter) IsAllowed(msg []byte) bool {
	return false
}
  • Copy this plugin code to a directory. mkdir testfilter && cp sample.go testfilter
  • Build the plugin. CGO_ENABLED=1 go build -a -ldflags="-s -w" -buildmode=plugin -o testfilter.filter sample.go
  • Change the config.toml to add the filter provider config.
  • Run kaf-relay with the filter pluing. ./kaf-relay.bin --mode single --stop-at-end --filter ./testfilter/testfilter.filter

Metrics

Replication metrics are exposed through a HTTP server.

$ curl localhost:7081/metrics
kafka_relay_msg_count{source="topicA", destination="machineX_topicA", partition="0"} 44
kafka_relay_msg_count{source="topicA", destination="machineX_topicA", partition="1"} 100
kafka_relay_msg_count{source="topicA", destination="machineX_topicA", partition="2"} 100
kafka_relay_msg_count{source="topicA", destination="machineX_topicA", partition="3"} 44
kafka_relay_msg_count{source="topicA", destination="machineX_topicA", partition="4"} 44
kafka_relay_msg_count{source="topicA", destination="machineX_topicA", partition="5"} 100

Documentation

The Go Gopher

There is no documentation for this package.

Directories

Path Synopsis
Package filter provides interfaces and structures for creating and managing filters.
Package filter provides interfaces and structures for creating and managing filters.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL