commitlog

package
v1.9.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 9, 2022 License: Apache-2.0 Imports: 26 Imported by: 1

Documentation

Overview

Package commitlog provides an implementation for a file-backed write-ahead log.

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrEntryNotFound is returned when a segment search cannot find a
	// specific entry.
	ErrEntryNotFound = errors.New("entry not found")

	// ErrSegmentClosed is returned on reads/writes to a closed segment.
	ErrSegmentClosed = errors.New("segment has been closed")

	// ErrSegmentExists is returned when attempting to create a segment that
	// already exists.
	ErrSegmentExists = errors.New("segment already exists")

	// ErrSegmentReplaced is returned when attempting to read from a segment
	// that has been replaced due to log compaction. When this error is
	// encountered, operations should be retried in order to run against the
	// new segment.
	ErrSegmentReplaced = errors.New("segment was replaced")

	// ErrCommitLogDeleted is returned when attempting to read from a commit
	// log that has been deleted.
	ErrCommitLogDeleted = errors.New("commit log was deleted")

	// ErrCommitLogClosed is returned when attempting to read from a commit
	// log that has been closed.
	ErrCommitLogClosed = errors.New("commit log was closed")
)
View Source
var ErrCommitLogReadonly = errors.New("end of readonly log")

ErrCommitLogReadonly is returned when the end of a readonly CommitLog has been reached.

View Source
var ErrIncorrectOffset = errors.New("incorrect offset")

ErrIncorrectOffset is returned if the offset is incorrect. This is used in case Optimistic Concurrency Control is activated.

View Source
var ErrSegmentNotFound = errors.New("segment not found")

ErrSegmentNotFound is returned if the segment could not be found.

Functions

This section is empty.

Types

type CommitLog

type CommitLog interface {
	// Delete closes the log and removes all data associated with it from the
	// filesystem.
	Delete() error

	// NewReader creates a new Reader starting at the given offset. If
	// uncommitted is true, the Reader will read uncommitted messages from the
	// log. Otherwise, it will only return committed messages.
	NewReader(offset int64, uncommitted bool) (*Reader, error)

	// Truncate removes all messages from the log starting at the given offset.
	Truncate(offset int64) error

	// NewestOffset returns the offset of the last message in the log or -1 if
	// empty.
	NewestOffset() int64

	// OldestOffset returns the offset of the first message in the log or -1 if
	// empty.
	OldestOffset() int64

	// EarliestOffsetAfterTimestamp returns the earliest offset whose timestamp
	// is greater than or equal to the given timestamp.
	EarliestOffsetAfterTimestamp(timestamp int64) (int64, error)

	// LatestOffsetBeforeTimestamp returns the latest offset whose timestamp is
	// less than or equal to the given timestamp.
	LatestOffsetBeforeTimestamp(timestamp int64) (int64, error)

	// SetHighWatermark sets the high watermark on the log. All messages up to
	// and including the high watermark are considered committed.
	SetHighWatermark(hw int64)

	// OverrideHighWatermark sets the high watermark on the log using the given
	// value, even if the value is less than the current HW. This is used for
	// unit testing purposes.
	OverrideHighWatermark(hw int64)

	// HighWatermark returns the high watermark for the log.
	HighWatermark() int64

	// NewLeaderEpoch indicates the log is entering a new leader epoch.
	NewLeaderEpoch(epoch uint64) error

	// LastOffsetForLeaderEpoch returns the start offset of the first leader
	// epoch larger than the provided one or the log end offset if the current
	// epoch equals the provided one.
	LastOffsetForLeaderEpoch(epoch uint64) int64

	// LastLeaderEpoch returns the latest leader epoch for the log.
	LastLeaderEpoch() uint64

	// Append writes the given batch of messages to the log and returns their
	// corresponding offsets in the log. This will return ErrCommitLogReadonly
	// if the log is in readonly mode.
	Append(msg []*Message) ([]int64, error)

	// AppendMessageSet writes the given message set data to the log and
	// returns the corresponding offsets in the log. This can be called even if
	// the log is in readonly mode to allow for reconciliation, e.g. when
	// replicating from another log.
	AppendMessageSet(ms []byte) ([]int64, error)

	// Clean applies retention and compaction rules against the log, if
	// applicable.
	Clean() error

	// NotifyLEO registers and returns a channel which is closed when messages
	// past the given log end offset are added to the log. If the given offset
	// is no longer the log end offset, the channel is closed immediately.
	// Waiter is an opaque value that uniquely identifies the entity waiting
	// for data.
	NotifyLEO(waiter interface{}, leo int64) <-chan struct{}

	// SetReadonly marks the log as readonly. When in readonly mode, new
	// messages cannot be added to the log with Append and committed readers
	// will read up to the log end offset (LEO), if the HW allows so, and then
	// will receive an ErrCommitLogReadonly error. This will unblock committed
	// readers waiting for data if they are at the LEO. Readers will continue
	// to block if the HW is less than the LEO. This does not affect
	// uncommitted readers. Messages can still be written to the log with
	// AppendMessageSet for reconciliation purposes, e.g. when replicating from
	// another log.
	SetReadonly(readonly bool)

	// IsReadonly indicates if the log is in readonly mode.
	IsReadonly() bool

	// IsConcurrencyControlEnabled indicates if the log should check for concurrency before appending messages
	IsConcurrencyControlEnabled() bool

	// Close closes each log segment file and stops the background goroutine
	// checkpointing the high watermark to disk.
	Close() error
}

CommitLog is the durable write-ahead log interface used to back each stream.

func New

func New(opts Options) (CommitLog, error)

New creates a new CommitLog and starts a background goroutine which periodically checkpoints the high watermark to disk.

type Message

type Message struct {
	Crc        int32
	MagicByte  int8
	Attributes int8
	Key        []byte
	Value      []byte
	Headers    map[string][]byte

	// Transient fields
	Timestamp     int64
	LeaderEpoch   uint64
	AckInbox      string
	CorrelationID string
	AckPolicy     client.AckPolicy
	Offset        int64
}

Message is the object that gets serialized and written to the log.

func (*Message) Encode

func (m *Message) Encode(e packetEncoder) error

Encode the Message into the packetEncoder.

type Options

type Options struct {
	Name                 string        // commitLog name
	Path                 string        // Path to log directory
	MaxSegmentBytes      int64         // Max bytes a Segment can contain before creating a new one
	MaxSegmentAge        time.Duration // Max time before a new log segment is rolled out.
	MaxLogBytes          int64         // Retention by bytes
	MaxLogMessages       int64         // Retention by messages
	MaxLogAge            time.Duration // Retention by age
	Compact              bool          // Run compaction on log clean
	CompactMaxGoroutines int           // Max number of goroutines to use in a log compaction
	CleanerInterval      time.Duration // Frequency to enforce retention policy
	HWCheckpointInterval time.Duration // Frequency to checkpoint HW to disk
	ConcurrencyControl   bool          // Optimistic Concurrency Control
	Logger               logger.Logger
}

Options contains settings for configuring a commitLog.

type Reader

type Reader struct {
	// contains filtered or unexported fields
}

Reader reads messages atomically from a CommitLog. Readers should not be used concurrently.

func (*Reader) ReadMessage

func (r *Reader) ReadMessage(ctx context.Context, headersBuf []byte) (SerializedMessage, int64, int64, uint64, error)

ReadMessage reads a single message from the underlying CommitLog or blocks until one is available. It returns the SerializedMessage in addition to its offset, timestamp, and leader epoch. This may return uncommitted messages if the reader was created with the uncommitted flag set to true.

ReadMessage should not be called concurrently, and the headersBuf slice should have a capacity of at least 28.

TODO: Should this just return a MessageSet directly instead of a Message and the MessageSet header values?

type SerializedMessage

type SerializedMessage []byte

SerializedMessage is a serialized message read from the log.

func (SerializedMessage) Attributes

func (m SerializedMessage) Attributes() int8

Attributes returns the byte used for message flags.

func (SerializedMessage) Crc

func (m SerializedMessage) Crc() uint32

Crc returns the CRC32 digest of the message.

func (SerializedMessage) Headers

func (m SerializedMessage) Headers() map[string][]byte

Headers returns the message headers map.

func (SerializedMessage) Key

func (m SerializedMessage) Key() []byte

Key returns the message key.

func (SerializedMessage) MagicByte

func (m SerializedMessage) MagicByte() int8

MagicByte returns the byte used for encoding protocol version detection.

func (SerializedMessage) Value

func (m SerializedMessage) Value() []byte

Value returns the message value.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL