kafka

package
v0.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 14, 2019 License: Apache-2.0 Imports: 9 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type KafkaInput

type KafkaInput struct {
	Addrs  []string `json:"addrs" yaml:"addr"` //如果定义了group,则addrs是zookeeper的地址(2181),否则的话是kafka的地址(9092)
	Topics []string `json:"topics" yaml:"topics"`
	Group  string   `json:"group" yaml:"group"`
	Offset int64    `json:"offset" yaml:"offset"`

	Message chan []byte //从这个管道中读取数据
}

KafkaInput kafka input sarame.OffsetNewest int64 = -1 sarame.OffsetOldest int64 = -2

func NewKafkaInput

func NewKafkaInput() *KafkaInput

func (*KafkaInput) Close

func (in *KafkaInput) Close() error

func (*KafkaInput) Init

func (in *KafkaInput) Init(v interface{}) error

func (*KafkaInput) Read

func (in *KafkaInput) Read(p []byte) (cnt int, err error)

func (*KafkaInput) ReadFromTopic

func (in *KafkaInput) ReadFromTopic(topic string)

简单kafka消费者

func (*KafkaInput) ReadWithGroup

func (in *KafkaInput) ReadWithGroup() error

多个consumer group

func (*KafkaInput) Start

func (in *KafkaInput) Start() error

func (*KafkaInput) Version

func (in *KafkaInput) Version() string

type Producer

type Producer struct {
	Addrs    []string    `json:"addrs" yaml:"addrs"`
	Topic    string      `json:"topic" yaml:"topic"`
	MaxProcs int         `json:"max_procs" yaml:"max_procs"` //最大并发写协程, 由于并发写入topic,写入顺序不可控,想要严格数序的话,maxThreads = 1即可
	Message  chan []byte `json:"-" yaml:"-"`                 //将数据写入这个管道中
	// contains filtered or unexported fields
}

func NewProducer

func NewProducer() *Producer

func (*Producer) ChanInfo

func (p *Producer) ChanInfo() string

func (*Producer) Close

func (p *Producer) Close() error

func (*Producer) Init

func (p *Producer) Init(v interface{}) error

func (*Producer) LoadConfig

func (p *Producer) LoadConfig(f string) error

func (*Producer) Start

func (p *Producer) Start(ctx context.Context) error

func (*Producer) Version

func (p *Producer) Version() string

func (*Producer) Write

func (p *Producer) Write(msg []byte) (int, error)

func (*Producer) WriteToTopic

func (p *Producer) WriteToTopic(ctx context.Context) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL