storage

package
v0.0.0-...-0b345dd Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 6, 2022 License: Apache-2.0 Imports: 40 Imported by: 0

Documentation

Index

Constants

View Source
const (
	MAX_TTL_VOLUME_REMOVAL_DELAY = 10 // 10 minutes
)

Variables

View Source
var ErrVolumeNotFound = fmt.Errorf("volume not found")
View Source
var ErrorDeleted = errors.New("already deleted")
View Source
var ErrorNotFound = errors.New("not found")
View Source
var ErrorSizeMismatch = errors.New("size mismatch")

Functions

func CheckAndFixVolumeDataIntegrity

func CheckAndFixVolumeDataIntegrity(v *Volume, indexFile *os.File) (lastAppendAtNs uint64, err error)

func ScanVolumeFile

func ScanVolumeFile(dirname string, collection string, id needle.VolumeId,
	needleMapKind NeedleMapKind,
	volumeFileScanner VolumeFileScanner) (err error)

func ScanVolumeFileFrom

func ScanVolumeFileFrom(version needle.Version, datBackend backend.BackendStorageFile, offset int64, volumeFileScanner VolumeFileScanner) (err error)

func VolumeFileName

func VolumeFileName(dir string, collection string, id int) (fileName string)

Types

type DiskLocation

type DiskLocation struct {
	Directory              string
	IdxDirectory           string
	DiskType               types.DiskType
	MaxVolumeCount         int
	OriginalMaxVolumeCount int
	MinFreeSpace           util.MinFreeSpace
	// contains filtered or unexported fields
}

func NewDiskLocation

func NewDiskLocation(dir string, maxVolumeCount int, minFreeSpace util.MinFreeSpace, idxDir string, diskType types.DiskType) *DiskLocation

func (*DiskLocation) CheckDiskSpace

func (l *DiskLocation) CheckDiskSpace()

func (*DiskLocation) Close

func (l *DiskLocation) Close()

func (*DiskLocation) DeleteCollectionFromDiskLocation

func (l *DiskLocation) DeleteCollectionFromDiskLocation(collection string) (e error)

func (*DiskLocation) DeleteVolume

func (l *DiskLocation) DeleteVolume(vid needle.VolumeId) error

func (*DiskLocation) DestroyEcVolume

func (l *DiskLocation) DestroyEcVolume(vid needle.VolumeId)

func (*DiskLocation) EcVolumesLen

func (l *DiskLocation) EcVolumesLen() int

func (*DiskLocation) FindEcShard

func (*DiskLocation) FindEcVolume

func (l *DiskLocation) FindEcVolume(vid needle.VolumeId) (*erasure_coding.EcVolume, bool)

func (*DiskLocation) FindVolume

func (l *DiskLocation) FindVolume(vid needle.VolumeId) (*Volume, bool)

func (*DiskLocation) LoadEcShard

func (l *DiskLocation) LoadEcShard(collection string, vid needle.VolumeId, shardId erasure_coding.ShardId) (err error)

func (*DiskLocation) LoadVolume

func (l *DiskLocation) LoadVolume(vid needle.VolumeId, needleMapKind NeedleMapKind) bool

func (*DiskLocation) LocateVolume

func (l *DiskLocation) LocateVolume(vid needle.VolumeId) (os.DirEntry, bool)

func (*DiskLocation) SetStopping

func (l *DiskLocation) SetStopping()

func (*DiskLocation) SetVolume

func (l *DiskLocation) SetVolume(vid needle.VolumeId, volume *Volume)

func (*DiskLocation) UnUsedSpace

func (l *DiskLocation) UnUsedSpace(volumeSizeLimit uint64) (unUsedSpace uint64)

func (*DiskLocation) UnloadEcShard

func (l *DiskLocation) UnloadEcShard(vid needle.VolumeId, shardId erasure_coding.ShardId) bool

func (*DiskLocation) UnloadVolume

func (l *DiskLocation) UnloadVolume(vid needle.VolumeId) error

func (*DiskLocation) VolumesLen

func (l *DiskLocation) VolumesLen() int

type LevelDbNeedleMap

type LevelDbNeedleMap struct {
	// contains filtered or unexported fields
}

func NewLevelDbNeedleMap

func NewLevelDbNeedleMap(dbFileName string, indexFile *os.File, opts *opt.Options) (m *LevelDbNeedleMap, err error)

func (*LevelDbNeedleMap) Close

func (m *LevelDbNeedleMap) Close()

func (*LevelDbNeedleMap) Delete

func (m *LevelDbNeedleMap) Delete(key NeedleId, offset Offset) error

func (*LevelDbNeedleMap) Destroy

func (m *LevelDbNeedleMap) Destroy() error

func (*LevelDbNeedleMap) Get

func (m *LevelDbNeedleMap) Get(key NeedleId) (element *needle_map.NeedleValue, ok bool)

func (*LevelDbNeedleMap) IndexFileSize

func (nm *LevelDbNeedleMap) IndexFileSize() uint64

func (*LevelDbNeedleMap) Put

func (m *LevelDbNeedleMap) Put(key NeedleId, offset Offset, size Size) error

func (*LevelDbNeedleMap) ReadIndexEntry

func (nm *LevelDbNeedleMap) ReadIndexEntry(n int64) (key NeedleId, offset Offset, size Size, err error)

func (*LevelDbNeedleMap) Sync

func (nm *LevelDbNeedleMap) Sync() error

type NeedleMap

type NeedleMap struct {
	// contains filtered or unexported fields
}

func LoadCompactNeedleMap

func LoadCompactNeedleMap(file *os.File) (*NeedleMap, error)

func NewCompactNeedleMap

func NewCompactNeedleMap(file *os.File) *NeedleMap

func (*NeedleMap) Close

func (nm *NeedleMap) Close()

func (*NeedleMap) Delete

func (nm *NeedleMap) Delete(key NeedleId, offset Offset) error

func (*NeedleMap) Destroy

func (nm *NeedleMap) Destroy() error

func (*NeedleMap) Get

func (nm *NeedleMap) Get(key NeedleId) (element *needle_map.NeedleValue, ok bool)

func (*NeedleMap) IndexFileSize

func (nm *NeedleMap) IndexFileSize() uint64

func (*NeedleMap) Put

func (nm *NeedleMap) Put(key NeedleId, offset Offset, size Size) error

func (*NeedleMap) ReadIndexEntry

func (nm *NeedleMap) ReadIndexEntry(n int64) (key NeedleId, offset Offset, size Size, err error)

func (*NeedleMap) Sync

func (nm *NeedleMap) Sync() error

type NeedleMapKind

type NeedleMapKind int
const (
	NeedleMapInMemory      NeedleMapKind = iota
	NeedleMapLevelDb                     // small memory footprint, 4MB total, 1 write buffer, 3 block buffer
	NeedleMapLevelDbMedium               // medium memory footprint, 8MB total, 3 write buffer, 5 block buffer
	NeedleMapLevelDbLarge                // large memory footprint, 12MB total, 4write buffer, 8 block buffer
)

type NeedleMapper

type NeedleMapper interface {
	Put(key NeedleId, offset Offset, size Size) error
	Get(key NeedleId) (element *needle_map.NeedleValue, ok bool)
	Delete(key NeedleId, offset Offset) error
	Close()
	Destroy() error
	ContentSize() uint64
	DeletedSize() uint64
	FileCount() int
	DeletedCount() int
	MaxFileKey() NeedleId
	IndexFileSize() uint64
	Sync() error
	ReadIndexEntry(n int64) (key NeedleId, offset Offset, size Size, err error)
}

type ProgressFunc

type ProgressFunc func(processed int64) bool

type ReadOption

type ReadOption struct {
	ReadDeleted bool
}

type SortedFileNeedleMap

type SortedFileNeedleMap struct {
	// contains filtered or unexported fields
}

func NewSortedFileNeedleMap

func NewSortedFileNeedleMap(indexBaseFileName string, indexFile *os.File) (m *SortedFileNeedleMap, err error)

func (*SortedFileNeedleMap) Close

func (m *SortedFileNeedleMap) Close()

func (*SortedFileNeedleMap) Delete

func (m *SortedFileNeedleMap) Delete(key NeedleId, offset Offset) error

func (*SortedFileNeedleMap) Destroy

func (m *SortedFileNeedleMap) Destroy() error

func (*SortedFileNeedleMap) Get

func (m *SortedFileNeedleMap) Get(key NeedleId) (element *needle_map.NeedleValue, ok bool)

func (*SortedFileNeedleMap) IndexFileSize

func (nm *SortedFileNeedleMap) IndexFileSize() uint64

func (*SortedFileNeedleMap) Put

func (m *SortedFileNeedleMap) Put(key NeedleId, offset Offset, size Size) error

func (*SortedFileNeedleMap) ReadIndexEntry

func (nm *SortedFileNeedleMap) ReadIndexEntry(n int64) (key NeedleId, offset Offset, size Size, err error)

func (*SortedFileNeedleMap) Sync

func (nm *SortedFileNeedleMap) Sync() error

type Store

type Store struct {
	MasterAddress pb.ServerAddress

	Ip        string
	Port      int
	GrpcPort  int
	PublicUrl string
	Locations []*DiskLocation

	NeedleMapKind       NeedleMapKind
	NewVolumesChan      chan master_pb.VolumeShortInformationMessage
	DeletedVolumesChan  chan master_pb.VolumeShortInformationMessage
	NewEcShardsChan     chan master_pb.VolumeEcShardInformationMessage
	DeletedEcShardsChan chan master_pb.VolumeEcShardInformationMessage
	// contains filtered or unexported fields
}

* A VolumeServer contains one Store

func NewStore

func NewStore(grpcDialOption grpc.DialOption, ip string, port int, grpcPort int, publicUrl string, dirnames []string, maxVolumeCounts []int,
	minFreeSpaces []util.MinFreeSpace, idxFolder string, needleMapKind NeedleMapKind, diskTypes []DiskType) (s *Store)

func (*Store) AddVolume

func (s *Store) AddVolume(volumeId needle.VolumeId, collection string, needleMapKind NeedleMapKind, replicaPlacement string, ttlString string, preallocate int64, MemoryMapMaxSizeMb uint32, diskType DiskType) error

func (*Store) CheckCompactVolume

func (s *Store) CheckCompactVolume(volumeId needle.VolumeId) (float64, error)

func (*Store) Close

func (s *Store) Close()

func (*Store) CollectErasureCodingHeartbeat

func (s *Store) CollectErasureCodingHeartbeat() *master_pb.Heartbeat

func (*Store) CollectHeartbeat

func (s *Store) CollectHeartbeat() *master_pb.Heartbeat

func (*Store) CommitCleanupVolume

func (s *Store) CommitCleanupVolume(vid needle.VolumeId) error

func (*Store) CommitCompactVolume

func (s *Store) CommitCompactVolume(vid needle.VolumeId) (bool, error)

func (*Store) CompactVolume

func (s *Store) CompactVolume(vid needle.VolumeId, preallocate int64, compactionBytePerSecond int64, progressFn ProgressFunc) error

func (*Store) ConfigureVolume

func (s *Store) ConfigureVolume(i needle.VolumeId, replication string) error

func (*Store) DeleteCollection

func (s *Store) DeleteCollection(collection string) (e error)

func (*Store) DeleteEcShardNeedle

func (s *Store) DeleteEcShardNeedle(ecVolume *erasure_coding.EcVolume, n *needle.Needle, cookie types.Cookie) (int64, error)

func (*Store) DeleteVolume

func (s *Store) DeleteVolume(i needle.VolumeId) error

func (*Store) DeleteVolumeNeedle

func (s *Store) DeleteVolumeNeedle(i needle.VolumeId, n *needle.Needle) (Size, error)

func (*Store) DestroyEcVolume

func (s *Store) DestroyEcVolume(vid needle.VolumeId)

func (*Store) EcVolumes

func (s *Store) EcVolumes() (ecVolumes []*erasure_coding.EcVolume)

func (*Store) FindEcVolume

func (s *Store) FindEcVolume(vid needle.VolumeId) (*erasure_coding.EcVolume, bool)

func (*Store) FindFreeLocation

func (s *Store) FindFreeLocation(diskType DiskType) (ret *DiskLocation)

func (*Store) GetDataCenter

func (s *Store) GetDataCenter() string

func (*Store) GetRack

func (s *Store) GetRack() string

func (*Store) GetVolume

func (s *Store) GetVolume(i needle.VolumeId) *Volume

func (*Store) GetVolumeSizeLimit

func (s *Store) GetVolumeSizeLimit() uint64

func (*Store) HasVolume

func (s *Store) HasVolume(i needle.VolumeId) bool

func (*Store) MarkVolumeReadonly

func (s *Store) MarkVolumeReadonly(i needle.VolumeId) error

func (*Store) MarkVolumeWritable

func (s *Store) MarkVolumeWritable(i needle.VolumeId) error

func (*Store) MaybeAdjustVolumeMax

func (s *Store) MaybeAdjustVolumeMax() (hasChanges bool)

func (*Store) MountEcShards

func (s *Store) MountEcShards(collection string, vid needle.VolumeId, shardId erasure_coding.ShardId) error

func (*Store) MountVolume

func (s *Store) MountVolume(i needle.VolumeId) error

func (*Store) ReadEcShardNeedle

func (s *Store) ReadEcShardNeedle(vid needle.VolumeId, n *needle.Needle, onReadSizeFn func(size types.Size)) (int, error)

func (*Store) ReadVolumeNeedle

func (s *Store) ReadVolumeNeedle(i needle.VolumeId, n *needle.Needle, readOption *ReadOption, onReadSizeFn func(size Size)) (int, error)

func (*Store) SetDataCenter

func (s *Store) SetDataCenter(dataCenter string)

func (*Store) SetRack

func (s *Store) SetRack(rack string)

func (*Store) SetStopping

func (s *Store) SetStopping()

func (*Store) SetVolumeSizeLimit

func (s *Store) SetVolumeSizeLimit(x uint64)

func (*Store) String

func (s *Store) String() (str string)

func (*Store) UnmountEcShards

func (s *Store) UnmountEcShards(vid needle.VolumeId, shardId erasure_coding.ShardId) error

func (*Store) UnmountVolume

func (s *Store) UnmountVolume(i needle.VolumeId) error

func (*Store) VolumeInfos

func (s *Store) VolumeInfos() (allStats []*VolumeInfo)

func (*Store) WriteVolumeNeedle

func (s *Store) WriteVolumeNeedle(i needle.VolumeId, n *needle.Needle, checkCookie bool, fsync bool) (isUnchanged bool, err error)

type StreamReader

type StreamReader struct {
	// contains filtered or unexported fields
}

func (*StreamReader) Read

func (sr *StreamReader) Read(p []byte) (n int, err error)

type Volume

type Volume struct {
	Id needle.VolumeId

	Collection  string
	DataBackend backend.BackendStorageFile

	MemoryMapMaxSizeMb uint32

	super_block.SuperBlock
	// contains filtered or unexported fields
}

func NewVolume

func NewVolume(dirname string, dirIdx string, collection string, id needle.VolumeId, needleMapKind NeedleMapKind, replicaPlacement *super_block.ReplicaPlacement, ttl *needle.TTL, preallocate int64, memoryMapMaxSizeMb uint32) (v *Volume, e error)

func (*Volume) BinarySearchByAppendAtNs

func (v *Volume) BinarySearchByAppendAtNs(sinceNs uint64) (offset Offset, isLast bool, err error)

on server side

func (*Volume) Close

func (v *Volume) Close()

Close cleanly shuts down this volume

func (*Volume) CommitCompact

func (v *Volume) CommitCompact() error

func (*Volume) Compact

func (v *Volume) Compact(preallocate int64, compactionBytePerSecond int64) error

compact a volume based on deletions in .dat files

func (*Volume) Compact2

func (v *Volume) Compact2(preallocate int64, compactionBytePerSecond int64, progressFn ProgressFunc) error

compact a volume based on deletions in .idx files

func (*Volume) ContentSize

func (v *Volume) ContentSize() uint64

func (*Volume) DataFileName

func (v *Volume) DataFileName() (fileName string)

func (*Volume) DeletedCount

func (v *Volume) DeletedCount() uint64

func (*Volume) DeletedSize

func (v *Volume) DeletedSize() uint64

func (*Volume) Destroy

func (v *Volume) Destroy() (err error)

Destroy removes everything related to this volume

func (*Volume) DiskType

func (v *Volume) DiskType() types.DiskType

func (*Volume) FileCount

func (v *Volume) FileCount() uint64

func (*Volume) FileName

func (v *Volume) FileName(ext string) (fileName string)

func (*Volume) FileStat

func (v *Volume) FileStat() (datSize uint64, idxSize uint64, modTime time.Time)

func (*Volume) GetVolumeInfo

func (v *Volume) GetVolumeInfo() *volume_server_pb.VolumeInfo

func (*Volume) GetVolumeSyncStatus

func (v *Volume) GetVolumeSyncStatus() *volume_server_pb.VolumeSyncStatusResponse

func (*Volume) HasRemoteFile

func (v *Volume) HasRemoteFile() bool

func (*Volume) IncrementalBackup

func (v *Volume) IncrementalBackup(volumeServer pb.ServerAddress, grpcDialOption grpc.DialOption) error

func (*Volume) IndexFileName

func (v *Volume) IndexFileName() (fileName string)

func (*Volume) IndexFileSize

func (v *Volume) IndexFileSize() uint64

func (*Volume) IsReadOnly

func (v *Volume) IsReadOnly() bool

func (*Volume) LoadRemoteFile

func (v *Volume) LoadRemoteFile() error

func (*Volume) MaxFileKey

func (v *Volume) MaxFileKey() types.NeedleId

func (*Volume) NeedToReplicate

func (v *Volume) NeedToReplicate() bool

func (*Volume) ReadNeedleBlob

func (v *Volume) ReadNeedleBlob(offset int64, size Size) ([]byte, error)

read fills in Needle content by looking up n.Id from NeedleMapper

func (*Volume) RemoteStorageNameKey

func (v *Volume) RemoteStorageNameKey() (storageName, storageKey string)

func (*Volume) SaveVolumeInfo

func (v *Volume) SaveVolumeInfo() error

func (*Volume) SetStopping

func (v *Volume) SetStopping()

func (*Volume) StreamRead

func (v *Volume) StreamRead(n *needle.Needle, writer io.Writer) (err error)

func (*Volume) StreamWrite

func (v *Volume) StreamWrite(n *needle.Needle, data io.Reader, dataSize uint32) (err error)

func (*Volume) String

func (v *Volume) String() string

func (*Volume) SyncToDisk

func (v *Volume) SyncToDisk()

func (*Volume) ToVolumeInformationMessage

func (v *Volume) ToVolumeInformationMessage() (types.NeedleId, *master_pb.VolumeInformationMessage)

func (*Volume) Version

func (v *Volume) Version() needle.Version

func (*Volume) WriteNeedleBlob

func (v *Volume) WriteNeedleBlob(needleId NeedleId, needleBlob []byte, size Size) error

type VolumeFileScanner

type VolumeFileScanner interface {
	VisitSuperBlock(super_block.SuperBlock) error
	ReadNeedleBody() bool
	VisitNeedle(n *needle.Needle, offset int64, needleHeader, needleBody []byte) error
}

type VolumeFileScanner4GenIdx

type VolumeFileScanner4GenIdx struct {
	// contains filtered or unexported fields
}

generate the volume idx

func (*VolumeFileScanner4GenIdx) ReadNeedleBody

func (scanner *VolumeFileScanner4GenIdx) ReadNeedleBody() bool

func (*VolumeFileScanner4GenIdx) VisitNeedle

func (scanner *VolumeFileScanner4GenIdx) VisitNeedle(n *needle.Needle, offset int64, needleHeader, needleBody []byte) error

func (*VolumeFileScanner4GenIdx) VisitSuperBlock

func (scanner *VolumeFileScanner4GenIdx) VisitSuperBlock(superBlock super_block.SuperBlock) error

type VolumeFileScanner4ReadAll

type VolumeFileScanner4ReadAll struct {
	Stream volume_server_pb.VolumeServer_ReadAllNeedlesServer
	V      *Volume
}

func (*VolumeFileScanner4ReadAll) ReadNeedleBody

func (scanner *VolumeFileScanner4ReadAll) ReadNeedleBody() bool

func (*VolumeFileScanner4ReadAll) VisitNeedle

func (scanner *VolumeFileScanner4ReadAll) VisitNeedle(n *needle.Needle, offset int64, needleHeader, needleBody []byte) error

func (*VolumeFileScanner4ReadAll) VisitSuperBlock

func (scanner *VolumeFileScanner4ReadAll) VisitSuperBlock(superBlock super_block.SuperBlock) error

type VolumeFileScanner4Vacuum

type VolumeFileScanner4Vacuum struct {
	// contains filtered or unexported fields
}

func (*VolumeFileScanner4Vacuum) ReadNeedleBody

func (scanner *VolumeFileScanner4Vacuum) ReadNeedleBody() bool

func (*VolumeFileScanner4Vacuum) VisitNeedle

func (scanner *VolumeFileScanner4Vacuum) VisitNeedle(n *needle.Needle, offset int64, needleHeader, needleBody []byte) error

func (*VolumeFileScanner4Vacuum) VisitSuperBlock

func (scanner *VolumeFileScanner4Vacuum) VisitSuperBlock(superBlock super_block.SuperBlock) error

type VolumeInfo

type VolumeInfo struct {
	Id                needle.VolumeId
	Size              uint64
	ReplicaPlacement  *super_block.ReplicaPlacement
	Ttl               *needle.TTL
	DiskType          string
	Collection        string
	Version           needle.Version
	FileCount         int
	DeleteCount       int
	DeletedByteCount  uint64
	ReadOnly          bool
	CompactRevision   uint32
	ModifiedAtSecond  int64
	RemoteStorageName string
	RemoteStorageKey  string
}

func NewVolumeInfo

func NewVolumeInfo(m *master_pb.VolumeInformationMessage) (vi VolumeInfo, err error)

func NewVolumeInfoFromShort

func NewVolumeInfoFromShort(m *master_pb.VolumeShortInformationMessage) (vi VolumeInfo, err error)

func (VolumeInfo) IsRemote

func (vi VolumeInfo) IsRemote() bool

func (VolumeInfo) String

func (vi VolumeInfo) String() string

func (VolumeInfo) ToVolumeInformationMessage

func (vi VolumeInfo) ToVolumeInformationMessage() *master_pb.VolumeInformationMessage

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL