kafka

package
v0.0.0-...-72d088b Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 20, 2023 License: Apache-2.0 Imports: 17 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Errorf

func Errorf(code int, format string, a ...interface{}) error

Errorf returns an error object for the code, message and error info.

func InitLogger

func InitLogger(log *logger.Logger)

Types

type ConsumerMessage

type ConsumerMessage struct {
	Key, Value []byte
	MessageID  string
	Topic      string
	Partition  int32
	Offset     int64

	Headers        []*RecordHeader // only set if kafka is version 0.11+
	CreateAt       time.Time
	Timestamp      time.Time // only set if kafka is version 0.10+, inner message timestamp
	BlockTimestamp time.Time // only set if kafka is version 0.10+, outer (compressed) block timestamp
}

ConsumerMessage encapsulates a Kafka message returned by the consumer.

type Error

type Error struct {
	Code    int
	Message string
	Err     error // 原始错误
}

func ClientClose

func ClientClose(message string) *Error

ClientClose

func New

func New(code int, message string) *Error

New returns an error object for the code, message.

func Newf

func Newf(code int, format string, a ...interface{}) *Error

Newf New(code fmt.Sprintf(format, a...))

func (*Error) Error

func (e *Error) Error() string

func (*Error) Unwrap

func (e *Error) Unwrap() error

type Handler

type Handler func(ctx context.Context, session sarama.ConsumerGroupSession, message *ConsumerMessage) error

type Metadata

type Metadata struct {
	// contains filtered or unexported fields
}

Metadata 元数据

type ProducerError

type ProducerError struct {
	Msg *ProducerMessage
	Err error
}

ProducerError is the type of error generated when the producer fails to deliver a message. It contains the original ProducerMessage as well as the actual error value.

type ProducerMessage

type ProducerMessage struct {
	Topic string // The Kafka topic for this message.
	// The partitioning key for this message. Pre-existing Encoders include
	// StringEncoder and ByteEncoder.
	Key string
	// The actual message to store in Kafka. Pre-existing Encoders include
	// StringEncoder and ByteEncoder.
	Value []byte

	// This field is used to hold arbitrary data you wish to include so it
	// will be available when receiving on the Successes and Errors channels.
	// Sarama completely ignores this field and is only to be used for
	// pass-through data.
	Metadata interface{}

	// Offset is the offset of the message stored on the broker. This is only
	// guaranteed to be defined if the message was successfully delivered and
	// RequiredAcks is not NoResponse.
	Offset int64
	// Partition is the partition that the message was sent to. This is only
	// guaranteed to be defined if the message was successfully delivered.
	Partition int32
	// Timestamp is the timestamp assigned to the message by the broker. This
	// is only guaranteed to be defined if the message was successfully
	// delivered, RequiredAcks is not NoResponse, and the Kafka broker is at
	// least version 0.10.0.
	Timestamp time.Time

	// MessageID
	MessageID string
}

type Reader

type Reader interface {
	FetchMessage(ctx context.Context, handler Handler) error
	CommitMessage(ctx context.Context, session sarama.ConsumerGroupSession, message *ConsumerMessage) error
	Close() error
}

func NewReader

func NewReader(brokers []string, topic, group string, opts ...ReaderOpt) Reader

type ReaderOpt

type ReaderOpt func(o *ReaderOpts)

func ReaderCommitInterval

func ReaderCommitInterval(commitInterval int) ReaderOpt

func ReaderConsumeFrom

func ReaderConsumeFrom(consumeFrom string) ReaderOpt

func ReaderServiceName

func ReaderServiceName(serviceName string) ReaderOpt

func ReaderStartOffset

func ReaderStartOffset(offset StartOffset) ReaderOpt

type ReaderOpts

type ReaderOpts struct {
	ServiceName string
	// The list of broker addresses used to connect to the kafka cluster.
	Brokers []string

	// The topic to read messages from.
	Topic string

	// GroupID holds the optional consumer group id.  If GroupID is specified, then
	// Partition should NOT be specified e.g. 0
	GroupID string

	// StartOffset determines from whence the consumer group should begin
	// consuming when it finds a partition without a committed offset.  If
	// non-zero, it must be set to one of FirstOffset or LastOffset.
	//
	// Default: FirstOffset
	//
	// Only used when Group is set
	StartOffset StartOffset

	ConsumeFrom string

	CommitInterval int
}

type RecordHeader

type RecordHeader struct {
	Key   []byte
	Value []byte
}

RecordHeader stores key and value for a record header

type RequiredAck

type RequiredAck int16
const (
	// NoResponse doesn't send any response, the TCP ACK is all you get.
	NoResponse RequiredAck = 0
	// WaitForLocal waits for only the local commit to succeed before responding.
	WaitForLocal RequiredAck = 1
	// WaitForAll waits for all in-sync replicas to commit before responding.
	// The minimum number of in-sync replicas is configured on the broker via
	// the `min.insync.replicas` configuration key.
	WaitForAll RequiredAck = -1
)

type StartOffset

type StartOffset int
const (
	// OffsetNewest stands for the log head offset, i.e. the offset that will be
	// assigned to the next message that will be produced to the partition. You
	// can send this to a client's GetOffset method to get this offset, or when
	// calling ConsumePartition to start consuming new messages.
	OffsetNewest StartOffset = -1
	// OffsetOldest stands for the oldest offset available on the broker for a
	// partition. You can send this to a client's GetOffset method to get this
	// offset, or when calling ConsumePartition to start consuming from the
	// oldest offset that is still available on the broker.
	OffsetOldest StartOffset = -2
)

type Writer

type Writer interface {
	SendMessage(ctx context.Context, message *ProducerMessage) error
	Errors() <-chan *ProducerError
	Messages() <-chan *ProducerMessage
	Close() (err error)
}

func NewWriter

func NewWriter(brokers []string, opts ...WriterOpt) Writer

NewWriter 初始化

type WriterOpt

type WriterOpt func(o *WriterOpts)

func WriterAsync

func WriterAsync(async bool) WriterOpt

func WriterMaxAttempts

func WriterMaxAttempts(num int) WriterOpt

func WriterProducer

func WriterProducer(producer string) WriterOpt

func WriterReadTimeout

func WriterReadTimeout(readTimeout int) WriterOpt

func WriterRequiredAck

func WriterRequiredAck(requiredAck RequiredAck) WriterOpt

func WriterServiceName

func WriterServiceName(serviceName string) WriterOpt

type WriterOpts

type WriterOpts struct {
	ServiceName string
	ProducerTo  string
	Brokers     []string
	// Limit on how many attempts will be made to deliver a message.
	//
	// The default is to try at most 10 times.
	MaxAttempts int

	// Number of acknowledges from partition replicas required before receiving
	// a response to a produce request. The default is -1, which means to wait for
	// all replicas, and a value above 0 is required to indicate how many replicas
	// should acknowledge a message to be considered successful.
	//
	// This version of kafka-go (v0.3) does not support 0 required acks, due to
	// some internal complexity implementing this with the Kafka protocol. If you
	// need that functionality specifically, you'll need to upgrade to v0.4.
	RequiredAck RequiredAck

	ReadTimeout int

	Async bool
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL