input

package
v0.4.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 18, 2023 License: Apache-2.0 Imports: 31 Imported by: 0

Documentation

Index

Constants

View Source
const (
	TypeKafkaGo     = "kafka-go"
	TypeKafkaSarama = "sarama"
	TypeKafkaFranz  = "franz"
)
View Source
const (
	Krb5KeytabAuth = 2
	CommitRetries  = 6
	RetryBackoff   = 5 * time.Second
)

Variables

View Source
var SHA256 scram.HashGeneratorFcn = func() hash.Hash { return sha256.New() }
View Source
var SHA512 scram.HashGeneratorFcn = func() hash.Hash { return sha512.New() }

Functions

func GetFranzConfig

func GetFranzConfig(kfkCfg *config.KafkaConfig) (opts []kgo.Opt, err error)

func GetSaramaConfig

func GetSaramaConfig(kfkCfg *config.KafkaConfig) (sarCfg *sarama.Config, err error)

Types

type Inputer

type Inputer interface {
	Init(cfg *config.Config, taskCfg *config.TaskConfig, putFn func(msg *model.InputMessage), cleanupFn func()) error
	Run()
	Stop() error
	CommitMessages(message *model.InputMessage) error
}

func NewInputer

func NewInputer(typ string) Inputer

type KafkaFranz

type KafkaFranz struct {
	// contains filtered or unexported fields
}

KafkaFranz implements input.Inputer refers to examples/group_consuming/main.go

func NewKafkaFranz

func NewKafkaFranz() *KafkaFranz

NewKafkaFranz get instance of kafka reader

func (*KafkaFranz) CommitMessages

func (k *KafkaFranz) CommitMessages(msg *model.InputMessage) error

func (*KafkaFranz) Description

func (k *KafkaFranz) Description() string

Description of this kafka consumer, which topic it reads from

func (*KafkaFranz) Init

func (k *KafkaFranz) Init(cfg *config.Config, taskCfg *config.TaskConfig, putFn func(msg *model.InputMessage), cleanupFn func()) (err error)

Init Initialise the kafka instance with configuration

func (*KafkaFranz) Run

func (k *KafkaFranz) Run()

kafka main loop

func (*KafkaFranz) Stop

func (k *KafkaFranz) Stop() error

Stop kafka consumer and close all connections

type KafkaGo

type KafkaGo struct {
	// contains filtered or unexported fields
}

KafkaGo implements input.Inputer

func NewKafkaGo

func NewKafkaGo() *KafkaGo

NewKafkaGo get instance of kafka reader

func (*KafkaGo) CommitMessages

func (k *KafkaGo) CommitMessages(msg *model.InputMessage) (err error)

func (*KafkaGo) Description

func (k *KafkaGo) Description() string

Description of this kafka consumer, which topic it reads from

func (*KafkaGo) Init

func (k *KafkaGo) Init(cfg *config.Config, taskCfg *config.TaskConfig, putFn func(msg *model.InputMessage), cleanupFn func()) (err error)

Init Initialise the kafka instance with configuration

func (*KafkaGo) Run

func (k *KafkaGo) Run()

kafka main loop

func (*KafkaGo) Stop

func (k *KafkaGo) Stop() error

Stop kafka consumer and close all connections

type KafkaGoLogger

type KafkaGoLogger struct {
	// contains filtered or unexported fields
}

func (*KafkaGoLogger) Printf

func (kgl *KafkaGoLogger) Printf(template string, args ...interface{})

type KafkaSarama

type KafkaSarama struct {
	// contains filtered or unexported fields
}

KafkaSarama implements input.Inputer

func NewKafkaSarama

func NewKafkaSarama() *KafkaSarama

NewKafkaSarama get instance of kafka reader

func (*KafkaSarama) CommitMessages

func (k *KafkaSarama) CommitMessages(msg *model.InputMessage) error

func (*KafkaSarama) Description

func (k *KafkaSarama) Description() string

Description of this kafka consumer, which topic it reads from

func (*KafkaSarama) Init

func (k *KafkaSarama) Init(cfg *config.Config, taskCfg *config.TaskConfig, putFn func(msg *model.InputMessage), cleanupFn func()) (err error)

Init Initialise the kafka instance with configuration

func (*KafkaSarama) Run

func (k *KafkaSarama) Run()

kafka main loop

func (*KafkaSarama) Stop

func (k *KafkaSarama) Stop() error

Stop kafka consumer and close all connections

type MyConsumerGroupHandler

type MyConsumerGroupHandler struct {
	// contains filtered or unexported fields
}

func (MyConsumerGroupHandler) Cleanup

func (MyConsumerGroupHandler) ConsumeClaim

func (MyConsumerGroupHandler) Setup

type XDGSCRAMClient

type XDGSCRAMClient struct {
	*scram.Client
	*scram.ClientConversation
	scram.HashGeneratorFcn
}

func (*XDGSCRAMClient) Begin

func (x *XDGSCRAMClient) Begin(userName, password, authzID string) (err error)

func (*XDGSCRAMClient) Done

func (x *XDGSCRAMClient) Done() bool

func (*XDGSCRAMClient) Step

func (x *XDGSCRAMClient) Step(challenge string) (response string, err error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL