infrakafka

package
v1.3.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 19, 2024 License: MIT Imports: 7 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type ConnectionConfig

type ConnectionConfig struct {
	// Broker Address. Comma-separated list of "host:port" expected
	Address     string      `mapstructure:"address"`
	StartOffset StartOffset `mapstructure:"start_offset"`
}

func (*ConnectionConfig) Validate

func (c *ConnectionConfig) Validate() error

type ConnectionsConfig

type ConnectionsConfig map[string]*ConnectionConfig

func (*ConnectionsConfig) Validate

func (c *ConnectionsConfig) Validate() error

type Container

type Container struct {
	// contains filtered or unexported fields
}

Container is a simple container for holding named kafka connections.

func NewContainer

func NewContainer() *Container

func (*Container) AddConnection

func (cont *Container) AddConnection(name string, cfg *ConnectionConfig) error

AddConnection adds a named connection to a container. It's possible to create consumer or producer on created connection later using CreateProducer ot CreateConsumer.

func (*Container) CreateConsumer

func (cont *Container) CreateConsumer(
	connectionName string,
	consumerGroup string,
	topics []string,
) (*kafka.Reader, error)

CreateConsumer creates a new kafka consumer by a connection name and subscribes it to a given topic

func (*Container) CreateProducer

func (cont *Container) CreateProducer(connectionName string) (*kafka.Writer, error)

CreateProducer creates a new kafka producer by a connection name

func (*Container) Exists added in v1.3.1

func (cont *Container) Exists(name string) bool

Exists checks if a connection with the given name exists in the container.

type StartOffset

type StartOffset string
const (
	StartOffsetFirst StartOffset = "first"
	StartOffsetLast  StartOffset = "last"
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL