kafka

package module
v0.0.0-...-74d4f35 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 21, 2021 License: MIT Imports: 7 Imported by: 1

Documentation

Index

Constants

View Source
const (
	// HeaderPartition Topic partition where Message was stored in Apache Kafka commit log
	HeaderPartition = "quark-kafka-partition"
	// HeaderOffset Topic partition offset, item number inside an specific Topic partition
	HeaderOffset = "quark-kafka-offset"
	// HeaderKey Message key on a Apache Kafka Topic, used to group multiple messages by its ID, it can be also used
	// to compact commit logs by updating and then deleting previous item versions (removes duplicates)
	HeaderKey = "quark-kafka-key"
	// HeaderValue Apache Kafka message body in binary format
	HeaderValue = "quark-kafka-value"
	// HeaderTimestamp Apache Kafka insertion time
	HeaderTimestamp = "quark-kafka-timestamp"
	// HeaderBlockTimestamp Apache Kafka producer insertion time
	HeaderBlockTimestamp = "quark-kafka-block-timestamp"
	// HeaderMemberId Member unique identifier from an Apache Kafka Consumer Group
	HeaderMemberId = "quark-kafka-member-id"
	// HeaderGenerationId Unique identifier when Topic partition list is requested when joining an Apache Kafka
	// Consumer Group
	HeaderGenerationId = "quark-kafka-generation-id"
	// HeaderHighWaterMarkOffset Last message that was successfully copied to all of the log’s replicas in an Apache
	// Kafka cluster
	HeaderHighWaterMarkOffset = "quark-kafka-high-water-mark-offset"
)

Variables

This section is empty.

Functions

func MarshalKafkaHeaders

func MarshalKafkaHeaders(msg *quark.Message) []sarama.RecordHeader

MarshalKafkaHeaders parses the given Message and its metadata into Apache Kafka's header types

func MarshalKafkaMessage

func MarshalKafkaMessage(msg *quark.Message) *sarama.ProducerMessage

MarshalKafkaMessage parses the given Message into a Apache Kafka producer message

func NewKafkaBroker

func NewKafkaBroker(cfg *sarama.Config, opts ...quark.Option) *quark.Broker

NewKafkaBroker allocates and returns a Kafka Broker

func NewKafkaHeader

func NewKafkaHeader(msg *sarama.ConsumerMessage) quark.Header

NewKafkaHeader creates a Message Header from an Apache Kafka message

func UnmarshalKafkaHeaders

func UnmarshalKafkaHeaders(headers []*sarama.RecordHeader, msg *quark.Message)

UnmarshalKafkaHeaders parses the given Apache Kafka headers into the given Quark Message

func UnmarshalKafkaMessage

func UnmarshalKafkaMessage(msgKafka *sarama.ConsumerMessage, msg *quark.Message)

UnmarshalKafkaMessage parses the given Apache Kafka message into a Message

Types

type KafkaConfiguration

type KafkaConfiguration struct {
	Config   *sarama.Config
	Consumer KafkaConsumerConfig
	Producer KafkaProducerConfig
}

KafkaConfiguration Apache Kafka specific Broker and Consumer configuration, overrides default values, contains from basic configuration to functions serving as Hooks when an action was dispatched

type KafkaConsumerConfig

type KafkaConsumerConfig struct {
	GroupHandler     sarama.ConsumerGroupHandler
	PartitionHandler KafkaPartitionConsumer
	Topic            KafkaConsumerTopicConfig
	// Hooks
	OnReceived func(context.Context, *sarama.ConsumerMessage)
}

KafkaConsumerConfig Apache Kafka consumer configuration

type KafkaConsumerTopicConfig

type KafkaConsumerTopicConfig struct {
	Partition int32
	Offset    int64
}

KafkaConsumerTopicConfig Apache Kafka configuration used to override default consuming values

type KafkaPartitionConsumer

type KafkaPartitionConsumer interface {
	Consume(context.Context, sarama.PartitionConsumer, *quark.Consumer, quark.EventWriter)
}

KafkaPartitionConsumer This consumer is the default way to consume messages from Apache Kafka.

It pulls messages from an specific partition inside an Apache Kafka cluster or Broker. This way of consuming messages is useful when actual parallelization of the process itself is required. When in a pool, it will pull messages for each consumer running in a Worker, running the process at the same time in a worker pool.

type KafkaProducerConfig

type KafkaProducerConfig struct {
	// Hooks
	OnSent func(ctx context.Context, message *sarama.ProducerMessage, partition int32, offset int64)
}

KafkaProducerConfig Apache Kafka producer configuration

type KafkaPublisher

type KafkaPublisher struct {
	// contains filtered or unexported fields
}

KafkaPublisher Quark default publisher for Kafka

func NewKafkaPublisher

func NewKafkaPublisher(cfg KafkaConfiguration, addrs ...string) *KafkaPublisher

NewKafkaPublisher allocates a new KafkaPublisher

func (*KafkaPublisher) Publish

func (d *KafkaPublisher) Publish(ctx context.Context, messages ...*quark.Message) error

Directories

Path Synopsis
examples

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL