redelivery

package
v0.0.0-...-8f4c080 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 17, 2022 License: Apache-2.0 Imports: 15 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type AssignedPartition

type AssignedPartition interface {
	// AddMessage adds a marker message to this partition.
	// `now` is the local, monotonic clock time
	AddMessage(msg *confluentKafka.Message, now time.Time) error

	// SmallestMarkerOffset returns the smallest offset of all marker messages currently being tracked
	SmallestMarkerOffset() (int64, bool)

	// Redeliver determines which messages have expired redelivery deadlines and initiates redelivery.
	// `now` is the local, monotonic clock time
	Redeliver(now time.Time)

	// Close releases the resources consumed by this partition instance
	Close()
}

AssignedPartition allows the marker consumer to track consumption progress as well as which messages require redelivery for a specific partition in the marker topic

func NewAssignedPartition

func NewAssignedPartition(markersQueue MarkersQueue, redeliverer Redeliverer, durationUseNowIfNoMarkerSeen time.Duration) AssignedPartition

NewAssignedPartition constructs a new AssignedPartiton instance

type MarkerConsumer

type MarkerConsumer interface {
	// Start the `MarkerConsumer`
	Start()

	// Stop the `MarkerConsumer`, it is unusable after it is stopped
	Stop()
}

MarkerConsumer encapsulates the ability to consume markers from the markers topic and redeliver messages redelivery deadlines have expired

func NewMarkerConsumer

func NewMarkerConsumer(config queue.Config, clients kafka.ClientFactory) (MarkerConsumer, error)

NewMarkerConsumer constructs a new `MarkerConsumer` instance

type MarkerDeadline

type MarkerDeadline struct {
	MessageID          MessageID
	RedeliveryDeadline time.Time
	// contains filtered or unexported fields
}

MarkerDeadline associates a message with its redelivery deadline

type MarkerDeadlinesPQ

type MarkerDeadlinesPQ struct {
	// contains filtered or unexported fields
}

MarkerDeadlinesPQ is a priority queue to track messages by their redelivery deadline in ascending order

func NewMarkerDeadlinesPQ

func NewMarkerDeadlinesPQ() *MarkerDeadlinesPQ

NewMarkerDeadlinesPQ constructs a new `MarkerDeadlinesPQ` instance.

func (*MarkerDeadlinesPQ) Dequeue

func (pq *MarkerDeadlinesPQ) Dequeue() (MarkerDeadline, bool)

Dequeue removes the `MarkerDeadline` item with the earliest redelivery deadline from the head of the priotity queue

func (*MarkerDeadlinesPQ) Enqueue

func (pq *MarkerDeadlinesPQ) Enqueue(markerDeadline MarkerDeadline) bool

Enqueue adds a new `MarkerDeadline` item to the priotity queue

func (*MarkerDeadlinesPQ) Head

func (pq *MarkerDeadlinesPQ) Head() (MarkerDeadline, bool)

Head peeks at the `MarkerDeadline` item with the earliest redelivery deadline at the head of the queue Returns `(MarkerDeadline{}, false)` if the queue is empty.

func (*MarkerDeadlinesPQ) Update

func (pq *MarkerDeadlinesPQ) Update(messageID MessageID, redeliveryDeadline time.Time) bool

Update modifies the redelivery deadline of a marker in the priority queue.

type MarkerID

type MarkerID = RecordID

MarkerID uniquely identifies a record from the marker topic

func NewMarkerID

func NewMarkerID(partition int32, offset int64) MarkerID

NewMarkerID constructs an id that uniquely identifies a marker record.

type MarkerOffset

type MarkerOffset struct {
	MessageID      MessageID
	OffsetOfMarker int64
}

MarkerOffset associates a message with the location of the marker that tracks it in the maker topic

type MarkerOffsetsPQ

type MarkerOffsetsPQ struct {
	// contains filtered or unexported fields
}

MarkerOffsetsPQ is a priority queue to track markers by their offset in the marker topic in ascending order

func NewMarkerOffsetsPQ

func NewMarkerOffsetsPQ() *MarkerOffsetsPQ

NewMarkerOffsetsPQ constructs a new `MarkerOffsetsPQ` instance.

func (*MarkerOffsetsPQ) Dequeue

func (pq *MarkerOffsetsPQ) Dequeue() (MarkerOffset, bool)

Dequeue removes the `MarkerOffset` item with the smallest offset value from the head of the priotity queue

func (*MarkerOffsetsPQ) Enqueue

func (pq *MarkerOffsetsPQ) Enqueue(markerOffset MarkerOffset) bool

Enqueue adds a new `MarkerOffset` item to the priotity queue

func (*MarkerOffsetsPQ) Head

func (pq *MarkerOffsetsPQ) Head() (MarkerOffset, bool)

Head peeks at the `MarkerOffset` item with the smallest offset value at the head of the queue Returns `(MarkerOffset{}, false)` if the queue is empty.

type MarkersQueue

type MarkersQueue interface {
	// Partition returns the ordinal of the partition that the queue is tracking
	Partition() int32

	// AddMarker adds a marker to the queue.
	AddMarker(offset int64, timestamp time.Time, marker *kafkamq.Marker)

	// MarkersToRedelivery returns the markers that require redelivery given the specified timestamp.
	MarkersToRedeliver(now time.Time) []*kafkamq.Marker

	// SmallestMarkerOffset returns the smallest marker offset tracked by the queue.
	// Returns `(0, false)` if the queue is empty.
	SmallestMarkerOffset() (int64, bool)
}

MarkersQueue is the datastructure used to track Markers consumed from the markers topic that determines:

  1. Which messages currently are in-progress, including their payload.
  2. Which messages require redelivery (based on marker redelivery deadlines)
  3. The offset up to which markers have been consumed from the markers topic; this offset is used when committing marker consumer offsets to Kafka.

Note that the MarkersQueue is not thread-safe as it is only meant to be used by a single Kafka consumer thread.

func NewMarkersQueue

func NewMarkersQueue(partition int32) MarkersQueue

NewMarkersQueue constructs a new MarkersQueue instance

type MessageID

type MessageID = RecordID

MessageID uniquely identifies a record from the queue topic

func MessageIDFromMarker

func MessageIDFromMarker(marker *kafkamq.Marker) MessageID

MessageIDFromMarker returns an identifier that uniquely identifies the message record from the queue topic that the specified `marker` tracks.

func NewMessageID

func NewMessageID(partition int32, offset int64) MessageID

NewMessageID constructs an id that uniquely identifies a message record.

type OffsetCommitter

type OffsetCommitter interface {
	// AddMarker registers a marker to be committed
	AddMarker(MarkerID)

	// DropOffsets drops any cached offsets for the specified partition
	DropOffsets(partition int32)
}

OffsetCommitter allows the redelivery tracker to periodically commit batches of marker offsets to Kafka

func NewOffsetCommitter

func NewOffsetCommitter(markerTopic string, numOffsetsPerCommit uint, kConsumer kafka.Consumer) OffsetCommitter

NewOffsetCommitter constructs a new OffsetCommitter instance that can be used to periodically commit consumer offsets for the specified `markerTopic`. `numOffsetsPerCommit` is the number of offsets that must be added before a commit is performed.

type RecordID

type RecordID struct {
	// contains filtered or unexported fields
}

RecordID uniqely identifies a record in a kafka topic by using the record's location (partition:offset)

type Redeliverer

type Redeliverer interface {
	Redeliver(markers []*kafkamq.Marker)
	Stop()
}

Redeliverer provides functionality to redeliver messages associated with markers whose deadlines have expired.

with the mechanics of message redelivery.

func NewRedeliverer

func NewRedeliverer(partition int32, messageSender internal.MessageSender, markerProducer internal.MarkerProducer, backoffPolicy backoff.BackOff) Redeliverer

NewRedeliverer constructs a `Redeliverer` instance that can be used to redeliver messages associated with markers whose deadlines have expired.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL