kafclient

package module
v2.1.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 28, 2023 License: Apache-2.0 Imports: 13 Imported by: 0

README

Kafka client Version 2

New kafclient implement lib kafka-go instead of sarama

WHY WE NEED CHANGE IT?

  • Sarama is good, so fast for subscribe message, but it need more config but so confused.
  • Connect to broker not stable. Can not reconnect with broker restart, we manual reconnect.
  • Configuration so many steps, but not have timeout connect, not have health check.
  • Lib change and breaking change many, so we implement has bugs.

then we restructure this flow and change base lib.

Installation

    go get github.com/teng231/kafclient/v2

Using

  • Start with mod consumer:
    kclient := &Client{}
    kclient.SetAddrs([]string{"localhost:9092"})
    kclient.NewPublisher()
    err := kclient.Publish(context.TODO(), "topic-1", map[string]interface{}{
            "meta":  "tester2",
            "index": 1,
            "topic": "topic-1",
        })
  • Start with mod producer
    kclient := &Client{}
	kclient.SetAddrs([]string{"localhost:9092"})
	kclient.NewConsumer("tete2", []string{"topic-1", "topic-2"})
	cmsg := make(chan *Message, 1000)
	log.Print("Listen message 1")
	kclient.Listen(context.Background(), cmsg)
	log.Print("Listen message 2")
	for msg := range cmsg {
		log.Print(string(msg.Body))
		msg.Commit()
	}

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func RandStringBytes added in v2.0.5

func RandStringBytes(n int) string

Types

type Client

type Client struct {
	// contains filtered or unexported fields
}

func (*Client) Close

func (k *Client) Close() error

func (*Client) CreateTopic added in v2.0.5

func (k *Client) CreateTopic(topic string, numPart int)

func (*Client) HealthCheckBroker added in v2.0.3

func (k *Client) HealthCheckBroker()

func (*Client) IsReaderConnected added in v2.0.3

func (k *Client) IsReaderConnected() bool

func (*Client) IsWriters added in v2.0.3

func (k *Client) IsWriters() bool

func (*Client) ListTopics

func (k *Client) ListTopics()

func (*Client) ListTopics2 added in v2.0.5

func (k *Client) ListTopics2()

func (*Client) Listen added in v2.0.3

func (k *Client) Listen(ctx context.Context, cMgs chan *Message) error

Listen manual listen need call msg.Commit() when process done recommend for this process

func (*Client) ListenWithAutoCommit added in v2.0.3

func (k *Client) ListenWithAutoCommit(ctx context.Context, cMgs chan *Message) error

ListenWithAutoCommit autocommit when message delivered not recommend use this function

func (*Client) NewConsumer added in v2.0.3

func (k *Client) NewConsumer(consumerGroup string, topics []string)

func (*Client) NewPublisher added in v2.0.3

func (k *Client) NewPublisher() error

func (*Client) Publish

func (k *Client) Publish(ctx context.Context, topic string, msg interface{}) error

func (*Client) SetAddrs added in v2.0.3

func (k *Client) SetAddrs(addrs []string)

type IClient

type IClient interface {
	SetAddrs(addrs []string)
	Listen(ctx context.Context, cMgs chan *Message) error
	ListenWithAutoCommit(ctx context.Context, cMgs chan *Message) error
	NewConsumer(consumerGroup string, topics []string)
	IsWriters() bool
	Close() error

	NewPublisher() error
	Publish(ctx context.Context, topic string, msg interface{}) error
	IsReaderConnected() bool
}

type Message

type Message struct {
	Offset        int64  `json:"offset,omitempty"`
	Partition     int    `json:"partition,omitempty"`
	Topic         string `json:"topic,omitempty"`
	Key           string `json:"key,omitempty"`
	Body          []byte `json:"body,omitempty"`
	Timestamp     int64  `json:"timestamp,omitempty"`
	ConsumerGroup string `json:"consumer_group,omitempty"`
	Commit        func()
	Headers       map[string]string
}

Message define message encode/decode sarama message

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL