kafka

package
v0.0.0-...-a78daf7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 4, 2018 License: MIT Imports: 12 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func HandleSignals

func HandleSignals(tp *kasper.TopicProcessor)

HandleSignals - closes topic processor on sigint/sigterm

func LogRate

func LogRate(f *FrameProcessor, frequency time.Duration)

LogRate - Goroutine that writes the counter content every so often.

func NewKafkaConfig

func NewKafkaConfig(address string, topic string, partition int) *kasper.Config

NewKafkaConfig - preparse the client and configuration for TopicProcessor

Types

type FrameProcessor

type FrameProcessor struct {
	Counter    *counter.RateCounter
	DecodeJSON int
}

FrameProcessor - Kafka message processor that messures "frames-per-(duration)"

func (*FrameProcessor) Process

func (f *FrameProcessor) Process(msgs []*sarama.ConsumerMessage, sender kasper.Sender) error

Process - Callback

func (*FrameProcessor) ProcessMessage

func (f *FrameProcessor) ProcessMessage(msg *sarama.ConsumerMessage)

ProcessMessage - Processes Kafka messages from topic and increments the counter

type ProcessorStdout

type ProcessorStdout struct {
}

ProcessorStdout - Kafka message processor that shows how to read messages from Kafka topic

func (*ProcessorStdout) Process

func (cs *ProcessorStdout) Process(msgs []*sarama.ConsumerMessage, sender kasper.Sender) error

Process - Callback

func (*ProcessorStdout) ProcessMessage

func (*ProcessorStdout) ProcessMessage(msg *sarama.ConsumerMessage)

ProcessMessage - Processes Kafka messages from topic prints info to stdout

type UserProcessor

type UserProcessor struct {
	Counter *alg.Counter
}

UserProcessor - Kafka message processor that parses the json data and forwards the time and user identitifer info to a counter structure.

func (*UserProcessor) Process

func (up *UserProcessor) Process(msgs []*sarama.ConsumerMessage, sender kasper.Sender) error

Process - Callback

func (*UserProcessor) ProcessMessage

func (up *UserProcessor) ProcessMessage(msg *sarama.ConsumerMessage)

ProcessMessage - Processes Kafka messages from topic prints info to stdout

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL