kafka

package
v0.0.0-...-0c66be6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 23, 2024 License: Apache-2.0 Imports: 15 Imported by: 5

Documentation

Index

Constants

View Source
const FirstOffset = kafka.FirstOffset
View Source
const LastOffset = kafka.LastOffset

Variables

This section is empty.

Functions

func InitTopic

func InitTopic(bootstrapUrl string, topics ...string) (err error)

func InitTopicWithPartitionNumber

func InitTopicWithPartitionNumber(bootstrapUrl string, partitionNumber int, topics ...string) (err error)

func NewConsumer

func NewConsumer(ctx context.Context, config Config, topic string, listener func(delivery []byte) error) error

func NewMultiConsumer

func NewMultiConsumer(ctx context.Context, config Config, topics []string, listener func(delivery Message) error) (err error)

Types

type Config

type Config struct {
	KafkaUrl               string
	ConsumerGroup          string           //optional
	StartOffset            int64            //defaults to FirstOffset
	Debug                  bool             //defaults to false
	TimeNow                func() time.Time //defaults to time.Now
	Wg                     *sync.WaitGroup  //optional
	PartitionWatchInterval time.Duration    //defaults to time.Minute
	OnError                func(error)      //defaults to log.Fatal
}

type KeySeparationBalancer

type KeySeparationBalancer struct {
	SubBalancer kafka.Balancer
	Seperator   string
}

func (*KeySeparationBalancer) Balance

func (this *KeySeparationBalancer) Balance(msg kafka.Message, partitions ...int) (partition int)

type Message

type Message struct {
	Topic     string
	Partition int
	Offset    int64
	Key       []byte
	Value     []byte
	Time      time.Time
}

type Producer

type Producer struct {
	// contains filtered or unexported fields
}

func NewProducer

func NewProducer(ctx context.Context, config Config, topic string) (*Producer, error)

func NewProducerWithBalancer

func NewProducerWithBalancer(ctx context.Context, config Config, topic string, balancer kafka.Balancer) (*Producer, error)

func NewProducerWithKeySeparationBalancer

func NewProducerWithKeySeparationBalancer(ctx context.Context, config Config, topic string) (*Producer, error)

func (*Producer) Produce

func (this *Producer) Produce(key string, message []byte) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL