kafka

package
v0.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 9, 2023 License: Apache-2.0 Imports: 17 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func InitLogger

func InitLogger(log logger.Logger)

func ToProducerMessage

func ToProducerMessage(message *ProducerMessage) (msg *sarama.ProducerMessage)

Types

type ConsumerMessage

type ConsumerMessage struct {
	Key, Value []byte
	Topic      string
	Partition  int32
	Offset     int64

	Headers        []*RecordHeader // only set if kafka is version 0.11+
	Timestamp      time.Time       // only set if kafka is version 0.10+, inner message timestamp
	BlockTimestamp time.Time       // only set if kafka is version 0.10+, outer (compressed) block timestamp
}

ConsumerMessage encapsulates a Kafka message returned by the consumer.

type Handler

type Handler func(ctx context.Context, session sarama.ConsumerGroupSession, message *ConsumerMessage) error

type ProducerError

type ProducerError struct {
	Msg *ProducerMessage
	Err error
}

ProducerError is the type of error generated when the producer fails to deliver a message. It contains the original ProducerMessage as well as the actual error value.

type ProducerMessage

type ProducerMessage struct {
	Topic string // The Kafka topic for this message.
	// The partitioning key for this message. Pre-existing Encoders include
	// StringEncoder and ByteEncoder.
	Key string
	// The actual message to store in Kafka. Pre-existing Encoders include
	// StringEncoder and ByteEncoder.
	Value []byte

	// This field is used to hold arbitrary data you wish to include so it
	// will be available when receiving on the Successes and Errors channels.
	// Sarama completely ignores this field and is only to be used for
	// pass-through data.
	Metadata interface{}

	// Offset is the offset of the message stored on the broker. This is only
	// guaranteed to be defined if the message was successfully delivered and
	// RequiredAcks is not NoResponse.
	Offset int64
	// Partition is the partition that the message was sent to. This is only
	// guaranteed to be defined if the message was successfully delivered.
	Partition int32
	// Timestamp is the timestamp assigned to the message by the broker. This
	// is only guaranteed to be defined if the message was successfully
	// delivered, RequiredAcks is not NoResponse, and the Kafka broker is at
	// least version 0.10.0.
	Timestamp time.Time

	// MessageID
	MessageID string
}

type Reader

type Reader interface {
	FetchMessage(ctx context.Context, handler Handler) error
	CommitMessage(ctx context.Context, session sarama.ConsumerGroupSession, message *ConsumerMessage) error
	Close() error
}

func InitReader

func InitReader(brokers []string, topic, group string) (Reader, error)

func NewReader

func NewReader(brokers []string, topic, group string, opts ...ReaderOpt) (Reader, error)

type ReaderOpt

type ReaderOpt func(o *ReaderOpts)

func ReaderCommitInterval

func ReaderCommitInterval(commitInterval int) ReaderOpt

func ReaderLogger

func ReaderLogger(logger logger.Logger) ReaderOpt

func ReaderServiceName

func ReaderServiceName(serviceName string) ReaderOpt

func ReaderStartOffset

func ReaderStartOffset(offset StartOffset) ReaderOpt

type ReaderOpts

type ReaderOpts struct {
	ServiceName string
	// The list of broker addresses used to connect to the kafka cluster.
	Brokers []string

	// The topic to read messages from.
	Topic string

	// GroupID holds the optional consumer group id.  If GroupID is specified, then
	// Partition should NOT be specified e.g. 0
	GroupID string

	// StartOffset determines from whence the consumer group should begin
	// consuming when it finds a partition without a committed offset.  If
	// non-zero, it must be set to one of FirstOffset or LastOffset.
	//
	// Default: FirstOffset
	//
	// Only used when Group is set
	StartOffset StartOffset

	CommitInterval int

	Logger logger.Logger
}

type RecordHeader

type RecordHeader struct {
	Key   []byte
	Value []byte
}

RecordHeader stores key and value for a record header

type RequiredAck

type RequiredAck int16
const (
	// NoResponse doesn't send any response, the TCP ACK is all you get.
	NoResponse RequiredAck = 0
	// WaitForLocal waits for only the local commit to succeed before responding.
	WaitForLocal RequiredAck = 1
	// WaitForAll waits for all in-sync replicas to commit before responding.
	// The minimum number of in-sync replicas is configured on the broker via
	// the `min.insync.replicas` configuration key.
	WaitForAll RequiredAck = -1
)

type StartOffset

type StartOffset int
const (
	// OffsetNewest stands for the log head offset, i.e. the offset that will be
	// assigned to the next message that will be produced to the partition. You
	// can send this to a client's GetOffset method to get this offset, or when
	// calling ConsumePartition to start consuming new messages.
	OffsetNewest StartOffset = -1
	// OffsetOldest stands for the oldest offset available on the broker for a
	// partition. You can send this to a client's GetOffset method to get this
	// offset, or when calling ConsumePartition to start consuming from the
	// oldest offset that is still available on the broker.
	OffsetOldest StartOffset = -2
)

type Writer

type Writer interface {
	SendMessage(ctx context.Context, message *ProducerMessage) error
	Errors() <-chan *ProducerError
	Messages() <-chan *ProducerMessage
	Close() (err error)
}

func NewWriter

func NewWriter(brokers []string, opts ...WriterOpt) (Writer, error)

NewWriter 初始化

type WriterOpt

type WriterOpt func(o *WriterOpts)

func WriterAsync

func WriterAsync(async bool) WriterOpt

func WriterLogger

func WriterLogger(logger logger.Logger) WriterOpt

func WriterMaxAttempts

func WriterMaxAttempts(num int) WriterOpt

func WriterReadTimeout

func WriterReadTimeout(readTimeout int) WriterOpt

func WriterRequiredAck

func WriterRequiredAck(requiredAck RequiredAck) WriterOpt

func WriterServiceName

func WriterServiceName(serviceName string) WriterOpt

type WriterOpts

type WriterOpts struct {
	ServiceName string
	Brokers     []string
	// Limit on how many attempts will be made to deliver a message.
	//
	// The default is to try at most 10 times.
	MaxAttempts int

	// Number of acknowledges from partition replicas required before receiving
	// a response to a produce request. The default is -1, which means to wait for
	// all replicas, and a value above 0 is required to indicate how many replicas
	// should acknowledge a message to be considered successful.
	//
	// This version of kafka-go (v0.3) does not support 0 required acks, due to
	// some internal complexity implementing this with the Kafka protocol. If you
	// need that functionality specifically, you'll need to upgrade to v0.4.
	RequiredAck RequiredAck

	ReadTimeout int

	Async bool

	Logger logger.Logger
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL