disklog

package
v0.0.0-...-e848abc Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 6, 2015 License: MIT Imports: 19 Imported by: 0

Documentation

Overview

disklog is the package that implements the main production suitable implementation of the LogStorage interface. The implementation expects a dedicated directory for the log files. Filenames consist of the first ID included in the file.

Index

Constants

View Source
const DEFAULT_CACHE_SLOT_SIZE = 30

DEFAULT_CACHE_SLOT_SIZE is how many batches of messages are to be kept in cache.

View Source
const DEFAULT_TARGET_MAX_SEGMENT_SIZE = 1024

DEFAULT_TARGET_MAX_SEGMENT_SIZE is the size in MB after which we declare the segment full

View Source
const LOG_NAME_FORMAT = "*-forest-log"

LOG_NAME_FORMAT is the glob matching our logfile filenames.

View Source
const LOG_NAME_FORMAT_SUFFIX = "-forest-log"
View Source
const SEGMENT_CLEANUP_SCAN_INTERVAL = 1 * time.Hour

SEGMENT_CLEANUP_SCAN_INTERVAL is the interval at which a check is run to look for segments to clean up.

View Source
const SEGMENT_LAST_USED_TIMEOUT = 2 * time.Minute

SEGMENT_LAST_USED_TIMEOUT is the duration for which a segment must not have been used prior to it being closed.

View Source
const TARGET_OPEN_SEGMENTS = 2

TARGET_OPEN_SEGMETNS is the minimum number of segments to keep open

View Source
const TOPIC_STORE_NAME = "topic-store"

Variables

View Source
var APPEND_WITH_NO_CONTENT = errors.New("No messages were given, unable to append.")
View Source
var ERR_INDEX_TOO_HIGH = errors.New("Index for message too high")
View Source
var ERR_SEGMENT_ALREADY_OPEN = errors.New("Segment is already open")

ERR_SEGMENT_ALREADY_OPEN is returned if Open is called on an already open segment.

View Source
var ERR_SEGMENT_MESSAGE_BODY_UNREADABLE = errors.New("Error reading message body - length in header and available bytes in stream did not match.")
View Source
var FIRST_APPEND_MULTIPLE_SEGMENTS = errors.New("First append attempted, but multiple segments already exist.")

Functions

This section is empty.

Types

type CacheEntry

type CacheEntry struct {
	// contains filtered or unexported fields
}

type CacheEntryInfo

type CacheEntryInfo struct {
	FirstIndex int64
	CacheSize  int
}

type DiskLogConfigFunction

type DiskLogConfigFunction func(*DiskLogStorage)

func SetCacheSlotSize

func SetCacheSlotSize(size int) DiskLogConfigFunction

func SetSegmentCleanupAge

func SetSegmentCleanupAge(age time.Duration) DiskLogConfigFunction

func SetTargetSegmentSize

func SetTargetSegmentSize(size int) DiskLogConfigFunction

type DiskLogStorage

type DiskLogStorage struct {
	// contains filtered or unexported fields
}

DiskLogStorage is the main implementation of commitlog.LogStorage intended for production use. It maintains a persistent log to disk using a set of segment files that together hold all of the log entries. A cache is used to enable fast recall of messages in the log that have just been written, this ensures that the usual case of a leader sending messages to a follower, or a cilent reading recently written messages is fast and requires no physical IO.

func LoadLog

func LoadLog(topicName string, loadpath string, configFuncs ...DiskLogConfigFunction) (*DiskLogStorage, error)

LoadLog will open an existing DiskLogStorage structure from the given path or create a new one.

If existing files are found matching the LOG_NAME_FORMAT file format, then:

1 - Get a list of all files match the pattern [0-9]*-forest-log
2 - Sorts these and determine the highest index start value
3 - Validates all message CRC in the highest index value Logfile in case there was previously a crash
4 - Opens the highest index value Logfile and generates the index / offset slice

If no existing log files are found then a new segment is created for first Index 1.

func (*DiskLogStorage) AppendFirstMessages

func (dlog *DiskLogStorage) AppendFirstMessages(msgs model.Messages, leaderFirstIndex int64) (lastIndex int64, err error)

AppendFirstMessages writes the given messages into a segment and presents it to the cache.

The following steps are followed:

1 - Confirm that only one segment is open
2 - Check whether the firstIndex of the segment matches leaderFirstIndex
3 - If required delete the current segment and create a new one
4 - Append the messages and populate the cache

func (*DiskLogStorage) AppendMessages

func (dlog *DiskLogStorage) AppendMessages(msgs model.Messages) (lastIndex int64, err error)

AppendMessages writes the given messages into a segment and presents it to the cache.

The following steps are followed:

1 - Attempt to append to the current last open segment.
2 - If the segment is full, open a new segment.
3 - If more than TARGET_OPEN_SEGMENTS (2) segments are open then close the older ones until just the previous and new segments are open.
4 - Present the new messages to the cache

func (*DiskLogStorage) ChangeConfiguration

func (dlog *DiskLogStorage) ChangeConfiguration(configFuncs ...DiskLogConfigFunction)

ChangeConfiguration takes one or more DisklogConfigFunctions and applies to them to a live DiskLogStorage instance.

Changes to segment configuration will only be carried out to the latest open configuration and new segments.

func (*DiskLogStorage) ExpVar

func (dlog *DiskLogStorage) ExpVar() interface{}

Returns stats for expvar

func (*DiskLogStorage) GetFirstIndex

func (dlog *DiskLogStorage) GetFirstIndex() (firstIndex int64, err error)

func (*DiskLogStorage) GetLastIndex

func (dlog *DiskLogStorage) GetLastIndex() (lastIndex int64, err error)

func (*DiskLogStorage) GetMessages

func (dlog *DiskLogStorage) GetMessages(index int64, count int64) (model.Messages, error)

GetMessages returns as many messages as possible from the given start index up to count messages.

The following sequence is used to get the messages:

1 - Check the cache.  If a message exists at this index, return it and all other messages in the cache up to count messages.
2 - Work backwards through the segments list looking for a segment with a start Index <= index
3 - Ask the segment to retrieve up to count messages (this may trigger a segment load)

func (*DiskLogStorage) Shutdown

func (dlog *DiskLogStorage) Shutdown(notifier *utils.ShutdownNotifier)

func (*DiskLogStorage) Sync

func (dlog *DiskLogStorage) Sync() error

Sync calls Sync on the underlying files that have been written to since the last sync.

This only called by the Commit Log for policies WRITE_SYNC and PERIODIC_SYNC.

func (*DiskLogStorage) TruncateMessages

func (dlog *DiskLogStorage) TruncateMessages(index int64) error

TruncateMessages is used by CommitLog to remove messages that it knows are invalid (happens during leadership changes)

These steps are followed:

1 - Truncate the messages in the cache.
2 - The newest segment with an firstIndex <= index is found.
3 - The segment is requested to truncate to index.
4 - If any segments existing beyond the one found, they are deleted.

type DiskLogStorageInfo

type DiskLogStorageInfo struct {
	PathName                string
	SegmentCount            int
	LastIndex               int64
	FirstIndex              int64
	Target_Max_Segment_Size int
	Segment_Cleanup_Age     string
	Cache_Slot_Size         int
	SegmentInfo             []interface{}
	CacheInfo               interface{}
}

type DiskTopicPersistentStore

type DiskTopicPersistentStore struct {
	Term     int64
	VotedFor string
	// contains filtered or unexported fields
}

func NewDiskTopicPersistentStore

func NewDiskTopicPersistentStore(dirpath string) *DiskTopicPersistentStore

func (*DiskTopicPersistentStore) Load

func (store *DiskTopicPersistentStore) Load() (term int64, votedFor string, err error)

Load reads the given topic information from persistent storage

func (*DiskTopicPersistentStore) SetTerm

func (store *DiskTopicPersistentStore) SetTerm(term int64, votedFor string) (err error)

type MessageCache

type MessageCache struct {
	// contains filtered or unexported fields
}

MessageCache provides DiskLogStorage cache facilities.

The cache is populated by the data passed to appends being kept around for a while and used for gets when it can be. This significantly reduces the amount of file seeking and reading.

Note that locks are not required - all read / write locking has been done by the caller.

func CreateCache

func CreateCache(capacity int) *MessageCache

CreateCache generates the MessageCache for a give number of message batches.

func (*MessageCache) AppendMessages

func (msgCache *MessageCache) AppendMessages(index int64, msgs model.Messages)

AppendMessages caches the messages for a while, bumping out old ones as requied.

func (*MessageCache) ExpVar

func (msgCache *MessageCache) ExpVar() interface{}

func (*MessageCache) GetMessages

func (msgCache *MessageCache) GetMessages(index int64, count int64) model.Messages

GetMessages returns messages from the cache from the given index.

func (*MessageCache) Truncate

func (msgCache *MessageCache) Truncate(index int64)

Truncate is used to trim the cache in case the underlying commit log has been truncated.

type MessageCacheInfo

type MessageCacheInfo struct {
	CacheSlotsUsed int
	CacheEntryInfo []CacheEntryInfo
	TotalCacheSize int
	Hits           int
	Misses         int
}

MessageCacheInfo is used by ExpVars to present key stats about the MessageCache state.

type Segment

type Segment struct {
	// contains filtered or unexported fields
}

func CreateNewSegment

func CreateNewSegment(topicName string, directory string, firstIndex int64, targetSize int) (*Segment, error)

Create generates a new Segment file in the given directory, starting at the given index. The Segment is returned in an open state.

func ExistingSegment

func ExistingSegment(topicName string, fullfilename string, targetSize int) *Segment

func (*Segment) AppendMessages

func (seg *Segment) AppendMessages(msgs model.Messages) (appendIndex, lastIndex int64, segmentFull bool, err error)

AppendMessages checks to see whether the segment is full, and if not it appends the messages to it.

func (*Segment) Close

func (seg *Segment) Close() error

Close turns an open sgement into a close one. The file is closed and memory freed.

func (*Segment) Delete

func (seg *Segment) Delete() error

Delete removes the whole segment file.

func (*Segment) ExpVar

func (seg *Segment) ExpVar() interface{}

func (*Segment) GetFirstIndex

func (seg *Segment) GetFirstIndex() int64

func (*Segment) GetLastAccessTime

func (seg *Segment) GetLastAccessTime() time.Time

func (*Segment) GetLastIndex

func (seg *Segment) GetLastIndex() int64

func (*Segment) GetLastModifiedTime

func (seg *Segment) GetLastModifiedTime() time.Time

func (*Segment) GetMessages

func (seg *Segment) GetMessages(index int64, targetMessageCount int) (msgs model.Messages, segmentRequiredOpening bool, err error)

GetMessages returns as many messages as possible from the given start index up to the maximum message batch size

Cache has already been checked at this stage, so we use seeking in the file itself to find the messages.

func (*Segment) GetOpenStatus

func (seg *Segment) GetOpenStatus() bool

Locking getters

func (*Segment) Open

func (seg *Segment) Open(validate bool) error

Open reads the Segment from disk and generates a mapping between index and offest into the file If validate is true then all CRCs for the messages in the log are checked and the log is trunctaed if any are invalid.

func (*Segment) Sync

func (seg *Segment) Sync() error

Sync syncs the file if required.

func (*Segment) Truncate

func (seg *Segment) Truncate(index int64) error

Truncate removes all messages at index and beyond

type SegmentInfo

type SegmentInfo struct {
	Filename                   string
	FirstIndex                 int64
	SegmentOpen                bool
	NumberOfMessages           int
	LastAccessTime             time.Time
	LastModifiedTime           time.Time
	StatsGetMessageCalls       int
	StatsAppendMessageCalls    int
	StatsNoSeekGets            int
	StatsNoSeekAppends         int
	StatsSeekCount             int
	StatsSeekMessagesReadCount int
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL