kafka

package
v0.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 17, 2017 License: Apache-2.0 Imports: 7 Imported by: 0

README

[inputs] plugin kafka

{
    "kafka": {
        "addrs": [
            "192.168.0.1:9092",
            "192.168.0.2:9092",
            "192.168.0.3:9092"
        ],
        "topics": [
            "test"
        ],
        "group":"transport",
        "offset": -1    # -1:newest,-2:oldest
    }
}

| 参数 | 类型 | 要求 | 说明 |
| -----| ---- | ---- |------|
|addrs |list  | Y    |若group为空,addrs为kafka brokers的地址;否则为zookeeper的地址 |
|topics|list  | Y    |kafka topic list| 
|group |string| N    |kafka group|
|offset|int   | Y    |-1: newest, -2: oldest|

Documentation

Index

Constants

View Source
const (
	VERSION = "0.0.1"
)

Variables

This section is empty.

Functions

This section is empty.

Types

type KafkaInput

type KafkaInput struct {
	Addrs  []string `json:"addrs"` //如果定义了group,则addrs是zookeeper的地址(2181),否则的话是kafka的地址(9092)
	Topics []string `json:"topics"`
	Group  string   `json:"group"`
	Offset int64    `json:"offset"`

	Message chan []byte //从这个管道中读取数据
}

一些使用说明: sarame.OffsetNewest int64 = -1 sarame.OffsetOldest int64 = -2

func NewKafkaInput

func NewKafkaInput() *KafkaInput

func (*KafkaInput) Close

func (in *KafkaInput) Close() error

func (*KafkaInput) Init

func (in *KafkaInput) Init(config transport.Configer) error

func (*KafkaInput) Read

func (in *KafkaInput) Read(p []byte) (cnt int, err error)

func (*KafkaInput) ReadFromTopic

func (in *KafkaInput) ReadFromTopic(topic string)

简单kafka消费者

func (*KafkaInput) ReadWithGroup

func (in *KafkaInput) ReadWithGroup() error

多个consumer group

func (*KafkaInput) Start

func (in *KafkaInput) Start() error

func (*KafkaInput) Version

func (in *KafkaInput) Version() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL