kafka

package module
v1.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 17, 2020 License: MIT Imports: 9 Imported by: 2

README

go-kafka

seperated from go-utils/kafka

Documentation

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type CommitFilter

type CommitFilter struct {
	// contains filtered or unexported fields
}

CommitFilter buffer to lazy commit kafka message

func NewCommitFilter

func NewCommitFilter(ctx context.Context, kMsgPool *sync.Pool, opts ...CommitFilterOptFunc) (f *CommitFilter, err error)

NewCommitFilter create new CommitFilter

func (*CommitFilter) GetAfterChan

func (f *CommitFilter) GetAfterChan() chan *KafkaMsg

GetAfterChan get channel out of GetAfterChan

func (*CommitFilter) GetBeforeChan

func (f *CommitFilter) GetBeforeChan() chan *KafkaMsg

GetBeforeChan get channel send message in CommitFilter

type CommitFilterOptFunc

type CommitFilterOptFunc func(*commitCheckOption) error

CommitFilterOptFunc option for CommitFilter

func WithCommitFilterCheckChanSize

func WithCommitFilterCheckChanSize(size int) CommitFilterOptFunc

WithCommitFilterCheckChanSize set commit check channel's size

func WithCommitFilterCheckInterval

func WithCommitFilterCheckInterval(interval time.Duration) CommitFilterOptFunc

WithCommitFilterCheckInterval set commit check interval

func WithCommitFilterCheckNum

func WithCommitFilterCheckNum(num int) CommitFilterOptFunc

WithCommitFilterCheckNum set commit check num

type KafkaCli

type KafkaCli struct {
	*KafkaCliCfg
	// contains filtered or unexported fields
}

KafkaCli kafka consumer client

Example
var (
	kmsgPool = &sync.Pool{
		New: func() interface{} {
			return &KafkaMsg{}
		},
	}
)
cli, err := NewKafkaCliWithGroupID(
	context.Background(),
	&KafkaCliCfg{
		Brokers:  []string{"brokers url here"},
		Topics:   []string{"topics name here"},
		Groupid:  "group id",
		KMsgPool: kmsgPool,
	},
	WithCommitFilterCheckInterval(5*time.Second),
	WithCommitFilterCheckNum(100),
)
if err != nil {
	panic(errors.Wrap(err, "try to connect to kafka got error"))
}

for kmsg := range cli.Messages(context.Background()) {
	// do something with kafka message
	fmt.Println(string(kmsg.Message))
	cli.CommitWithMsg(kmsg) // async commit
}
Output:

func NewKafkaCliWithGroupID

func NewKafkaCliWithGroupID(ctx context.Context, cfg *KafkaCliCfg, opts ...CommitFilterOptFunc) (k *KafkaCli, err error)

NewKafkaCliWithGroupID create new kafka consumer

func (*KafkaCli) Close

func (k *KafkaCli) Close()

Close close kafka client

func (*KafkaCli) CommitWithMsg

func (k *KafkaCli) CommitWithMsg(kmsg *KafkaMsg)

CommitWithMsg commit kafka message

func (*KafkaCli) ListenNotifications

func (k *KafkaCli) ListenNotifications(ctx context.Context)

ListenNotifications log kafka broker notify

func (*KafkaCli) Messages

func (k *KafkaCli) Messages(ctx context.Context) <-chan *KafkaMsg

Messages get kafka messages chan

type KafkaCliCfg

type KafkaCliCfg struct {
	Brokers, Topics []string
	Groupid         string
	KMsgPool        *sync.Pool
}

KafkaCliCfg configuration for kafka message

type KafkaMsg

type KafkaMsg struct {
	Topic     string
	Message   []byte
	Offset    int64
	Partition int32
	Timestamp time.Time
}

KafkaMsg kafka message

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL