kafka

package
v0.0.0-...-f47fe49 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 24, 2024 License: Apache-2.0 Imports: 14 Imported by: 3

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// package level sharable kafka variables, safe to use concurrently.
	Dialer    *kafka.Dialer
	Transport *kafka.Transport

	TlsConfig     *tls.Config
	SaslMechanism sasl.Mechanism
)

Functions

func CloseReader

func CloseReader(reader *Reader, context string)

CloseReader attempts to close the provided reader and logs the error if it fails.

func CloseWriter

func CloseWriter(writer *Writer, context string)

CloseWriter attempts to close the provided writer and logs the error if it fails.

func Consume

func Consume(reader *Reader, consumerHandler func(Message))

Consume consumes a message from the reader with the provided handler function.

func CreateDialer

func CreateDialer(config *clowder.BrokerConfig) (*kafka.Dialer, error)

CreateDialer returns a Kafka dialer for the Kafka Go library, which includes the TLS configuration and the Sasl mechanism to connect to the managed Kafka.

func CreateSaslMechanism

func CreateSaslMechanism(saslConfig *clowder.KafkaSASLConfig) (sasl.Mechanism, error)

CreateSaslMechanism returns a Sasl mechanism that Kafka Go requires for setting up the connection. Currently, we support plain, scram-sha-256 and scram-sha-512 mechanisms.

func CreateTLSConfig

func CreateTLSConfig(caContents *string) *tls.Config

CreateTLSConfig returns a TLS configuration. The minimum TLS version is set to 1.2 and if the "caContents" are not empty the provided certificate is included as "trusted" for the TLS configuration.

func CreateTransport

func CreateTransport(sasl sasl.Mechanism, tls *tls.Config) *kafka.Transport

CreateTransport returns a kafka transport that is memoized since it can be used concurrently

func Produce

func Produce(writer *Writer, message *Message) error

Produce produces a message with the writer.

Types

type Header kafka.Header

type Logger

type Logger interface {
	Debugf(msg string, args ...interface{})
	Errorf(msg string, args ...interface{})
}

wrapper around the logger methods we need

type Message

type Message kafka.Message

func (*Message) AddHeaders

func (message *Message) AddHeaders(headers []Header)

func (*Message) AddValue

func (message *Message) AddValue(record []byte)

func (*Message) AddValueAsJSON

func (message *Message) AddValueAsJSON(record interface{}) error

func (*Message) GetHeader

func (message *Message) GetHeader(name string) string

func (*Message) ParseTo

func (message *Message) ParseTo(output interface{}) error

func (*Message) SetKeyFromHeaders

func (message *Message) SetKeyFromHeaders()

Set the key on the kafka message from the headers, using this precedence: 1. OrgID, every req _should_ have one of these. 2. EBS Account Number, fallback 3. XRHID, if we have neither...hopefully there is a x-rh-identity we can use!

func (*Message) TranslateHeaders

func (message *Message) TranslateHeaders() []Header

translate a kafka message's headers from segmentio/kafka -> our kafka

type Options

type Options struct {
	// REQUIRED FIELDS
	BrokerConfig []clowder.BrokerConfig
	Topic        string

	// only used for reader, optional.
	GroupID *string

	// logger to pass along
	Logger Logger
}

Options is a struct for creating a reader/writer

type Reader

type Reader struct {
	*kafka.Reader

	Options *Options
}

wrapping the reader/writer types we're using so we can change them under the hood in the future if necessary

func GetReader

func GetReader(conf *Options) (*Reader, error)

GetReader returns a Kafka reader configured with the specified settings.

type Writer

type Writer struct {
	*kafka.Writer

	Options *Options
}

func GetWriter

func GetWriter(conf *Options) (*Writer, error)

GetWriter returns a Kafka writer configured with the specified settings.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL