kafka

package
v1.7.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 30, 2024 License: MIT Imports: 8 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func CloseConsumer

func CloseConsumer()

Close the singleton instance consumer.

func CloseProducer

func CloseProducer()

Close the singleton instance producer.

func Dequeue

func Dequeue[T any](ctx context.Context, count int) ([]T, error)

Dequeue fetches count number of messages from kafka queue on the configured topic.

func Enqueue

func Enqueue[T any](ctx context.Context, items ...T) (bool, error)

Enqueue will send message to the Kafka queue of the configured topic.

func Initialize

func Initialize(config Config) error

Set Kafka brokers & topic globally.

func IsInitialized

func IsInitialized() bool

Returns true if Kafka brokers and topic are set.

func LastEnqueueSucceeded

func LastEnqueueSucceeded() bool

Returns true if it is known that last Enqueue to Kafka succeeded or not.

func Subscribe

func Subscribe(topic string, consumer sarama.Consumer, fetchCount int, dispatcher chan<- string, wg *sync.WaitGroup) error

Subscribe to a kafka topic and send messages received via a dispatcher.

Types

type Config

type Config struct {
	// Kafka Brokers.
	Brokers []string
	// Kafka topic.
	Topic string
}

Config contains the global config for this kafka package.

var DefaultConfig Config = Config{
	Brokers: []string{"localhost:9093"},
	Topic:   "sop-deleted-data",
}

DefaultConfig contains the default config for this package.

func GetConfig added in v1.6.8

func GetConfig() Config

Returns this package's global config.

type QueueConsumer

type QueueConsumer struct {
	// contains filtered or unexported fields
}

func GetConsumer

func GetConsumer(config *sarama.Config) (*QueueConsumer, error)

Returns the singleton instance consumer.

type QueueProducer

type QueueProducer struct {
	// contains filtered or unexported fields
}

QueueProducer struct contains the sarama producer instance & other necessary artifacts required to achieve our producer functionalities, e.g. - error tracking, successful send sampling...

func GetProducer

func GetProducer(config *sarama.Config) (*QueueProducer, error)

GetProducer will return the singleton instance of the producer.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL