Documentation ¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func HandleSignals ¶
func HandleSignals(tp *kasper.TopicProcessor)
HandleSignals - closes topic processor on sigint/sigterm
func LogRate ¶
func LogRate(f *FrameProcessor, frequency time.Duration)
LogRate - Goroutine that writes the counter content every so often.
Types ¶
type FrameProcessor ¶
type FrameProcessor struct { Counter *counter.RateCounter DecodeJSON int }
FrameProcessor - Kafka message processor that messures "frames-per-(duration)"
func (*FrameProcessor) Process ¶
func (f *FrameProcessor) Process(msgs []*sarama.ConsumerMessage, sender kasper.Sender) error
Process - Callback
func (*FrameProcessor) ProcessMessage ¶
func (f *FrameProcessor) ProcessMessage(msg *sarama.ConsumerMessage)
ProcessMessage - Processes Kafka messages from topic and increments the counter
type ProcessorStdout ¶
type ProcessorStdout struct { }
ProcessorStdout - Kafka message processor that shows how to read messages from Kafka topic
func (*ProcessorStdout) Process ¶
func (cs *ProcessorStdout) Process(msgs []*sarama.ConsumerMessage, sender kasper.Sender) error
Process - Callback
func (*ProcessorStdout) ProcessMessage ¶
func (*ProcessorStdout) ProcessMessage(msg *sarama.ConsumerMessage)
ProcessMessage - Processes Kafka messages from topic prints info to stdout
type UserProcessor ¶
UserProcessor - Kafka message processor that parses the json data and forwards the time and user identitifer info to a counter structure.
func (*UserProcessor) Process ¶
func (up *UserProcessor) Process(msgs []*sarama.ConsumerMessage, sender kasper.Sender) error
Process - Callback
func (*UserProcessor) ProcessMessage ¶
func (up *UserProcessor) ProcessMessage(msg *sarama.ConsumerMessage)
ProcessMessage - Processes Kafka messages from topic prints info to stdout