kafka

package
v0.0.4-vb Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 10, 2024 License: Apache-2.0 Imports: 7 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NoopCommit

func NoopCommit()

NoopCommit is a no operation commit function.

Types

type BrokerAcknowledgment

type BrokerAcknowledgment struct {
	// contains filtered or unexported fields
}

BrokerAcknowledgment for kafka broker. Naks are not supported on kafka side. Committing the message is the only way to handling the message. Proper errorhandling needs to be done by the subscriber.

func (BrokerAcknowledgment) AckMessage

func (k BrokerAcknowledgment) AckMessage()

AckMessage acknowledges the message.

func (BrokerAcknowledgment) NakMessage

func (k BrokerAcknowledgment) NakMessage()

NakMessage negatively acknowledges the message.

type Controller

type Controller struct {
	// contains filtered or unexported fields
}

Controller is the Kafka implementation for asyncapi-codegen.

func NewController

func NewController(hosts []string, options ...ControllerOption) (*Controller, error)

NewController creates a new KafkaController that fulfill the BrokerLinker interface.

func (*Controller) Publish

func (c *Controller) Publish(ctx context.Context, channel string, um extensions.BrokerMessage) error

Publish a message to the broker.

func (*Controller) Subscribe

Subscribe to messages from the broker.

type ControllerOption

type ControllerOption func(controller *Controller)

ControllerOption is a function that can be used to configure a Kafka controller Examples: WithGroupID(), WithPartition(), WithMaxBytes(), WithLogger().

func WithAutoCommit

func WithAutoCommit(enabled bool) ControllerOption

WithAutoCommit set if a AutoCommitMessagesHandler or ManualCommitMessagesHandler should be used for processing the messages.

func WithGroupID

func WithGroupID(groupID string) ControllerOption

WithGroupID set a custom group ID for channel subscription.

func WithLogger

func WithLogger(logger extensions.Logger) ControllerOption

WithLogger set a custom logger that will log operations on broker controller.

func WithMaxBytes

func WithMaxBytes(maxBytes int) ControllerOption

WithMaxBytes set the maximum size of a message.

func WithPartition

func WithPartition(partition int) ControllerOption

WithPartition set the partition to use for the topic.

type MessagesHandler

type MessagesHandler func(
	ctx context.Context,
	r *kafka.Reader,
	sub extensions.BrokerChannelSubscription,
)

MessagesHandler is a function that can be used to process messages from the broker.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL