storage

package
v0.0.0-...-3ee4cf2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 11, 2021 License: Apache-2.0 Imports: 27 Imported by: 0

Documentation

Index

Constants

View Source
const (
	//FileMode is default file mode
	FileMode = 0600
	//DirMode is default dir mode
	DirMode = 0700
)
View Source
const (
	//MetadataFile is the file name of storage metadata
	MetadataFile = "kingbus_meta.db"
	//BucketName is the bucket name using in bolt db
	BucketName = "kingbus_meta_bucket"
)
View Source
const (
	//ReadonlySegmentPattern is the read only segment file name pattern
	ReadonlySegmentPattern = "%020d-%020d.log"
	//ReadonlyIndexPattern is the read only index file name pattern
	ReadonlyIndexPattern = "%020d-%020d.index"
	//ReadWriteSegmentPattern is the read write segment file name pattern
	ReadWriteSegmentPattern = "%020d-inprogress.log"
	//ReadWriteIndexPattern is the read only index file name pattern
	ReadWriteIndexPattern = "%020d-inprogress.index"
	//RecordLengthSize is the record length size
	RecordLengthSize int = 4
)
View Source
const (
	//HardStateKey represents hard state in raft
	HardStateKey = "hard_state"
	//ConfStateKey represents conf state in raft
	ConfStateKey = "conf_state"

	//ExecutedGtidSetKey represents executed gtid set key
	ExecutedGtidSetKey = "executed_gtids"
	//GtidPurgedKey represents gtid purged key
	GtidPurgedKey = "gtid_purged"
	//FdePrefix is the key prefix of FORMAT_DESCRIPTION_EVENT
	FdePrefix = "fde"
	//NextBinlogPrefix is the key prefix of nex binlog file name
	NextBinlogPrefix = "next_binlog"
	//MasterInfoKey is master info key
	MasterInfoKey = "master_info"
	//PgePrefix if the key prefix of previous gtid event
	PgePrefix = "pre_gtids"
	//SyncerArgsKey is the key of syncer start args
	SyncerArgsKey = "syncer_args"
	//NeedRecoverKey is the key of storage need recover
	NeedRecoverKey = "ds_need_recover"
	//RaftClusterKey is the key of raft cluster information
	RaftClusterKey = "raft_cluster"
	//AppliedIndexKey is the key of applied index
	AppliedIndexKey = "apply_index"
)
View Source
const (
	//IndexEntrySize is the size of IndexEntry
	IndexEntrySize = 12
)

Variables

View Source
var (
	//ErrKeyNotFound return for key is not found
	ErrKeyNotFound = errors.New("storage:key is not found")
	//ErrKeyIsNil return for key is nil
	ErrKeyIsNil = errors.New("storage:key is nil")
	//ErrArgsIllegal return for args are illegal
	ErrArgsIllegal = errors.New("storage:args are illegal")
	//ErrNotContain return for gtids not be contained
	ErrNotContain = errors.New("storage:gtids not be contained")
	//ErrOutOfBound return for index out of bound
	ErrOutOfBound = errors.New("storage:index out of bound")
	//ErrNotContinuous return for  index is not continuous
	ErrNotContinuous = errors.New("storage:index is not continuous")
	//ErrClosed return for segment is closed
	ErrClosed = errors.New("storage:segment is closed")
	//ErrNotWritable return for segment not writable
	ErrNotWritable = errors.New("storage:segment not writable")
)
View Source
var (

	//SegmentSize is the size of segment file
	SegmentSize int64 = 1024 * 1024 * 1024 //1GB

)

Functions

This section is empty.

Types

type DiskEntryReader

type DiskEntryReader struct {
	// contains filtered or unexported fields
}

DiskEntryReader is a raft entry reader

func (*DiskEntryReader) GetNext

func (r *DiskEntryReader) GetNext() (*raftpb.Entry, error)

GetNext get the next raft entry

func (*DiskEntryReader) NextRaftIndex

func (r *DiskEntryReader) NextRaftIndex() uint64

NextRaftIndex get the next raft index

type DiskStorage

type DiskStorage struct {
	Dir string

	Mu       sync.RWMutex
	Segments []*Segment

	ReserveSegmentCount int

	MetaStorage
	// contains filtered or unexported fields
}

DiskStorage is the Storage store data in disk

func NewDiskStorage

func NewDiskStorage(dir string, reserveSizeInGB int) (*DiskStorage, error)

NewDiskStorage create a disk storage

func (*DiskStorage) Close

func (s *DiskStorage) Close() error

Close storage

func (*DiskStorage) Entries

func (s *DiskStorage) Entries(lo, hi, maxSize uint64) ([]raftpb.Entry, error)

Entries get raft entries in [lo,hi)

func (*DiskStorage) FirstIndex

func (s *DiskStorage) FirstIndex() (uint64, error)

FirstIndex get first raft index

func (*DiskStorage) LastIndex

func (s *DiskStorage) LastIndex() (uint64, error)

LastIndex get last raft index

func (*DiskStorage) NewEntryReaderAt

func (s *DiskStorage) NewEntryReaderAt(raftIndex uint64) (EntryReader, error)

NewEntryReaderAt create a DiskEntryReader at raftIndex

func (*DiskStorage) SaveRaftEntries

func (s *DiskStorage) SaveRaftEntries(entries []raftpb.Entry) error

SaveRaftEntries save raft entries in storage

func (*DiskStorage) Snapshot

func (s *DiskStorage) Snapshot() (raftpb.Snapshot, error)

Snapshot not implement

func (*DiskStorage) StartPurgeLog

func (s *DiskStorage) StartPurgeLog()

StartPurgeLog implements purge the expired log files

func (*DiskStorage) StopPurgeLog

func (s *DiskStorage) StopPurgeLog()

StopPurgeLog stop purge log

func (*DiskStorage) Term

func (s *DiskStorage) Term(i uint64) (uint64, error)

Term get raft term of raft index

func (*DiskStorage) TruncateSuffix

func (s *DiskStorage) TruncateSuffix(raftIndex uint64) error

TruncateSuffix truncate raft entry to the raft index is i, include i.

type EntryReader

type EntryReader interface {
	GetNext() (*raftpb.Entry, error)
	NextRaftIndex() uint64
}

EntryReader is a raft entry reader interface

type Index

type Index struct {
	// contains filtered or unexported fields
}

Index is a index of segment file

type IndexEntry

type IndexEntry struct {
	RaftIndex    uint64
	FilePosition uint32
}

IndexEntry is index entry in index file

func (*IndexEntry) Marshal

func (i *IndexEntry) Marshal() ([]byte, error)

Marshal encode index entry into byte

func (*IndexEntry) Unmarshal

func (i *IndexEntry) Unmarshal(data []byte) error

Unmarshal decode byte into index entry

type MemoryEventReader

type MemoryEventReader struct {
}

MemoryEventReader not implement

func (*MemoryEventReader) GetNext

func (s *MemoryEventReader) GetNext() (*raftpb.Entry, error)

GetNext not implement

func (*MemoryEventReader) NextRaftIndex

func (s *MemoryEventReader) NextRaftIndex() uint64

NextRaftIndex not implement

type MemoryStorage

type MemoryStorage struct {
	MetaStorage
	// contains filtered or unexported fields
}

MemoryStorage is Storage implemented in memory

func NewMemoryStorage

func NewMemoryStorage(dir string) (*MemoryStorage, error)

NewMemoryStorage create a memory storage

func (*MemoryStorage) Close

func (s *MemoryStorage) Close() error

Close memory storage

func (*MemoryStorage) Entries

func (s *MemoryStorage) Entries(lo, hi, maxSize uint64) ([]raftpb.Entry, error)

Entries get raft entries in [lo,hi)

func (*MemoryStorage) FirstIndex

func (s *MemoryStorage) FirstIndex() (uint64, error)

FirstIndex get first raft index

func (*MemoryStorage) LastIndex

func (s *MemoryStorage) LastIndex() (uint64, error)

LastIndex get last raft index

func (*MemoryStorage) NewEntryReaderAt

func (s *MemoryStorage) NewEntryReaderAt(raftIndex uint64) (EntryReader, error)

NewEntryReaderAt not implement

func (*MemoryStorage) SaveRaftEntries

func (s *MemoryStorage) SaveRaftEntries(entries []raftpb.Entry) error

SaveRaftEntries save raft entries in storage

func (*MemoryStorage) Snapshot

func (s *MemoryStorage) Snapshot() (raftpb.Snapshot, error)

Snapshot do not support

func (*MemoryStorage) StartPurgeLog

func (s *MemoryStorage) StartPurgeLog()

StartPurgeLog not implement

func (*MemoryStorage) StopPurgeLog

func (s *MemoryStorage) StopPurgeLog()

StopPurgeLog not implement

func (*MemoryStorage) Term

func (s *MemoryStorage) Term(i uint64) (uint64, error)

Term get raft term of raft index

func (*MemoryStorage) TruncateSuffix

func (s *MemoryStorage) TruncateSuffix(i uint64) error

TruncateSuffix not implement

type MetaStorage

type MetaStorage interface {
	InitialState() (raftpb.HardState, raftpb.ConfState, error)
	SaveHardState(st raftpb.HardState) error

	Get(key []byte) ([]byte, error)
	Set(key, value []byte) error
	Delete(key []byte) error

	//key is key+flavor
	SetGtidSet(flavor string, key string, gtidSet gomysql.GTIDSet) error
	GetGtidSet(flavor string, key string) (gomysql.GTIDSet, error)
	SetBinlogProgress(appliedIndex uint64, executedGtidSet gomysql.GTIDSet) error

	GetFde(preGtidEventIndex uint64) ([]byte, error)

	//get the raft index of PreviousGtidSet
	GetPreviousGtidSet(slaveExecutedGtids *gomysql.MysqlGTIDSet) (uint64, error)
	SetPreviousGtidSet(raftIndex uint64, previousGtidSet *gomysql.MysqlGTIDSet) error

	GetNextBinlogFile(startRaftIndex uint64) (string, error)
	UpdatePugedGtidset(firstIndex uint64) error

	Close2() error
}

MetaStorage is metadata storage interface

type MetaStore

type MetaStore struct {
	DB         *bolt.DB
	Dir        string
	BucketName []byte
}

MetaStore is a metadata store

func NewMetaStore

func NewMetaStore(dir string) (*MetaStore, error)

NewMetaStore create a meta store

func (*MetaStore) Close2

func (s *MetaStore) Close2() error

Close2 close meta store

func (*MetaStore) Delete

func (s *MetaStore) Delete(key []byte) error

Delete key

func (*MetaStore) Get

func (s *MetaStore) Get(key []byte) ([]byte, error)

Get value by key

func (*MetaStore) GetFde

func (s *MetaStore) GetFde(preGtidEventIndex uint64) ([]byte, error)

GetFde get FORMAT_DESCRIPTION_EVENT

func (*MetaStore) GetGtidSet

func (s *MetaStore) GetGtidSet(flavor string, key string) (gomysql.GTIDSet, error)

GetGtidSet get gtid set

func (*MetaStore) GetNextBinlogFile

func (s *MetaStore) GetNextBinlogFile(startRaftIndex uint64) (string, error)

GetNextBinlogFile get the next binlog file name

func (*MetaStore) GetPreviousGtidSet

func (s *MetaStore) GetPreviousGtidSet(slaveExecutedGtids *gomysql.MysqlGTIDSet) (uint64, error)

GetPreviousGtidSet Iterate over all Previous Gtid keys in order, and read only the Previous_gtids_log_event, to find the last one, that is the subset of slaveExecutedGtids. Since every binary log file begins with a Previous_gtids_log_event, that contains all GTIDs in all previous binary logs. We also ask for the first GTID in the binary log to know if we should send the FD event with the "created" field cleared or not.

func (*MetaStore) InitialState

func (s *MetaStore) InitialState() (pb.HardState, pb.ConfState, error)

InitialState init raft state

func (*MetaStore) SaveHardState

func (s *MetaStore) SaveHardState(st pb.HardState) error

SaveHardState save hard state in raft

func (*MetaStore) Set

func (s *MetaStore) Set(key []byte, value []byte) error

Set value by key

func (*MetaStore) SetBinlogProgress

func (s *MetaStore) SetBinlogProgress(appliedIndex uint64, executedGtidSet gomysql.GTIDSet) error

SetBinlogProgress save appliedIndex and executedGtidSet at the same time

func (*MetaStore) SetGtidSet

func (s *MetaStore) SetGtidSet(flavor string, key string, gtidSet gomysql.GTIDSet) error

SetGtidSet set mysql gtid

func (*MetaStore) SetPreviousGtidSet

func (s *MetaStore) SetPreviousGtidSet(raftIndex uint64, previousGtidSet *gomysql.MysqlGTIDSet) error

SetPreviousGtidSet set previous gtid set

func (*MetaStore) UpdatePugedGtidset

func (s *MetaStore) UpdatePugedGtidset(firstIndex uint64) error

UpdatePugedGtidset update purged gtid when segment purged or updated by master gtid_purged

type MmapFile

type MmapFile struct {
	// contains filtered or unexported fields
}

MmapFile is the file mmaped in os

type RWFile

type RWFile struct {
	// contains filtered or unexported fields
}

RWFile is a index file with RW mode

type Segment

type Segment struct {
	Mu         sync.RWMutex
	FirstIndex uint64
	LastIndex  uint64

	LogFile  *MmapFile
	LogIndex *Index
	Status   SegmentStatus
}

Segment is segment file

type SegmentStatus

type SegmentStatus int8

SegmentStatus is the status of segment file

const (
	//SegmentReadOnly represents segment file read only
	SegmentReadOnly SegmentStatus = iota
	//SegmentRDWR represents segment file read write
	SegmentRDWR
	//SegmentClosed represents segment file is closed
	SegmentClosed
)

type Storage

type Storage interface {
	// entries returns a slice of log entries in the range [lo,hi).
	// MaxSize limits the total size of the log entries returned, but
	// entries returns at least one entry if any.
	Entries(lo, hi, maxSize uint64) ([]raftpb.Entry, error)
	// Term returns the term of entry i, which must be in the range
	// [FirstIndex()-1, LastIndex()]. The term of the entry before
	// FirstIndex is retained for matching purposes even though the
	// rest of that entry may not be available.SaveHardState
	Term(i uint64) (uint64, error)
	// LastIndex returns the index of the last entry in the log.
	LastIndex() (uint64, error)
	// FirstIndex returns the index of the first log entry that is
	// possibly available via entries (older entries have been incorporated
	// into the latest Snapshot; if storage only contains the dummy entry the
	// first log entry is not available).
	FirstIndex() (uint64, error)
	// Snapshot returns the most recent snapshot.
	// If snapshot is temporarily unavailable, it should return ErrSnapshotTemporarilyUnavailable,
	// so raft state machine could know that Storage needs some time to prepare
	// snapshot and call Snapshot later.
	Snapshot() (raftpb.Snapshot, error)
	SaveRaftEntries(entries []raftpb.Entry) error
	TruncateSuffix(i uint64) error
	StartPurgeLog()
	StopPurgeLog()

	MetaStorage
	NewEntryReaderAt(raftIndex uint64) (EntryReader, error)
	Close() error
}

Storage is kingbus server storage

Directories

Path Synopsis
Package storagepb is a generated protocol buffer package.
Package storagepb is a generated protocol buffer package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL