kafka

package
v0.0.0-...-61829c1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 26, 2019 License: Apache-2.0 Imports: 11 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func PartitionerConstructor

func PartitionerConstructor(topic string) sarama.Partitioner

func SendNotify

func SendNotify(client sarama.Client, topic, key string, notify interface{}) error

Types

type ConsumerConfig

type ConsumerConfig struct {
	Retry             RetryConfig
	Fetch             FetchConfig
	MaxWaitTime       int64
	MaxProcessingTime int64
	Return            ReturnConfig
	Offsets           OffsetsConfig
}

func (ConsumerConfig) MarshalDefaults

func (consumerConfig ConsumerConfig) MarshalDefaults(v interface{})

type ConsumerKafkaConfig

type ConsumerKafkaConfig struct {
	ConsumerKafka     KafkaConfig
	Zookeeper         ZookeeperConfig
	Topics            []string
	ConsumerGroupName string
	// contains filtered or unexported fields
}

func (*ConsumerKafkaConfig) GetCG

func (consumerKafkaConfig *ConsumerKafkaConfig) GetCG() *consumergroup.ConsumerGroup

func (*ConsumerKafkaConfig) GetKafkaClient

func (consumerKafkaConfig *ConsumerKafkaConfig) GetKafkaClient() sarama.Client

func (*ConsumerKafkaConfig) Init

func (consumerKafkaConfig *ConsumerKafkaConfig) Init()

func (ConsumerKafkaConfig) MarshalDefaults

func (consumerKafkaConfig ConsumerKafkaConfig) MarshalDefaults(v interface{})

func (ConsumerKafkaConfig) NewKafkaConsumer

func (consumerKafkaConfig ConsumerKafkaConfig) NewKafkaConsumer() (cg *consumergroup.ConsumerGroup, err error)

type FetchConfig

type FetchConfig struct {
	Min     int32
	Default int32
	Max     int32
}

func (FetchConfig) MarshalDefaults

func (fetchConfig FetchConfig) MarshalDefaults(v interface{})

type FlushConfig

type FlushConfig struct {
	MaxMessages int
}

func (FlushConfig) MarshalDefaults

func (flushConfig FlushConfig) MarshalDefaults(v interface{})

type KafkaConfig

type KafkaConfig struct {
	Name     string
	Addrs    []string `conf:"env"`
	Net      NetConfig
	MetaData MetaDataConfig
	Producer ProducerConfig
	Consumer ConsumerConfig
	Version  string
}

func (KafkaConfig) DockerDefaults

func (kafkaConfig KafkaConfig) DockerDefaults() conf.DockerDefaults

func (KafkaConfig) MarshalDefaults

func (kafkaConfig KafkaConfig) MarshalDefaults(v interface{})

func (KafkaConfig) NewKafkaClient

func (kafkaConfig KafkaConfig) NewKafkaClient() (kafkaClient sarama.Client, err error)

type MetaDataConfig

type MetaDataConfig struct {
	Retry            RetryConfig
	RefreshFrequency int64
}

func (MetaDataConfig) MarshalDefaults

func (metaDataConfig MetaDataConfig) MarshalDefaults(v interface{})

type NetConfig

type NetConfig struct {
	MaxOpenRequests int
	DialTimeout     int64
	ReadTimeout     int64
	WriteTimeout    int64
	KeepAlive       int64
	SASL            SASLConfig
}

func (NetConfig) MarshalDefaults

func (netConfig NetConfig) MarshalDefaults(v interface{})

type OffsetsConfig

type OffsetsConfig struct {
	CommitInterval int64
	Initial        int64
	Retention      int64
}

func (OffsetsConfig) MarshalDefaults

func (offsetsConfig OffsetsConfig) MarshalDefaults(v interface{})

type ProducerConfig

type ProducerConfig struct {
	MaxMessageBytes int
	RequiredAcks    int
	Timeout         int64
	Compression     int8
	Return          ReturnConfig
	Flush           FlushConfig
	Retry           RetryConfig
}

func (ProducerConfig) MarshalDefaults

func (producerConfig ProducerConfig) MarshalDefaults(v interface{})

type ProducerKafkaConfig

type ProducerKafkaConfig struct {
	ProducerKafka KafkaConfig
	// contains filtered or unexported fields
}

func (*ProducerKafkaConfig) GetKafkaClient

func (producerKafkaConfig *ProducerKafkaConfig) GetKafkaClient() sarama.Client

func (*ProducerKafkaConfig) Init

func (producerKafkaConfig *ProducerKafkaConfig) Init()

func (ProducerKafkaConfig) MarshalDefaults

func (producerKafkaConfig ProducerKafkaConfig) MarshalDefaults(v interface{})

type RetryConfig

type RetryConfig struct {
	Max     int
	Backoff int64
}

func (RetryConfig) MarshalDefaults

func (retryConfig RetryConfig) MarshalDefaults(v interface{})

type ReturnConfig

type ReturnConfig struct {
	Successes bool
	Errors    bool
}

func (ReturnConfig) MarshalDefaults

func (returnConfig ReturnConfig) MarshalDefaults(v interface{})

type SASLConfig

type SASLConfig struct {
	Enable    bool
	Handshake bool
	User      string
	Password  string
}

type ZookeeperConfig

type ZookeeperConfig struct {
	Name    string
	Addrs   []string `conf:"env"`
	Timeout int64
	Chroot  string
}

func (ZookeeperConfig) DockerDefaults

func (zookeeperConfig ZookeeperConfig) DockerDefaults() conf.DockerDefaults

func (ZookeeperConfig) MarshalDefaults

func (zookeeperConfig ZookeeperConfig) MarshalDefaults(v interface{})

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL