kafka

package
v0.0.0-...-c6e60ff Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 9, 2017 License: Apache-2.0 Imports: 19 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrNotReady       = errors.New("not ready")
	ErrNotAllowed     = errors.New("not allowed")
	ErrStopping       = errors.New("stopping")
	ErrAlreadyClosed  = errors.New("consumer already closed")
	ErrConsumerBroken = errors.New("consumer conn broken")
)
View Source
var InvalidPartitionID = int32(-1)

Functions

func ParseDSN

func ParseDSN(dsn string) (zone, cluster, topic string, partitionID int32, err error)

ParseDSN parse the kafka DSN which is in the form of: kafka:zone://cluster/topic#partition.

Types

type Config

type Config struct {
	Sarama *sarama.Config
	QoS    QoS
	// contains filtered or unexported fields
}

Config is the configuration of kafka pkg.

func DefaultConfig

func DefaultConfig() *Config

DefaultConfig creates a default Config as async producer.

func (*Config) Ack

func (c *Config) Ack(ack sarama.RequiredAcks) *Config

Ack sets the kafka producer required ack parameter.

func (*Config) AsyncMode

func (c *Config) AsyncMode() *Config

AsyncMode will switch the kafka producer to async mode.

func (*Config) DryrunMode

func (c *Config) DryrunMode() *Config

DryrunMode switch kafka to dryrun mode.

func (*Config) LossTolerant

func (c *Config) LossTolerant() *Config

LossTolerant set the QoS to loss tolerant.

func (*Config) SetConsumerChanBuffer

func (c *Config) SetConsumerChanBuffer(size int) *Config

SetConsumerChanBuffer set the channel buffer size of consumer.

func (*Config) SyncMode

func (c *Config) SyncMode() *Config

SyncMode will switch the kafka producer to sync mode.

func (*Config) ThroughputFirst

func (c *Config) ThroughputFirst() *Config

ThroughputFirst set the QoS to throughput first.

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer is a kafka low level consumer that can consumer multiple zone/cluster kafka clusters.

func NewConsumer

func NewConsumer(dsns []string, cf *Config) *Consumer

NewConsumer returns a kafka consumer.

func (*Consumer) Errors

func (c *Consumer) Errors() <-chan error

func (*Consumer) Messages

func (c *Consumer) Messages() <-chan *sarama.ConsumerMessage

func (*Consumer) Start

func (c *Consumer) Start() error

func (*Consumer) Stop

func (c *Consumer) Stop() error

type Producer

type Producer struct {

	// Send will send a kafka message.
	Send func(*sarama.ProducerMessage) error
	// contains filtered or unexported fields
}

Producer is a uniform kafka producer that is transparent for sync/async mode.

func NewProducer

func NewProducer(name string, brokers []string, cf *Config) *Producer

NewProducer creates a uniform kafka producer.

func (*Producer) ClientID

func (p *Producer) ClientID() string

ClientID returns the client id for the kafka connection.

func (*Producer) Close

func (p *Producer) Close() error

Close will drain and close the Producer.

func (*Producer) SetErrorHandler

func (p *Producer) SetErrorHandler(f func(err *sarama.ProducerError)) error

SetErrorHandler setup the async producer unretriable errors, e.g: ErrInvalidPartition, ErrMessageSizeTooLarge, ErrIncompleteResponse ErrBreakerOpen(e,g. update leader fails). And it is *REQUIRED* for async producer. For sync producer it is not allowed.

func (*Producer) SetSuccessHandler

func (p *Producer) SetSuccessHandler(f func(err *sarama.ProducerMessage)) error

SetSuccessHandler sets the success produced message callback for async producer. And it is *REQUIRED* for async producer. For sync producer it is not allowed.

func (*Producer) Start

func (p *Producer) Start() error

Start is REQUIRED before the producer is able to produce.

type QoS

type QoS uint8

QoS is the quality of service for kafka producer.

const (
	ThroughputFirst QoS = 1
	LossTolerant    QoS = 2
)

Directories

Path Synopsis
A script to test kafka async and ack mechanism.
A script to test kafka async and ack mechanism.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL