kafka

package
v0.0.0-...-fcd0a94 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 20, 2022 License: GPL-3.0 Imports: 9 Imported by: 0

Documentation

Index

Constants

View Source
const (
	OldestOffset = int64(-2)
)

Variables

This section is empty.

Functions

func CloseProducer

func CloseProducer(log common.Logger, syncProducer sarama.SyncProducer)

func DefaultConsumerConfig

func DefaultConsumerConfig(clientId string, kafkaVersion sarama.KafkaVersion) *sarama.Config

func DefaultProducerConfig

func DefaultProducerConfig(clientId string, kafkaVersion sarama.KafkaVersion) *sarama.Config

func EnableSasl

func EnableSasl(
	log common.Logger,
	conf *sarama.Config,
	username string,
	password string,
	algorithm string,
	useTLS bool,
	verifySSL bool) (*sarama.Config, error)

func NewKafkaClient

func NewKafkaClient(log common.Logger, brokers []string, config *sarama.Config) sarama.Client

func NewProducer

func NewProducer(brokers []string, config *sarama.Config) (sarama.SyncProducer, error)

func TimeBasedPartitionCount

func TimeBasedPartitionCount(client sarama.Client, topic string) ([]int, []int, []int)

Types

type GeneralTopicInfo

type GeneralTopicInfo struct {
	Name              string
	NumberMessages    int64
	ReplicationFactor int16
	ReplicaAssignment map[int32][]int32
	ConfigEntries     map[string]*string
	NumberPartitions  int32
}

type JokkConsumer

type JokkConsumer struct {
	MsgChannel chan sarama.ConsumerMessage
	// contains filtered or unexported fields
}

func NewConsumer

func NewConsumer(log common.Logger, host string, conf *sarama.Config) (JokkConsumer, error)

func (*JokkConsumer) Cleanup

func (jc *JokkConsumer) Cleanup(session sarama.ConsumerGroupSession) error

func (*JokkConsumer) Close

func (jc *JokkConsumer) Close() error

func (*JokkConsumer) ConsumeClaim

func (jc *JokkConsumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error

func (*JokkConsumer) Setup

func (jc *JokkConsumer) Setup(session sarama.ConsumerGroupSession) (err error)

func (*JokkConsumer) StartReceivingMessages

func (jc *JokkConsumer) StartReceivingMessages(topic string)

type PartitionChannelInfo

type PartitionChannelInfo struct {
	// contains filtered or unexported fields
}

type PartitionCountInfo

type PartitionCountInfo struct {
	TotalMessageCount int64
	Partitions        []PartitionInfo
}

func PartitionMessageCount

func PartitionMessageCount(client sarama.Client, topic string, timestamp int64) PartitionCountInfo

* Set 'time' to OldestOffset to use the default time range when calculating messages/partitions. * Set time to get the most recent available offset at the given time (in milliseconds.)

type PartitionDetailCountInfo

type PartitionDetailCountInfo struct {
	TotalMessageCount int64
	Partitions        []PartitionDetailInfo
}

func DetailedPartitionInfo

func DetailedPartitionInfo(admin sarama.ClusterAdmin, client sarama.Client, topic string) PartitionDetailCountInfo

type PartitionDetailInfo

type PartitionDetailInfo struct {
	PartitionInfo   PartitionInfo
	Leader          int32
	Replicas        []int32
	Isr             []int32
	OfflineReplicas []int32
}

type PartitionInfo

type PartitionInfo struct {
	Id                int
	OldOffset         int
	NewOffset         int
	PartitionMsgCount int
}

type TopicDetailInfo

type TopicDetailInfo struct {
	GeneralTopicInfo    GeneralTopicInfo
	PartionDetailedInfo []PartitionDetailInfo
}

type TopicInfo

type TopicInfo struct {
	GeneralTopicInfo GeneralTopicInfo
	PartitionsInfo   []PartitionInfo
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL