xkafka

package module
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 24, 2023 License: Apache-2.0 Imports: 14 Imported by: 1

README

clients

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewLogger

func NewLogger() *logger

func SetConsumer

func SetConsumer(name string, consumer *Consumer)

func SetProducer

func SetProducer(name string, producer *Producer)

Types

type Acknowledgment

type Acknowledgment struct {
	// contains filtered or unexported fields
}

func (*Acknowledgment) Acknowledge

func (a *Acknowledgment) Acknowledge()

type BatchListener

type BatchListener interface {
	Listen([]ConsumerMessage, *Acknowledgment)
	BatchCount() int
}

type Callback

type Callback func(*RecordMetadata, error)

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer 是一个group的消费者

func GetConsumer

func GetConsumer(name string) *Consumer

func NewConsumer

func NewConsumer(opt *Option, logger xlog.Logger, metrics metrics.Provider, tracer tracer.Provider) (*Consumer, error)

func (*Consumer) AddBatchListener

func (c *Consumer) AddBatchListener(topic string, listener BatchListener)

func (*Consumer) AddListener

func (c *Consumer) AddListener(topic string, listener Listener)

func (*Consumer) Options

func (c *Consumer) Options() Option

func (*Consumer) Start

func (c *Consumer) Start()

Start 启动后台消费任务

func (*Consumer) Stop

func (c *Consumer) Stop() error

Stop 停止后台消费任务

type ConsumerMessage

type ConsumerMessage struct {
	Topic     string
	Key       string
	Value     string
	Partition int32
	Offset    int64
}

type ConsumerMessageCarrier

type ConsumerMessageCarrier struct {
	*sarama.ConsumerMessage
	// contains filtered or unexported fields
}

consumer message carrier

func NewConsumerMessageCarrier

func NewConsumerMessageCarrier(m *sarama.ConsumerMessage) *ConsumerMessageCarrier

func (ConsumerMessageCarrier) Get

func (carrier ConsumerMessageCarrier) Get(key string) string

func (ConsumerMessageCarrier) Keys

func (carrier ConsumerMessageCarrier) Keys() []string

func (ConsumerMessageCarrier) Set

func (carrier ConsumerMessageCarrier) Set(key string, value string)

type Kafka

type Kafka struct {
	Opt      *Option
	Consumer *Consumer
	Producer *Producer
}

func New

func New(opt *Option, logger xlog.Logger, metrics metrics.Provider, tracer tracer.Provider) (*Kafka, error)

type Listener

type Listener interface {
	Listen(ConsumerMessage, *Acknowledgment)
}

type Option

type Option struct {
	Name            string
	Addr            []string
	Version         string
	MaxOpenRequests int
	DialTimeout     time.Duration
	ReadTimeout     time.Duration
	WriteTimeout    time.Duration
	SASL            struct {
		// Whether or not to use SASL authentication when connecting to the broker
		// (defaults to false).
		Enable bool
		// SASLMechanism is the name of the enabled SASL mechanism.
		// Possible values: OAUTHBEARER, PLAIN (defaults to PLAIN).
		Mechanism sarama.SASLMechanism
		// Version is the SASL Protocol Version to use
		// Kafka > 1.x should use V1, except on Azure EventHub which use V0
		Version int16
		// Whether or not to send the Kafka SASL handshake first if enabled
		// (defaults to true). You should only set this to false if you're using
		// a non-Kafka SASL proxy.
		Handshake bool
		// AuthIdentity is an (optional) authorization identity (authzid) to
		// use for SASL/PLAIN authentication (if different from User) when
		// an authenticated user is permitted to act as the presented
		// alternative user. See RFC4616 for details.
		AuthIdentity string
		// User is the authentication identity (authcid) to present for
		// SASL/PLAIN or SASL/SCRAM authentication
		User string
		// Password for SASL/PLAIN authentication
		Password string
		// authz id used for SASL/SCRAM authentication
		SCRAMAuthzID string
		// SCRAMClientGeneratorFunc is a generator of a user provided implementation of a SCRAM
		// client used to perform the SCRAM exchange with the server.
		SCRAMClientGeneratorFunc func() sarama.SCRAMClient
		// TokenProvider is a user-defined callback for generating
		// access tokens for SASL/OAUTHBEARER auth. See the
		// AccessTokenProvider interface docs for proper implementation
		// guidelines.
		TokenProvider sarama.AccessTokenProvider

		GSSAPI sarama.GSSAPIConfig
	}
	Metadata struct {
		Retries int
		Timeout time.Duration
	}
	Consumer struct {
		Group              string
		EnableAutoCommit   bool
		AutoCommitInterval time.Duration
		InitialOffset      int64
		SessionTimeout     time.Duration
		MinFetchBytes      int32
		DefaultFetchBytes  int32
		MaxFetchBytes      int32
		MaxFetchWait       time.Duration
		Retries            int
	}
	Producer struct {
		MaxMessageBytes  int
		Acks             sarama.RequiredAcks
		Timeout          time.Duration
		Retries          int
		MaxFlushBytes    int
		MaxFlushMessages int
		FlushFrequency   time.Duration
		Idempotent       bool
	}
	EnableMetrics bool
	EnableTracer  bool
}

func NewDefaultOptions

func NewDefaultOptions() *Option

type Producer

type Producer struct {
	// contains filtered or unexported fields
}

func GetProducer

func GetProducer(name string) *Producer

func NewProducer

func NewProducer(opt *Option, logger xlog.Logger, metrics metrics.Provider, tracer tracer.Provider) (*Producer, error)

NewProducer 创建一个异步的生产者

func (*Producer) Close

func (p *Producer) Close()

Close 关闭客户端,等待缓冲区完成读写再返回

func (*Producer) Options

func (p *Producer) Options() Option

func (*Producer) Send

func (p *Producer) Send(topic, value string, cb Callback)

Send 是异步发送接口

func (*Producer) SendMessage

func (p *Producer) SendMessage(message ProducerMessage, cb Callback)

SendMessage 是异步发送接口

func (*Producer) SyncSend

func (p *Producer) SyncSend(ctx context.Context, topic, value string) error

SyncSend 是同步发送接口。

func (*Producer) SyncSendMessage

func (p *Producer) SyncSendMessage(ctx context.Context, message ProducerMessage) error

SyncSendMessage 是同步发送接口。

type ProducerMessage

type ProducerMessage struct {
	Topic string
	Key   string
	Value string
}

type ProducerMessageCarrier

type ProducerMessageCarrier struct {
	*sarama.ProducerMessage
	// contains filtered or unexported fields
}

producer message carrier

func NewProducerMessageCarrier

func NewProducerMessageCarrier(m *sarama.ProducerMessage) *ProducerMessageCarrier

func (ProducerMessageCarrier) Get

func (carrier ProducerMessageCarrier) Get(key string) string

func (ProducerMessageCarrier) Keys

func (carrier ProducerMessageCarrier) Keys() []string

func (ProducerMessageCarrier) Set

func (carrier ProducerMessageCarrier) Set(key string, value string)

type RecordMetadata

type RecordMetadata struct {
	Topic     string
	KeySize   int
	ValueSize int
	Offset    int64
	Partition int32
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL