kafka

package
v0.0.0-...-3fb6fb6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 4, 2022 License: Apache-2.0 Imports: 6 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type KafkaConf

type KafkaConf struct {
	ConsumerGroupName string `json:"name"`
	ZkPath            string `json:"zk_path"`
	Topic             string `json:"topic"`
	ForceRestart      bool   `json:"force_restart"`
	ReadNewest        bool   `json:"read_newest"`
	KafkaVersion      int    `json:"kafka_version_major"`
	SASLEnabled       bool   `json:"sasl_enabled"`
	SASLUsername      string `json:"username"`
	SASLPasswordKey   string `json:"passwordKey"`
}

KafkaConf holds configuration options for KafkaSource

type KafkaMsg

type KafkaMsg interface {
	MarkDone()
	GetRawMsg() *sarama.ConsumerMessage
	IsProcessed() bool
}

type KafkaMsgFactory

type KafkaMsgFactory interface {
	//Create call to wrap consumer message inside KafkaMsg
	Create(msg *sarama.ConsumerMessage) KafkaMsg
}

type KafkaOffsetTracker

type KafkaOffsetTracker struct {
	// contains filtered or unexported fields
}

KafkaOffsetTracker is implementation of OffsetTracker to track offsets for KafkaSource, KafkaMessage

func (*KafkaOffsetTracker) TrackMe

func (k *KafkaOffsetTracker) TrackMe(kmsg KafkaMsg)

TrackMe method ensures messages to track are enqued for tracking

type KafkaSource

type KafkaSource struct {
	// contains filtered or unexported fields
}

KafkaSource is Source implementation which reads from Kafka. This implementation uses sarama lib and wvanbergen implementation of HA Kafka Consumer using zookeeper

func GetKafkaSource

func GetKafkaSource(conf KafkaConf, factory KafkaMsgFactory) *KafkaSource

GetKafkaSource method is used to get instance of KafkaSource.

func (*KafkaSource) CommitOffsets

func (k *KafkaSource) CommitOffsets(data KafkaMsg) error

CommitOffsets enables cliento explicity commit the Offset that is processed.

func (*KafkaSource) Generate

func (k *KafkaSource) Generate(out chan<- interface{})

Generate is Source method implementation, which connect to Kafka and pushes KafkaMessage into the channel

func (*KafkaSource) RegisterHook

func (k *KafkaSource) RegisterHook(hook KafkaSourceHook)

RegisterHook used to registerHook with KafkSource

func (*KafkaSource) Stop

func (k *KafkaSource) Stop()

Stop method implements Source interface stop method, to Stop the KafkaConsumer

type KafkaSourceHook

type KafkaSourceHook interface {
	//Pre called before passing the message to DMux
	Pre(k KafkaMsg)
}

KafkaSourceHook to track messages coming out of the source in order

type OffsetTracker

type OffsetTracker interface {
	TrackMe(kmsg KafkaMsg)
}

OffsetTracker is interface which defines methods to track Messages which have been queued for processing

func GetKafkaOffsetTracker

func GetKafkaOffsetTracker(size int, source *KafkaSource) OffsetTracker

GetKafkaOffsetTracker is Global function to get instance of KafkaOffsetTracker

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL