kafka

package
v1.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 23, 2023 License: MIT Imports: 9 Imported by: 0

Documentation

Index

Constants

View Source
const (
	ReaderError errors.Code = errors.ErrInternal + 1
	ConsumerError
)

Variables

View Source
var ErrMissingBrokers = errors.New("kafka: missing broker(s) address(es)")

Functions

func NewTransport

func NewTransport(auth *Authentication) (*kafka.Transport, error)

NewTransport creates a new transport using the scram sha512 mechanism.

Types

type Authentication

type Authentication struct {
	Username string
	Password string
}

Authentication contains the user and pwd used in the connection dial. It is used by the SCRAM SHA512 SASL mechanism. Keep in mind that this authentication method requires Kafka with a version >= 0.10.2.0.

type Consumer

type Consumer struct {
	*kafka.Reader
	// contains filtered or unexported fields
}

Consumer wraps the Kafka reader.

func NewConsumer

func NewConsumer(opt *ConsumerOptions) (*Consumer, error)

NewConsumer creates a new message reader. Returns ErrMissingBrokers if len(opt.Brokers) == 0. The remaining parameters aren't checked. It is assumed that the topic exists.

func (*Consumer) StartConsumption added in v0.6.0

func (c *Consumer) StartConsumption(fn MessageConsumer) *sub.Controller

StartConsumption initializes a new reader loop that will run in background. The loop terminates only if *Controller.Shutdown is called.

type ConsumerOptions

type ConsumerOptions struct {
	// Brokers contains a list of addresses
	// in the form <ip/domain>:<port>.
	Brokers []string

	Topic   string
	GroupID string

	*Authentication

	// Timeout is the max time spent trying
	// to complete the connection process.
	Timeout time.Duration

	// QueueCapacity defines the internal queue size.
	QueueCapacity int

	// ReadLagInterval is time between the
	// lastest message in a Kafka topic and
	// a message that the consumer has processed.
	ReadLagInterval time.Duration

	// QueuedErrors contains the ammount of
	// errors that the Controller.ErrCh can hold.
	QueuedErrors int

	ErrLimiterOpts *sub.ErrorLimiterOptions
}

ConsumerOptions contains a subset of kafka reader options.

type MessageConsumer added in v0.6.0

type MessageConsumer func(m *kafka.Message) error

MessageConsumer is responsable for processing the given kafka message.

type Publisher

type Publisher struct {
	*kafka.Writer
}

Publisher wraps the Kafka writer.

func NewPublisher

func NewPublisher(
	opt *PublisherOptions,
	tr *kafka.Transport,
) (*Publisher, error)

NewPublisher creates a new message writer. Returns ErrMissingBrokers if len(opt.Brokers) == 0. The remaining parameters aren't checked. You must give a non-nil transport. It is assumed that the topic exists.

type PublisherOptions

type PublisherOptions struct {
	// Brokers contains a list of addresses in the form <ip/domain>:<port>.
	Brokers []string

	Topic string

	*Authentication
}

ConsumerOptions contains a subset of kafka writer options.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL