kafka

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 31, 2024 License: Apache-2.0 Imports: 10 Imported by: 0

Documentation

Index

Constants

View Source
const (
	ErrMissingConsumerBroker    = utils.Error("Missing Consumer broker address")
	ErrMissingConsumerTopic     = utils.Error("Missing Consumer Topic name")
	ErrMissingConsumerGroup     = utils.Error("Missing Consumer Group")
	ErrConsumerAlreadyConnected = utils.Error("Cannot change connection properties; already connected")
	DefaultTimeout              = time.Second * 30

	ErrMissingProducerBroker = utils.Error("Missing Producer broker address")
	ErrMissingProducerTopic  = utils.Error("Missing Producer Topic name")
	ErrProducerClosed        = utils.Error("Producer is already closed")
	ErrInvalidAuthType       = utils.Error("Invalid authentication type")

	ErrMissingAdminBroker = utils.Error("Missing Admin broker address")

	AuthTypeNone     = "none"
	AuthTypePlain    = "plain"
	AuthTypeScram256 = "scram256"
	AuthTypeScram512 = "scram512"
)

Variables

This section is empty.

Functions

This section is empty.

Types

type AdminConfig

type AdminConfig struct {
	Brokers  string `json:"brokers"`
	AuthType string `json:"authType"`
	Username string `json:"username"`
	Password string `json:"password"`
	tlsProvider.ClientConfig
}

func (AdminConfig) Validate

func (c AdminConfig) Validate() error

type ConsumerConfig

type ConsumerConfig struct {
	Brokers  string `json:"brokers"`
	Topic    string `json:"topic"`
	Group    string `json:"group"`
	AuthType string `json:"authType"`
	Username string `json:"username"`
	Password string `json:"password"`
	tlsProvider.ClientConfig
}

func (ConsumerConfig) Validate

func (c ConsumerConfig) Validate() error

type ConsumerFunc

type ConsumerFunc func(ctx context.Context, message kafka.Message) error

ConsumerFunc Reader handler type

type KafkaAdmin

type KafkaAdmin struct {
	Conn *kafka.Conn
	// contains filtered or unexported fields
}

func NewAdmin

func NewAdmin(ctx context.Context, cfg AdminConfig) (*KafkaAdmin, error)

func (*KafkaAdmin) Connect

func (c *KafkaAdmin) Connect() error

func (*KafkaAdmin) CreateTopic

func (c *KafkaAdmin) CreateTopic(topic string, numPartitions int, replicationFactor int) error

CreateTopic create a new Topic

func (*KafkaAdmin) DeleteTopic

func (c *KafkaAdmin) DeleteTopic(topic string) error

DeleteTopic removes a Topic

func (*KafkaAdmin) Disconnect

func (c *KafkaAdmin) Disconnect()

func (*KafkaAdmin) GetTopics

func (c *KafkaAdmin) GetTopics(topics ...string) ([]kafka.Partition, error)

func (*KafkaAdmin) IsConnected

func (c *KafkaAdmin) IsConnected() bool

func (*KafkaAdmin) ListTopics

func (c *KafkaAdmin) ListTopics() ([]string, error)

GetTopics list existing kafka topics

func (*KafkaAdmin) TopicExists

func (c *KafkaAdmin) TopicExists(topic string) (bool, error)

TopicExists returns true if Topic exists

type KafkaConsumer

type KafkaConsumer struct {
	Brokers string
	Group   string
	Topic   string

	Reader *kafka.Reader
	// contains filtered or unexported fields
}

func NewConsumer

func NewConsumer(ctx context.Context, cfg ConsumerConfig) (*KafkaConsumer, error)

func (*KafkaConsumer) ChannelSubscribe

func (c *KafkaConsumer) ChannelSubscribe(ch chan kafka.Message) error

ChannelSubscribe subscribes to a reader handler by channel Note: This function is blocking

func (*KafkaConsumer) Connect

func (c *KafkaConsumer) Connect()

Connect to Kafka broker

func (*KafkaConsumer) Disconnect

func (c *KafkaConsumer) Disconnect()

Disconnect Diconnect from kafka

func (*KafkaConsumer) GetConfig

func (c *KafkaConsumer) GetConfig() *kafka.ReaderConfig

GetConfig Get initial config object Useful to set other options before connect

func (*KafkaConsumer) IsConnected

func (c *KafkaConsumer) IsConnected() bool

IsConnected Returns true if Reader was initialized

func (*KafkaConsumer) ReadMessage

func (c *KafkaConsumer) ReadMessage() (kafka.Message, error)

ReadMessage reads a single message from Kafka It returns the Kafka message and an error If there is no message available, it will block until a message is available If an error occurs, it will be returned

func (*KafkaConsumer) Rewind

func (c *KafkaConsumer) Rewind() error

Rewind Read messages from the beginning

func (*KafkaConsumer) Subscribe

func (c *KafkaConsumer) Subscribe(handler ConsumerFunc) error

Subscribe consumes a message from a topic using a handler Note: this function is blocking

func (*KafkaConsumer) SubscribeWithOffsets

func (c *KafkaConsumer) SubscribeWithOffsets(handler ConsumerFunc) error

SubscribeWithOffsets manages a reader handler that explicitly commits offsets Note: this function is blocking

type KafkaProducer

type KafkaProducer struct {
	Brokers string
	Topic   string
	Writer  *kafka.Writer
	// contains filtered or unexported fields
}

func NewProducer

func NewProducer(ctx context.Context, cfg ProducerConfig) (*KafkaProducer, error)

func (*KafkaProducer) Disconnect

func (c *KafkaProducer) Disconnect()

Disconnect disconnects from the Writer

func (*KafkaProducer) IsConnected

func (p *KafkaProducer) IsConnected() bool

IsConnected returns ture if Writer is connected

func (*KafkaProducer) Write

func (p *KafkaProducer) Write(value []byte, key ...[]byte) error

Write writes a single message to topic

func (*KafkaProducer) WriteJson

func (p *KafkaProducer) WriteJson(data interface{}, key ...[]byte) error

WriteJson Write a struct to a Topic as a json message

func (*KafkaProducer) WriteMulti

func (p *KafkaProducer) WriteMulti(values ...[]byte) error

WriteMulti Write multiple messages to Topic

func (*KafkaProducer) WriteMultiJson

func (p *KafkaProducer) WriteMultiJson(values ...interface{}) error

WriteMultiJson Write a slice of structs to a Topic as a json message

type ProducerConfig

type ProducerConfig struct {
	Brokers  string `json:"brokers"`
	Topic    string `json:"topic"`
	AuthType string `json:"authType"`
	Username string `json:"username"`
	Password string `json:"password"`
	tlsProvider.ClientConfig
}

func (ProducerConfig) Validate

func (c ProducerConfig) Validate() error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL