logstore

package
v0.0.0-...-16dfdc2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 3, 2018 License: Apache-2.0 Imports: 26 Imported by: 0

Documentation

Index

Constants

View Source
const (
	IndexEntrySize    = 36
	PageSize          = 4096
	IndexCountPerPage = PageSize / IndexEntrySize //113

)
View Source
const (
	MetaDataType int32 = iota
	RaftLogType
	VdlLogType
)
View Source
const (
	TermSize          = 8
	RindexSize        = 8
	RaftTypeSize      = 4
	RaftLogHeaderSize = 20
	RecordHeaderSize  = 44
)

The recording fields are arranged in the following order: crc|record_type|data_length|vindex|term|rindex|raft_type|data

Variables

View Source
var (
	ErrFileNotFound         = errors.New("logstore: file not found")
	ErrArgsNotAvailable     = errors.New("logstore: args not available")
	ErrOutOfRange           = errors.New("logstore:starVindex is out of range")
	ErrFileExist            = errors.New("logstore:file already exists")
	ErrFileNotExist         = errors.New("logstore:the file or dir not exists")
	ErrEntryNotExist        = errors.New("logstore:entry not exists")
	ErrCrcNotMatch          = errors.New("logstore: Crc32 values do not match")
	ErrFieldNotMatch        = errors.New("logstore: index entry field do not match record")
	ErrBadSegmentName       = errors.New("logstore:bad segment name")
	ErrBadIndexName         = errors.New("logstore:bad index name")
	ErrNoFileMeta           = errors.New("logstore:segment has no metadata")
	ErrNotAllowDelete       = errors.New("logstore:segment doesn't allow to delete")
	ErrNotAllowWrite        = errors.New("logstore:segment doesn't allow to write")
	ErrRangeNotExist        = errors.New("logstore:out of the range of logstore")
	ErrTornWrite            = errors.New("logstore: file exist an incomplete write in the end")
	ErrSegmentClosed        = errors.New("logstore: the segment is closed")
	ErrRangeMetadataDestory = errors.New("logstore: the range metadata file is destory")
	ErrNotContinuous        = errors.New("logstore:segments are not continuous")
)
View Source
var (
	MaxSegmentSize int64 = 512 * 1024 * 1024

	GetRecordDuration time.Duration
)
View Source
var (
	//logstore interface metric
	LogstoreReadTps = prometheus.NewCounter(prometheus.CounterOpts{
		Namespace: "logstore",
		Subsystem: "read",
		Name:      "read_tps",
		Help:      "logstore read tps",
	})
	LogstoreReadLatency = prometheus.NewSummary(prometheus.SummaryOpts{
		Namespace:  "logstore",
		Subsystem:  "read",
		Name:       "read_latency",
		Help:       "logstore read latency",
		MaxAge:     conf.DefaultMetricsConfig.LogstoreReadLatencySummaryDuration,
		Objectives: map[float64]float64{0.99: 0.001},
	})

	LogstoreWriteTps = prometheus.NewCounter(prometheus.CounterOpts{
		Namespace: "logstore",
		Subsystem: "write",
		Name:      "write_tps",
		Help:      "logstore write tps",
	})
	LogstoreWriteLatency = prometheus.NewSummary(prometheus.SummaryOpts{
		Namespace:  "logstore",
		Subsystem:  "write",
		Name:       "write_latency",
		Help:       "logstore write latency",
		MaxAge:     conf.DefaultMetricsConfig.LogstoreWriteLatencySummaryDuration,
		Objectives: map[float64]float64{0.99: 0.001},
	})

	LogstoreDeleteSegmentLatency = prometheus.NewSummary(prometheus.SummaryOpts{
		Namespace:  "logstore",
		Subsystem:  "delete",
		Name:       "delete_segment_latency",
		Help:       "logstore delete segment latency(ms)",
		MaxAge:     conf.DefaultMetricsConfig.LogstoreDSLatencySummaryDuration,
		Objectives: map[float64]float64{0.99: 0.001},
	})

	//segment metric
	SegmentReadTps = prometheus.NewCounter(prometheus.CounterOpts{
		Namespace: "logstore",
		Subsystem: "segment",
		Name:      "read_tps",
		Help:      "segment read tps",
	})

	SegmentCutCounter = prometheus.NewCounter(prometheus.CounterOpts{
		Namespace: "logstore",
		Subsystem: "segment",
		Name:      "segment_cut_Interval",
		Help:      "The count of segment cut",
	})
)
View Source
var (
	NormalExitFlag   string = "normal_exit"
	UnnormalExitFlag string = "unnormal_exit"
)
View Source
var (
	IndexFileSuffix = ".idx"
	LogFileSuffix   = ".log"
)
View Source
var Encoding = binary.BigEndian
View Source
var (
	//512MB
	ReserveRecordMemory uint64 = 1024 * 1024 * 512 //reserve memory for tail records.
)

Functions

func CheckAndRestoreDataDir

func CheckAndRestoreDataDir(dir string) error

issue #119 检查数据目录下的segment,index和range的一致性,并删除多余的信息或文件

func IsFileExist

func IsFileExist(filePath string) bool

func Min

func Min(a, b int64) int64

func MockIndexTornWrite

func MockIndexTornWrite(index *Index, count int64)

just for fiu test,not use in production environment

func MockSegmentTornWrite

func MockSegmentTornWrite(s *Segment, count int64)

just for fiu test

func RecordsToRaftLog

func RecordsToRaftLog(records []*Record, maxSize uint64) []raftpb.Entry

仅用于性能测试 only for performance test,don't use in production

func RecordsToVdlLog

func RecordsToVdlLog(records []*Record, maxSize int32) []logstream.Entry

Types

type FileMeta

type FileMeta struct {
	VdlVersion string
}

type FlagFile

type FlagFile struct {
	// contains filtered or unexported fields
}

func NewFlagFile

func NewFlagFile(dir string) *FlagFile

func (*FlagFile) NeedRecover

func (f *FlagFile) NeedRecover() bool

判断是否需要恢复index 文件,以下情形需要恢复: 1.exit_flag_file文件不存在 2.exit_flag_file文件中的内容不是normal_exit

func (*FlagFile) WriteExitFlag

func (f *FlagFile) WriteExitFlag(flag string)

如果文件不存在,创建文件并写入标识

type Index

type Index struct {
	Name      string
	IndexPath string

	IndexFile *fileutil.LockedFile
	Position  int64
}

type IndexEntry

type IndexEntry struct {
	Crc      uint32
	Vindex   int64  //vdl log index
	Rindex   uint64 //raft log index
	Position int64  //raft log in the segment file position
	Length   int64  //the length of data field in record
}

type LogFile

type LogFile struct {
	File          *fileutil.LockedFile
	Name          string
	MapData       []byte
	WritePosition int64
	SyncPosition  int64
	MaxBytes      int64
}

type LogRange

type LogRange struct {
	FirstVindex int64  `json:"first_vindex"`
	FirstRindex uint64 `json:"first_rindex"`
	LastVindex  int64  `json:"last_vindex"`
	LastRindex  uint64 `json:"last_rindex"`
}

type LogStore

type LogStore struct {
	Dir                 string    // the living directory of the underlay files
	Meta                *FileMeta // metadata recorded at the head of each WAL
	SegmentSizeBytes    int64     //the max size of segment file
	ReserveSegmentCount int
	IsNew               bool

	Mu       sync.RWMutex
	Segments []*Segment //log segment files,the last segment is for write only, others for read only

	Mc            *MemCache //memory cache for The latest piece of records which including all the records in last segment
	RangeMetaFile *RangeFile
	ExitFlagFile  *FlagFile
}

func NewLogStore

func NewLogStore(cfg *LogStoreConfig) (*LogStore, error)

创建logstore

func (*LogStore) Close

func (s *LogStore) Close() error

func (*LogStore) CreateSnapshotMeta

func (s *LogStore) CreateSnapshotMeta(applyIndex uint64) ([]*logstream.SegmentFile, error)

func (*LogStore) DeleteFiles

func (s *LogStore) DeleteFiles(segmentNames []string) error

输出segment文件和对应的index 文件 只能从开始位置的segment开始顺序删除

func (*LogStore) DeleteRaftLog

func (s *LogStore) DeleteRaftLog(rindex uint64) error

删除rindex及之后的raft log,包含rindex

func (*LogStore) Entries

func (s *LogStore) Entries(lo, hi, maxSize uint64) ([]raftpb.Entry, error)

Entries returns a slice of log entries in the range [lo,hi). MaxSize limits the total size of the log entries returned, but Entries returns at least one entry if any.

func (*LogStore) FetchLogStreamMessages

func (s *LogStore) FetchLogStreamMessages(startVindex int64, endRindex uint64, maxBytes int32) ([]logstream.Entry, error, bool)

kafka接口调用该函数获取vdl log startVindex:表示获取vdl log的开始位置 endRindex:表示获取vdl log的最大rindex,小于等于该值 maxBytes:该次获取的vdl log总大小不能超过maxBytes,如果第一条vdl log就超过了maxBytes,则返回第一条 bool return whether read from cache

func (*LogStore) FirstIndex

func (s *LogStore) FirstIndex() (uint64, error)

FirstIndex returns the first index written. 0 for no entries.

func (*LogStore) FirstVindex

func (s *LogStore) FirstVindex() (int64, error)

FirstVIndex returns the first index written. -1 for no entries.

func (*LogStore) GetEntriesInMemCache

func (s *LogStore) GetEntriesInMemCache(count int) ([]raftpb.Entry, error)

仅用于测试使用,勿用于生产环境 only for performance test,don't use in production

func (*LogStore) GetFirstSegment

func (s *LogStore) GetFirstSegment() *Segment

func (*LogStore) GetLastSegment

func (s *LogStore) GetLastSegment() *Segment

func (*LogStore) GetStartSegmentByVindex

func (s *LogStore) GetStartSegmentByVindex(startVindex int64) (*Segment, bool)

根据startVindex获得对应的segment结构,同时返回改segment是否是最后一个segment

func (*LogStore) GetVindexByRindex

func (s *LogStore) GetVindexByRindex(rindex uint64) (int64, error)

根据rindex获取vindex

func (*LogStore) IsNewStore

func (s *LogStore) IsNewStore() bool

func (*LogStore) LastIndex

func (s *LogStore) LastIndex() (uint64, error)

LastIndex returns the last index written. 0 for no entries.

func (*LogStore) LastVindex

func (s *LogStore) LastVindex() (int64, error)

LastIndex returns the last index written. -1 for no entries.

func (*LogStore) MaxVindex

func (s *LogStore) MaxVindex(maxRindex uint64) (int64, error)

获取不超过maxRindex,对应的最大vindex

func (*LogStore) MinVindex

func (s *LogStore) MinVindex() (int64, error)

func (*LogStore) ReadRaftLogByRindex

func (s *LogStore) ReadRaftLogByRindex(start, end uint64, maxSize uint64) ([]raftpb.Entry, error)

获取[start,end)范围的raft entry,并且总大小不超过maxSize。 范围和大小两个限制,有一个突破,则立即返回 如果第一条raft entry大小就超过maxSize,则返回第一条raft entry

func (*LogStore) ReadRaftLogFromSegments

func (s *LogStore) ReadRaftLogFromSegments(start, end uint64, maxSize uint64) ([]raftpb.Entry, error)

从文件读取,获取[start,end)范围的raft entry,并且总大小不超过maxSize。 范围和大小两个限制,有一个突破,则立即返回 如果第一条raft entry大小就超过maxSize,则返回第一条raft entry

func (*LogStore) ReadRecordsByVindex

func (s *LogStore) ReadRecordsByVindex(startVindex int64, endRindex uint64,
	maxSize int32) ([]logstream.Entry, error, bool)

根据vindex获取对应的Entry startVindex:对应的vindex开始位置 endRindex:获取Entry的rindex,不超过endRindex,小于等于该值 maxSize:获取的Entry总大小不超过该值 范围和大小两个限制,有一个突破,则立即返回 如果第一条entry大小就超过maxSize,则返回第一条entry bool return whether read from cache

func (*LogStore) ReadVdlLogFromSegments

func (s *LogStore) ReadVdlLogFromSegments(startVindex int64, endRindex uint64,
	maxSize int32) ([]logstream.Entry, error)

从文件读取,根据vindex获取对应的Entry startVindex:对应的vindex开始位置 endRindex:获取Entry的rindex,不超过endRindex,小于等于该值 maxSize:获取的Entry总大小不超过该值 范围和大小两个限制,有一个突破,则立即返回 如果第一条entry大小就超过maxSize,则返回第一条entry

func (*LogStore) SegmentCount

func (s *LogStore) SegmentCount() int

func (*LogStore) Snapshot

func (s *LogStore) Snapshot() (raftpb.Snapshot, error)

func (*LogStore) StoreEntries

func (s *LogStore) StoreEntries(entries []raftpb.Entry) error

raft lib调用StoreEntries存储entries到logstore中

func (*LogStore) Term

func (s *LogStore) Term(i uint64) (uint64, error)

根据index(i)获取对应的Term

func (*LogStore) WriteRecords

func (s *LogStore) WriteRecords(records []*Record, recordSize int64) error

logstore调用该函数写Record recordSize为records的总大小

type LogStoreConfig

type LogStoreConfig struct {
	Dir                 string
	Meta                *FileMeta //the segment metadata
	SegmentSizeBytes    int64
	MaxSizePerMsg       int
	MemCacheSizeByte    uint64
	ReserveSegmentCount int
}

type MemCache

type MemCache struct {
	Mu              sync.RWMutex //the last segment and logstore will operate the MemCache, so need a lock
	VindexToRecords []*Record    //the last segment file records which cache in memeory
	RindexToRecords []*Record
	CacheSize       uint64
	FirstRindex     uint64
	LastRindex      uint64
	FirstVindex     int64
	LastVindex      int64
}

func NewMemCache

func NewMemCache(MemCacheSizeByte uint64, maxSizePerMsg int) *MemCache

创建MemCache,CacheSize取决为:保留内存/单条消息最大Size

func (*MemCache) DeleteRecordsByRindex

func (m *MemCache) DeleteRecordsByRindex(rindex uint64)

将MemCache中的数据全部清空

func (*MemCache) GetFirstRindex

func (m *MemCache) GetFirstRindex() uint64

func (*MemCache) GetFirstVindex

func (m *MemCache) GetFirstVindex() int64

func (*MemCache) GetLastRindex

func (m *MemCache) GetLastRindex() uint64

func (*MemCache) GetLastVindex

func (m *MemCache) GetLastVindex() int64

func (*MemCache) GetRaftLogByRindex

func (m *MemCache) GetRaftLogByRindex(start, end uint64, maxSize uint64) ([]raftpb.Entry, bool)

根据范围和消息总大小限制获取raft entry 范围:[start,end)

func (*MemCache) GetRecords

func (m *MemCache) GetRecords(count int) []*Record

仅用于测试 only for performance test,don't use in production

func (*MemCache) GetVdlLogByVindex

func (m *MemCache) GetVdlLogByVindex(startVindex int64, endRindex uint64, maxSize int32) ([]logstream.Entry, bool)

获取从startVindex到endRindex之间的VDL Log,包括endRindex所在的记录 startVindex是VDL log index endRindex是raft log index

func (*MemCache) LoadRecords

func (m *MemCache) LoadRecords(records []*Record)

将records加载到MemCache中

func (*MemCache) WriteRecords

func (m *MemCache) WriteRecords(records []*Record)

type NewIndexConfig

type NewIndexConfig struct {
	Dir  string
	Name string
}

type NewSegmentConfig

type NewSegmentConfig struct {
	Dir           string
	Name          string
	MaxBytes      int64
	Mc            *MemCache
	Meta          *FileMeta
	RangeMetaFile *RangeFile
}

type OpenIndexConfig

type OpenIndexConfig struct {
	Name          string
	Dir           string
	OpenIndexMode OpenMode
}

type OpenMode

type OpenMode int8
const (
	ReadWriteMode OpenMode = iota
	ReadOnlyMode
	ReadWriteWithRecoverMode
)

type OpenSegmentConfig

type OpenSegmentConfig struct {
	Dir             string
	Name            string
	OpenSegmentMode OpenMode
	MaxBytes        int64
	Range           *LogRange
	Mc              *MemCache
	RangeMetaFile   *RangeFile
}

type OpenSegmentsConfig

type OpenSegmentsConfig struct {
	Dir           string
	MaxBytes      int64
	Mc            *MemCache
	Meta          *FileMeta
	Ranges        *RangeInfo
	RangeMetaFile *RangeFile
	NeedRecover   bool
}

type ParseIndexResult

type ParseIndexResult struct {
	Entries     []*IndexEntry
	FirstVindex int64
	FirstRindex uint64
	LastVindex  int64
	LastRindex  uint64
}

type RangeFile

type RangeFile struct {
	// contains filtered or unexported fields
}

func NewRangeFile

func NewRangeFile(dir string) *RangeFile

func (*RangeFile) AppendLogRange

func (m *RangeFile) AppendLogRange(segmentName string, l *LogRange) error

func (*RangeFile) DeleteLogRanges

func (m *RangeFile) DeleteLogRanges(deleteSegments []string) error

func (*RangeFile) GetRangeInfo

func (m *RangeFile) GetRangeInfo() (*RangeInfo, error)

type RangeInfo

type RangeInfo struct {
	//key is segment name, value is index range
	NameToRange map[string]*LogRange `json:"range"`
}

type Record

type Record struct {
	Crc        uint32 // crc for the remainder field
	RecordType int32  //
	DataLen    int64  //data length
	Vindex     int64

	Term     uint64
	Rindex   uint64
	RaftType raftpb.EntryType
	Data     []byte
}

Record store order Crc|RecordType|DataLen|Vindex|Term|Rindex|RaftType|Data

type RequestType

type RequestType int8
const (
	RaftIndexType RequestType = iota
	VdlIndexType
)

type Segment

type Segment struct {
	Mu              sync.RWMutex
	SegmentPath     string //file path
	FirstVindex     int64
	FirstRindex     uint64
	LastVindex      int64  //the last vdl log index
	LastRindex      uint64 // the last raft log index
	InitFirstVindex int64  //if the segment has record, InitFirstVindex is equal FirstVindex

	Log       *LogFile
	IndexFile *Index // the index for this segment
	Status    SegmentStatus

	Mc            *MemCache
	RangeMetaFile *RangeFile
}

func NewSegment

func NewSegment(cfg *NewSegmentConfig) (*Segment, error)

创建segment和index segment file name like this :"0000000000000000.log"

func OpenSegment

func OpenSegment(cfg *OpenSegmentConfig) (*Segment, error)

open a exist segment file and its index file

func OpenSegmentWithRead

func OpenSegmentWithRead(cfg *OpenSegmentConfig) (*Segment, error)

以只读方式打开segment

func OpenSegmentWithWrite

func OpenSegmentWithWrite(cfg *OpenSegmentConfig) (*Segment, error)

以读写方式打开segment

func OpenSegments

func OpenSegments(cfg *OpenSegmentsConfig) ([]*Segment, error)

打开segments

func (*Segment) Close

func (s *Segment) Close() error

func (*Segment) DeleteRecordsByRindex

func (s *Segment) DeleteRecordsByRindex(start uint64) error

only delete record in the last segment, and must guarantee the rindex(start) in this segment.

func (*Segment) GetEndPositionByRindex

func (s *Segment) GetEndPositionByRindex(rindex uint64) (int64, int64, error)

读取rindex在segment和index中对应记录的结束位置 用于记录snapshot中最后一个segment和index文件

func (*Segment) GetFirstRindex

func (s *Segment) GetFirstRindex() uint64

func (*Segment) GetFirstVindex

func (s *Segment) GetFirstVindex() int64

func (*Segment) GetLastRindex

func (s *Segment) GetLastRindex() uint64

func (*Segment) GetLastVindex

func (s *Segment) GetLastVindex() int64

func (*Segment) GetMaxBytes

func (s *Segment) GetMaxBytes() int64

func (*Segment) GetMaxVindexByRindex

func (s *Segment) GetMaxVindexByRindex(rindex uint64) (int64, error)

获取rindex对应的最大vindex

func (*Segment) GetName

func (s *Segment) GetName() string

func (*Segment) GetVindexByRindex

func (s *Segment) GetVindexByRindex(rindex uint64) (int64, error)

根据rindex读取对应的vindex

func (*Segment) ReOpenWithWrite

func (s *Segment) ReOpenWithWrite() error

将只读的segment以读写方式打开

func (*Segment) ReadRaftLogsByRindex

func (s *Segment) ReadRaftLogsByRindex(start, end uint64, maxSize uint64) ([]raftpb.Entry, uint64, error)

读取[start,end)范围的raft log,同时满足整个raft log size大小不能超过maxSize,hasRead表示是否已经读取了部分raftlog 用于判断第一条raftlog 就超过maxSize时是否添加该raftlog 返回<raft_log,剩下应该读取的大小,error>

func (*Segment) ReadVdlLogsByVindex

func (s *Segment) ReadVdlLogsByVindex(startVindex int64, endRindex uint64, maxSize int32) (*VdlResult, error)

根据vindex读取vdl log

func (*Segment) RebuildIndexFile

func (s *Segment) RebuildIndexFile() (*ParseIndexResult, error)

只有在启动的时候,才有可能进行重建索引操作 所以不需要加锁

func (*Segment) Remove

func (s *Segment) Remove() error

remove the segment file and index file synchronously

func (*Segment) SetReadOnly

func (s *Segment) SetReadOnly()

func (*Segment) SyncIndexFile

func (s *Segment) SyncIndexFile() error

func (*Segment) WriteRecords

func (s *Segment) WriteRecords(records []*Record) error

写records

type SegmentStatus

type SegmentStatus int8
const (
	SegmentReadOnly SegmentStatus = iota
	SegmentRDWR
	SegmentClosed
)

type VdlResult

type VdlResult struct {
	// contains filtered or unexported fields
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL