kafka

package
v2.2.8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 5, 2024 License: MIT Imports: 26 Imported by: 0

Documentation

Index

Constants

View Source
const (
	SASLMechanismPlain       = "PLAIN"
	SASLMechanismScramSHA256 = "SCRAM-SHA-256"
	SASLMechanismScramSHA512 = "SCRAM-SHA-512"
	SASLMechanismGSSAPI      = "GSSAPI"
	SASLMechanismOAuthBearer = "OAUTHBEARER"
)

Variables

This section is empty.

Functions

func NewKgoConfig

func NewKgoConfig(cfg Config, logger *zap.Logger) ([]kgo.Opt, error)

NewKgoConfig creates a new Config for the Kafka Client as exposed by the franz-go library. If TLS certificates can't be read an error will be returned. logger is only used to print warnings about TLS.

Types

type Config

type Config struct {
	// General
	Brokers  []string `koanf:"brokers"`
	ClientID string   `koanf:"clientId"`
	RackID   string   `koanf:"rackId"`

	TLS  TLSConfig  `koanf:"tls"`
	SASL SASLConfig `koanf:"sasl"`

	RetryInitConnection bool `koanf:"retryInitConnection"`
}

func (*Config) SetDefaults

func (c *Config) SetDefaults()

func (*Config) Validate

func (c *Config) Validate() error

type KgoZapLogger

type KgoZapLogger struct {
	// contains filtered or unexported fields
}

func (KgoZapLogger) Level

func (k KgoZapLogger) Level() kgo.LogLevel

Level Implements kgo.Logger interface. It returns the log level to log at. We pin this to debug as the zap logger decides what to actually send to the output stream.

func (KgoZapLogger) Log

func (k KgoZapLogger) Log(level kgo.LogLevel, msg string, keyvals ...interface{})

Log implements kgo.Logger interface

type OAuthBearerConfig added in v2.2.7

type OAuthBearerConfig struct {
	TokenEndpoint string `koanf:"tokenEndpoint"`
	ClientID      string `koanf:"clientId"`
	ClientSecret  string `koanf:"clientSecret"`
	Scope         string `koanf:"scope"`
}

func (*OAuthBearerConfig) Validate added in v2.2.7

func (c *OAuthBearerConfig) Validate() error

type SASLConfig

type SASLConfig struct {
	Enabled   bool   `koanf:"enabled"`
	Username  string `koanf:"username"`
	Password  string `koanf:"password"`
	Mechanism string `koanf:"mechanism"`

	// SASL Mechanisms that require more configuration than username & password
	GSSAPI      SASLGSSAPIConfig  `koanf:"gssapi"`
	OAuthBearer OAuthBearerConfig `koanf:"oauth"`
}

SASLConfig for Kafka Client

func (*SASLConfig) SetDefaults

func (c *SASLConfig) SetDefaults()

SetDefaults for SASL Config

func (*SASLConfig) Validate

func (c *SASLConfig) Validate() error

Validate SASL config input

type SASLGSSAPIConfig

type SASLGSSAPIConfig struct {
	AuthType           string `koanf:"authType"`
	KeyTabPath         string `koanf:"keyTabPath"`
	KerberosConfigPath string `koanf:"kerberosConfigPath"`
	ServiceName        string `koanf:"serviceName"`
	Username           string `koanf:"username"`
	Password           string `koanf:"password"`
	Realm              string `koanf:"realm"`

	// EnableFAST enables FAST, which is a pre-authentication framework for Kerberos.
	// It includes a mechanism for tunneling pre-authentication exchanges using armoured KDC messages.
	// FAST provides increased resistance to passive password guessing attacks.
	EnableFast bool `koanf:"enableFast"`
}

SASLGSSAPIConfig represents the Kafka Kerberos config

func (*SASLGSSAPIConfig) SetDefaults added in v2.2.0

func (s *SASLGSSAPIConfig) SetDefaults()

type Service

type Service struct {
	// contains filtered or unexported fields
}

func NewService

func NewService(cfg Config, logger *zap.Logger) *Service

func (*Service) Brokers added in v2.2.1

func (s *Service) Brokers() []string

Brokers returns list of brokers this service is connecting to

func (*Service) CreateAndTestClient added in v2.2.0

func (s *Service) CreateAndTestClient(ctx context.Context, l *zap.Logger, opts []kgo.Opt) (*kgo.Client, error)

CreateAndTestClient creates a client with the services default settings logger: will be used to log connections, errors, warnings about tls config, ...

type TLSConfig

type TLSConfig struct {
	Enabled               bool   `koanf:"enabled"`
	CaFilepath            string `koanf:"caFilepath"`
	CertFilepath          string `koanf:"certFilepath"`
	KeyFilepath           string `koanf:"keyFilepath"`
	Ca                    string `koanf:"ca"`
	Cert                  string `koanf:"cert"`
	Key                   string `koanf:"key"`
	Passphrase            string `koanf:"passphrase"`
	InsecureSkipTLSVerify bool   `koanf:"insecureSkipTlsVerify"`
}

TLSConfig to connect to Kafka via TLS

func (*TLSConfig) SetDefaults

func (c *TLSConfig) SetDefaults()

func (*TLSConfig) Validate

func (c *TLSConfig) Validate() error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL