configs

package
v1.2.84 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 17, 2024 License: Apache-2.0 Imports: 16 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func RegisterSASL

func RegisterSASL(mechanism Mechanism)

Types

type Ack

type Ack string

func (Ack) Config

func (ack Ack) Config() kgo.Acks

type Compression

type Compression struct {
	Name  string `json:"name"`
	Level int    `json:"level"`
}

func (*Compression) Config

func (compression *Compression) Config() (v kgo.CompressionCodec)

type Config

type Config struct {
	Generic
	Producers ProducerConfig            `json:"producers"`
	Consumers map[string]ConsumerConfig `json:"consumers"`
}

type ConsumerConfig

type ConsumerConfig struct {
	MaxPollRecords           int             `json:"maxPollRecords"`
	PartitionBuffer          int             `json:"partitionBuffer"`
	MaxWait                  time.Duration   `json:"maxWait"`
	MinBytes                 string          `json:"minBytes"`
	MaxBytes                 string          `json:"maxBytes"`
	MaxPartitionBytes        string          `json:"maxPartitionBytes"`
	Isolation                IsolationLevel  `json:"isolation"`
	KeepControl              bool            `json:"keepControl"`
	Rack                     string          `json:"rack"`
	MaxConcurrentFetches     int             `json:"maxConcurrentFetches"`
	KeepRetryableFetchErrors bool            `json:"keepRetryableFetchErrors"`
	Group                    string          `json:"group"` // group we are in
	Topics                   []string        `json:"topics"`
	Balancers                []GroupBalancer `json:"balancers"` // balancers we can use
	SessionTimeout           time.Duration   `json:"sessionTimeout"`
	RebalanceTimeout         time.Duration   `json:"rebalanceTimeout"`
	HeartbeatInterval        time.Duration   `json:"heartbeatInterval"`
	RequireStable            bool            `json:"requireStable"`
}

func (*ConsumerConfig) Options

func (config *ConsumerConfig) Options() (opts []kgo.Opt, err error)

type Generic

type Generic struct {
	Brokers                []string      `json:"brokers"`
	DialTimeout            time.Duration `json:"dialTimeout"`
	RequestTimeoutOverhead time.Duration `json:"requestTimeoutOverhead"`
	ConnIdleTimeout        time.Duration `json:"connIdleTimeout"`
	Retries                int           `json:"retries"`
	RetryTimeout           time.Duration `json:"retryTimeout"`
	MaxBrokerWriteBytes    string        `json:"maxBrokerWriteBytes"`
	MaxBrokerReadBytes     string        `json:"maxBrokerReadBytes"`
	AllowAutoTopicCreation bool          `json:"allowAutoTopicCreation"`
	Meta                   MetaConfig    `json:"meta"`
	SASL                   SASLConfig    `json:"sasl"`
	SSL                    SSLConfig     `json:"ssl"`
}

func (*Generic) NewClient

func (config *Generic) NewClient(id string, version versions.Version, log logs.Logger) (v *kgo.Client, err error)

func (*Generic) Options

func (config *Generic) Options(id string, version versions.Version, log logs.Logger) (v []kgo.Opt, err error)

type GroupBalancer

type GroupBalancer string

func (GroupBalancer) Config

func (config GroupBalancer) Config() kgo.GroupBalancer

type IsolationLevel

type IsolationLevel string

func (IsolationLevel) Config

func (config IsolationLevel) Config() kgo.IsolationLevel

type Logger

type Logger struct {
	// contains filtered or unexported fields
}

func (*Logger) Level

func (log *Logger) Level() kgo.LogLevel

func (*Logger) Log

func (log *Logger) Log(level kgo.LogLevel, msg string, keyvals ...any)

type Mechanism

type Mechanism interface {
	Name() string
	Construct(options MechanismOptions) (err error)
	Authenticate(ctx context.Context, host string) (Session, []byte, error)
	Shutdown()
}

type MechanismOptions

type MechanismOptions struct {
	Log    logs.Logger
	Config configures.Config
}

type MetaConfig

type MetaConfig struct {
	MaxAge time.Duration `json:"maxAge"`
	MinAge time.Duration `json:"minAge"`
}

type OffsetManagerConfig

type OffsetManagerConfig struct {
	Name    string          `json:"name"`
	Options json.RawMessage `json:"options"`
}

type Partitioner

type Partitioner struct {
	Name    string          `json:"name"`
	Options json.RawMessage `json:"options"`
}

func (Partitioner) Config

func (partitioner Partitioner) Config() (v kgo.Partitioner, err error)

type PlainSASL

type PlainSASL struct {
	// contains filtered or unexported fields
}

func (*PlainSASL) Authenticate

func (s *PlainSASL) Authenticate(ctx context.Context, host string) (Session, []byte, error)

func (*PlainSASL) Construct

func (s *PlainSASL) Construct(options MechanismOptions) (err error)

func (*PlainSASL) Name

func (s *PlainSASL) Name() string

func (*PlainSASL) Shutdown

func (s *PlainSASL) Shutdown()

type PlainSASLConfig

type PlainSASLConfig struct {
	Zid      string `json:"zid"`
	Username string `json:"username"`
	Password string `json:"password"`
}

type ProducerConfig

type ProducerConfig struct {
	Enable                bool          `json:"enable"`
	Num                   int           `json:"num"`
	Ack                   Ack           `json:"ack"`
	DisableIdempotency    bool          `json:"disableIdempotency"`
	MaxInflight           int           `json:"maxInflight"` // if idempotency is disabled, we allow a configurable max inflight
	Compressions          []Compression `json:"compressions"`
	MaxRecordBatchBytes   string        `json:"maxRecordBatchBytes"`
	MaxBufferedRecords    int           `json:"maxBufferedRecords"`
	MaxBufferedBytes      string        `json:"maxBufferedBytes"`
	Timeout               time.Duration `json:"timeout"`
	RecordRetries         int           `json:"recordRetries"`
	MaxUnknownFailures    int           `json:"maxUnknownFailures"`
	Linger                time.Duration `json:"linger"`
	RecordDeliveryTimeout time.Duration `json:"recordDeliveryTimeout"`
	Partitioner           *Partitioner  `json:"partitioner"`
}

func (*ProducerConfig) Options

func (config *ProducerConfig) Options() (opts []kgo.Opt, err error)

type SASLConfig

type SASLConfig struct {
	Name    string          `json:"name"`
	Options json.RawMessage `json:"options"`
}

func (*SASLConfig) Config

func (config *SASLConfig) Config(log logs.Logger) (v Mechanism, err error)

type SSLConfig

type SSLConfig struct {
	Enable bool `json:"enable"`
	configs.Client
}

type ScramSASL

type ScramSASL struct {
	// contains filtered or unexported fields
}

func (*ScramSASL) Authenticate

func (s *ScramSASL) Authenticate(ctx context.Context, host string) (Session, []byte, error)

func (*ScramSASL) Construct

func (s *ScramSASL) Construct(options MechanismOptions) (err error)

func (*ScramSASL) Name

func (s *ScramSASL) Name() string

func (*ScramSASL) Shutdown

func (s *ScramSASL) Shutdown()

type ScramSASLConfig

type ScramSASLConfig struct {
	Zid      string `json:"zid"`
	Username string `json:"username"`
	Password string `json:"password"`
	Algo     string `json:"algo"`
}

type Session

type Session interface {
	Challenge([]byte) (bool, []byte, error)
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL