internal

package
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 5, 2024 License: Apache-2.0 Imports: 16 Imported by: 0

Documentation

Index

Constants

View Source
const (
	MarkerFolderName = "remote"
	MarkerFileName   = "segment_marker"

	MarkerFolderMode        os.FileMode = 0o700
	MarkerWindowsFolderMode os.FileMode = 0o777
	MarkerFileMode          os.FileMode = 0o600
	MarkerWindowsFileMode   os.FileMode = 0o666
)

Variables

This section is empty.

Functions

func DecodeMarkerV1

func DecodeMarkerV1(bs []byte) (uint64, error)

DecodeMarkerV1 decodes the segment number from a segment marker, encoded with EncodeMarkerV1.

func EncodeMarkerV1

func EncodeMarkerV1(segment uint64) ([]byte, error)

EncodeMarkerV1 encodes the segment number, from whom we need to create a marker, in the marker file format, which in v1 includes the segment number and a trailing CRC code of the first 10 bytes.

func FindMarkableSegment

func FindMarkableSegment(segmentDataCount map[int]*countDataItem, tooOldThreshold time.Duration) int

FindMarkableSegment finds, given the summary of data updates received, and a threshold on how much time can pass for a segment that hasn't received updates to be considered as "live", the segment that should be marked as last consumed. The algorithm will find the highest numbered segment that is considered as "consumed", with its all predecessors "consumed" as well.

A consumed segment is one with data count of zero, meaning that there's no data left in flight for it, or it hasn't received any updates for tooOldThreshold time.

Also, while reviewing the data items in segmentDataCount, those who are consumed will be deleted to clean up space.

This algorithm runs in O(N log N), being N the size of segmentDataCount, and allocates O(N) memory.

Types

type MarkerFileHandler

type MarkerFileHandler interface {
	wal.Marker

	// MarkSegment writes in the backing file-store that a particular segment is the last one marked.
	MarkSegment(segment int)
}

MarkerFileHandler is a file-backed wal.Marker, that also allows one to write to the backing store as particular segment number as the last one marked.

func NewMarkerFileHandler

func NewMarkerFileHandler(logger log.Logger, walDir string) (MarkerFileHandler, error)

NewMarkerFileHandler creates a new markerFileHandler.

type MarkerHandler

type MarkerHandler interface {
	wal.Marker

	// UpdateReceivedData sends an update event to the handler, that informs that some dataUpdate, coming from a particular WAL
	// segment, has been read out of the WAL and enqueued for sending.
	UpdateReceivedData(segmentId, dataCount int)

	// UpdateSentData sends an update event to the handler, informing that some dataUpdate, coming from a particular WAL
	// segment, has been delivered, or the sender has given up on it.
	UpdateSentData(segmentId, dataCount int) // Data which was sent or given up on sending

	// Stop stops the handler, and it's async processing of receive/send dataUpdate updates.
	Stop()
}

func NewMarkerHandler

func NewMarkerHandler(mfh MarkerFileHandler, maxSegmentAge time.Duration, logger log.Logger, metrics *MarkerMetrics) MarkerHandler

NewMarkerHandler creates a new markerHandler.

type MarkerMetrics

type MarkerMetrics struct {
	// contains filtered or unexported fields
}

func NewMarkerMetrics

func NewMarkerMetrics(reg prometheus.Registerer) *MarkerMetrics

func (*MarkerMetrics) WithCurriedId

func (m *MarkerMetrics) WithCurriedId(id string) *MarkerMetrics

WithCurriedId returns a curried version of MarkerMetrics, with the id label pre-filled. This is a helper that avoids having to move the id around where it's unnecessary, and won't change inside the consumer of the metrics.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL