kafka

package
v0.0.9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 17, 2022 License: MIT Imports: 8 Imported by: 0

README

kafka

说明: kafka 库主要封装了sarama库中的常用方法,来对常用的kafka管理操作和简单的生产消费进行接口模拟,用以实现一些基本的功能。不过侧重点在于整个集群或topic详情信息窥探。

在kafka中,主要分为三种客户端,分别是: 生产者客户端消费者客户端集群管理客户端

kafka client Authentication

注意: 在使用sarama库连接 kafka 时,官方仅支持如下三种方式的认证,因此在使用的时候需要特别注意。

  • 无认证: 也就是直连 broker 即可对集群进行管理,以及对 topic 进行读写操作
  • TLS认证: 基于SSL/TLS证书的认证,通过证书身份识别来真正识别每一个用户和操作相关的信息
  • SASL/PLAIN认证: 官方 kafka 其实在 SASL 支持多种方式,比如SASL/SCRAM, SASL/GSSAPI等,但是 sarama 库中仅实现了 PLAINTEXT 一种认证方式。具体查看Config结构体中的SASL的定义说明

cluster admin api

集群管理客户端对外通过ClusterAdmin接口暴露,主要包含如下常见的方法实现:

  • CreateTopic: Topic创建
  • ListTopics: 列出topic列表
  • DescribeTopics: 查看topic的详情信息
  • DeleteTopic: topic删除
  • CreatePartitions: 创建分区(增加分区时手动指定分区和分区分配策略 要求: version 1.0.0 or higher)
  • AlterPartitionReassignments: 修改分区副本分配策略(用在副本在节点间的均衡 要求: 2.4.0.0 or higher)
  • ListPartitionReassignments: 列出分区副本分配策略(正在进行分配的副本分配信息 要求: 2.4.0.0 or higher)
  • DeleteRecords: 删除偏移量小于给定偏移量的记录(要求: 0.11.0.0 or higher)
  • DescribeConfig: 查看配置文件(获取指定资源的配置实体,参数类型为ConfigResource,对于一些敏感配置信息没有开放 要求: 0.11.0.0 or higher)
  • AlterConfig: 修改配置文件(使用默认参数来更新指定资源配置)
  • CreateACL: 创建ACL访问规则
  • ListAcls: 列出ACL规则
  • DeleteACL: 删除ACL规则
  • ListConsumerGroups: 列出集群消费组
  • DescribeConsumerGroups: 查看指定消费组的详情信息
  • ListConsumerGroupOffsets: 列出指定消费组可用的offset
  • DeleteConsumerGroup: 删除消费组
  • DescribeCluster: 查看集群详情信息(broker节点,controller节点等信息)
  • DescribeLogDirs: 查看topic的各个日志信息(获取指定broker列表里日志目录信息)
  • Close: 关闭admin api,并关闭基础客户端

注意: 因为通常可能企业内部使用的集群版本各不相同,因此对于有兼容版本的问题,高版本api接口未进行封装

已封装功能

  • 创建topic (指定分区和副本数进行创建)
  • 列出集群全部Topic列表
  • 查看集群信息(broker列表,controler节点)
  • 查看topic详情(配置信息,分区信息,ISR等信息)
  • 查看集群的topic日志信息(每个节点上每个topic-part的大小)
  • 列出topic的消费组
  • 查看topic的消费组信息(指定group的topic的消费者情况)
  • 查看指定实体的配置文件(broker,topic)
  • 修改指定资源实体的配置文件

producer client api

sarama 中,对于生产者相关的接口来讲,仅提供了异步的接口,即AsyncProducer接口。

AsyncProducer接口主要实现了以下几个功能:

  • AsyncClose(): 异步客户端关闭(该方法会触发生产者的关闭,只有当错误和成功的channel都关闭时才会关闭,相当于优雅关闭,该方法在消费者侧可以很好的保证消息不会丢失)
  • Close(): 该方法会等待缓冲在buffer里的消息已经被刷新时才关闭
  • Input(): 这是一个ProducerMessage指针类型的写入channel,用来将用户希望的消息写进去
  • Successes(): 这是成功输出的channel(当Return.Successes为true时,用户必须从这里读取来判断是否写入成功,否则生产者将deadlock,建议在一个单独的select语句中发送和读取消息)
  • Errors(): 返回给用户的错误输出channel(当该channel满时,必须先读出来,否则会死锁,当然也可设置Producer.Return.Errors为false来避免返回错误信息)

已实现的封装接口

  • 同步生产(syncProducer): 从字符串直接生产
  • 同步生产(syncProducer): 从标准输入读取字符串进行

consumer client api

注意: 其实对于消费者而言,通常会分为两种消费方式,一种是消费者直接消费,另外一种是使用消费者组来消费一个指定topic的数据,后者不会影响整个topic的数据一致性,相当于一种订阅模式。

sarama中,对于消息消费的两个接口分别为ConsumerConsumerGroup两个接口,但是因为消费组中会涉及到更多的消息控制,整体会比较负责一些,通常在测试消息过程中可以直接使用Consumer接口,而在对生产topic数据进行消费或处理时需要注意使用ConsumerGroup.

Consumer 接口实现的几个主要功能:

  • Topics(): 从集群元数据中返回一组可用的topic列表信息
  • Partitions(topic string): 返回指定topic的排序分区列表,等同于Client.Partitions()
  • ConsumePartition(topic string,part int32,offset int64): 使用指定的topic/part的offset上创建一个PartitionConsumer,offset可以是字面的offset或者OffsetNewestOffsetOldest的内置变量
  • HighWaterMarks(): 返回每个topic和分区当前的高水位(分区间的一致性无法保证,因为高位水是单独标记的)
  • Close(): 关闭消费者客户端

ConsumerGroup 接口实现的几个主要功能:

  • Consume(ctx context.Context,topics []string,handler ConsumerGroupHandler) error: 为指定的topic列表加入一个消费者集群,然后通过ConsumerGroupHandler开启一个阻塞的ConsumerGroupSession,但是需要注意每个会话有一个完整的生命周期
  • Errors(): 返回一个错误类型的读channel,如果想自定义一些错误类型可以设置Consumer.Return.Errors=true 然后读取出来自定义实现
  • Close(): 停止消费组并分离正在运行的会话

consumerGroupSession声明周期

封装已实现接口

  • 消费者组消费,采用gokafka消费组
  • 指定消费者组
  • 指定消费位置earliestlatest(default)

注意事项

sarama库中,默认创建topic的方法(CreateTopic(topic,detail,validate))中是无法指定分区的规划的,也就是无法怼topic的分区进行静态分配,因此,无法做到怼集群进行虚拟集群的管理。但是可以尝试采取先创建topic,然后使用虚拟组进行topic的重排,进而实现整个topic的按照指定broker进行分配。

// 接口竟然不支持
AlterPartitionReassignments(topic string, assignment [][]int32) error
ListPartitionReassignments(topics string, partitions []int32) (topicStatus map[string]map[int32]*PartitionReplicaReassignmentsStatus, err error)


Documentation

Overview

================================================================ *Copyright (C) 2020 BGBiao Ltd. All rights reserved. * *FileName:main.go *Author:BGBiao *Date:2020年05月07日 *Description: * ================================================================

================================================================ *Copyright (C) 2020 BGBiao Ltd. All rights reserved. * *FileName:const.go *Author:BGBiao *Date:2020年05月07日 *Description: * ================================================================

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type AdminApi

type AdminApi struct {
	Admin sarama.ClusterAdmin
}

func NewClusterAdmin

func NewClusterAdmin(brokerList []string) *AdminApi

init a clusteradmin api

func NewClusterAdminWithSASLPlainText added in v0.0.6

func NewClusterAdminWithSASLPlainText(brokerList []string, username, password string) *AdminApi

func (*AdminApi) AddPartitions added in v0.0.5

func (adminApi *AdminApi) AddPartitions(topic string, count int32, assignment [][]int32, validateOnly bool) (bool, error)

add the partitions for a topic

func (*AdminApi) AlterPartitionsReassignments added in v0.0.5

func (adminApi *AdminApi) AlterPartitionsReassignments(topic string, assignment [][]int32) (bool, error)

alter the partitions assignment for a topic notice: AlterPartitionReassignmentsRequest contain a version with {TimeoutMs int32,Version int16(0)} ,maybe occur follow message: kafka server: The version of API is not supported.

func (*AdminApi) Close

func (adminApi *AdminApi) Close()

Close the adminApi

func (*AdminApi) CreateCustomTopic

func (adminApi *AdminApi) CreateCustomTopic(name string, partNum int32, replicaFactor int16, config map[string]string) (bool, error)

Create a custom topic with the partNum,replicaFactor and one config

func (*AdminApi) CreateCustomTopicWithReplicaAssign added in v0.0.4

func (adminApi *AdminApi) CreateCustomTopicWithReplicaAssign(name string, config map[string]string, replicasAssign map[int32][]int32) (bool, error)

Create a custom topic configs , replicasAssignments notic: the topic partitions,replications and replicasAssignments cannot be used at the same time.

func (*AdminApi) CreateTopic

func (adminApi *AdminApi) CreateTopic(name string) (bool, error)

Create a topic with default (partition:3,replicationFactor:3)

func (*AdminApi) DeleteTopic

func (adminApi *AdminApi) DeleteTopic(name string) (bool, error)

Delete a topic

func (*AdminApi) DescribeCluster

func (adminApi *AdminApi) DescribeCluster() ([]*sarama.Broker, int32, error)

Describe cluster return : broker_list,controllerId,error

func (*AdminApi) DescribeConsumerGroup

func (adminApi *AdminApi) DescribeConsumerGroup(groups []string) (consumergroupmembers []*ConsumerGroupMember, err error)

Describe the ConsumerGroups

func (*AdminApi) DescribeConsumerGroups

func (adminApi *AdminApi) DescribeConsumerGroups(groups []string) ([]*sarama.GroupDescription, error)

Describe the given consumer groups

func (*AdminApi) DescribeLogDirs

func (adminApi *AdminApi) DescribeLogDirs(brokers []int32) (map[int32][]sarama.DescribeLogDirsResponseDirMetadata, error)

Describe logdir

func (*AdminApi) DescribeTopics

func (adminApi *AdminApi) DescribeTopics(topiclist []string) ([]TopicInfo, error)

describe topic metainfo

func (*AdminApi) GetBrokerIdList

func (adminApi *AdminApi) GetBrokerIdList() (controllerId int32, brokerIds []int32, brokerInfo []BrokerAddr)

get broker id list 返回controllerid brokerid 列表 brokerid和broker地址对应关系 注意:如果结构体指针方法中使用var定义了变量之后,在函数返回中就不需要写名称了(返回数据中的名称其实就是相当于var name type). 如下方法可以将返回中的名称去掉,把方法中的var注释去掉也可以

func (*AdminApi) GetLogFromBrokers

func (adminApi *AdminApi) GetLogFromBrokers(ids []int32)

Getloginfo from the broker id

func (*AdminApi) GetLogFromTopic

func (adminApi *AdminApi) GetLogFromTopic(topic string) []BrokerLogInfo

Getloginfo from the a topic

func (*AdminApi) GetLogFromTopics

func (adminApi *AdminApi) GetLogFromTopics(topics []string) []TopicsBrokerLogInfo

Getloginfo from topic list

func (*AdminApi) ListConsumerGroup

func (adminApi *AdminApi) ListConsumerGroup() ([]string, error)

List ConsumerGroups 查看存活的在消费的消费者列表

func (*AdminApi) ListConsumerGroupOffSet

func (adminApi *AdminApi) ListConsumerGroupOffSet(group, topic string) ([]TopicPartOffSet, error)

func (*AdminApi) ListConsumerGroupOffSets

func (adminApi *AdminApi) ListConsumerGroupOffSets(group string, topicPartitions map[string][]int32) ([]TopicPartOffSet, error)

获取topic的分区列表

func (*AdminApi) ListConsumerGroupOffsets

func (adminApi *AdminApi) ListConsumerGroupOffsets(group string, topicPart map[string][]int32) (*sarama.OffsetFetchResponse, error)

List the consumer group offset available

func (*AdminApi) ListConsumerGroups

func (adminApi *AdminApi) ListConsumerGroups() (map[string]string, error)

List the consumer group avaliable map[string]string 中key为消费者组,value为消费者状态,consumer表示正在消费

func (*AdminApi) ListTopic

func (adminApi *AdminApi) ListTopic() (map[string]sarama.TopicDetail, error)

ListTopics() (map[string]TopicDetail, error)

func (*AdminApi) ListTopicsInfo

func (adminApi *AdminApi) ListTopicsInfo(topiclist []string) ([]TopicInfo, error)

ListTopicInfo by topic list while the topics is empty,will list all topic

func (*AdminApi) TopicIsExist added in v0.0.4

func (adminApi *AdminApi) TopicIsExist(name string) bool

check the topic is exist. sarama.ErrTopicAlreadyExists

func (*AdminApi) UpdateTopicConfig added in v0.0.4

func (adminApi *AdminApi) UpdateTopicConfig(name string, config map[string]string, validateOnly bool) (bool, error)

alter config AlterConfig(resourceType ConfigResourceType, name string, entries map[string]*string, validateOnly bool) error

https://pkg.go.dev/github.com/Shopify/sarama#ConfigResourceType type ConfigResourceType int8

type Api

type Api struct {
	ConsumerApi sarama.ConsumerGroup
}

func NewConsumerApi

func NewConsumerApi(brokers []string, groupName, consumerOffset string) *Api

init a consumer api

func NewConsumerApiWithSASLPlainText added in v0.0.6

func NewConsumerApiWithSASLPlainText(brokers []string, groupName, consumerOffset, username, password string) *Api

init the kafka consumer api with sasl/plaintext auth

func (*Api) Cleanup

func (c *Api) Cleanup(sarama.ConsumerGroupSession) error

func (*Api) Close

func (c *Api) Close()

close the consumer api

func (*Api) ConsumeClaim

func (c *Api) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error

func (*Api) ConsumerMsgFromTopics

func (c *Api) ConsumerMsgFromTopics(topics []string) func()

consumer topic some info notice: can'not aware the partitions increase.

func (*Api) Setup

consumerGroupHandler https://pkg.go.dev/github.com/Shopify/sarama?tab=doc#Handler Handler是一个包含Setup,Cleanup,ConsumeClaim方法的接口

type BrokerAddr

type BrokerAddr struct {
	BrokerId int32  `json:"brokerId"`
	BrokerIp string `json:"brokerIp"`
}

type BrokerLogInfo

type BrokerLogInfo struct {
	BrokerId int32                `json:"brokerId"`
	BrokerIp string               `json:"brokerIp"`
	LogDatas []LogdirTopicLoginfo `json:"logDatas"`
}

type ConsumerGroupMember

type ConsumerGroupMember struct {
	GroupID    string `json:"groupID"`
	State      string `json:"state"`
	ClientInfo []ConsumerMemberInfo
}

ConsumerGroup and members info

type ConsumerMemberInfo

type ConsumerMemberInfo struct {
	ClientID  string   `json:"clientID"`
	ClientIP  string   `json:"clientIP"`
	TopicList []string `json:"topicList"`
}

type LogDirResponseData

type LogDirResponseData struct {
	sarama.DescribeLogDirsResponseDirMetadata
}

modify the sarama.DescribeLogDirsResponseDirMetadata

func (LogDirResponseData) GetLogSize

func (logdir LogDirResponseData) GetLogSize(topic string) LogdirTopicLoginfo

get the topic-partation size from a logdir logdir.Path https://pkg.go.dev/github.com/Shopify/sarama?tab=doc#DescribeLogDirsResponseDirMetadata

type LogdirTopicLoginfo

type LogdirTopicLoginfo struct {
	Path    string         `json:"path"`
	LogInfo []TopicLogInfo `json:"logInfo"`
}

type ProducerApi

type ProducerApi struct {
	ProducerSyncApi sarama.SyncProducer
}

func NewProducerApi

func NewProducerApi(brokers []string) *ProducerApi

初始化一个生产者api

func NewProducerApiWithSASLPlainText added in v0.0.6

func NewProducerApiWithSASLPlainText(brokers []string, username, password string) *ProducerApi

init a producer client with the saslplaintext

func (*ProducerApi) Close

func (p *ProducerApi) Close()

func (*ProducerApi) PutFromString

func (p *ProducerApi) PutFromString(topic, msg string) bool

发送消息

type TopicInfo

type TopicInfo struct {
	// topic base info
	Name              string             `json:"name"`
	PartitionNum      int32              `json:"partitionNum"`
	Replication       int16              `json:"replication"`
	ReplicaAssignment map[int32][]int32  `json:"replicaAssignment"`
	ConfigEntries     map[string]*string `json:"configEntries"`
	// topic metadata state info
	PartId              int32   `json:"partId"`
	PartLeader          int32   `json:"partLeader"`
	PartReplicas        []int32 `json:"partReplicas"`
	PartIsr             []int32 `json:"partIsr"`
	PartOfflineReplicas []int32 `json:"partOfflineReplicas"`
}

topic info struct (map[string]sarama.TopicDetail)

type TopicLogInfo

type TopicLogInfo struct {
	TopicPart string `json:"topicPart"`
	LogSize   int64  `json:"logSize"`
	OffsetLag int64  `json:"offsetLag"`
}

the topic partition loginfo on someone broker with a logdir.

type TopicPartOffSet

type TopicPartOffSet struct {
	TopicPart string
	OffSet    int64
}

topic-part 的offset信息

type TopicsBrokerLogInfo

type TopicsBrokerLogInfo struct {
	Name    string `json:"name"`
	LogData []BrokerLogInfo
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL