kafka_go

package module
v1.0.29 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 11, 2023 License: AGPL-3.0 Imports: 15 Imported by: 0

README

kafka golang

基于 sarama 封装, 目的在于使用简洁及优化 运行模式为 消费组的方式

支持功能

  1. 客户端连接池,可复用连接
  2. 通过build可构建多个消息主题发送及主题消费
  3. 默认使用消费者组模式

使用

1. 消费者

通过build 可同时构建多个需要的业务消费,指定不同的参数传入

  • demo

var (
 addr    = "192.168.186.130:9092,192.168.186.201:9092,192.168.186.202:9092"
 topic   = "web_log"
 groupId = "testGroupId"
)

//使用阻塞模式
var consumerFactory = kafka_go.NewFactoryConsumer()
//使用非阻塞模式
// var consumerFactory = kafka_go.NewFactoryConsumerBackGround()

//消费构建
demoConsumerBuilder := kafka_go.NewBuildConsumer(addr, groupId, topic)
//设置kafka版本
demoConsumerBuilder.SetKafkaVersion("3.0.0")
//注册消费回调监听
demoConsumerBuilder.SetResponseListener(func(context *kafka_go.ConsumerMessageContext) {
	fmt.Printf("%+v\n", context.GetMessageString())
})

//注册消费构建(多个以参数区分)
consumerFactory.RegisterConsumer(demoConsumerBuilder)

//运行消费者
consumerFactory.Run()
  • build 接口说明 BuildConsumer

使用方式

//消费构建
demoConsumerBuilder := kafka_go.NewBuildConsumer(addr, groupId, topic)
//设置kafka版本
demoConsumerBuilder.SetKafkaVersion("3.0.0")

接口使用前注意事项

  • 消费者可创建为一消费者多协程分区消费和一消费者一协程消费
  • 一消费者多协程分区消费不保存消息可靠性,重启后存在丢失消息的情况,但吞吐量较高,且接收消息后无法调用消息确认
  • 一消费一协程保证消息可靠,但吞吐量较底
  • 此组件针对消费组进行处理,需传入消费入groupdId

案例设置:(主要针对BuildConsumer设置)

//一消费多协程

//消费构建
demo2ConsumerBuilder := kafka_go.NewBuildConsumer(Conf.Addr, Conf.GroupId2, Conf.Topic2)
demo2ConsumerBuilder.SetDebug(true)
demo2ConsumerBuilder.SetMultiplePartition(true) //开启多协程消费 默认为false
demo2ConsumerBuilder.SetIsAutoCommit(false)//开启多协程消费,此配置失效
demo2ConsumerBuilder.SetKafkaVersion("3.0.0")
demo2ConsumerBuilder.SetResponseListener(func(context *kafka_go.ConsumerMessageContext) {
		nowTime := time.Now().In(cstZone).Format(UTFALL_SECOND)
		fmt.Printf("%s xx received2 [partition:%d, topic:%s, content:%s]\n", nowTime, context.GetPartition(), context.GetTopic(), context.GetMessageString())
		context.GetSession().Ack()
})
//一消费一协程
demoConsumerBuilder := kafka_go.NewBuildConsumer(Conf.Addr, Conf.GroupId3, Conf.Topic3)
demoConsumerBuilder.SetDebug(true)
demoConsumerBuilder.SetIsAutoCommit(false)// 开启手动提交
demoConsumerBuilder.SetKafkaVersion("3.0.0")
demoConsumerBuilder.SetResponseListener(func(context *kafka_go.ConsumerMessageContext) {
	nowTime := time.Now().In(cstZone).Format(UTFALL_SECOND)
	fmt.Printf("%s xx received [partition:%d, topic:%s, content:%s]\n", nowTime, context.GetPartition(), context.GetTopic(), context.GetMessageString())
	//fmt.Printf("%+v\n", context.GetMessageString())
	context.GetSession().Ack()
})
接口名称 描述
SetKafkaVersion(kafkaVersion string) 设置kakfa版本号 底层会自动处理新版本接口兼容
SetIsAutoCommit(autoCommit bool) 是否自动提交 默认为true, 当为false是需执行手动提交
通过 回调上下文session处理 ConsumerMessageContext.getSession().Ack()进行提交
SetMultiplePartition(isMultiplePartition bool) BuildConsumerApi 是否多分区指定消费(一个消费者跟根据分区数创建子协程),此模式存在重启后消息丢失
SetBalanceType(balanceType BalanceType) BuildConsumerApi 设置分区策略类型 CONSUMER
BALANCE_STRATEGY_ROUNDROBIN:轮询(默认)
CONSUMER_BALANCE_STRATEGY_STICKY:亲和性
CONSUMER_BALANCE_STRATEGY_RANGE: 随机
SetResponseListener(responseResult ConsumerResponseListener) 拉取消息事件监听
GetAddr() string 获取broker地址
GetGroupId() string 获取消费组
GetTopic() string 获取主题
GetBalanceType() BalanceType 获取分区策略方式
ToString() string 返回构建数据字符串
  • build 拉取消息事件监听回调 说明 ConsumerResponseListener

type ConsumerResponseListener func(context *ConsumerMessageContext)

ConsumerMessageContext : 事件消息上下文

接口名称 说明
GetBuilder() BuildConsumerApi 返回当前执行回调中的build信息
GetGroupId() string 返回当前组
GetTopic() string 返回当前主题
GetPartition() int32 返回当前执行数据的分区
GetOffset() int64 返回当前数据的偏移数
GetMessage() []byte 获取消息主体 字节数组
GetMessageString() string 返回消息主体 字符串
GetTimeStamp() time.Time 返回消息时间
GetVal() *sarama.ConsumerMessage 返回原生消息内容
GetSession() *ConsumerSession 返回当前执行的session
  • ConsumerSession 接口说明

接口名称 说明
Ack() 手动确认消息
IsAutoAck() bool 是否自动提交
GetSession() sarama.ConsumerGroupSession 原生session获取
GetMessage() *sarama.ConsumerMessage 原生消息获取

1. 生产者

通过build 可同时构建多个需要的业务生产者,指定不同的参数传入,默认生使用的是同步提交
底层维护一套连接池,根据设置最大连接数进行设置

  • demo

var (
    productFactory = kafka_go.NewFactoryProduct()
)
demoBuild := kafka_go.NewBuildProduct("demo1", "192.168.186.130:9092,192.168.186.201:9092,192.168.186.202:9092").SetDebug(true).SetMaxConnection(4)
err := productFactory.Register(demoBuild).Connect()
if err != nil {
    log.Panicln(err)
}
num := 10
var wg = &sync.WaitGroup{}
for i := 0; i < num; i++ {
	wg.Add(1)
	go func() {
		defer wg.Done()
		partition, offset, err := productFactory.Push("demo1", Conf.Topic3, "生产者消息")
		if err != nil {
			fmt.Println(err)
		} else {
			fmt.Printf("发送成功  partition:%d, offset:%d", partition, offset)
		}
	}()
		//time.Sleep(time.Second)
}
wg.Wait()

  • build 接口说明 BuildProduct

使用方式

接口 说明
SetMaxConnection(maxConnection int32) BuildProductApi 设置连接池数量
SetAckType(ackType ProductAckType) BuildProductApi 设置消息确认方式
SetTransactional(isTransactional bool) BuildProductApi 是否开启事务提交
GetName() string 获取当前build name
GetAddr() string 获取当前服务地址
GetAddrSlice() []string 获取当前服务地址切片
GetMaxConnection() int32 获取当前大连接数
GetConnStrategy() ProductBalanceType 获取连接池加载类型, 默认为轮询
  • ProductAckType 消息确认类型说明

确认类型 说明
PRODUCT_ACK_TYPE_NONAL 不等待 broker 的 ack,这一操作提供了一个最低的延迟
PRODUCT_ACK_TYPE_FOLLOWER 等待 broker 的 ack,partition 的 leader 落盘成功后返回 ack
PRODUCT_ACK_TYPE_ALL 等待 broker 的 ack,partition 的 leader 和 follower (ISRL里的follower,不是全部的follower)全部落盘成功后才 返回 ack。但是如果在 follower 同步完成后,broker 发送 ack 之前,leader 发生故障,那么会造成数据重复, 开启事务后可避免数据重复

Documentation

Index

Constants

View Source
const (

	//默认最大连接数
	DEFAULT_MAX_CONNECTION int32 = 5

	//连接轮询
	PRODUCT_CONN_STRATEGY_BALANCE_ROUNDROBIN ProductBalanceType = 0

	//ack确认类型- 不等待 broker 的 ack,这一操作提供了一个最低的延迟,broker 一接收到还 没有写入磁盘就已经返回,当 broker 故障时有可能丢失数据
	PRODUCT_ACK_TYPE_NORNAL ProductAckType = 0

	//等待 broker 的 ack,partition 的 leader 落盘成功后返回 ack,如果在 follower 同步成功之前 leader 故障,那么将会丢失数据
	PRODUCT_ACK_TYPE_FOLLOWER ProductAckType = 1

	//等待 broker 的 ack,partition 的 leader 和 follower (ISRL里的follower,不是全部的follower)全部落盘成功后才 返回 ack。但是如果在 follower 同步完成后,broker 发送 ack 之前,leader 发生故障,那么会造成数据重复
	//开启事务后可避免数据重复
	PRODUCT_ACK_TYPE_ALL ProductAckType = -1
)

Variables

View Source
var (
	//--消费者错误---
	//构建器不能为空
	KAFKA_CONSUMER_ERROR_BUILD_EMPTY = errors.New("consumer build len can not be empty")

	//生产者错误
	//构建器不能为空
	KAFKA_PRODUCT_ERROR_BUILD_EMPTY = errors.New("product build len can not be empty")
	//构建器名称不能为空
	KAFKA_PRODUCT_ERROR_BUILD_NAME_EMPTY = errors.New("product build name can not be empty")
	//连接不存在
	KAFKA_PRODUCT_ERROR_BUILD_CLIENT_EMPTY = errors.New("product client empty")
	//客户端连接错误
	KAFKA_PRODUCT_ERROR_BUILD_CLIENT_NIL = errors.New("product client nil")
)

Functions

func NewFactoryConsumer

func NewFactoryConsumer() *factoryConsumer

func NewFactoryConsumerBackGround

func NewFactoryConsumerBackGround() *factoryConsumer

func NewFactoryProduct

func NewFactoryProduct() *factoryProduct

* 初始化生产者 创建同步生产者

Types

type AckLevel

type AckLevel int

type BalanceType

type BalanceType int

消费策略

const (
	//消费策略-轮询 默认 消费者连接池失败, 暂时只使用轮询
	CONSUMER_BALANCE_STRATEGY_ROUNDROBIN BalanceType = 0
	//消费策略-粘性
	CONSUMER_BALANCE_STRATEGY_STICKY BalanceType = 1
	//消费策略-随机
	CONSUMER_BALANCE_STRATEGY_RANGE BalanceType = 2
)

type BuildConsumer

type BuildConsumer struct {
	// contains filtered or unexported fields
}

消费者构建

func (*BuildConsumer) GetAddr

func (this *BuildConsumer) GetAddr() string

func (*BuildConsumer) GetBalanceType

func (this *BuildConsumer) GetBalanceType() BalanceType

func (*BuildConsumer) GetConfig

func (this *BuildConsumer) GetConfig() *sarama.Config

func (*BuildConsumer) GetGroupId

func (this *BuildConsumer) GetGroupId() string

func (*BuildConsumer) GetKafkaVersion

func (this *BuildConsumer) GetKafkaVersion() string

func (*BuildConsumer) GetResponseListener

func (this *BuildConsumer) GetResponseListener() ConsumerResponseListener

消息返回事件监听

func (*BuildConsumer) GetTopic

func (this *BuildConsumer) GetTopic() string

func (*BuildConsumer) IsAutoCommit

func (this *BuildConsumer) IsAutoCommit() bool

func (*BuildConsumer) IsDebug

func (this *BuildConsumer) IsDebug() bool

func (*BuildConsumer) IsMultiplePartition added in v1.0.21

func (this *BuildConsumer) IsMultiplePartition() bool

func (*BuildConsumer) IsOldOffset

func (this *BuildConsumer) IsOldOffset() bool

func (*BuildConsumer) SetBalanceType added in v1.0.21

func (this *BuildConsumer) SetBalanceType(balanceType BalanceType) BuildConsumerApi

分区策略类型

func (*BuildConsumer) SetConfig

func (this *BuildConsumer) SetConfig(config *sarama.Config) BuildConsumerApi

设置其它配置

func (*BuildConsumer) SetDebug

func (this *BuildConsumer) SetDebug(isDebug bool) BuildConsumerApi

func (*BuildConsumer) SetIsAutoCommit

func (this *BuildConsumer) SetIsAutoCommit(autoCommit bool) BuildConsumerApi

* 是否自动提交

func (*BuildConsumer) SetKafkaVersion

func (this *BuildConsumer) SetKafkaVersion(kafkaVersion string) BuildConsumerApi

设置kafka版本号

func (*BuildConsumer) SetMultiplePartition added in v1.0.21

func (this *BuildConsumer) SetMultiplePartition(isMultiplePartition bool) BuildConsumerApi

设置是否加载多个分区

func (*BuildConsumer) SetResponseListener

func (this *BuildConsumer) SetResponseListener(responseListener ConsumerResponseListener) BuildConsumerApi

设置消息返回事件

func (*BuildConsumer) ToString

func (this *BuildConsumer) ToString() string

type BuildConsumerApi

type BuildConsumerApi interface {
	SetDebug(isDebug bool) BuildConsumerApi
	IsDebug() bool
	//设置kafka版本号
	SetKafkaVersion(kafkaVersion string) BuildConsumerApi
	//设置其它配置
	SetConfig(config *sarama.Config) BuildConsumerApi
	//是否分区策略类型
	SetBalanceType(balanceType BalanceType) BuildConsumerApi
	//是否多个分区加载
	IsMultiplePartition() bool
	/*
		是否多个分区加载
		消费者多个分区不保证消息稳定性,
		但可以保证吞吐量
	*/
	SetMultiplePartition(isMultiplePartition bool) BuildConsumerApi
	//是否自动提交
	SetIsAutoCommit(autoCommit bool) BuildConsumerApi
	//设置消息返回事件
	SetResponseListener(responseResult ConsumerResponseListener) BuildConsumerApi
	//返回消息事件监听
	GetResponseListener() ConsumerResponseListener
	//获取broker地址
	GetAddr() string
	//获取组id
	GetGroupId() string
	//获取主题
	GetTopic() string
	//获取分区策略方式
	GetBalanceType() BalanceType
	//是否启用加载历史消费
	IsOldOffset() bool
	//是否自动提交
	IsAutoCommit() bool
	//获取版本
	GetKafkaVersion() string
	//获取原始配置
	GetConfig() *sarama.Config
	//返回构建数据
	ToString() string
}

* 构建消费者接口

func NewBuildConsumer

func NewBuildConsumer(addr string, groupId string, topic string) BuildConsumerApi

初始化消费者构建器

func NewBuildConsumerStrategy

func NewBuildConsumerStrategy(addr string, groupId string, topic string, balanceType BalanceType) BuildConsumerApi

* 创建消费构建者[默认拉取历史消费]

func NewNewBuildConsumerOffset

func NewNewBuildConsumerOffset(addr string, groupId string, topic string, balanceType BalanceType, oldOffset bool) BuildConsumerApi

* 创建消费构建者[判断是否拉取历史消费]

type BuildProduct

type BuildProduct struct {
	// contains filtered or unexported fields
}

* 生产者

func (*BuildProduct) GetAddr

func (this *BuildProduct) GetAddr() string

func (*BuildProduct) GetAddrSlice

func (this *BuildProduct) GetAddrSlice() []string

func (*BuildProduct) GetConfig

func (this *BuildProduct) GetConfig() *sarama.Config

func (*BuildProduct) GetConnStrategy

func (this *BuildProduct) GetConnStrategy() ProductBalanceType

func (*BuildProduct) GetHashCode

func (this *BuildProduct) GetHashCode() int64

func (*BuildProduct) GetHashCodeString

func (this *BuildProduct) GetHashCodeString() string

func (*BuildProduct) GetMaxConnection

func (this *BuildProduct) GetMaxConnection() int32

func (*BuildProduct) GetName

func (this *BuildProduct) GetName() string

func (*BuildProduct) GetTransactional added in v1.0.22

func (this *BuildProduct) GetTransactional() bool

func (*BuildProduct) SetAckType

func (this *BuildProduct) SetAckType(ackType ProductAckType) BuildProductApi

func (*BuildProduct) SetConfig

func (this *BuildProduct) SetConfig(config *sarama.Config) BuildProductApi

func (*BuildProduct) SetDebug

func (this *BuildProduct) SetDebug(isDebug bool) BuildProductApi

func (*BuildProduct) SetMaxConnection

func (this *BuildProduct) SetMaxConnection(maxConnection int32) BuildProductApi

func (*BuildProduct) SetTransactional

func (this *BuildProduct) SetTransactional(isTransactional bool) BuildProductApi

func (*BuildProduct) ToString

func (this *BuildProduct) ToString() string

type BuildProductApi

type BuildProductApi interface {
	//是否开启调试
	SetDebug(isDebug bool) BuildProductApi
	//是否开启事务提交
	SetTransactional(isTransactional bool) BuildProductApi
	GetTransactional() bool
	//设置确认方式
	SetAckType(ackType ProductAckType) BuildProductApi
	//当前buildName
	GetName() string
	//生产者连接地址
	GetAddr() string
	//获取连接地址数组
	GetAddrSlice() []string
	//设置配置
	SetConfig(config *sarama.Config) BuildProductApi
	//设置连接数
	SetMaxConnection(maxConnection int32) BuildProductApi
	//获取连接策略模式
	GetConnStrategy() ProductBalanceType
	//获取连接数
	GetMaxConnection() int32
	//获取配置
	GetConfig() *sarama.Config
	//获取build hashcode
	GetHashCode() int64
	GetHashCodeString() string
	//返回构建数据
	ToString() string
}

* 生产者构建接口

func NewBuildProduct

func NewBuildProduct(name string, addr string) BuildProductApi

*

type ConsumerMessageContext

type ConsumerMessageContext struct {
	// contains filtered or unexported fields
}

消费消息上下文 groupId string, topic string, partition int32, offset int64, message []byte, timeStamp time.Time, consumerVal *sarama.ConsumerMessage

func (*ConsumerMessageContext) GetBuilder

func (this *ConsumerMessageContext) GetBuilder() BuildConsumerApi

func (*ConsumerMessageContext) GetGroupId

func (this *ConsumerMessageContext) GetGroupId() string

func (*ConsumerMessageContext) GetMessage

func (this *ConsumerMessageContext) GetMessage() []byte

func (*ConsumerMessageContext) GetMessageString

func (this *ConsumerMessageContext) GetMessageString() string

func (*ConsumerMessageContext) GetOffset

func (this *ConsumerMessageContext) GetOffset() int64

func (*ConsumerMessageContext) GetPartition

func (this *ConsumerMessageContext) GetPartition() int32

func (*ConsumerMessageContext) GetSession

func (this *ConsumerMessageContext) GetSession() *ConsumerSession

func (*ConsumerMessageContext) GetTimeStamp

func (this *ConsumerMessageContext) GetTimeStamp() time.Time

func (*ConsumerMessageContext) GetTopic

func (this *ConsumerMessageContext) GetTopic() string

func (*ConsumerMessageContext) GetVal

type ConsumerResponseListener

type ConsumerResponseListener func(context *ConsumerMessageContext)

消费者返回数据监听

type ConsumerSession

type ConsumerSession struct {
	// contains filtered or unexported fields
}

func NewConsumerSession

func NewConsumerSession(session sarama.ConsumerGroupSession, message *sarama.ConsumerMessage, isAutoAck bool) *ConsumerSession

func (*ConsumerSession) Ack

func (this *ConsumerSession) Ack()

func (*ConsumerSession) GetMessage

func (this *ConsumerSession) GetMessage() *sarama.ConsumerMessage

func (*ConsumerSession) GetSession

func (this *ConsumerSession) GetSession() sarama.ConsumerGroupSession

func (*ConsumerSession) IsAutoAck

func (this *ConsumerSession) IsAutoAck() bool

type ProductAckType

type ProductAckType int

type ProductBalanceType

type ProductBalanceType int

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL