kafka

package
v0.39.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 2, 2024 License: Apache-2.0 Imports: 9 Imported by: 3

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NoopCommit added in v0.35.0

func NoopCommit()

NoopCommit is a no operation commit function.

Types

type BrokerAcknowledgment added in v0.35.0

type BrokerAcknowledgment struct {
	// contains filtered or unexported fields
}

BrokerAcknowledgment for kafka broker. Naks are not supported on kafka side. Committing the message is the only way to handling the message. Proper errorhandling needs to be done by the subscriber.

func (BrokerAcknowledgment) AckMessage added in v0.35.0

func (k BrokerAcknowledgment) AckMessage()

AckMessage acknowledges the message.

func (BrokerAcknowledgment) NakMessage added in v0.35.0

func (k BrokerAcknowledgment) NakMessage()

NakMessage negatively acknowledges the message.

type Controller

type Controller struct {
	// contains filtered or unexported fields
}

Controller is the Kafka implementation for asyncapi-codegen.

func NewController

func NewController(hosts []string, options ...ControllerOption) (*Controller, error)

NewController creates a new KafkaController that fulfill the BrokerLinker interface.

func (*Controller) Publish

func (c *Controller) Publish(ctx context.Context, channel string, um extensions.BrokerMessage) error

Publish a message to the broker.

func (*Controller) Subscribe

Subscribe to messages from the broker.

type ControllerOption

type ControllerOption func(controller *Controller)

ControllerOption is a function that can be used to configure a Kafka controller Examples: WithGroupID(), WithPartition(), WithMaxBytes(), WithLogger().

func WithAutoCommit added in v0.35.0

func WithAutoCommit(enabled bool) ControllerOption

WithAutoCommit set if a AutoCommitMessagesHandler or ManualCommitMessagesHandler should be used for processing the messages.

func WithConnectionTest added in v0.36.0

func WithConnectionTest(enabled bool) ControllerOption

WithConnectionTest set the connectionTest feature toggle to configure if NewController should validate the connection on creation.

func WithGroupID

func WithGroupID(groupID string) ControllerOption

WithGroupID set a custom group ID for channel subscription.

func WithLogger

func WithLogger(logger extensions.Logger) ControllerOption

WithLogger set a custom logger that will log operations on broker controller.

func WithMaxBytes

func WithMaxBytes(maxBytes int) ControllerOption

WithMaxBytes set the maximum size of a message.

func WithPartition

func WithPartition(partition int) ControllerOption

WithPartition set the partition to use for the topic.

func WithSasl added in v0.36.0

func WithSasl(sasl sasl.Mechanism) ControllerOption

WithSasl set the sasl.Mechanism that will be used for kafka.Dial, kafka.Reader and kafka.Writer.

func WithTLS added in v0.36.0

func WithTLS(tls *tls.Config) ControllerOption

WithTLS set the tls.Config that will be used for kafka.Dial, kafka.Reader and kafka.Writer.

type MessagesHandler added in v0.35.0

type MessagesHandler func(
	ctx context.Context,
	r *kafka.Reader,
	sub extensions.BrokerChannelSubscription,
)

MessagesHandler is a function that can be used to process messages from the broker.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL