gkafka: github.com/gogf/gkafka Index | Examples | Files | Directories

package gkafka

import "github.com/gogf/gkafka"

Package gkafka provides producer and consumer client for kafka server.

Create consumer.


group := "test-group"
topic := "test"
kafkaConfig := gkafka.NewConfig()
kafkaConfig.Servers = "localhost:9092"
kafkaConfig.AutoMarkOffset = false
kafkaConfig.Topics = topic
kafkaConfig.GroupId = group

client := gkafka.NewClient(kafkaConfig)
defer client.Close()

// Mark the offset from reading.
client.MarkOffset(topic, 0, 6)
for {
    if msg, err := client.Receive(); err != nil {
    } else {
        fmt.Println("consume:", msg.Partition, msg.Offset, string(msg.Value))

Create producer.


topic := "test"
kafkaConfig := gkafka.NewConfig()
kafkaConfig.Servers = "localhost:9092"
kafkaConfig.AutoMarkOffset = false
kafkaConfig.Topics = topic

client := gkafka.NewClient(kafkaConfig)
defer client.Close()
for {
    s := time.Now().String()
    fmt.Println("produce:", s)
    if err := client.SyncSend(&gkafka.Message{Value: []byte(s)}); err != nil {

Fetch all topics from server.


config := gkafka.NewConfig()
config.Servers = "localhost:9092"

client := gkafka.NewClient(config)
defer client.Close()




Package Files

gkafka.go gkafka_message.go

type Client Uses

type Client struct {
    Config *Config
    // contains filtered or unexported fields

Kafka Client(Consumer/SyncProducer/AsyncProducer)

func NewClient Uses

func NewClient(config *Config) *Client

New a kafka client.

func (*Client) AsyncSend Uses

func (client *Client) AsyncSend(message *Message) error

AsyncSend sends data to kafka in asynchronized way(concurrent safe).

func (*Client) Close Uses

func (client *Client) Close()

Close closes the client.

func (*Client) MarkOffset Uses

func (client *Client) MarkOffset(topic string, partition int, offset int, metadata ...string) error

MarkOffset marks the start offset for consumer.

func (*Client) Receive Uses

func (client *Client) Receive() (*Message, error)

Receive receives message from kafka from specified topics in config, in BLOCKING way. It will handle offset tracking automatically.

func (*Client) SyncSend Uses

func (client *Client) SyncSend(message *Message) error

SyncSend sends data to kafka in synchronized way.

func (*Client) Topics Uses

func (client *Client) Topics() ([]string, error)

Topics returns all topics from kafka server.

type Config Uses

type Config struct {
    GroupId        string // group id for consumer.
    Servers        string // server list, multiple servers joined by ','.
    Topics         string // topic list, multiple topics joined by ','.
    AutoMarkOffset bool   // auto mark message read after consumer message from server

Kafka client config based on sarama.Config

func NewConfig Uses

func NewConfig() *Config

NewConfig creates and returns a default configuration object.

type Message Uses

type Message struct {
    Value     []byte
    Key       []byte
    Topic     string
    Partition int
    Offset    int
    // contains filtered or unexported fields

Kafka Message.

func (*Message) MarkOffset Uses

func (msg *Message) MarkOffset()

MarkOffset marks current message as consumed.


third/github.com/johngcn/sarama-clusterPackage cluster provides cluster extensions for Sarama, enabing users to consume topics across from multiple, balanced nodes.

Package gkafka imports 6 packages (graph) and is imported by 1 packages. Updated 2019-07-02. Refresh now. Tools for package owners.