data

package
v0.7.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 14, 2023 License: AGPL-3.0 Imports: 20 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DirectoryPermissions os.FileMode = 0755
	FilePermissions      os.FileMode = 0644
	RetentionCheckMs     int         = 5 * 60 * 1000
)

Variables

This section is empty.

Functions

func MergeDataStructure

func MergeDataStructure(fileNames []string, topic *TopicDataId, offset int64, config conf.DatalogConfig) error

func NewEmptyChunk

func NewEmptyChunk(start int64) SegmentChunk

func ReadFileStructure

func ReadFileStructure(topicId *TopicDataId, offset int64, config conf.DatalogConfig) ([]string, error)

Reads segment file names that will contain the data starting from offset

Types

type Datalog

type Datalog interface {
	Initializer

	// Seeks the position and fills the buffer with chunks until maxSize or maxRecords is reached.
	// Opens and close the file handle. It may issue several reads to reach to the position.
	ReadFileFrom(
		buf []byte,
		maxSize int,
		segmentId int64,
		startOffset int64,
		maxRecords int,
		topic *TopicDataId,
	) ([]byte, error)

	// Blocks until there's an available buffer to be used to stream.
	// After use, it should be released
	StreamBuffer() []byte

	// Releases the stream buffer
	ReleaseStreamBuffer(buf []byte)

	// Gets the max producer offset from local.
	// Returns an error when not found.
	ReadProducerOffset(topicId *TopicDataId) (int64, error)

	// Gets a sorted list of offsets representing the name of the segment files, where the offset is less than maxOffset
	SegmentFileList(topic *TopicDataId, maxOffset int64) ([]int64, error)
}

func NewDatalog

func NewDatalog(config conf.DatalogConfig) Datalog

type LocalWriteItem

type LocalWriteItem interface {
	SegmentChunk
	Replication() ReplicationInfo
	SetResult(error)
}

type ReadItem

type ReadItem interface {
	Origin() string   // An identifier of the source of the poll used to determine whether the reader should use the last stored offset and not auto commit
	CommitOnly() bool // Determines whether it should only commit and not read as part of this request
	SetResult(error, SegmentChunk)
}

Represents a queued message to read from a segment. When the read is completed, `SetResult()` is invoked.

type ReadSegmentChunk

type ReadSegmentChunk struct {
	Buffer []byte
	Start  int64  // The offset of the first message
	Length uint32 // The amount of messages in the chunk
}

func (*ReadSegmentChunk) DataBlock

func (s *ReadSegmentChunk) DataBlock() []byte

func (*ReadSegmentChunk) RecordLength

func (s *ReadSegmentChunk) RecordLength() uint32

func (*ReadSegmentChunk) StartOffset

func (s *ReadSegmentChunk) StartOffset() int64

type ReplicationDataItem

type ReplicationDataItem interface {
	SegmentChunk
	SegmentId() int64
	SetResult(error)
}

type ReplicationReader

type ReplicationReader interface {
	MergeFileStructure() (bool, error) // Merge the index files content and file structures

	// Reads at least a chunk from a replica and returns the amount of bytes written in the buffer
	StreamFile(
		segmentId int64,
		topic *TopicDataId,
		startOffset int64,
		maxRecords int,
		buf []byte) (int, error)
}

type SegmentReader

type SegmentReader struct {
	Items chan ReadItem

	Topic                 TopicDataId
	TopicRangeClusterSize int
	SourceVersion         GenId // The version in which this reader was created, a consumer might be on Gen=v3 but the current is v4. In this case, source would be v4 and topic.Version = v3

	MaxProducedOffset *int64 // When set, it determines the last offset produced for this topicId for an old generation, inclusive
	// contains filtered or unexported fields
}

func NewSegmentReader

func NewSegmentReader(
	group string,
	isLeader bool,
	replicationReader ReplicationReader,
	topic TopicDataId,
	topicRangeClusterSize int,
	sourceVersion GenId,
	initialOffset int64,
	offsetState OffsetState,
	maxProducedOffset *int64,
	datalog Datalog,
	config conf.DatalogConfig,
) (*SegmentReader, error)

Returns a log file reader.

The segment reader instance is valid for a single generation, closed when the generation ends or the broker is no longer the leader.

It aggressively reads ahead and maintains local cache, so there should there should be a different reader instance per consumer group.

func (*SegmentReader) HasStoppedReceiving

func (s *SegmentReader) HasStoppedReceiving() bool

Determines that the reader has stopped polling the channel, no further ReadItems will be processed.

It signals that either the current offset info changed in a way that generations don't match, the during an offset

func (*SegmentReader) StoredOffsetAsCompleted

func (s *SegmentReader) StoredOffsetAsCompleted() bool

Returns true when the offset state has been set as completed (previous generations only)

type SegmentWriter

type SegmentWriter struct {
	Items chan SegmentChunk
	Topic TopicDataId
	// contains filtered or unexported fields
}

SegmentWriter contains the logic to write segments on disk and replicate them.

There should be an instance per topic+token+generation. When the generation changes for a token, the channel should be closed.

func NewSegmentWriter

func NewSegmentWriter(
	topic TopicDataId,
	gossiper Replicator,
	config conf.DatalogConfig,
	segmentId *int64,
) (*SegmentWriter, error)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL