commitlog

package
v0.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 1, 2019 License: Apache-2.0 Imports: 23 Imported by: 0

Documentation

Overview

Package commitlog provides an implementation for a file-backed write-ahead log.

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrEntryNotFound is returned when a segment search cannot find a
	// specific entry.
	ErrEntryNotFound = errors.New("entry not found")

	// ErrSegmentClosed is returned on reads/writes to a closed segment.
	ErrSegmentClosed = errors.New("segment has been closed")

	// ErrSegmentExists is returned when attempting to create a segment that
	// already exists.
	ErrSegmentExists = errors.New("segment already exists")

	// ErrSegmentReplaced is returned when attempting to read from a segment
	// that has been replaced due to log compaction. When this error is
	// encountered, operations should be retried in order to run against the
	// new segment.
	ErrSegmentReplaced = errors.New("segment was replaced")
)
View Source
var (
	ErrIndexCorrupt = errors.New("corrupt index file")
)
View Source
var ErrSegmentNotFound = errors.New("segment not found")

Functions

func NewMessageSetFromProto

func NewMessageSetFromProto(baseOffset, basePos int64, msgs []*proto.Message) (
	MessageSet, []*Entry, error)

Types

type CommitLog

type CommitLog struct {
	Options
	// contains filtered or unexported fields
}

CommitLog implements the server.CommitLog interface, which is a durable write-ahead log.

func New

func New(opts Options) (*CommitLog, error)

New creates a new CommitLog and starts a background goroutine which periodically checkpoints the high watermark to disk.

func (*CommitLog) Append

func (l *CommitLog) Append(msgs []*proto.Message) ([]int64, error)

Append writes the given batch of messages to the log and returns their corresponding offsets in the log.

func (*CommitLog) AppendMessageSet

func (l *CommitLog) AppendMessageSet(ms []byte) ([]int64, error)

AppendMessageSet writes the given message set data to the log and returns the corresponding offsets in the log.

func (*CommitLog) Clean

func (l *CommitLog) Clean() error

Clean applies retention and compaction rules against the log, if applicable.

func (*CommitLog) Close

func (l *CommitLog) Close() error

Close closes each log segment file and stops the background goroutine checkpointing the high watermark to disk.

func (*CommitLog) Delete

func (l *CommitLog) Delete() error

Delete closes the log and removes all data associated with it from the filesystem.

func (*CommitLog) HighWatermark

func (l *CommitLog) HighWatermark() int64

HighWatermark returns the high watermark for the log.

func (*CommitLog) NewReader

func (l *CommitLog) NewReader(offset int64, uncommitted bool) (*Reader, error)

NewReader creates a new Reader starting at the given offset. If uncommitted is true, the Reader will read uncommitted messages from the log. Otherwise, it will only return committed messages.

func (*CommitLog) NewestOffset

func (l *CommitLog) NewestOffset() int64

NewestOffset returns the offset of the last message in the log or -1 if empty.

func (*CommitLog) OffsetForTimestamp

func (l *CommitLog) OffsetForTimestamp(timestamp int64) (int64, error)

OffsetForTimestamp returns the earliest offset whose timestamp is greater than or equal to the given timestamp.

func (*CommitLog) OldestOffset

func (l *CommitLog) OldestOffset() int64

OldestOffset returns the offset of the first message in the log or -1 if empty.

func (*CommitLog) Segments

func (l *CommitLog) Segments() []*Segment

func (*CommitLog) SetHighWatermark

func (l *CommitLog) SetHighWatermark(hw int64)

SetHighWatermark sets the high watermark on the log. All messages up to and including the high watermark are considered committed.

func (*CommitLog) Truncate

func (l *CommitLog) Truncate(offset int64) error

Truncate removes all messages from the log starting at the given offset.

type CompactCleaner

type CompactCleaner struct {
	CompactCleanerOptions
}

CompactCleaner implements the compaction policy which replaces segments with compacted ones, i.e. retaining only the last message for a given key.

func NewCompactCleaner

func NewCompactCleaner(opts CompactCleanerOptions) *CompactCleaner

NewCompactCleaner returns a new Cleaner which performs log compaction by rewriting segments such that they contain only the last message for a given key.

func (*CompactCleaner) Compact

func (c *CompactCleaner) Compact(hw int64, segments []*Segment) ([]*Segment, error)

Compact performs log compaction by rewriting segments such that they contain only the last message for a given key. Compaction is applied to all segments up to but excluding the active (last) segment or the provided HW, whichever comes first.

type CompactCleanerOptions

type CompactCleanerOptions struct {
	Logger        logger.Logger
	Name          string
	MaxGoroutines int
}

CompactCleanerOptions contains configuration settings for the CompactCleaner.

type DeleteCleaner

type DeleteCleaner struct {
	DeleteCleanerOptions
}

DeleteCleaner implements the delete cleanup policy which deletes old log segments based on the retention policy.

func NewDeleteCleaner

func NewDeleteCleaner(opts DeleteCleanerOptions) *DeleteCleaner

NewDeleteCleaner returns a new Cleaner which enforces log retention policies by deleting segments.

func (*DeleteCleaner) Clean

func (c *DeleteCleaner) Clean(segments []*Segment) ([]*Segment, error)

Clean will enforce the log retention policy by deleting old segments. Deletion only occurs at the segment granularity.

type DeleteCleanerOptions

type DeleteCleanerOptions struct {
	Retention struct {
		Bytes    int64
		Messages int64
		Age      time.Duration
	}
	Logger logger.Logger
	Name   string
}

DeleteCleanerOptions contains configuration settings for the DeleteCleaner.

type Entry

type Entry struct {
	Offset    int64
	Timestamp int64
	Position  int64
	Size      int32
}

func EntriesForMessageSet

func EntriesForMessageSet(basePos int64, ms []byte) []*Entry

type Index

type Index struct {
	// contains filtered or unexported fields
}

func NewIndex

func NewIndex(opts options) (idx *Index, err error)

func (*Index) Close

func (idx *Index) Close() error

func (*Index) CountEntries

func (idx *Index) CountEntries() int64

func (*Index) InitializePosition

func (idx *Index) InitializePosition() (*Entry, error)

func (*Index) Name

func (idx *Index) Name() string

func (*Index) Position

func (idx *Index) Position() int64

Position returns the current position in the index to write to next. This value also represents the total length of the index.

func (*Index) ReadAt

func (idx *Index) ReadAt(p []byte, offset int64) (n int, err error)

func (*Index) ReadEntryAtFileOffset

func (idx *Index) ReadEntryAtFileOffset(e *Entry, fileOffset int64) (err error)

ReadEntryAtFileOffset is used to read an Index entry at the given byte offset of the Index file. ReadEntryAtLogOffset is generally more useful for higher level use.

func (*Index) ReadEntryAtLogOffset

func (idx *Index) ReadEntryAtLogOffset(e *Entry, logOffset int64) error

ReadEntryAtLogOffset is used to read an Index entry at the given log offset of the Index file.

func (*Index) Shrink

func (idx *Index) Shrink() error

Shrink truncates the memory-mapped index file to the size of its contents.

func (*Index) Sync

func (idx *Index) Sync() error

func (*Index) TruncateEntries

func (idx *Index) TruncateEntries(number int) error

type IndexScanner

type IndexScanner struct {
	// contains filtered or unexported fields
}

func NewIndexScanner

func NewIndexScanner(idx *Index) *IndexScanner

func (*IndexScanner) Scan

func (s *IndexScanner) Scan() (*Entry, error)

type Message

type Message []byte

func (Message) Attributes

func (m Message) Attributes() int8

func (Message) Crc

func (m Message) Crc() uint32

func (Message) Headers

func (m Message) Headers() map[string][]byte

func (Message) Key

func (m Message) Key() []byte

func (Message) MagicByte

func (m Message) MagicByte() int8

func (Message) Value

func (m Message) Value() []byte

type MessageSet

type MessageSet []byte

func (MessageSet) Message

func (ms MessageSet) Message() Message

func (MessageSet) Offset

func (ms MessageSet) Offset() int64

func (MessageSet) Size

func (ms MessageSet) Size() int32

func (MessageSet) Timestamp

func (ms MessageSet) Timestamp() int64

type Options

type Options struct {
	Path                 string        // Path to log directory
	MaxSegmentBytes      int64         // Max bytes a Segment can contain before creating a new one
	MaxLogBytes          int64         // Retention by bytes
	MaxLogMessages       int64         // Retention by messages
	MaxLogAge            time.Duration // Retention by age
	Compact              bool          // Run compaction on log clean
	CompactMaxGoroutines int           // Max number of goroutines to use in a log compaction
	CleanerInterval      time.Duration // Frequency to enforce retention policy
	HWCheckpointInterval time.Duration // Frequency to checkpoint HW to disk
	LogRollTime          time.Duration // Max time before a new log segment is rolled out.
	Logger               logger.Logger
}

Options contains settings for configuring a CommitLog.

type Reader

type Reader struct {
	// contains filtered or unexported fields
}

Reader reads messages atomically from a CommitLog. Readers should not be used concurrently.

func (*Reader) ReadMessage

func (r *Reader) ReadMessage(ctx context.Context, headersBuf []byte) (Message, int64, int64, error)

ReadMessage reads a single message from the underlying CommitLog or blocks until one is available. It returns the Message in addition to its offset and timestamp. This may return uncommitted messages if the Reader was created with the uncommitted flag set to true.

ReadMessage should not be called concurrently, and the headersBuf slice should have a capacity of at least 20.

type Segment

type Segment struct {
	Index      *Index
	BaseOffset int64

	sync.RWMutex
	// contains filtered or unexported fields
}

func NewSegment

func NewSegment(path string, baseOffset, maxBytes int64, isNew bool, suffix string) (*Segment, error)

func (*Segment) CheckSplit

func (s *Segment) CheckSplit(logRollTime time.Duration) bool

CheckSplit determines if a new log segment should be rolled out either because this segment is full or LogRollTime has passed since the first message was written to the segment.

func (*Segment) Cleaned

func (s *Segment) Cleaned() (*Segment, error)

Cleaned creates a cleaned segment for this segment.

func (*Segment) Close

func (s *Segment) Close() error

Close a segment such that it can no longer be read from or written to. This operation is idempotent.

func (*Segment) Delete

func (s *Segment) Delete() error

Delete closes the segment and then deletes its log and index files.

func (*Segment) FirstOffset

func (s *Segment) FirstOffset() int64

func (*Segment) IsEmpty

func (s *Segment) IsEmpty() bool

func (*Segment) LastOffset

func (s *Segment) LastOffset() int64

func (*Segment) MessageCount

func (s *Segment) MessageCount() int64

func (*Segment) NextOffset

func (s *Segment) NextOffset() int64

func (*Segment) Position

func (s *Segment) Position() int64

func (*Segment) ReadAt

func (s *Segment) ReadAt(p []byte, off int64) (n int, err error)

func (*Segment) Replace

func (s *Segment) Replace(old *Segment) error

Replace replaces the given segment with the callee.

func (*Segment) Seal

func (s *Segment) Seal()

Seal a segment from being written to. This is called on the former active segment after a new segment is rolled. This is a no-op if the segment is already sealed.

func (*Segment) Truncated

func (s *Segment) Truncated() (*Segment, error)

Truncated creates a truncated segment for this segment.

func (*Segment) WriteMessageSet

func (s *Segment) WriteMessageSet(ms []byte, entries []*Entry) error

type SegmentScanner

type SegmentScanner struct {
	// contains filtered or unexported fields
}

func NewSegmentScanner

func NewSegmentScanner(segment *Segment) *SegmentScanner

func (*SegmentScanner) Scan

func (s *SegmentScanner) Scan() (MessageSet, *Entry, error)

Scan should be called repeatedly to iterate over the messages in the segment, it will return io.EOF when there are no more messages.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL