ekafka

package module
v0.4.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 15, 2021 License: MIT Imports: 21 Imported by: 2

README

ekafka 组件使用指南

Table of contents

注意事项

如果生产、消费kafka比较慢,并不一定是kafka慢,而是你的配置不正确,请认真查看文档:https://github.com/segmentio/kafka-go/issues/417

  • producer比较慢,可能是用的默认配置batchTimeout,1s发送

以下列举kafka producer客户端比较关键的默认配置

  • BatchTimeout 批量发送消息的周期,默认1s
  • BatchSize 批量发送的消息数量,默认100条
  • BatchBytes 批量发送的消息大小,默认1MB
  • Async 设置成true时会导致WriteMessages非阻塞,会导致调用WriteMessages方法获取不到error
  • RequiredAcks ACK配置
    • RequireNone (0) fire-and-forget,producer不等待来自broker同步完成的确认后,就可以发送下一批消息
    • RequireOne (1) producer在leader已成功收到的数据并得到确认后,才发送下一批消息
    • RequireAll (-1) producer在所有follower副本确认接收到数据后,才发送下一批消息

更改以上默认配置,可以让速度变快。当然默认配置是可以让你的服务性能损耗耕地,或者可靠性更高,所以你自己需要权衡业务需求,更改配置。

基本组件

kafka-go 进行了轻量封装,并提供了以下功能:

  • 规范了标准配置格式,提供了统一的 Load().Build() 方法。
  • 支持自定义拦截器
  • 提供了默认的 Debug 拦截器,开启 Debug 后可输出 Request、Response 至终端。
快速上手

生产者消费者使用样例可参考 example

ConsumerGroup

相对于 Consumer(对 kafka-go 的封装)来说,ConsumerGroup 则提供了更加易用的 API。这是一个简单的例子:

首先添加所需配置:

[kafka]
brokers = ["127.0.0.1:9092", "127.0.0.1:9093", "127.0.0.1:9094"]

[kafka.client]
timeout = "3s"

[kafka.consumerGroups.cg1]
JoinGroupBackoff = "1s"
groupID = "group-1"
topic = "my-topic"
package main

import "github.com/gotomicro/ego-component/ekafka"

func main() {
	cmp := ekafka.Load("kafka").Build()
	// 获取实例(第一次调用时初始化,再次获取时会复用)
	cg := cmp.ConsumerGroup("cg1")

	for {
		pollCtx, _ := context.WithTimeout(ctx, 1*time.Minute)
		// 拉取事件,可能是消息、Rebalancing 事件或者错误等
		event, err := consumerGroup.Poll(pollCtx)
		if err != nil {
			elog.Panic("poll error")
			return
		}
		switch e := event.(type) {
		case ekafka.Message:
			// 按需处理消息
		case ekafka.AssignedPartitions:
			// 在 Kafka 完成分区分配时触发一次
		case ekafka.RevokedPartitions:
			// 在当前 Generation 结束时触发一次
		case error:
			// 错误处理

			// 结束
			if err := cg.Close(); err != nil {
				elog.Panic("关闭ConsumerGroup失败")
			}
		default:
			// ...
		}
	}
}
配置说明
[kafka.consumerGroups.cg1]
# GroupID is the name of the consumer group.
groupID = "group-1"
# The topic to read messages from.
topic = "my-topic"
# HeartbeatInterval sets the optional frequency at which the reader sends the consumer
# group heartbeat update.
#
# Default: 3s
heartbeatInterval = "3s"
# PartitionWatchInterval indicates how often a reader checks for partition changes.
# If a reader sees a partition change (such as a partition add) it will rebalance the group
# picking up new partitions.
#
# Default: 5s
partitionWatchInterval = "5s"
# WatchForPartitionChanges is used to inform kafka-go that a consumer group should be
# polling the brokers and rebalancing if any partition changes happen to the topic.
watchPartitionChanges = false
# SessionTimeout optionally sets the length of time that may pass without a heartbeat
# before the coordinator considers the consumer dead and initiates a rebalance.
#
# Default: 30s
sessionTimeout = "30s"
# RebalanceTimeout optionally sets the length of time the coordinator will wait
# for members to join as part of a rebalance.  For kafka servers under higher
# load, it may be useful to set this value higher.
#
# Default: 30s
rebalanceTimeout = "30s"
# JoinGroupBackoff optionally sets the length of time to wait before re-joining
# the consumer group after an error.
#
# Default: 5s
joinGroupBackoff = "5s"
# StartOffset determines from whence the consumer group should begin
# consuming when it finds a partition without a committed offset.  If
# non-zero, it must be set to one of FirstOffset or LastOffset.
#
# Default: `-2` (FirstOffset)
startOffset = "-2"
# RetentionTime optionally sets the length of time the consumer group will
# be saved by the broker.  -1 will disable the setting and leave the
# retention up to the broker's offsets.retention.minutes property.  By
# default, that setting is 1 day for kafka < 2.0 and 7 days for kafka >=
# 2.0.
#
# Default: -1
retentionTime = "-1"
# MinBytes indicates to the broker the minimum batch size that the consumer
# will accept. Setting a high minimum when consuming from a low-volume topic
# may result in delayed delivery when the broker does not have enough data to
# satisfy the defined minimum.
#
# Default: 1
minBytes = 1
# MaxBytes indicates to the broker the maximum batch size that the consumer
# will accept. The broker will truncate a message to satisfy this maximum, so
# choose a value that is high enough for your largest message size.
#
# Default: 1MB
maxBytes = 1048576
# Maximum amount of time to wait for new data to come when fetching batches
# of messages from kafka.
#
# Default: 10s
maxWait = "10s"
# ReadLagInterval sets the frequency at which the reader lag is updated.
# Setting this field to a negative value disables lag reporting.
#
# Default: 60s
readLagInterval = "60s"
# CommitInterval indicates the interval at which offsets are committed to
# the broker.  If 0, commits will be handled synchronously.
#
# Default: 0
commitInterval = "0"
# BackoffDelayMin optionally sets the smallest amount of time the reader will wait before
# polling for new messages
#
# Default: 100ms
readBackoffMin = "100ms"
# BackoffDelayMax optionally sets the maximum amount of time the reader will wait before
# polling for new messages
#
# Default: 1s
readBackoffMax = "1s"

Consumer Server 组件

必须配合 ConsumerGroup 使用。

基本组件通常是搭配其他服务模块(如 HTTP 服务)一起使用的,如果只想使用 Ego 做单纯的 Kafka 消费应用,可以使用 Consumer Server 组件

Consumer Server 组件依赖于基本组件提供 Kafka 消费者实例,同时实现了 ego.Server 接口以达到常驻运行的目的,并且和 Ego 框架共享生命周期。

Consumer Server 支持两种消费模式:

  • 逐条消费:会在处理消息的回调执行完成之后立即 commit。但实际 commit 时机依赖于 kafka-go 的配置,kafka-go 支持设置定时提交来提高性能
  • 手动消费:Consumer Server 提供一个生命周期的 context 和 consumer 对象,开发者自行决定如何消费
配置说明
[kafka]
debug=true
brokers=["localhost:9094"]

[kafka.client]
timeout="3s"

[kafka.consumers.c1]
topic="sre-infra-test"
groupID="group-1"
# 指定 Consumer Group 未曾提交过 offset 时从何处开始消费
startOffset = -1
# 默认为同步提交,可以配置自动批量提交间隔来提高性能
commitInterval = "1s"

[kafka.consumerGroups.cg1]
topic="sre-infra-test"
groupID="group-1"
# 指定 Consumer Group 未曾提交过 offset 时从何处开始消费
startOffset = -1
# 默认为同步提交,可以配置自动批量提交间隔来提高性能
commitInterval = "1s"

[kafkaConsumerServers.s1]
debug=true
# 使用 ekafka 中注册的哪一个 Consumer,对应 `kafka.consumers.[name]` 配置项
consumerName="c1"
# 也可以配合 ConsumerGroup 使用
consumerGroupName="cg1"

StartOffset

Ref

配置值 说明
-1 LastOffset 从最新位置
-2 FirstOffset 从最旧位置 (default)
示例

逐条消费:

package main

import (
	"github.com/gotomicro/ego"
	"github.com/gotomicro/ego-component/ekafka"
	"github.com/gotomicro/ego-component/ekafka/consumerserver"
	"github.com/gotomicro/ego/core/elog"
	"github.com/gotomicro/ego/server/egovernor"
	"github.com/segmentio/kafka-go"
)

func main() {
	app := ego.New().Serve(
		// 可以搭配其他服务模块一起使用
		egovernor.Load("server.governor").Build(),

		// 初始化 Consumer Server
		func() *consumerserver.Component {
			// 依赖 `ekafka` 管理 Kafka consumer
			ec := ekafka.Load("kafka").Build()
			cs := consumerserver.Load("kafkaConsumerServers.s1").Build(
				consumerserver.WithEkafka(ec),
			)

			// 用来接收、处理 `kafka-go` 和处理消息的回调产生的错误
			consumptionErrors := make(chan error)

			// 注册处理消息的回调函数
			cs.OnEachMessage(consumptionErrors, func(ctx context.Context, message kafka.Message) error {
				elog.Infof("got a message: %s\n", string(message.Value))
				// 如果返回错误则会被转发给 `consumptionErrors`,默认出现任何错误都会导致消费终止、
				// ConsumerGroup 退出;但可以将错误标记为 Retryable 以实现重试,ConsumerGroup 最多重试 3 次
				// 如:
				// return fmt.Errorf("%w 写入数据库时发生错误", consumerserver.ErrRecoverableError)

				return nil
			})

			return cs
		}(),
		// 还可以启动多个 ConsumerServer
	)
	if err := app.Run(); err != nil {
		elog.Panic("startup", elog.Any("err", err))
	}
}

获取 Consumer 实例后手动消费:

package main

import (
	"github.com/gotomicro/ego"
	"github.com/gotomicro/ego-component/ekafka"
	"github.com/gotomicro/ego-component/ekafka/consumerserver"
	"github.com/gotomicro/ego/core/elog"
)

func main() {
	app := ego.New().Serve(
		// 可以搭配其他服务模块一起使用
		egovernor.Load("server.governor").Build(),

		// 初始化 ConsumerServer
		func() *consumerserver.Component {
			// 依赖 `ekafka` 管理 Kafka Consumer
			ec := ekafka.Load("kafka").Build()
			cs := consumerserver.Load("kafkaConsumerServers.s1").Build(
				consumerserver.WithEkafka(ec),
			)

			// 注册处理消息的回调函数
			cs.OnStart(func(ctx context.Context, consumer *ekafka.Consumer) error {
				// 编写自己的消费逻辑...

				return nil
			})

			return cs
		}(),
		// 还可以启动多个 ConsumerServer
	)
	if err := app.Run(); err != nil {
		elog.Panic("startup", elog.Any("err", err))
	}
}

获取 ConsumerGroup 实例后手动消费:

package main

import (
	"github.com/gotomicro/ego"
	"github.com/gotomicro/ego-component/ekafka"
	"github.com/gotomicro/ego-component/ekafka/consumerserver"
	"github.com/gotomicro/ego/core/elog"
)

func main() {
	app := ego.New().Serve(
		// 可以搭配其他服务模块一起使用
		egovernor.Load("server.governor").Build(),

		// 初始化 ConsumerServer
		func() *consumerserver.Component {
			// 依赖 `ekafka` 管理 Kafka ConsumerGroup
			ec := ekafka.Load("kafka").Build()
			cs := consumerserver.Load("kafkaConsumerServers.s1").Build(
				consumerserver.WithEkafka(ec),
			)

			// 注册处理消息的回调函数
			cs.OnConsumerGroupStart(func(ctx context.Context, consumerGroup *ekafka.ConsumerGroup) error {
				// 编写自己的消费逻辑...

				return nil
			})

			return cs
		}(),
		// 还可以启动多个 ConsumerServer
	)
	if err := app.Run(); err != nil {
		elog.Panic("startup", elog.Any("err", err))
	}
}

Producer

以下是简单使用 ekafka.Producer 的例子。

所需配置

[kafka]
	brokers=["localhost:9091","localhost:9092","localhost:9093"]
	[kafka.client]
		timeout="3s"
	[kafka.producers.p1]        # 定义了名字为 p1 的 producer
		topic="test_topic"        # 指定生产消息的 topic
package main

import (
	"context"
	"fmt"
	"log"
	"strings"

	"github.com/BurntSushi/toml"
	"github.com/gotomicro/ego-component/ekafka"
	"github.com/gotomicro/ego/core/econf"
)

// produce 生产消息
func main() {
	// 假设你配置的toml如下所示
	conf := `
[kafka]
	debug=true
	brokers=["localhost:9091","localhost:9092","localhost:9093"]
	[kafka.client]
		timeout="3s"
	[kafka.producers.p1]        # 定义了名字为 p1 的 producer
		topic="sre-infra-test"  # 指定生产消息的 topic
`
	// 加载配置文件
	err := econf.LoadFromReader(strings.NewReader(conf), toml.Unmarshal)
	if err != nil {
		panic("LoadFromReader fail," + err.Error())
	}

	// 初始化 ekafka 组件
	cmp := ekafka.Load("kafka").Build()

	// 使用 p1 生产者生产消息
	producerClient := cmp.Producer("p1")

	// 生产3条消息
	err = producerClient.WriteMessages(
		context.Background(),
		ekafka.Message{Value: []byte("Hello World!")},
		ekafka.Message{Value: []byte("One!")},
		ekafka.Message{Value: []byte("Two!")},
	)

	if err != nil {
		log.Fatal("failed to write messages:", err)
	}

	if err := producerClient.Close(); err != nil {
		log.Fatal("failed to close writer:", err)
	}

	fmt.Println(`produce message success --------------->`)
}

测试

E2E 测试

运行 E2E 测试需要准备 Kafka 环境,推荐 3 个 broker、每 topic 3 个 partition,否则有些测试会报错。

首先将 test/e2e/config/example.toml 复制为 test/e2e/config/e2e.toml 并按实际情况修改,该文件即是运行 E2E 测试的配置文件。

ekafka/ 目录下执行命令运行测试:

$ make test-e2e

Documentation

Index

Constants

View Source
const PackageName = "component.ekafka"

Variables

This section is empty.

Functions

func DefaultConfig

func DefaultConfig() *config

DefaultConfig 返回默认配置

Types

type AssignedPartitions added in v0.2.0

type AssignedPartitions struct {
	Partitions []TopicPartition
}

type Balancer

type Balancer = kafka.Balancer

type Client

type Client struct {
	// contains filtered or unexported fields
}

func (*Client) CreateTopics

func (wc *Client) CreateTopics(ctx context.Context, req *kafka.CreateTopicsRequest) (res *kafka.CreateTopicsResponse, err error)

func (*Client) DeleteTopics

func (wc *Client) DeleteTopics(ctx context.Context, req *kafka.DeleteTopicsRequest) (res *kafka.DeleteTopicsResponse, err error)

func (*Client) ListOffsets

func (wc *Client) ListOffsets(ctx context.Context, req *kafka.ListOffsetsRequest) (res *kafka.ListOffsetsResponse, err error)

func (*Client) Metadata

func (wc *Client) Metadata(ctx context.Context, req *kafka.MetadataRequest) (res *kafka.MetadataResponse, err error)

func (*Client) OffsetFetch

func (wc *Client) OffsetFetch(ctx context.Context, req *kafka.OffsetFetchRequest) (res *kafka.OffsetFetchResponse, err error)

type ClientInterceptor added in v0.3.0

type ClientInterceptor func(oldProcessFn clientProcessFn) (newProcessFn clientProcessFn)

func InterceptorClientChain added in v0.3.0

func InterceptorClientChain(interceptors ...ClientInterceptor) ClientInterceptor

type Component

type Component struct {
	// contains filtered or unexported fields
}

Component kafka 组件,包含Client、Producers、Consumers

func (*Component) Client

func (cmp *Component) Client() *Client

Client 返回kafka Client

func (*Component) Consumer

func (cmp *Component) Consumer(name string) *Consumer

Consumer 返回指定名称的kafka Consumer

func (*Component) ConsumerGroup added in v0.2.0

func (cmp *Component) ConsumerGroup(name string) *ConsumerGroup

ConsumerGroup 返回指定名称的 ConsumerGroup

func (*Component) GetCompName added in v0.4.0

func (cmp *Component) GetCompName() string

func (*Component) Producer

func (cmp *Component) Producer(name string) *Producer

Producer 返回指定名称的kafka Producer

type Consumer

type Consumer struct {
	Config  consumerConfig
	Brokers []string `json:"brokers" toml:"brokers"`
	// contains filtered or unexported fields
}

Consumer 消费者/消费者组,

func (*Consumer) Close

func (r *Consumer) Close() error

func (*Consumer) CommitMessages

func (r *Consumer) CommitMessages(ctx context.Context, msgs ...*Message) (err error)

func (*Consumer) FetchMessage

func (r *Consumer) FetchMessage(ctx context.Context) (msg Message, ctxOutput context.Context, err error)

func (*Consumer) Lag

func (r *Consumer) Lag() int64

func (*Consumer) Offset

func (r *Consumer) Offset() int64

func (*Consumer) ReadLag

func (r *Consumer) ReadLag(ctx context.Context) (lag int64, err error)

func (*Consumer) ReadMessage

func (r *Consumer) ReadMessage(ctx context.Context) (msg Message, ctxOutput context.Context, err error)

func (*Consumer) SetOffset

func (r *Consumer) SetOffset(offset int64) (err error)

func (*Consumer) SetOffsetAt

func (r *Consumer) SetOffsetAt(ctx context.Context, t time.Time) (err error)

type ConsumerGroup added in v0.2.0

type ConsumerGroup struct {
	// contains filtered or unexported fields
}

func NewConsumerGroup added in v0.2.0

func NewConsumerGroup(options ConsumerGroupOptions) (*ConsumerGroup, error)

func (*ConsumerGroup) Close added in v0.2.0

func (cg *ConsumerGroup) Close() error

func (*ConsumerGroup) CommitMessages added in v0.2.0

func (cg *ConsumerGroup) CommitMessages(ctx context.Context, messages ...Message) error

func (*ConsumerGroup) Poll added in v0.2.0

func (cg *ConsumerGroup) Poll(ctx context.Context) (msg interface{}, err error)

type ConsumerGroupOptions added in v0.2.0

type ConsumerGroupOptions struct {
	Logger                 *elog.Component
	Brokers                []string
	GroupID                string
	Topic                  string
	HeartbeatInterval      time.Duration
	PartitionWatchInterval time.Duration
	WatchPartitionChanges  bool
	SessionTimeout         time.Duration
	RebalanceTimeout       time.Duration
	JoinGroupBackoff       time.Duration
	StartOffset            int64
	RetentionTime          time.Duration
	Reader                 readerOptions
	// contains filtered or unexported fields
}

type Container

type Container struct {
	// contains filtered or unexported fields
}

func DefaultContainer

func DefaultContainer() *Container

DefaultContainer 返回默认Container

func Load

func Load(key string) *Container

Load 载入配置,初始化Container

func (*Container) Build

func (c *Container) Build(options ...Option) *Component

Build 构建Container

type Message

type Message = kafka.Message

type Messages added in v0.3.0

type Messages []*Message

func (Messages) ToLog added in v0.3.0

func (m Messages) ToLog() []logMessage

func (Messages) ToNoPointer added in v0.3.0

func (m Messages) ToNoPointer() []Message

type Option

type Option func(c *Container)

func WithBrokers

func WithBrokers(brokers ...string) Option

WithBrokers 注入brokers配置

func WithClientInterceptor added in v0.3.0

func WithClientInterceptor(interceptors ...ClientInterceptor) Option

WithClientInterceptor 注入拦截器

func WithDebug

func WithDebug(debug bool) Option

WithDebug 注入Debug配置

func WithRegisterBalancer

func WithRegisterBalancer(balancerName string, balancer Balancer) Option

WithRegisterBalancer 注册名字为<balancerName>的balancer 注册之后可通过在producer配置文件中可通过<balancerName>来指定使用此balancer

func WithServerInterceptor added in v0.3.0

func WithServerInterceptor(interceptors ...ServerInterceptor) Option

WithServerInterceptor 注入拦截器

type Producer

type Producer struct {
	// contains filtered or unexported fields
}

func (*Producer) Close

func (p *Producer) Close() error

func (*Producer) WriteMessages

func (p *Producer) WriteMessages(ctx context.Context, msgs ...*Message) error

type RevokedPartitions added in v0.2.0

type RevokedPartitions struct {
	Partitions []TopicPartition
}

type ServerInterceptor added in v0.3.0

type ServerInterceptor func(oldProcessFn serverProcessFn) (newProcessFn serverProcessFn)

func InterceptorServerChain added in v0.3.0

func InterceptorServerChain(interceptors ...ServerInterceptor) ServerInterceptor

type TopicPartition added in v0.2.0

type TopicPartition struct {
	Topic     string
	Partition int
	Offset    int64
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL