registry

command
v3.18.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 13, 2022 License: Apache-2.0 Imports: 16 Imported by: 0

README

Overview

The Registry service exposes Kafka topic and broker metadata via a gRPC & HTTP API. Resources can be queried and filtered using inherent attributes along with custom, user-defined tags.

Some example questions a user might ask by querying the Registry service:

  • Give me all broker IDs where the rack equals us-east-1a
  • Give me the configurations for all topics tagged "environment:dev"

Additionally, Registry is continuously receiving support for write operations. An example operations that's possible using tags and topic creation (see further into the README for the target_broker_tags feature):

  • Tag brokers 1001, 1002, 1003 with "pool:inbound"
  • Create a topic named "inbound" with 32 partitions, a replication factor of 2, and place all partitions on brokers tagged "pool:inbound", and tag the topic "primary:true"

Installation

  • go get github.com/DataDog/kafka-kit/cmd/registry

Binary will be found at $GOPATH/bin/registry

Tested with Kafka 0.10, 2.2-2.7, ZooKeeper 3.4, 3.5

Usage

Flags

Usage of registry:
  -bootstrap-servers string
    	Kafka bootstrap servers [REGISTRY_BOOTSTRAP_SERVERS] (default "localhost")
  -enable-locking
    	Enable distributed locking for write operations [REGISTRY_ENABLE_LOCKING]
  -enable-profiling
    	Enable Datadog continuous profiling [REGISTRY_ENABLE_PROFILING]
  -grpc-listen string
    	Server gRPC listen address [REGISTRY_GRPC_LISTEN] (default "localhost:8090")
  -http-listen string
    	Server HTTP listen address [REGISTRY_HTTP_LISTEN] (default "localhost:8080")
  -kafka-sasl-mechanism string
    	SASL mechanism to use for authentication. Supported: PLAIN, SCRAM-SHA-256, SCRAM-SHA-512 [REGISTRY_KAFKA_SASL_MECHANISM]
  -kafka-sasl-password string
    	SASL password for use with the PLAIN and SASL-SCRAM-* mechanisms [REGISTRY_KAFKA_SASL_PASSWORD]
  -kafka-sasl-username string
    	SASL username for use with the PLAIN and SASL-SCRAM-* mechanisms [REGISTRY_KAFKA_SASL_USERNAME]
  -kafka-security-protocol string
    	Protocol used to communicate with brokers. Supported: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL [REGISTRY_KAFKA_SECURITY_PROTOCOL]
  -kafka-ssl-ca-location string
    	CA certificate path (.pem/.crt) for verifying broker's identity. Needed for SSL and SASL_SSL protocols. [REGISTRY_KAFKA_SSL_CA_LOCATION]
  -kafka-version string
    	Kafka release (Semantic Versioning) [REGISTRY_KAFKA_VERSION] (default "v0.10.2")
  -read-rate-limit int
    	Read request rate limit (reqs/s) [REGISTRY_READ_RATE_LIMIT] (default 5)
  -tag-allowed-staleness int
    	Minutes before tags with no associated resource are deleted [REGISTRY_TAG_ALLOWED_STALENESS] (default 60)
  -tag-cleanup-frequency int
    	Minutes between runs of tag cleanup [REGISTRY_TAG_CLEANUP_FREQUENCY] (default 20)
  -version
    	version [REGISTRY_VERSION]
  -write-rate-limit int
    	Write request rate limit (reqs/s) [REGISTRY_WRITE_RATE_LIMIT] (default 1)
  -zk-addr string
    	ZooKeeper connect string [REGISTRY_ZK_ADDR] (default "localhost:2181")
  -zk-prefix string
    	ZooKeeper prefix (if Kafka is configured with a chroot path prefix) [REGISTRY_ZK_PREFIX]
  -zk-tags-prefix string
    	Tags storage ZooKeeper prefix [REGISTRY_ZK_TAGS_PREFIX] (default "registry")

Setup

Run Registry, point it at your ZooKeeper cluster:

$ registry --zk-addr zk-test-0.service.consul:2181 --zk-prefix kafka --bootstrap-servers kafka1:9092
2019/12/10 21:48:42 Registry running
2019/12/10 21:48:42 Connected to ZooKeeper: zk-test-0.service.consul:2181
2019/12/10 21:48:42 KafkaAdmin connected to bootstrap servers: kafka1:9092
2019/12/10 21:48:42 gRPC up: 0.0.0.0:8090
2019/12/10 21:48:42 HTTP up: 0.0.0.0:8080

For multi-node setups, it's strongly advised to set --enable-locking=true; this backs write/update operations with a ZooKeeper based distributed lock.

API Examples

See the Registry proto definition for further details. The API is designed gRPC-first and provides HTTP using grpc-gateway; the mappings are described in the proto file.

List Topics

Lists topic names.

$ curl -s localhost:8080/v1/topics/list | jq
{
  "names": [
    "__consumer_offsets",
    "test0",
    "test1"
  ]
}

Get Topic

Returns full metadata for topic.

$ curl -s "localhost:8080/v1/topics?name=test0" | jq
{
  "topics": {
    "test0": {
      "name": "test0",
      "partitions": 32,
      "replication": 2
    }
  }
}

Get Topics

Returns full metadata for all topics.

$ curl -s localhost:8080/v1/topics
<...>

List/Get Topics with Tag Filtering

Works with both list and get. Any number of tags can be specified (multiple tags currently become a logical AND).

$ curl -s "localhost:8080/v1/topics/list?tag=replication:3" | jq
{
  "names": [
    "__consumer_offsets",
    "test0",
    "test1",
    "test3",
    "test4"
  ]
}
$ curl -s "localhost:8080/v1/topics/list?tag=replication:2&tag=partitions:32" | jq
{
  "names": [
    "test0"
  ]
}

List Reassigning Topics

Lists topics that are undergoing reassignment.

$ curl -s "localhost:8080/v1/topics/reassigning" | jq
{
  "names": [
    "test3",
  ]
}

List Under Replicated Topics

Lists topics that have under-replicated ISRs.

$ curl -s "localhost:8080/v1/topics/underreplicated" | jq
{
  "names": [
    "test4",
  ]
}

Create a Topic

Topics can be created through the Registry service. Additionally, all partitions can be scoped to specific brokers by tag. This call embeds topicmappr placement constraints logic to ensure safe and optimal partition placement.

$ curl -XPOST localhost:8080/v1/topics/create -d '{
  "topic": {
    "name": "test2",
    "partitions": 6,
    "replication": 2,
    "tags": {
      "team": "eng"
    }
  },
  "target_broker_tags": [
    "pool:test"
  ]
}'

Delete a Topic

Deletes the specified topic. Attempting to delete a non-existent topic will return an error.

$ curl localhost:8080/v1/topics/list
{"names":["__consumer_offsets","test1","test2"]}

$ curl -XDELETE "localhost:8080/v1/topics/test2"
{}

$ curl localhost:8080/v1/topics/list
{"names":["__consumer_offsets","test1"]}

$ curl -XDELETE "localhost:8080/v1/topics/test2"
{"error":"topic does not exist","code":2,"message":"topic does not exist"}

List Brokers

Lists broker IDs.

$ curl -s localhost:8080/v1/brokers/list | jq
{
  "ids": [
    1001,
    1002,
    1003,
    1004,
    1005
  ]
}

Get Broker

Returns full metadata for broker.

$ curl -s "localhost:8080/v1/brokers?id=1001" | jq
{
  "brokers": {
    "1001": {
      "id": 1001,
      "listenersecurityprotocolmap": {
        "PLAINTEXT": "PLAINTEXT"
      },
      "endpoints": [
        "SASL_SSL://733fad76d5be:9093"
      ],
      "rack": "europe-west3-a",
      "jmxport": 9999,
      "host": "10.14.224.198",
      "timestamp": "1548442165222",
      "port": 9092,
      "version": 4
    }
  }
}

Get Brokers

Returns full metadata for all brokers.

$ curl -s localhost:8080/v1/brokers | jq
<...>

List/Get Brokers with Tag Filtering

Works with both list and get. Any number of tags can be specified, filtering is a "match all".

$ curl -s "localhost:8080/v1/brokers?tag=rack:a" | jq
{
  "brokers": {
    "1004": {
      "id": 1004,
      "listenersecurityprotocolmap": {
        "PLAINTEXT": "PLAINTEXT"
      },
      "endpoints": [
        "SASL_SSL://776d5be33fad:9093"
      ],
      "rack": "a",
      "jmxport": 9999,
      "host": "10.0.1.103",
      "timestamp": "1545171891",
      "port": 9092,
      "version": 4
    }
  }
}
$ curl -s "localhost:8080/v1/brokers/list?tag=port:9092&tag=rack:a" | jq
{
  "ids": [
    1004
  ]
}

Get unmapped brokers

Returns brokers that host no partitions. Optionally, exclude topic names can be specified where any partitions belonging to excluded topics are not counted as to whether a broker is considered mapped.

$ curl -s "localhost:8080/v1/brokers/unmapped?exclude=test1&exclude=test2&exclude=__consumer_offsets" | jq
{
  "ids": [
    1001,
    1002,
    1003
  ]
}

Broker<->Topic Mappings

Returns brokers by topic or topics by brokers.

$ curl -s "localhost:8080/v1/mappings/topic/test0" | jq
{
  "ids": [
    1001,
    1002,
    1003
  ]
}
$ curl -s "localhost:8080/v1/mappings/broker/1001" | jq
{
  "names": [
    "__consumer_offsets",
    "test0",
    "test1"
  ]
}

Set Custom Tags

Add user-defined custom tags. Multiple tags can be set at once. These show in Get requests and can be used seamlessly alongside default tags in lookup filtering. Since the tag filtering make no distinction between inherent attribute and custom tags, setting custom tags names that conflict with default tags are rejected and indicated as such through the API.

$ curl -XPUT "localhost:8080/v1/topics/tag/test0?tag=team:eng&tag=use:testing"
{"message":"success"}

View the above tag:

$ curl -s "localhost:8080/v1/topics?name=test0" | jq
{
  "topics": {
    "test0": {
      "tags": {
        "team": "eng",
        "use": "testing"
      },
      "name": "test0",
      "partitions": 32,
      "replication": 2
    }
  }
}

Delete Custom Tags

Custom tags can be deleted, optionally many at once.

$ curl -XDELETE "localhost:8080/v1/topics/tag/test0?tag=team&tag=use"
{"message":"success"}

MirrorMaker2 Offset Translation

Reports upstream and local offsets for MirrorMaker2 replicated topics.

$ curl "localhost:8080/v1/translate-offsets/source/test4 | jq"
{
  "offsets": {
    "source.test4-1:0": {
      "upstream_offset": "3",
      "local_offset": "3"
    },
    "source.test4-2:0": {
      "upstream_offset": "5",
      "local_offset": "5"
    },
    "source.test4-2:1": {
      "upstream_offset": "3",
      "local_offset": "3"
    },
    "source.test4-2:2": {
      "upstream_offset": "7",
      "local_offset": "7"
    }
  }
}

Documentation

The Go Gopher

There is no documentation for this package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL