kafka

package
v0.0.0-...-11052b5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 23, 2024 License: Apache-2.0 Imports: 7 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// AuthDSNTemplate template for auth DSN.
	AuthDSNTemplate = "SASLType:login:password"
)

Variables

This section is empty.

Functions

func NewConfig

func NewConfig(appName string, authDSN string) (*sarama.Config, error)

Types

type ConsumerGroup

type ConsumerGroup struct {
	// contains filtered or unexported fields
}

func NewConsumerGroup

func NewConsumerGroup(brokers []string, group string, config *sarama.Config) (*ConsumerGroup, error)

NewConsumerGroup creates new ConsumerGroup.

func (*ConsumerGroup) Close

func (c *ConsumerGroup) Close() error

Close closes consumer.

func (*ConsumerGroup) Consume

func (c *ConsumerGroup) Consume(ctx context.Context, topics []string, handler sarama.ConsumerGroupHandler) error

Consume starts non blocking consumer loop for ConsumerHandle on provided topics list. Returned channel will closed as soon as Setup step is happened is called for first handler call or if error happens first.

func (*ConsumerGroup) Errors

func (c *ConsumerGroup) Errors() <-chan error

Errors returns errors channel.

type Producer

type Producer struct {
	// contains filtered or unexported fields
}

Producer kafka producer.

func NewProducer

func NewProducer(brokers []string, config *sarama.Config) (*Producer, error)

NewProducer creates a new Producer using the given broker addresses and configuration.

func (*Producer) Close

func (p *Producer) Close() error

Close shuts down the producer and waits for any buffered messages to be flushed. You must call this function before a producer object passes out of scope, as it may otherwise leak memory. You must call this before calling Close on the underlying client.

func (*Producer) SendMessage

func (p *Producer) SendMessage(ctx context.Context, msg *sarama.ProducerMessage) error

SendMessage produces a given message, and returns only when it either has succeeded or failed to produce. It will return the partition and the offset of the produced message, or an error if the message failed to produce.

type RecordHeaders

type RecordHeaders []sarama.RecordHeader

func NewRecordHeadersFromPointers

func NewRecordHeadersFromPointers(headers []*sarama.RecordHeader) RecordHeaders

NewRecordHeadersFromPointers converts []*sarama.RecordHeader -> []sarama.RecordHeader.

func (RecordHeaders) String

func (p RecordHeaders) String() string

type SASL

type SASL struct {
	Mechanism sarama.SASLMechanism
	User      string
	Password  string
}

func NewSASL

func NewSASL() *SASL

func (*SASL) Parse

func (s *SASL) Parse(dsn string) error

Parse parses DSN and fill SASL struct.

type XDGSCRAMClient

type XDGSCRAMClient struct {
	*scram.ClientConversation
	HashGeneratorFcn scram.HashGeneratorFcn
}

func (*XDGSCRAMClient) Begin

func (x *XDGSCRAMClient) Begin(user, password, authzID string) error

Begin prepares the client for the SCRAM exchange with the server with a user name and a password

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL