gf: github.com/johng-cn/gf/g/database/gkafka Index | Files

package gkafka

import "github.com/johng-cn/gf/g/database/gkafka"

Package gkafka provides producer and consumer client for kafka server.

Kafka客户端.

Index

Package Files

gkafka.go gkafka_message.go

type Client Uses

type Client struct {
    Config *Config
    // contains filtered or unexported fields
}

Kafka Client(Consumer/SyncProducer/AsyncProducer)

func NewClient Uses

func NewClient(config *Config) *Client

New a kafka client.

func (*Client) AsyncSend Uses

func (client *Client) AsyncSend(message *Message) error

Send data to kafka in asynchronized way(concurrent safe).

func (*Client) Close Uses

func (client *Client) Close()

Close client.

func (*Client) MarkOffset Uses

func (client *Client) MarkOffset(topic string, partition int, offset int, metadata ...string) error

标记指定topic 分区开始读取位置

func (*Client) Receive Uses

func (client *Client) Receive() (*Message, error)

Receive message from kafka from specified topics in config, in BLOCKING way, gkafka will handle offset tracking automatically.

func (*Client) SyncSend Uses

func (client *Client) SyncSend(message *Message) error

Send data to kafka in synchronized way.

func (*Client) Topics Uses

func (client *Client) Topics() ([]string, error)

Get all topics from kafka server. 这里创建独立的消费客户端获取topics,获取完之后销毁该客户端对象。

type Config Uses

type Config struct {
    GroupId        string // group id for consumer.
    Servers        string // server list, multiple servers joined by ','.
    Topics         string // topic list, multiple topics joined by ','.
    AutoMarkOffset bool   // auto mark message read after consumer message from server
    sarama.Config
}

kafka Client based on sarama.Config

func NewConfig Uses

func NewConfig() *Config

New a default configuration object.

type Message Uses

type Message struct {
    Value     []byte
    Key       []byte
    Topic     string
    Partition int
    Offset    int
    // contains filtered or unexported fields
}

Kafka Message.

func (*Message) MarkOffset Uses

func (msg *Message) MarkOffset()

自动标记已读取

Package gkafka imports 5 packages (graph). Updated 2019-01-16. Refresh now. Tools for package owners.