Documentation ¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
View Source
var DefaultReceiveFn = func(consumeWorkerIdx int, receiveWorkerIdx int, topic string, body []byte, msg *kafka.Message) (err error) { seelog.Infof("[KAFKA CONSUMER] receive message: %s#%d|%d, tm:%s, key:%s, body:%s", msg.TopicPartition.String(), consumeWorkerIdx, receiveWorkerIdx, msg.Timestamp.Format("2006-01-02 15:04:05"), string(msg.Key), string(body)) return }
Functions ¶
func StartDefaultConsumer ¶ added in v0.2.8
func StartReceive ¶ added in v0.2.8
func StopDefaultConsumer ¶ added in v0.2.8
func StopDefaultConsumer() (err error)
func StopReceive ¶ added in v0.2.8
Types ¶
type Cfg ¶
type KafkaConsumer ¶
type KafkaConsumer struct {
// contains filtered or unexported fields
}
func Consumer ¶
func Consumer(name string) (r *KafkaConsumer)
func DefaultConsumer ¶ added in v0.2.8
func DefaultConsumer() (r *KafkaConsumer)
func NewKafkaConsumer ¶
func NewKafkaConsumer(cfg *Cfg) (r *KafkaConsumer, err error)
func SafeConsumer ¶
func SafeConsumer(name string) (r *KafkaConsumer, err error)
func (*KafkaConsumer) Close ¶
func (this *KafkaConsumer) Close()
func (*KafkaConsumer) SetReceiveSelector ¶ added in v0.1.8
Click to show internal directories.
Click to hide internal directories.