kafka

package
v0.0.0-...-fbb851a Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 9, 2023 License: Apache-2.0 Imports: 15 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type ConsumerGroup

type ConsumerGroup interface {
	Consume(ctx context.Context, topics []string, handler sarama.ConsumerGroupHandler) error
	Errors() <-chan error
	Close() error
}

type ConsumerGroupCreator

type ConsumerGroupCreator interface {
	Create(brokers []string, group string, cfg *sarama.Config) (ConsumerGroup, error)
}

type ConsumerGroupHandler

type ConsumerGroupHandler struct {
	MaxMessageLen int
	TopicTag      string
	// contains filtered or unexported fields
}

ConsumerGroupHandler is a sarama.ConsumerGroupHandler implementation.

func NewConsumerGroupHandler

func NewConsumerGroupHandler(edge edge.Edge, log logger.Logger) *ConsumerGroupHandler

func (*ConsumerGroupHandler) Cleanup

Cleanup stops the internal goroutine and is called after all ConsumeClaim functions have completed.

func (*ConsumerGroupHandler) ConsumeClaim

ConsumeClaim is called once each claim in a goroutine and must be thread-safe. Should run until the claim is closed.

func (*ConsumerGroupHandler) Handle

Handle processes a message and if successful saves it to be acknowledged after delivery.

func (*ConsumerGroupHandler) Setup

Setup is called once when a new session is opened. It setups up the handler and begins processing delivered messages.

type Input

type Input struct {
	Brokers                []string      `toml:"brokers"`
	ConsumerGroup          string        `toml:"consumer_group"`
	MaxMessageLen          int           `toml:"max_message_len"`
	MaxUndeliveredMessages int           `toml:"max_undelivered_messages"`
	MaxProcessingTime      time.Duration `toml:"max_processing_time"`
	Offset                 string        `toml:"offset"`
	BalanceStrategy        string        `toml:"balance_strategy"`
	Topics                 []string      `toml:"topics"`
	TopicTag               string        `toml:"topic_tag"`
	ConsumerFetchDefault   int64         `toml:"consumer_fetch_default"`
	ConnectionStrategy     string        `toml:"connection_strategy"`

	kafka.ReadConfig

	kafka.Logger
	Log logger.Logger `toml:"-"`

	ConsumerCreator ConsumerGroupCreator `toml:"-"`
	// contains filtered or unexported fields
}

func (*Input) Init

func (k *Input) Init() error

func (*Input) Name

func (k *Input) Name() string

func (*Input) Start

func (k *Input) Start(_ edge.Edge, out edge.Edge) error

func (*Input) Stop

func (k *Input) Stop() error

type SaramaCreator

type SaramaCreator struct{}

func (*SaramaCreator) Create

func (*SaramaCreator) Create(brokers []string, group string, cfg *sarama.Config) (ConsumerGroup, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL