go-kafka

module
v1.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 4, 2020 License: Apache-2.0

README

kafka

Build Status Go Report Card LICENSE Godoc

使用CircleCI进程CI Pipeline校验,Go Report Card进行Golang项目代码分析

  1. CircleCI: https://circleci.com/gh/chenguolin/go-kafka/tree/master
  2. Go Report Card: https://goreportcard.com/report/github.com/chenguolin/go-kafka

golang kafka基础库, 目前业界用的最多的Go kafka库是sarama 但是目前sarama存在一些问题,所以go-kafka基于sarama和sarama-cluster封装了更优化、功能更强大的API

  1. sarama存在几个问题
    • 对consumer group支持不是很友好,没有提供优化的api调用
    • consumer group没有及时commit offset,导致程序重启的时候会从最新的消息开始消费,导致数据丢失
    • 不支持rebalance机制
    • sarama-cluster实现了上面提到的几个问题,提供优化的api调用,支持自动commit offset、支持rebalance机制
  2. go-kafka 基于sarama、sarama-cluster封装了一层更通用的API调用,支持以下功能
    • 支持sync_producer和async_producer,默认使用设置Ack=all,保证数据不会丢失
    • producer 每60s会定期refresh一次所有topic的metadata,当topic 扩partition的时候无须重启,可以保证数据能够写入新的partition
    • 支持consumer group, 提供友好的api调用
    • 支持自动commit offset
    • consumer group支持rebalance机制,当topic 扩partition的时候无须重启,支持自动添加新的consumer 实例消费新的partition

代码依赖的 sarama 和 sarama-cluster之所以单独抽出来放在pkg目录下,是因为这边改了相关源码,不适合用vendor做依赖,具体改动的点如下

  1. 修复CI Workflow Check 失败问题,主要是go代码质量检测失败问题
  2. 修改代码,consumer group支持rebalance机制,当topic 扩partition的时候无须重启,支持自动添加新的consumer 实例消费新的partition

kafka简介

  1. kafka的队列模型

    • 每个 topic 可以有多个 partition,每个 partition 才是严格意义上的消息队列(消息先进先出),而不同 partition 之间是互不影响的。 举个例子来说,有消息 A 和消息 B,如果要保证 A 一定要比 B 先被消费,就必须要保证 A 一定要先被投递到某个 partition,且 B 再被投递到同一个 partition。 如果 B 被投递到了不一样 partition,那么 B 是有可能先于 A 被消费的。
    • “一个 topic 的一个 partition” 是保证无重复消息的最小消费单元,换句话说,如果有两个消费程序消费同一个 topic 的同一个 partition,那它们消费的消息事实上是彼此重复的。 所以为保证所有 partition 均被消费,且不会被同一个业务(属于同一个 group)的多个消费程序重复消费,是需要一个分配策略来决定每个消费程序应当消费哪几个或哪一个 partition,又或者应当做作为候补(当消费程序数量大于 partition 数量时发生),而 kafka 是希望用户程序自行实现这个分配策略的。
  2. kafka应用场景举例

    • 消息没有严格有序性要求,任意消息可以被任意消费程序消费,且消费各消息耗时相近 可以将消息投递到任意 partition,partition 任意均等分配到各个消费程序;
    • 所有消息要求严格有序,但消息量不大 可以配置仅一个 partition,一个消费程序负责消费,其他消费程序作为替补;
    • 同组消息要求有序,不同组消息不要求有序,例如同一个用户的消费要求有序,不同用户的消息不要求有序 可以将属于同一组的消息投递到同一个 partition,比如拿 UID 对 partition 数量取模;
    • 特定组的消息仅可以被特定消费程序消费 可以在将该特定组的消息投递到特定 partition,配置时指定到特定消费程序;
    • 某些消息消费耗时长且要求有序,有些消息消费耗时短且不要求有序 可以将分为两组 partition,一组实行针对有序消息的策略,且多一些 partition、多一些消费程序增大处理能力,另一组实行针对无序消息的的策略,且少一些 partition、少一些消费程序节省资源。

examples

consumer

// Package main kafka consumer example
// Created by chenguoin 2019-04-20
package main

import (
	"encoding/json"
	"fmt"
	"os"
	"os/signal"
	"runtime/debug"
	"sync"
	"syscall"

	"gitlab.local.com/golang/go-kafka"
	"gitlab.local.com/golang/go-log"
)

func main() {
	fmt.Println("Consumer start ...")

	// new consumer
	brokers := "localhost:9092,localhost:9092"
	topic := "k8s-log-test-output-stdout"
	groupID := "consumer_example"
	defaultOffset := kafka.OffsetNewset

	// new consumer
	consumer, err := kafka.NewConsumer(brokers, topic, groupID, defaultOffset)
	if err != nil {
		fmt.Println("kafka.NewConsumer error: ", err.Error())
		os.Exit(1)
	}
	defer consumer.Close()

	// goroutine receive message
	wg := &sync.WaitGroup{}
	wg.Add(1)
	stopChan := make(chan struct{})
	go consume(consumer, stopChan, wg)

	// wait signal
	sigchan := make(chan os.Signal, 1)
	signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)

	// blocking unit receive signal
	<-sigchan
	close(stopChan)
	wg.Wait()

	fmt.Println("Consumer stop successfuly ~")
}

func consume(consumer *kafka.Consumer, stopChan chan struct{}, wg *sync.WaitGroup) {
	defer func() {
		if err := recover(); err != nil {
			golog.Error("consume handle panic",
				golog.Object("error", err))
			debug.PrintStack()
		}
		// goroutine done
		wg.Done()
	}()

	// get message, error channel
	msgChan := consumer.Messages()
	errChan := consumer.Errors()

	for {
		select {
		case msg := <-msgChan:
			bytes, _ := json.Marshal(msg)
			fmt.Println(string(bytes))
			// commit offset 2 zk
			consumer.CommitOffset(msg)
		case err := <-errChan:
			fmt.Println("receive error: ", err.Error())
		case <-stopChan:
			fmt.Println("closing consume ...")
			return
		}
	}
}

sync producer

// Package main kafka sync producer example
// Created by chenguoin 2019-04-20
package main

import (
	"fmt"
	"math/rand"
	"os"
	"os/signal"
	"runtime/debug"
	"syscall"
	"time"

	"gitlab.local.com/golang/go-kafka"
	"gitlab.local.com/golang/go-log"
)

func main() {
	fmt.Println("Producer start ...")
	// new sync producer
	brokers := "localhost:9092,localhost:9092"

	producer, err := kafka.NewSyncProducer(brokers)
	if err != nil {
		fmt.Println("kafka.NewSyncProducer error: ", err.Error())
		os.Exit(1)
	}

	// sync produce message
	go syncProduce(producer)

	// wait signal
	sigchan := make(chan os.Signal, 1)
	signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)
	// Await a sigterm signal before safely closing the consumer
	<-sigchan

	fmt.Println("Producer stop successfuly ~")
}

func syncProduce(producer *kafka.SyncProducer) {
	defer func() {
		if err := recover(); err != nil {
			golog.Error("syncProduce handle panic",
				golog.Object("error", err))
			debug.PrintStack()
		}
	}()

	topic := "k8s-log-test-output-stdout"
	for {
		// rand key
		randKey := make([]byte, 16)
		for i := 0; i < 16; i++ {
			randKey[i] = byte(rand.Intn(26) + 65)
		}

		// rand value
		randValue := make([]byte, 64)
		for i := 0; i < 64; i++ {
			randValue[i] = byte(rand.Intn(26) + 65)
		}

		partition, offset, err := producer.Send(topic, string(randKey), string(randValue))
		if err != nil {
			golog.Error("producer.Send error", golog.Object("error", err))
			continue
		}

		golog.Info("producer.Send success", golog.Int32("partition", partition),
			golog.Int64("offset", offset))
		time.Sleep(time.Second)
	}
}

async producer

// Package main kafka async producer example
// Created by chenguoin 2019-04-20
package main

import (
	"fmt"
	"math/rand"
	"os"
	"os/signal"
	"runtime/debug"
	"syscall"
	"time"

	"gitlab.local.com/golang/go-kafka"
	"gitlab.local.com/golang/go-log"
)

func main() {
	fmt.Println("Producer start ...")
	// new async producer
	brokers := "localhost:9092,localhost:9092"

	producer, err := kafka.NewAsyncProducer(brokers)
	if err != nil {
		fmt.Println("kafka.NewAsyncProducer error: ", err.Error())
		os.Exit(1)
	}

	// async produce message
	go asyncProduce(producer)

	// wait signal
	sigchan := make(chan os.Signal, 1)
	signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)

	// Await a sigterm signal before safely closing the producer
	<-sigchan

	fmt.Println("Producer stop successfuly ~")
}

func asyncProduce(producer *kafka.AsyncProducer) {
	defer func() {
		if err := recover(); err != nil {
			golog.Error("asyncProduce handle panic",
				golog.Object("error", err))
			debug.PrintStack()
		}
	}()

	topic := "k8s-log-test-output-stdout"
	for {
		// rand key
		randKey := make([]byte, 16)
		for i := 0; i < 16; i++ {
			randKey[i] = byte(rand.Intn(26) + 65)
		}

		// rand value
		randValue := make([]byte, 64)
		for i := 0; i < 64; i++ {
			randValue[i] = byte(rand.Intn(26) + 65)
		}

		producer.Send(topic, string(randKey), string(randValue))

		time.Sleep(time.Second)
	}
}

kafkactl

kafkactl is a console util tool to access kafka cluster.

Usage:
  kafakctl <command> ...

Available Commands:
  help		help about any command.
  list		list topics, consumer groups.
  describe	describe topic, consumer group.
  create	create topic, partition.
  delete	delete topic, messages.
  consume	consume from kafka topic.
  producer	write message 2 kafka topic.


Use "kafkactl <command> --help" for more information about a given command.
kafakctl list
Usage:
  kafakctl list <sub-command> ...

Available Sub Commands:
  topics		list all topics.
  consumer-groups	list all consumer groups.

Options:
  -brokers		kafka brokers address.

Examples:
  # List all topics.
  kafkactl list topics -brokers 127.0.0.1:9092,127.0.0.2:9092.

  # List all consumer group.
  kafkactl list consumer-groups -brokers 127.0.0.1:9092,127.0.0.2:9092.

Use "kafkactl list --help" for more information about a given command.
kafkactl describe
Usage:
  kafakctl describe <sub-command> ...

Available Sub Commands:
  topic		describe a topic.
  consumer-group	describe a consumer group.

Options:
  -topic		topic name.
  -group		consumer group name.
  -brokers		kafka brokers address.

Examples:
  # Describe a topic.
  kafkactl describe topic -topic kafka_topic_test -brokers 127.0.0.1:9092,127.0.0.2:9092.

  # Describe a consumer group.
  kafkactl describe consumer-group -group consumer_example -brokers 127.0.0.1:9092,127.0.0.2:9092.

Use "kafkactl describe --help" for more information about a given command.
kafkactl create
Usage:
  kafakctl create <sub-command> ...

Available Sub Commands:
  topic		create new topic.
  partition	cincrease topic partition.

Options:
  -topic		topic name.
  -partition		topic partition count, default 1 partition.
  -replica		topic replica count, default 1 replica.
  -retentionTime        topic message retention time (second), default 86400 second.
  -totalPartition	topic total partition count.
  -brokers		kafka brokers address.

Examples:
  # Create new topic, default 1 partition and 1 replica.
  kafkactl create topic -topic kafka_topic_test -brokers 127.0.0.1:9092,127.0.0.2:9092.

  # Create new topic, 3 partition and default 1 replica.
  kafkactl create topic -topic kafka_topic_test -partition 3 -brokers 127.0.0.1:9092,127.0.0.2:9092.

  # Create new topic, 3 partition and 2 replica.
  kafkactl create topic -topic kafka_topic_test -partition 3 -replica 2 -brokers 127.0.0.1:9092,127.0.0.2:9092.

  # Create new topic, 3 partition and 2 replica retention 3 day.
  kafkactl create topic -topic kafka_topic_test -partition 3 -replica 2 -retentionTime 259200 -brokers 127.0.0.1:9092,127.0.0.2:9092.

  # Create new partition.
  kafkactl create partition -topic kafka_topic_test -totalPartition 6 -brokers 127.0.0.1:9092,127.0.0.2:9092.

Use "kafkactl create --help" for more information about a given command.
kafkactl delete
Usage:
  kafakctl create <sub-command> ...

Available Sub Commands:
  topic		delete a topic.
  message	delete topic partition messages.

Options:
  -topic		topic name.
  -partition		topic partition ID.
  -endOffset		topic partition end offset, whose offset is smaller than the given offset of the corresponding partition will be delete.
  -brokers		kafka brokers address.

Examples:
  # Delete a topic.
  kafkactl delete topic -topic kafka_topic_test -brokers 127.0.0.1:9092,127.0.0.2:9092.

  # Delete topic partition messages.
  kafkactl delete message -topic kafka_topic_test -partition 3 -endOffset 100000 -brokers 127.0.0.1:9092,127.0.0.2:9092.

Use "kafkactl delete --help" for more information about a given command.
kafkactl consume
Usage:
  kafakctl consume ...

Available Options:
  -topic		consume topic name.
  -group		consumer group name, default generate random name.
  -partition		consume topic partition ID, default use partition 0.
  -start		start from newest or oldest, default newest read from lateset.
  -brokers		kafka brokers.

Examples:
  # Consume topics, random generate consumer group and default use partition 0 and consume from newest.
  kafkactl consume -topic kafka_topic_test -brokers 127.0.0.1:9092,127.0.0.2:9092.

  # Consume topics, default use partition 0 and consume from newest.
  kafkactl consume -topic kafka_topic_test -group kafka_topic_test_group -brokers 127.0.0.1:9092,127.0.0.2:9092.

  # Consume topics, default consume from newest.
  kafkactl consume -topic kafka_topic_test -group kafka_topic_test_group -partition 1 -brokers 127.0.0.1:9092,127.0.0.2:9092.

  # Consume topics.
  kafkactl consume -topic kafka_topic_test -group kafka_topic_test_group -partition 1 -start oldest -brokers 127.0.0.1:9092,127.0.0.2:9092.

Use "kafkactl consume --help" for more information about a given command.
kafkactl producer
Usage:
  kafakctl producer ...

Available Options:
  -topic	topic name.
  -key		message key.
  -message	message body.
  -brokers	kafka brokers.

Examples:
  # Write meesage 2 topics.
  kafkactl producer -topic kafka_topic_test -key key -message message -brokers 127.0.0.1:9092,127.0.0.2:9092.

Use "kafkactl producer --help" for more information about a given command.

CI Workflow Check

  1. gocyclo: 校验代码复杂度
  2. gofmt: 校验代码是否都已经格式化
  3. golint: 校验代码风格规范是否按照指定标准
  4. gosimple: 校验代码是否可以简化
  5. govet: 代码静态校验
  6. misspell: 校验是否有英文单词拼写错误
  7. unused: 校验是否有未使用变量、常量、函数、结构体等
  8. gotest: 单元测试校验

Gitlab CI

如果使用Gitlab,也可以通过.gitlab-ci.yml运行Gitlab CI Pipeline,详细介绍请参考下面2篇文章

  1. Gitlab 安装使用
  2. Gitlab CI和CD配置

Directories

Path Synopsis
example
pkg
cluster
Package cluster provides cluster extensions for Sarama, enabing users to consume topics across from multiple, balanced nodes.
Package cluster provides cluster extensions for Sarama, enabing users to consume topics across from multiple, balanced nodes.
sarama
Package sarama is a pure Go client library for dealing with Apache Kafka (versions 0.8 and later).
Package sarama is a pure Go client library for dealing with Apache Kafka (versions 0.8 and later).

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL