Documentation ¶
Overview ¶
================================================================ *Copyright (C) 2020 BGBiao Ltd. All rights reserved. * *FileName:main.go *Author:BGBiao *Date:2020年05月07日 *Description: * ================================================================
================================================================ *Copyright (C) 2020 BGBiao Ltd. All rights reserved. * *FileName:const.go *Author:BGBiao *Date:2020年05月07日 *Description: * ================================================================
Index ¶
- type AdminApi
- func (adminApi *AdminApi) AddPartitions(topic string, count int32, assignment [][]int32, validateOnly bool) (bool, error)
- func (adminApi *AdminApi) AlterPartitionsReassignments(topic string, assignment [][]int32) (bool, error)
- func (adminApi *AdminApi) Close()
- func (adminApi *AdminApi) CreateCustomTopic(name string, partNum int32, replicaFactor int16, config map[string]string) (bool, error)
- func (adminApi *AdminApi) CreateCustomTopicWithReplicaAssign(name string, config map[string]string, replicasAssign map[int32][]int32) (bool, error)
- func (adminApi *AdminApi) CreateTopic(name string) (bool, error)
- func (adminApi *AdminApi) DeleteTopic(name string) (bool, error)
- func (adminApi *AdminApi) DescribeCluster() ([]*sarama.Broker, int32, error)
- func (adminApi *AdminApi) DescribeConsumerGroup(groups []string) (consumergroupmembers []*ConsumerGroupMember, err error)
- func (adminApi *AdminApi) DescribeConsumerGroups(groups []string) ([]*sarama.GroupDescription, error)
- func (adminApi *AdminApi) DescribeLogDirs(brokers []int32) (map[int32][]sarama.DescribeLogDirsResponseDirMetadata, error)
- func (adminApi *AdminApi) DescribeTopics(topiclist []string) ([]TopicInfo, error)
- func (adminApi *AdminApi) GetBrokerIdList() (controllerId int32, brokerIds []int32, brokerInfo []BrokerAddr)
- func (adminApi *AdminApi) GetLogFromBrokers(ids []int32)
- func (adminApi *AdminApi) GetLogFromTopic(topic string) []BrokerLogInfo
- func (adminApi *AdminApi) GetLogFromTopics(topics []string) []TopicsBrokerLogInfo
- func (adminApi *AdminApi) ListConsumerGroup() ([]string, error)
- func (adminApi *AdminApi) ListConsumerGroupOffSet(group, topic string) ([]TopicPartOffSet, error)
- func (adminApi *AdminApi) ListConsumerGroupOffSets(group string, topicPartitions map[string][]int32) ([]TopicPartOffSet, error)
- func (adminApi *AdminApi) ListConsumerGroupOffsets(group string, topicPart map[string][]int32) (*sarama.OffsetFetchResponse, error)
- func (adminApi *AdminApi) ListConsumerGroups() (map[string]string, error)
- func (adminApi *AdminApi) ListTopic() (map[string]sarama.TopicDetail, error)
- func (adminApi *AdminApi) ListTopicsInfo(topiclist []string) ([]TopicInfo, error)
- func (adminApi *AdminApi) TopicIsExist(name string) bool
- func (adminApi *AdminApi) UpdateTopicConfig(name string, config map[string]string, validateOnly bool) (bool, error)
- type Api
- type BrokerAddr
- type BrokerLogInfo
- type ConsumerGroupMember
- type ConsumerMemberInfo
- type LogDirResponseData
- type LogdirTopicLoginfo
- type ProducerApi
- type TopicInfo
- type TopicLogInfo
- type TopicPartOffSet
- type TopicsBrokerLogInfo
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type AdminApi ¶
type AdminApi struct {
Admin sarama.ClusterAdmin
}
func NewClusterAdminWithSASLPlainText ¶ added in v0.0.6
func (*AdminApi) AddPartitions ¶ added in v0.0.5
func (adminApi *AdminApi) AddPartitions(topic string, count int32, assignment [][]int32, validateOnly bool) (bool, error)
add the partitions for a topic
func (*AdminApi) AlterPartitionsReassignments ¶ added in v0.0.5
func (adminApi *AdminApi) AlterPartitionsReassignments(topic string, assignment [][]int32) (bool, error)
alter the partitions assignment for a topic notice: AlterPartitionReassignmentsRequest contain a version with {TimeoutMs int32,Version int16(0)} ,maybe occur follow message: kafka server: The version of API is not supported.
func (*AdminApi) CreateCustomTopic ¶
func (adminApi *AdminApi) CreateCustomTopic(name string, partNum int32, replicaFactor int16, config map[string]string) (bool, error)
Create a custom topic with the partNum,replicaFactor and one config
func (*AdminApi) CreateCustomTopicWithReplicaAssign ¶ added in v0.0.4
func (adminApi *AdminApi) CreateCustomTopicWithReplicaAssign(name string, config map[string]string, replicasAssign map[int32][]int32) (bool, error)
Create a custom topic configs , replicasAssignments notic: the topic partitions,replications and replicasAssignments cannot be used at the same time.
func (*AdminApi) CreateTopic ¶
Create a topic with default (partition:3,replicationFactor:3)
func (*AdminApi) DeleteTopic ¶
Delete a topic
func (*AdminApi) DescribeCluster ¶
Describe cluster return : broker_list,controllerId,error
func (*AdminApi) DescribeConsumerGroup ¶
func (adminApi *AdminApi) DescribeConsumerGroup(groups []string) (consumergroupmembers []*ConsumerGroupMember, err error)
Describe the ConsumerGroups
func (*AdminApi) DescribeConsumerGroups ¶
func (adminApi *AdminApi) DescribeConsumerGroups(groups []string) ([]*sarama.GroupDescription, error)
Describe the given consumer groups
func (*AdminApi) DescribeLogDirs ¶
func (adminApi *AdminApi) DescribeLogDirs(brokers []int32) (map[int32][]sarama.DescribeLogDirsResponseDirMetadata, error)
Describe logdir
func (*AdminApi) DescribeTopics ¶
describe topic metainfo
func (*AdminApi) GetBrokerIdList ¶
func (adminApi *AdminApi) GetBrokerIdList() (controllerId int32, brokerIds []int32, brokerInfo []BrokerAddr)
get broker id list 返回controllerid brokerid 列表 brokerid和broker地址对应关系 注意:如果结构体指针方法中使用var定义了变量之后,在函数返回中就不需要写名称了(返回数据中的名称其实就是相当于var name type). 如下方法可以将返回中的名称去掉,把方法中的var注释去掉也可以
func (*AdminApi) GetLogFromBrokers ¶
Getloginfo from the broker id
func (*AdminApi) GetLogFromTopic ¶
func (adminApi *AdminApi) GetLogFromTopic(topic string) []BrokerLogInfo
Getloginfo from the a topic
func (*AdminApi) GetLogFromTopics ¶
func (adminApi *AdminApi) GetLogFromTopics(topics []string) []TopicsBrokerLogInfo
Getloginfo from topic list
func (*AdminApi) ListConsumerGroup ¶
List ConsumerGroups 查看存活的在消费的消费者列表
func (*AdminApi) ListConsumerGroupOffSet ¶
func (adminApi *AdminApi) ListConsumerGroupOffSet(group, topic string) ([]TopicPartOffSet, error)
func (*AdminApi) ListConsumerGroupOffSets ¶
func (adminApi *AdminApi) ListConsumerGroupOffSets(group string, topicPartitions map[string][]int32) ([]TopicPartOffSet, error)
获取topic的分区列表
func (*AdminApi) ListConsumerGroupOffsets ¶
func (adminApi *AdminApi) ListConsumerGroupOffsets(group string, topicPart map[string][]int32) (*sarama.OffsetFetchResponse, error)
List the consumer group offset available
func (*AdminApi) ListConsumerGroups ¶
List the consumer group avaliable map[string]string 中key为消费者组,value为消费者状态,consumer表示正在消费
func (*AdminApi) ListTopic ¶
func (adminApi *AdminApi) ListTopic() (map[string]sarama.TopicDetail, error)
ListTopics() (map[string]TopicDetail, error)
func (*AdminApi) ListTopicsInfo ¶
ListTopicInfo by topic list while the topics is empty,will list all topic
func (*AdminApi) TopicIsExist ¶ added in v0.0.4
check the topic is exist. sarama.ErrTopicAlreadyExists
func (*AdminApi) UpdateTopicConfig ¶ added in v0.0.4
func (adminApi *AdminApi) UpdateTopicConfig(name string, config map[string]string, validateOnly bool) (bool, error)
alter config AlterConfig(resourceType ConfigResourceType, name string, entries map[string]*string, validateOnly bool) error
https://pkg.go.dev/github.com/Shopify/sarama#ConfigResourceType type ConfigResourceType int8
type Api ¶
type Api struct {
ConsumerApi sarama.ConsumerGroup
}
func NewConsumerApi ¶
init a consumer api
func NewConsumerApiWithSASLPlainText ¶ added in v0.0.6
func NewConsumerApiWithSASLPlainText(brokers []string, groupName, consumerOffset, username, password string) *Api
init the kafka consumer api with sasl/plaintext auth
func (*Api) ConsumeClaim ¶
func (c *Api) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
func (*Api) ConsumerMsgFromTopics ¶
consumer topic some info notice: can'not aware the partitions increase.
func (*Api) Setup ¶
func (c *Api) Setup(sarama.ConsumerGroupSession) error
consumerGroupHandler https://pkg.go.dev/github.com/Shopify/sarama?tab=doc#Handler Handler是一个包含Setup,Cleanup,ConsumeClaim方法的接口
type BrokerAddr ¶
type BrokerLogInfo ¶
type BrokerLogInfo struct { BrokerId int32 `json:"brokerId"` BrokerIp string `json:"brokerIp"` LogDatas []LogdirTopicLoginfo `json:"logDatas"` }
type ConsumerGroupMember ¶
type ConsumerGroupMember struct { GroupID string `json:"groupID"` State string `json:"state"` ClientInfo []ConsumerMemberInfo }
ConsumerGroup and members info
type ConsumerMemberInfo ¶
type LogDirResponseData ¶
type LogDirResponseData struct {
sarama.DescribeLogDirsResponseDirMetadata
}
modify the sarama.DescribeLogDirsResponseDirMetadata
func (LogDirResponseData) GetLogSize ¶
func (logdir LogDirResponseData) GetLogSize(topic string) LogdirTopicLoginfo
get the topic-partation size from a logdir logdir.Path https://pkg.go.dev/github.com/Shopify/sarama?tab=doc#DescribeLogDirsResponseDirMetadata
type LogdirTopicLoginfo ¶
type LogdirTopicLoginfo struct { Path string `json:"path"` LogInfo []TopicLogInfo `json:"logInfo"` }
type ProducerApi ¶
type ProducerApi struct {
ProducerSyncApi sarama.SyncProducer
}
func NewProducerApiWithSASLPlainText ¶ added in v0.0.6
func NewProducerApiWithSASLPlainText(brokers []string, username, password string) *ProducerApi
init a producer client with the saslplaintext
func (*ProducerApi) Close ¶
func (p *ProducerApi) Close()
func (*ProducerApi) PutFromString ¶
func (p *ProducerApi) PutFromString(topic, msg string) bool
发送消息
type TopicInfo ¶
type TopicInfo struct { // topic base info Name string `json:"name"` PartitionNum int32 `json:"partitionNum"` Replication int16 `json:"replication"` ReplicaAssignment map[int32][]int32 `json:"replicaAssignment"` ConfigEntries map[string]*string `json:"configEntries"` // topic metadata state info PartId int32 `json:"partId"` PartLeader int32 `json:"partLeader"` PartReplicas []int32 `json:"partReplicas"` PartIsr []int32 `json:"partIsr"` PartOfflineReplicas []int32 `json:"partOfflineReplicas"` }
topic info struct (map[string]sarama.TopicDetail)
type TopicLogInfo ¶
type TopicLogInfo struct { TopicPart string `json:"topicPart"` LogSize int64 `json:"logSize"` OffsetLag int64 `json:"offsetLag"` }
the topic partition loginfo on someone broker with a logdir.
type TopicPartOffSet ¶
topic-part 的offset信息
type TopicsBrokerLogInfo ¶
type TopicsBrokerLogInfo struct { Name string `json:"name"` LogData []BrokerLogInfo }