skafka

package
v0.1.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 29, 2022 License: MIT Imports: 8 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func DefaultDialer

func DefaultDialer() *kafka.Dialer

DefaultDialer 获取默认的链接对象

func DefaultReaderConfig

func DefaultReaderConfig() *kafka.ReaderConfig

读取器配置设置 DefaultReaderConfig 默认读取器配置

func DefaultWriterConfig

func DefaultWriterConfig() *kafka.WriterConfig

写入器配置设置 DefaultWriterConfig 默认写入器器配置

func GetDialer

func GetDialer(name string) (*kafka.Dialer, bool)

GetDialer 获取链接

func InitDefaultDialer

func InitDefaultDialer(opts ...KafkaDialerOptionHandler)

InitDefaultDialer 初始化默认的Dialer

func NewDialer

func NewDialer(name string, opts ...KafkaDialerOptionHandler)

NewDialer 创建新的拨号器

Types

type Closer

type Closer func() error

func NewReader

func NewReader(opts ...KafkaReaderOptionHandler) (*kafka.Reader, Closer)

NewReader 创建新的Kafka读取对象

func NewReaderChannel

func NewReaderChannel(opts ...KafkaReaderOptionHandler) (chan kafka.Message, Closer)

NewReaderChannel 创建新的读取器 并将读取内容输出到管道

func NewWriter

func NewWriter(opts ...KafkaWriterOptionHandler) (*kafka.Writer, Closer)

NewWriter 创建新的Kafka数据写入器

func NewWriter1

func NewWriter1(opts ...KafkaWriterOptionHandler) (func(context.Context, ...kafka.Message) error, Closer)

NewWriter1 创建新的Kafka写入器

type Dialers

type Dialers struct {
	// contains filtered or unexported fields
}

func (*Dialers) Get

func (d *Dialers) Get(name string) (*kafka.Dialer, bool)

Get 获取链接器

func (*Dialers) Set

func (d *Dialers) Set(name string, dialer *kafka.Dialer)

Set 存储一个链接器

type KafkaDialerOptionHandler

type KafkaDialerOptionHandler func(*kafka.Dialer)

KafkaDialerOptionHandler Kafka链接器配置选项

func D_WithCA

func D_WithCA(ca string) KafkaDialerOptionHandler

D_WithCA 设置证书

func D_WithUserNamePassword

func D_WithUserNamePassword(uname string, password string) KafkaDialerOptionHandler

D_WithUserNamePassword 设置用户名称密码

type KafkaReaderOptionHandler

type KafkaReaderOptionHandler func(*kafka.ReaderConfig)

KafkaOptionHandler Kafka配置选项

func R_WithBrokers

func R_WithBrokers(addrs []string) KafkaReaderOptionHandler

R_WithBroker 读取器 配置Broker

func R_WithDialer

func R_WithDialer(dialer *kafka.Dialer) KafkaReaderOptionHandler

R_WithDialer 读取器 配置链接器

func R_WithErrorLogger added in v0.1.1

func R_WithErrorLogger(fn meta.KafkaLoggerFunc) KafkaReaderOptionHandler

R_WithErrorLogger 读取器 错误日志

func R_WithGroupID

func R_WithGroupID(group string) KafkaReaderOptionHandler

R_WithGroup 读取器 配置消费者组

func R_WithLogger added in v0.1.1

R_WithLogger 读取器 日志

func R_WithMaxBytes

func R_WithMaxBytes(bcnt int) KafkaReaderOptionHandler

R_WithMaxBytes 读取器 配置最多读取字节数

func R_WithMinBytes

func R_WithMinBytes(bcnt int) KafkaReaderOptionHandler

R_WithMinBytes 读取器 配置最少读取字节数

func R_WithTopic

func R_WithTopic(topic string) KafkaReaderOptionHandler

R_WithTopic 读取器 配置主题

type KafkaWriterOptionHandler

type KafkaWriterOptionHandler func(*kafka.WriterConfig)

KafkaOptionHandler Kafka配置选项

func W_WithBatchBytes

func W_WithBatchBytes(bcnt int) KafkaWriterOptionHandler

W_WithBatchBytes 写入器 配置批量发送字节数上限

func W_WithBrokers

func W_WithBrokers(addrs []string) KafkaWriterOptionHandler

W_WithBroker 写入器 配置Broker

func W_WithDialer

func W_WithDialer(dialer *kafka.Dialer) KafkaWriterOptionHandler

W_WithDialer 写入器 配置链接器

func W_WithErrorLogger added in v0.1.1

func W_WithErrorLogger(fn meta.KafkaLoggerFunc) KafkaWriterOptionHandler

W_WithErrorLogger 写入器 错误日志

func W_WithLogger added in v0.1.1

W_WithLogger 写入器 日志

func W_WithTopic

func W_WithTopic(topic string) KafkaWriterOptionHandler

W_WithTopic 写入器 配置主题

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL