liftbridge: Index | Files

package commitlog

import ""

Package commitlog provides an implementation for a file-backed write-ahead log.


Package Files

commitlog.go compact_cleaner.go delete_cleaner.go encoder.go index.go interface.go leader_epoch_cache.go message.go message_set.go reader.go segment.go util.go


var (
    // ErrEntryNotFound is returned when a segment search cannot find a
    // specific entry.
    ErrEntryNotFound = errors.New("entry not found")

    // ErrSegmentClosed is returned on reads/writes to a closed segment.
    ErrSegmentClosed = errors.New("segment has been closed")

    // ErrSegmentExists is returned when attempting to create a segment that
    // already exists.
    ErrSegmentExists = errors.New("segment already exists")

    // ErrSegmentReplaced is returned when attempting to read from a segment
    // that has been replaced due to log compaction. When this error is
    // encountered, operations should be retried in order to run against the
    // new segment.
    ErrSegmentReplaced = errors.New("segment was replaced")

    // ErrCommitLogDeleted is returned when attempting to read from a commit
    // log that has been deleted.
    ErrCommitLogDeleted = errors.New("commit log was deleted")

    // ErrCommitLogClosed is returned when attempting to read from a commit
    // log that has been closed.
    ErrCommitLogClosed = errors.New("commit log was closed")
var ErrSegmentNotFound = errors.New("segment not found")

ErrSegmentNotFound is returned if the segment could not be found.

type CommitLog Uses

type CommitLog interface {
    // Delete closes the log and removes all data associated with it from the
    // filesystem.
    Delete() error

    // NewReader creates a new Reader starting at the given offset. If
    // uncommitted is true, the Reader will read uncommitted messages from the
    // log. Otherwise, it will only return committed messages.
    NewReader(offset int64, uncommitted bool) (*Reader, error)

    // Truncate removes all messages from the log starting at the given offset.
    Truncate(offset int64) error

    // NewestOffset returns the offset of the last message in the log or -1 if
    // empty.
    NewestOffset() int64

    // OldestOffset returns the offset of the first message in the log or -1 if
    // empty.
    OldestOffset() int64

    // OffsetForTimestamp returns the earliest offset whose timestamp is
    // greater than or equal to the given timestamp.
    OffsetForTimestamp(timestamp int64) (int64, error)

    // SetHighWatermark sets the high watermark on the log. All messages up to
    // and including the high watermark are considered committed.
    SetHighWatermark(hw int64)

    // OverrideHighWatermark sets the high watermark on the log using the given
    // value, even if the value is less than the current HW. This is used for
    // unit testing purposes.
    OverrideHighWatermark(hw int64)

    // HighWatermark returns the high watermark for the log.
    HighWatermark() int64

    // NewLeaderEpoch indicates the log is entering a new leader epoch.
    NewLeaderEpoch(epoch uint64) error

    // LastOffsetForLeaderEpoch returns the start offset of the first leader
    // epoch larger than the provided one or the log end offset if the current
    // epoch equals the provided one.
    LastOffsetForLeaderEpoch(epoch uint64) int64

    // LastLeaderEpoch returns the latest leader epoch for the log.
    LastLeaderEpoch() uint64

    // Append writes the given batch of messages to the log and returns their
    // corresponding offsets in the log.
    Append(msg []*Message) ([]int64, error)

    // AppendMessageSet writes the given message set data to the log and
    // returns the corresponding offsets in the log.
    AppendMessageSet(ms []byte) ([]int64, error)

    // Clean applies retention and compaction rules against the log, if
    // applicable.
    Clean() error

    // NotifyLEO registers and returns a channel which is closed when messages
    // past the given log end offset are added to the log. If the given offset
    // is no longer the log end offset, the channel is closed immediately.
    // Waiter is an opaque value that uniquely identifies the entity waiting
    // for data.
    NotifyLEO(waiter interface{}, leo int64) <-chan struct{}

    // Close closes each log segment file and stops the background goroutine
    // checkpointing the high watermark to disk.
    Close() error

CommitLog is the durable write-ahead log interface used to back each stream.

func New Uses

func New(opts Options) (CommitLog, error)

New creates a new CommitLog and starts a background goroutine which periodically checkpoints the high watermark to disk.

type Message Uses

type Message struct {
    Crc        int32
    MagicByte  int8
    Attributes int8
    Key        []byte
    Value      []byte
    Headers    map[string][]byte

    // Transient fields
    Timestamp     int64
    LeaderEpoch   uint64
    AckInbox      string
    CorrelationID string
    AckPolicy     client.AckPolicy

Message is the object that gets serialized and written to the log.

func (*Message) Encode Uses

func (m *Message) Encode(e packetEncoder) error

Encode the Message into the packetEncoder.

type Options Uses

type Options struct {
    Name                 string        // commitLog name
    Path                 string        // Path to log directory
    MaxSegmentBytes      int64         // Max bytes a Segment can contain before creating a new one
    MaxSegmentAge        time.Duration // Max time before a new log segment is rolled out.
    MaxLogBytes          int64         // Retention by bytes
    MaxLogMessages       int64         // Retention by messages
    MaxLogAge            time.Duration // Retention by age
    Compact              bool          // Run compaction on log clean
    CompactMaxGoroutines int           // Max number of goroutines to use in a log compaction
    CleanerInterval      time.Duration // Frequency to enforce retention policy
    HWCheckpointInterval time.Duration // Frequency to checkpoint HW to disk
    Logger               logger.Logger

Options contains settings for configuring a commitLog.

type Reader Uses

type Reader struct {
    // contains filtered or unexported fields

Reader reads messages atomically from a CommitLog. Readers should not be used concurrently.

func (*Reader) ReadMessage Uses

func (r *Reader) ReadMessage(ctx context.Context, headersBuf []byte) (SerializedMessage, int64, int64, uint64, error)

ReadMessage reads a single message from the underlying CommitLog or blocks until one is available. It returns the SerializedMessage in addition to its offset, timestamp, and leader epoch. This may return uncommitted messages if the reader was created with the uncommitted flag set to true.

ReadMessage should not be called concurrently, and the headersBuf slice should have a capacity of at least 28.

TODO: Should this just return a MessageSet directly instead of a Message and the MessageSet header values?

type SerializedMessage Uses

type SerializedMessage []byte

SerializedMessage is a serialized message read from the log.

func (SerializedMessage) Attributes Uses

func (m SerializedMessage) Attributes() int8

Attributes returns the byte used for message flags.

func (SerializedMessage) Crc Uses

func (m SerializedMessage) Crc() uint32

Crc returns the CRC32 digest of the message.

func (SerializedMessage) Headers Uses

func (m SerializedMessage) Headers() map[string][]byte

Headers returns the message headers map.

func (SerializedMessage) Key Uses

func (m SerializedMessage) Key() []byte

Key returns the message key.

func (SerializedMessage) MagicByte Uses

func (m SerializedMessage) MagicByte() int8

MagicByte returns the byte used for encoding protocol version detection.

func (SerializedMessage) Value Uses

func (m SerializedMessage) Value() []byte

Value returns the message value.

Package commitlog imports 26 packages (graph) and is imported by 2 packages. Updated 2020-08-20. Refresh now. Tools for package owners.