kafka

package
v0.0.0-...-7bbab05 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 14, 2024 License: Apache-2.0 Imports: 8 Imported by: 1

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type KafkaConf

type KafkaConf struct {
	ConsumerGroupName string `json:"name"`
	ZkPath            string `json:"zk_path"`
	Topic             string `json:"topic"`
	ForceRestart      bool   `json:"force_restart"`
	ReadNewest        bool   `json:"read_newest"`
	KafkaVersion      int    `json:"kafka_version_major"`
	SASLEnabled       bool   `json:"sasl_enabled"`
	SASLUsername      string `json:"username"`
	SASLPasswordKey   string `json:"passwordKey"`
}

KafkaConf holds configuration options for KafkaSource

type KafkaMsg

type KafkaMsg interface {
	MarkDone()
	GetRawMsg() *sarama.ConsumerMessage
	IsProcessed() bool
}

type KafkaMsgFactory

type KafkaMsgFactory interface {
	//Create call to wrap consumer message inside KafkaMsg
	Create(msg *sarama.ConsumerMessage) KafkaMsg
}

type KafkaOffsetTracker

type KafkaOffsetTracker struct {
	// contains filtered or unexported fields
}

KafkaOffsetTracker is implementation of OffsetTracker to track offsets for KafkaSource, KafkaMessage

func (*KafkaOffsetTracker) TrackMe

func (k *KafkaOffsetTracker) TrackMe(kmsg KafkaMsg)

TrackMe method ensures messages to track are enqued for tracking

type KafkaSource

type KafkaSource struct {
	// contains filtered or unexported fields
}

KafkaSource is Source implementation which reads from Kafka. This implementation uses sarama lib and wvanbergen implementation of HA Kafka Consumer using zookeeper

func GetKafkaSource

func GetKafkaSource(conf KafkaConf, factory KafkaMsgFactory, offMonitor offset_monitor.OffMonitor) *KafkaSource

GetKafkaSource method is used to get instance of KafkaSource.

func (*KafkaSource) CommitOffsets

func (k *KafkaSource) CommitOffsets(data KafkaMsg) (bool, error)

CommitOffsets enables cliento explicity commit the Offset that is processed.

func (*KafkaSource) Generate

func (k *KafkaSource) Generate(out chan<- interface{})

Generate is Source method implementation, which connect to Kafka and pushes KafkaMessage into the channel

func (*KafkaSource) GetKey

func (k *KafkaSource) GetKey(msg interface{}) []byte

func (*KafkaSource) GetOffset

func (k *KafkaSource) GetOffset(msg interface{}) int64

func (*KafkaSource) GetPartition

func (k *KafkaSource) GetPartition(msg interface{}) int32

func (*KafkaSource) GetValue

func (k *KafkaSource) GetValue(msg interface{}) []byte

func (*KafkaSource) RegisterHook

func (k *KafkaSource) RegisterHook(hook KafkaSourceHook)

RegisterHook used to registerHook with KafkSource

func (*KafkaSource) Stop

func (k *KafkaSource) Stop()

Stop method implements Source interface stop method, to Stop the KafkaConsumer

type KafkaSourceHook

type KafkaSourceHook interface {
	//Pre called before passing the message to DMux
	Pre(k KafkaMsg)
}

KafkaSourceHook to track messages coming out of the source in order

type OffsetTracker

type OffsetTracker interface {
	TrackMe(kmsg KafkaMsg)
}

OffsetTracker is interface which defines methods to track Messages which have been queued for processing

func GetKafkaOffsetTracker

func GetKafkaOffsetTracker(size int, source *KafkaSource) OffsetTracker

GetKafkaOffsetTracker is Global function to get instance of KafkaOffsetTracker

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL