Documentation ¶
Overview ¶
disklog is the package that implements the main production suitable implementation of the LogStorage interface. The implementation expects a dedicated directory for the log files. Filenames consist of the first ID included in the file.
Index ¶
- Constants
- Variables
- type CacheEntry
- type CacheEntryInfo
- type DiskLogConfigFunction
- type DiskLogStorage
- func (dlog *DiskLogStorage) AppendFirstMessages(msgs model.Messages, leaderFirstIndex int64) (lastIndex int64, err error)
- func (dlog *DiskLogStorage) AppendMessages(msgs model.Messages) (lastIndex int64, err error)
- func (dlog *DiskLogStorage) ChangeConfiguration(configFuncs ...DiskLogConfigFunction)
- func (dlog *DiskLogStorage) ExpVar() interface{}
- func (dlog *DiskLogStorage) GetFirstIndex() (firstIndex int64, err error)
- func (dlog *DiskLogStorage) GetLastIndex() (lastIndex int64, err error)
- func (dlog *DiskLogStorage) GetMessages(index int64, count int64) (model.Messages, error)
- func (dlog *DiskLogStorage) Shutdown(notifier *utils.ShutdownNotifier)
- func (dlog *DiskLogStorage) Sync() error
- func (dlog *DiskLogStorage) TruncateMessages(index int64) error
- type DiskLogStorageInfo
- type DiskTopicPersistentStore
- type MessageCache
- type MessageCacheInfo
- type Segment
- func (seg *Segment) AppendMessages(msgs model.Messages) (appendIndex, lastIndex int64, segmentFull bool, err error)
- func (seg *Segment) Close() error
- func (seg *Segment) Delete() error
- func (seg *Segment) ExpVar() interface{}
- func (seg *Segment) GetFirstIndex() int64
- func (seg *Segment) GetLastAccessTime() time.Time
- func (seg *Segment) GetLastIndex() int64
- func (seg *Segment) GetLastModifiedTime() time.Time
- func (seg *Segment) GetMessages(index int64, targetMessageCount int) (msgs model.Messages, segmentRequiredOpening bool, err error)
- func (seg *Segment) GetOpenStatus() bool
- func (seg *Segment) Open(validate bool) error
- func (seg *Segment) Sync() error
- func (seg *Segment) Truncate(index int64) error
- type SegmentInfo
Constants ¶
const DEFAULT_CACHE_SLOT_SIZE = 30
DEFAULT_CACHE_SLOT_SIZE is how many batches of messages are to be kept in cache.
const DEFAULT_TARGET_MAX_SEGMENT_SIZE = 1024
DEFAULT_TARGET_MAX_SEGMENT_SIZE is the size in MB after which we declare the segment full
const LOG_NAME_FORMAT = "*-forest-log"
LOG_NAME_FORMAT is the glob matching our logfile filenames.
const LOG_NAME_FORMAT_SUFFIX = "-forest-log"
const SEGMENT_CLEANUP_SCAN_INTERVAL = 1 * time.Hour
SEGMENT_CLEANUP_SCAN_INTERVAL is the interval at which a check is run to look for segments to clean up.
const SEGMENT_LAST_USED_TIMEOUT = 2 * time.Minute
SEGMENT_LAST_USED_TIMEOUT is the duration for which a segment must not have been used prior to it being closed.
const TARGET_OPEN_SEGMENTS = 2
TARGET_OPEN_SEGMETNS is the minimum number of segments to keep open
const TOPIC_STORE_NAME = "topic-store"
Variables ¶
var APPEND_WITH_NO_CONTENT = errors.New("No messages were given, unable to append.")
var ERR_INDEX_TOO_HIGH = errors.New("Index for message too high")
var ERR_SEGMENT_ALREADY_OPEN = errors.New("Segment is already open")
ERR_SEGMENT_ALREADY_OPEN is returned if Open is called on an already open segment.
var ERR_SEGMENT_MESSAGE_BODY_UNREADABLE = errors.New("Error reading message body - length in header and available bytes in stream did not match.")
var FIRST_APPEND_MULTIPLE_SEGMENTS = errors.New("First append attempted, but multiple segments already exist.")
Functions ¶
This section is empty.
Types ¶
type CacheEntry ¶
type CacheEntry struct {
// contains filtered or unexported fields
}
type CacheEntryInfo ¶
type DiskLogConfigFunction ¶
type DiskLogConfigFunction func(*DiskLogStorage)
func SetCacheSlotSize ¶
func SetCacheSlotSize(size int) DiskLogConfigFunction
func SetSegmentCleanupAge ¶
func SetSegmentCleanupAge(age time.Duration) DiskLogConfigFunction
func SetTargetSegmentSize ¶
func SetTargetSegmentSize(size int) DiskLogConfigFunction
type DiskLogStorage ¶
type DiskLogStorage struct {
// contains filtered or unexported fields
}
DiskLogStorage is the main implementation of commitlog.LogStorage intended for production use. It maintains a persistent log to disk using a set of segment files that together hold all of the log entries. A cache is used to enable fast recall of messages in the log that have just been written, this ensures that the usual case of a leader sending messages to a follower, or a cilent reading recently written messages is fast and requires no physical IO.
func LoadLog ¶
func LoadLog(topicName string, loadpath string, configFuncs ...DiskLogConfigFunction) (*DiskLogStorage, error)
LoadLog will open an existing DiskLogStorage structure from the given path or create a new one.
If existing files are found matching the LOG_NAME_FORMAT file format, then:
1 - Get a list of all files match the pattern [0-9]*-forest-log 2 - Sorts these and determine the highest index start value 3 - Validates all message CRC in the highest index value Logfile in case there was previously a crash 4 - Opens the highest index value Logfile and generates the index / offset slice
If no existing log files are found then a new segment is created for first Index 1.
func (*DiskLogStorage) AppendFirstMessages ¶
func (dlog *DiskLogStorage) AppendFirstMessages(msgs model.Messages, leaderFirstIndex int64) (lastIndex int64, err error)
AppendFirstMessages writes the given messages into a segment and presents it to the cache.
The following steps are followed:
1 - Confirm that only one segment is open 2 - Check whether the firstIndex of the segment matches leaderFirstIndex 3 - If required delete the current segment and create a new one 4 - Append the messages and populate the cache
func (*DiskLogStorage) AppendMessages ¶
func (dlog *DiskLogStorage) AppendMessages(msgs model.Messages) (lastIndex int64, err error)
AppendMessages writes the given messages into a segment and presents it to the cache.
The following steps are followed:
1 - Attempt to append to the current last open segment. 2 - If the segment is full, open a new segment. 3 - If more than TARGET_OPEN_SEGMENTS (2) segments are open then close the older ones until just the previous and new segments are open. 4 - Present the new messages to the cache
func (*DiskLogStorage) ChangeConfiguration ¶
func (dlog *DiskLogStorage) ChangeConfiguration(configFuncs ...DiskLogConfigFunction)
ChangeConfiguration takes one or more DisklogConfigFunctions and applies to them to a live DiskLogStorage instance.
Changes to segment configuration will only be carried out to the latest open configuration and new segments.
func (*DiskLogStorage) ExpVar ¶
func (dlog *DiskLogStorage) ExpVar() interface{}
Returns stats for expvar
func (*DiskLogStorage) GetFirstIndex ¶
func (dlog *DiskLogStorage) GetFirstIndex() (firstIndex int64, err error)
func (*DiskLogStorage) GetLastIndex ¶
func (dlog *DiskLogStorage) GetLastIndex() (lastIndex int64, err error)
func (*DiskLogStorage) GetMessages ¶
GetMessages returns as many messages as possible from the given start index up to count messages.
The following sequence is used to get the messages:
1 - Check the cache. If a message exists at this index, return it and all other messages in the cache up to count messages. 2 - Work backwards through the segments list looking for a segment with a start Index <= index 3 - Ask the segment to retrieve up to count messages (this may trigger a segment load)
func (*DiskLogStorage) Shutdown ¶
func (dlog *DiskLogStorage) Shutdown(notifier *utils.ShutdownNotifier)
func (*DiskLogStorage) Sync ¶
func (dlog *DiskLogStorage) Sync() error
Sync calls Sync on the underlying files that have been written to since the last sync.
This only called by the Commit Log for policies WRITE_SYNC and PERIODIC_SYNC.
func (*DiskLogStorage) TruncateMessages ¶
func (dlog *DiskLogStorage) TruncateMessages(index int64) error
TruncateMessages is used by CommitLog to remove messages that it knows are invalid (happens during leadership changes)
These steps are followed:
1 - Truncate the messages in the cache. 2 - The newest segment with an firstIndex <= index is found. 3 - The segment is requested to truncate to index. 4 - If any segments existing beyond the one found, they are deleted.
type DiskLogStorageInfo ¶
type DiskTopicPersistentStore ¶
type DiskTopicPersistentStore struct { Term int64 VotedFor string // contains filtered or unexported fields }
func NewDiskTopicPersistentStore ¶
func NewDiskTopicPersistentStore(dirpath string) *DiskTopicPersistentStore
type MessageCache ¶
type MessageCache struct {
// contains filtered or unexported fields
}
MessageCache provides DiskLogStorage cache facilities.
The cache is populated by the data passed to appends being kept around for a while and used for gets when it can be. This significantly reduces the amount of file seeking and reading.
Note that locks are not required - all read / write locking has been done by the caller.
func CreateCache ¶
func CreateCache(capacity int) *MessageCache
CreateCache generates the MessageCache for a give number of message batches.
func (*MessageCache) AppendMessages ¶
func (msgCache *MessageCache) AppendMessages(index int64, msgs model.Messages)
AppendMessages caches the messages for a while, bumping out old ones as requied.
func (*MessageCache) ExpVar ¶
func (msgCache *MessageCache) ExpVar() interface{}
func (*MessageCache) GetMessages ¶
func (msgCache *MessageCache) GetMessages(index int64, count int64) model.Messages
GetMessages returns messages from the cache from the given index.
func (*MessageCache) Truncate ¶
func (msgCache *MessageCache) Truncate(index int64)
Truncate is used to trim the cache in case the underlying commit log has been truncated.
type MessageCacheInfo ¶
type MessageCacheInfo struct { CacheSlotsUsed int CacheEntryInfo []CacheEntryInfo TotalCacheSize int Hits int Misses int }
MessageCacheInfo is used by ExpVars to present key stats about the MessageCache state.
type Segment ¶
type Segment struct {
// contains filtered or unexported fields
}
func CreateNewSegment ¶
func CreateNewSegment(topicName string, directory string, firstIndex int64, targetSize int) (*Segment, error)
Create generates a new Segment file in the given directory, starting at the given index. The Segment is returned in an open state.
func ExistingSegment ¶
func (*Segment) AppendMessages ¶
func (seg *Segment) AppendMessages(msgs model.Messages) (appendIndex, lastIndex int64, segmentFull bool, err error)
AppendMessages checks to see whether the segment is full, and if not it appends the messages to it.
func (*Segment) Close ¶
Close turns an open sgement into a close one. The file is closed and memory freed.
func (*Segment) GetFirstIndex ¶
func (*Segment) GetLastAccessTime ¶
func (*Segment) GetLastIndex ¶
func (*Segment) GetLastModifiedTime ¶
func (*Segment) GetMessages ¶
func (seg *Segment) GetMessages(index int64, targetMessageCount int) (msgs model.Messages, segmentRequiredOpening bool, err error)
GetMessages returns as many messages as possible from the given start index up to the maximum message batch size
Cache has already been checked at this stage, so we use seeking in the file itself to find the messages.