kafka

package
v1.3.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 12, 2023 License: Apache-2.0 Imports: 8 Imported by: 0

README

Kafka

Broker: Kafka

You can run examples by replace configs use your kafka instance configs.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer consumer struct

func NewConsumer

func NewConsumer(config *ConsumerConfig) *Consumer

NewConsumer returns a consumer instance

func (*Consumer) Cleanup

Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited

func (*Consumer) Close

func (c *Consumer) Close() error

Close close consumer

func (*Consumer) ConsumeClaim

func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error

ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().

func (*Consumer) Setup

Setup is run at the beginning of a new session, before ConsumeClaim

func (*Consumer) Start

func (c *Consumer) Start()

Start start consumer

func (*Consumer) Subscribe

func (c *Consumer) Subscribe(handler ConsumerHandler)

Subscribe subscribe to topic

type ConsumerConfig

type ConsumerConfig struct {
	Addr  []string
	Topic []string
	Gid   string

	EnableSASLAuth bool
	SASLMechanism  string
	SASLUser       string
	SASLPassword   string
	SASLHandshake  bool

	DialTimeout time.Duration

	ConsumeOldest     bool
	EnableReturnError bool

	ClientID string
}

ConsumerConfig kafka consumer config

type ConsumerHandler

type ConsumerHandler func(context.Context, *sarama.ConsumerMessage) error

type ProducerConfig

type ProducerConfig struct {
	Addr  []string
	Topic []string

	EnableSASLAuth bool
	SASLMechanism  string
	SASLUser       string
	SASLPassword   string
	SASLHandshake  bool

	DialTimeout      time.Duration
	SlowSendDuration time.Duration

	EnableReturnSuccess bool

	ClientID string
}

ProducerConfig kafka producer config

type SyncProducer

type SyncProducer struct {
	// contains filtered or unexported fields
}

SyncProducer send message sync

func NewSyncProducer

func NewSyncProducer(config *ProducerConfig) *SyncProducer

NewSyncProducer returns a SyncProducer instance

func (*SyncProducer) Close

func (sp *SyncProducer) Close() error

Close close producer

func (*SyncProducer) SendSyncMsg

func (sp *SyncProducer) SendSyncMsg(ctx context.Context, content string) error

SendSyncMsg send message sync

type TraceInterceptor

type TraceInterceptor struct {
	TraceID string
}

func (*TraceInterceptor) OnSend

func (ti *TraceInterceptor) OnSend(message *sarama.ProducerMessage)

Directories

Path Synopsis
examples

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL