kafka

package
v1.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 26, 2019 License: Apache-2.0 Imports: 5 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

func NewConsumer

func NewConsumer(cfg *ConsumerConfig) (*Consumer, error)

func (*Consumer) ErrorChannel

func (c *Consumer) ErrorChannel() <-chan *MessageError

func (*Consumer) MessageChannel

func (c *Consumer) MessageChannel() chan *Message

func (*Consumer) Start

func (c *Consumer) Start(topic string) error

func (*Consumer) Stop

func (c *Consumer) Stop()

type ConsumerConfig

type ConsumerConfig struct {
	ClientID         string
	BrokerList       string
	BufferSize       int
	ConsumerNum      int
	FlushMessages    int
	FlushFrequency   int
	FlushMaxMessages int
	Timeout          int
	ReturnErrors     bool
}

type ConsumerGroup

type ConsumerGroup struct {
	// contains filtered or unexported fields
}

func NewConsumerGroup

func NewConsumerGroup(cfg *ConsumerGroupConfig) (*ConsumerGroup, error)

func (*ConsumerGroup) ErrorChannel

func (c *ConsumerGroup) ErrorChannel() <-chan *MessageError

func (*ConsumerGroup) MarkOffset

func (c *ConsumerGroup) MarkOffset(msg *Message)

func (*ConsumerGroup) MessageChannel

func (c *ConsumerGroup) MessageChannel() chan *Message

func (*ConsumerGroup) Start

func (c *ConsumerGroup) Start(topic string) error

func (*ConsumerGroup) Stop

func (c *ConsumerGroup) Stop()

type ConsumerGroupConfig

type ConsumerGroupConfig struct {
	BrokerList   string
	BufferSize   int
	ClientID     string
	GroupID      string
	ReturnErrors bool
}

type Message

type Message struct {
	Topic     string
	Partition int32
	Offset    int64
	Key       []byte
	Data      []byte
}

type MessageError

type MessageError struct {
	Error     error
	Topic     string
	Timestamp time.Time
	Partition int32
	Offset    int64
	Metadata  interface{}
}

type Producer

type Producer struct {
	// contains filtered or unexported fields
}

func NewProducer

func NewProducer(cfg *ProducerConfig) (*Producer, error)

func (*Producer) ErrorChannel

func (p *Producer) ErrorChannel() <-chan *MessageError

func (*Producer) MessageChannel

func (p *Producer) MessageChannel() chan *Message

func (*Producer) Send

func (p *Producer) Send(topic string, data []byte)

func (*Producer) Start

func (p *Producer) Start()

func (*Producer) Stop

func (p *Producer) Stop()

type ProducerConfig

type ProducerConfig struct {
	BrokerList       string
	BufferSize       int
	ProducerNum      int
	FlushMessages    int
	FlushFrequency   int
	FlushMaxMessages int
	Timeout          int
	ReturnErrors     bool
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL