storage

package
v0.0.0-...-935c85c Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 17, 2020 License: Apache-2.0 Imports: 38 Imported by: 0

Documentation

Index

Constants

View Source
const (
	MAX_TTL_VOLUME_REMOVAL_DELAY = 10 // 10 minutes
)

Variables

View Source
var ErrorDeleted = errors.New("already deleted")
View Source
var ErrorNotFound = errors.New("not found")

Functions

func CheckVolumeDataIntegrity

func CheckVolumeDataIntegrity(v *Volume, indexFile *os.File) (lastAppendAtNs uint64, e error)

func ScanVolumeFile

func ScanVolumeFile(dirname string, collection string, id needle.VolumeId,
	needleMapKind NeedleMapType,
	volumeFileScanner VolumeFileScanner) (err error)

func ScanVolumeFileFrom

func ScanVolumeFileFrom(version needle.Version, datBackend backend.BackendStorageFile, offset int64, volumeFileScanner VolumeFileScanner) (err error)

func VolumeFileName

func VolumeFileName(dir string, collection string, id int) (fileName string)

Types

type DiskLocation

type DiskLocation struct {
	Directory           string
	MaxVolumeCount      int
	MinFreeSpacePercent float32
	// contains filtered or unexported fields
}

func NewDiskLocation

func NewDiskLocation(dir string, maxVolumeCount int, minFreeSpacePercent float32) *DiskLocation

func (*DiskLocation) CheckDiskSpace

func (l *DiskLocation) CheckDiskSpace()

func (*DiskLocation) Close

func (l *DiskLocation) Close()

func (*DiskLocation) DeleteCollectionFromDiskLocation

func (l *DiskLocation) DeleteCollectionFromDiskLocation(collection string) (e error)

func (*DiskLocation) DeleteVolume

func (l *DiskLocation) DeleteVolume(vid needle.VolumeId) error

func (*DiskLocation) DestroyEcVolume

func (l *DiskLocation) DestroyEcVolume(vid needle.VolumeId)

func (*DiskLocation) EcVolumesLen

func (l *DiskLocation) EcVolumesLen() int

func (*DiskLocation) FindEcShard

func (*DiskLocation) FindEcVolume

func (l *DiskLocation) FindEcVolume(vid needle.VolumeId) (*erasure_coding.EcVolume, bool)

func (*DiskLocation) FindVolume

func (l *DiskLocation) FindVolume(vid needle.VolumeId) (*Volume, bool)

func (*DiskLocation) LoadEcShard

func (l *DiskLocation) LoadEcShard(collection string, vid needle.VolumeId, shardId erasure_coding.ShardId) (err error)

func (*DiskLocation) LoadVolume

func (l *DiskLocation) LoadVolume(vid needle.VolumeId, needleMapKind NeedleMapType) bool

func (*DiskLocation) LocateVolume

func (l *DiskLocation) LocateVolume(vid needle.VolumeId) (os.FileInfo, bool)

func (*DiskLocation) SetVolume

func (l *DiskLocation) SetVolume(vid needle.VolumeId, volume *Volume)

func (*DiskLocation) UnUsedSpace

func (l *DiskLocation) UnUsedSpace(volumeSizeLimit uint64) (unUsedSpace uint64)

func (*DiskLocation) UnloadEcShard

func (l *DiskLocation) UnloadEcShard(vid needle.VolumeId, shardId erasure_coding.ShardId) bool

func (*DiskLocation) UnloadVolume

func (l *DiskLocation) UnloadVolume(vid needle.VolumeId) error

func (*DiskLocation) VolumesLen

func (l *DiskLocation) VolumesLen() int

type LevelDbNeedleMap

type LevelDbNeedleMap struct {
	// contains filtered or unexported fields
}

func NewLevelDbNeedleMap

func NewLevelDbNeedleMap(dbFileName string, indexFile *os.File, opts *opt.Options) (m *LevelDbNeedleMap, err error)

func (*LevelDbNeedleMap) Close

func (m *LevelDbNeedleMap) Close()

func (*LevelDbNeedleMap) Delete

func (m *LevelDbNeedleMap) Delete(key NeedleId, offset Offset) error

func (*LevelDbNeedleMap) Destroy

func (m *LevelDbNeedleMap) Destroy() error

func (*LevelDbNeedleMap) Get

func (m *LevelDbNeedleMap) Get(key NeedleId) (element *needle_map.NeedleValue, ok bool)

func (*LevelDbNeedleMap) IndexFileSize

func (nm *LevelDbNeedleMap) IndexFileSize() uint64

func (*LevelDbNeedleMap) Put

func (m *LevelDbNeedleMap) Put(key NeedleId, offset Offset, size Size) error

func (*LevelDbNeedleMap) Sync

func (nm *LevelDbNeedleMap) Sync() error

type NeedleMap

type NeedleMap struct {
	// contains filtered or unexported fields
}

func LoadCompactNeedleMap

func LoadCompactNeedleMap(file *os.File) (*NeedleMap, error)

func NewCompactNeedleMap

func NewCompactNeedleMap(file *os.File) *NeedleMap

func (*NeedleMap) Close

func (nm *NeedleMap) Close()

func (*NeedleMap) Delete

func (nm *NeedleMap) Delete(key NeedleId, offset Offset) error

func (*NeedleMap) Destroy

func (nm *NeedleMap) Destroy() error

func (*NeedleMap) Get

func (nm *NeedleMap) Get(key NeedleId) (element *needle_map.NeedleValue, ok bool)

func (*NeedleMap) IndexFileSize

func (nm *NeedleMap) IndexFileSize() uint64

func (*NeedleMap) Put

func (nm *NeedleMap) Put(key NeedleId, offset Offset, size Size) error

func (*NeedleMap) Sync

func (nm *NeedleMap) Sync() error

type NeedleMapType

type NeedleMapType int
const (
	NeedleMapInMemory      NeedleMapType = iota
	NeedleMapLevelDb                     // small memory footprint, 4MB total, 1 write buffer, 3 block buffer
	NeedleMapLevelDbMedium               // medium memory footprint, 8MB total, 3 write buffer, 5 block buffer
	NeedleMapLevelDbLarge                // large memory footprint, 12MB total, 4write buffer, 8 block buffer
)

type NeedleMapper

type NeedleMapper interface {
	Put(key NeedleId, offset Offset, size Size) error
	Get(key NeedleId) (element *needle_map.NeedleValue, ok bool)
	Delete(key NeedleId, offset Offset) error
	Close()
	Destroy() error
	ContentSize() uint64
	DeletedSize() uint64
	FileCount() int
	DeletedCount() int
	MaxFileKey() NeedleId
	IndexFileSize() uint64
	Sync() error
}

type ReadOption

type ReadOption struct {
	ReadDeleted bool
}

type SortedFileNeedleMap

type SortedFileNeedleMap struct {
	// contains filtered or unexported fields
}

func NewSortedFileNeedleMap

func NewSortedFileNeedleMap(baseFileName string, indexFile *os.File) (m *SortedFileNeedleMap, err error)

func (*SortedFileNeedleMap) Close

func (m *SortedFileNeedleMap) Close()

func (*SortedFileNeedleMap) Delete

func (m *SortedFileNeedleMap) Delete(key NeedleId, offset Offset) error

func (*SortedFileNeedleMap) Destroy

func (m *SortedFileNeedleMap) Destroy() error

func (*SortedFileNeedleMap) Get

func (m *SortedFileNeedleMap) Get(key NeedleId) (element *needle_map.NeedleValue, ok bool)

func (*SortedFileNeedleMap) IndexFileSize

func (nm *SortedFileNeedleMap) IndexFileSize() uint64

func (*SortedFileNeedleMap) Put

func (m *SortedFileNeedleMap) Put(key NeedleId, offset Offset, size Size) error

func (*SortedFileNeedleMap) Sync

func (nm *SortedFileNeedleMap) Sync() error

type Store

type Store struct {
	MasterAddress string

	Ip        string
	Port      int
	PublicUrl string
	Locations []*DiskLocation

	NeedleMapType       NeedleMapType
	NewVolumesChan      chan master_pb.VolumeShortInformationMessage
	DeletedVolumesChan  chan master_pb.VolumeShortInformationMessage
	NewEcShardsChan     chan master_pb.VolumeEcShardInformationMessage
	DeletedEcShardsChan chan master_pb.VolumeEcShardInformationMessage
	// contains filtered or unexported fields
}

* A VolumeServer contains one Store

func NewStore

func NewStore(grpcDialOption grpc.DialOption, port int, ip, publicUrl string, dirnames []string, maxVolumeCounts []int, minFreeSpacePercents []float32, needleMapKind NeedleMapType) (s *Store)

func (*Store) AddVolume

func (s *Store) AddVolume(volumeId needle.VolumeId, collection string, needleMapKind NeedleMapType, replicaPlacement string, ttlString string, preallocate int64, MemoryMapMaxSizeMb uint32) error

func (*Store) CheckCompactVolume

func (s *Store) CheckCompactVolume(volumeId needle.VolumeId) (float64, error)

func (*Store) Close

func (s *Store) Close()

func (*Store) CollectErasureCodingHeartbeat

func (s *Store) CollectErasureCodingHeartbeat() *master_pb.Heartbeat

func (*Store) CollectHeartbeat

func (s *Store) CollectHeartbeat() *master_pb.Heartbeat

func (*Store) CommitCleanupVolume

func (s *Store) CommitCleanupVolume(vid needle.VolumeId) error

func (*Store) CommitCompactVolume

func (s *Store) CommitCompactVolume(vid needle.VolumeId) error

func (*Store) CompactVolume

func (s *Store) CompactVolume(vid needle.VolumeId, preallocate int64, compactionBytePerSecond int64) error

func (*Store) ConfigureVolume

func (s *Store) ConfigureVolume(i needle.VolumeId, replication string) error

func (*Store) DeleteCollection

func (s *Store) DeleteCollection(collection string) (e error)

func (*Store) DeleteEcShardNeedle

func (s *Store) DeleteEcShardNeedle(ecVolume *erasure_coding.EcVolume, n *needle.Needle, cookie types.Cookie) (int64, error)

func (*Store) DeleteVolume

func (s *Store) DeleteVolume(i needle.VolumeId) error

func (*Store) DeleteVolumeNeedle

func (s *Store) DeleteVolumeNeedle(i needle.VolumeId, n *needle.Needle) (Size, error)

func (*Store) DestroyEcVolume

func (s *Store) DestroyEcVolume(vid needle.VolumeId)

func (*Store) EcVolumes

func (s *Store) EcVolumes() (ecVolumes []*erasure_coding.EcVolume)

func (*Store) FindEcVolume

func (s *Store) FindEcVolume(vid needle.VolumeId) (*erasure_coding.EcVolume, bool)

func (*Store) FindFreeLocation

func (s *Store) FindFreeLocation() (ret *DiskLocation)

func (*Store) GetVolume

func (s *Store) GetVolume(i needle.VolumeId) *Volume

func (*Store) GetVolumeSizeLimit

func (s *Store) GetVolumeSizeLimit() uint64

func (*Store) HasVolume

func (s *Store) HasVolume(i needle.VolumeId) bool

func (*Store) MarkVolumeReadonly

func (s *Store) MarkVolumeReadonly(i needle.VolumeId) error

func (*Store) MarkVolumeWritable

func (s *Store) MarkVolumeWritable(i needle.VolumeId) error

func (*Store) MaybeAdjustVolumeMax

func (s *Store) MaybeAdjustVolumeMax() (hasChanges bool)

func (*Store) MountEcShards

func (s *Store) MountEcShards(collection string, vid needle.VolumeId, shardId erasure_coding.ShardId) error

func (*Store) MountVolume

func (s *Store) MountVolume(i needle.VolumeId) error

func (*Store) ReadEcShardNeedle

func (s *Store) ReadEcShardNeedle(vid needle.VolumeId, n *needle.Needle) (int, error)

func (*Store) ReadVolumeNeedle

func (s *Store) ReadVolumeNeedle(i needle.VolumeId, n *needle.Needle, readOption *ReadOption) (int, error)

func (*Store) SetDataCenter

func (s *Store) SetDataCenter(dataCenter string)

func (*Store) SetRack

func (s *Store) SetRack(rack string)

func (*Store) SetVolumeSizeLimit

func (s *Store) SetVolumeSizeLimit(x uint64)

func (*Store) String

func (s *Store) String() (str string)

func (*Store) UnmountEcShards

func (s *Store) UnmountEcShards(vid needle.VolumeId, shardId erasure_coding.ShardId) error

func (*Store) UnmountVolume

func (s *Store) UnmountVolume(i needle.VolumeId) error

func (*Store) VolumeInfos

func (s *Store) VolumeInfos() (allStats []*VolumeInfo)

func (*Store) WriteVolumeNeedle

func (s *Store) WriteVolumeNeedle(i needle.VolumeId, n *needle.Needle, fsync bool) (isUnchanged bool, err error)

type Volume

type Volume struct {
	Id needle.VolumeId

	Collection  string
	DataBackend backend.BackendStorageFile

	MemoryMapMaxSizeMb uint32

	super_block.SuperBlock
	// contains filtered or unexported fields
}

func NewVolume

func NewVolume(dirname string, collection string, id needle.VolumeId, needleMapKind NeedleMapType, replicaPlacement *super_block.ReplicaPlacement, ttl *needle.TTL, preallocate int64, memoryMapMaxSizeMb uint32) (v *Volume, e error)

func (*Volume) BinarySearchByAppendAtNs

func (v *Volume) BinarySearchByAppendAtNs(sinceNs uint64) (offset Offset, isLast bool, err error)

on server side

func (*Volume) Close

func (v *Volume) Close()

Close cleanly shuts down this volume

func (*Volume) CommitCompact

func (v *Volume) CommitCompact() error

func (*Volume) Compact

func (v *Volume) Compact(preallocate int64, compactionBytePerSecond int64) error

compact a volume based on deletions in .dat files

func (*Volume) Compact2

func (v *Volume) Compact2(preallocate int64, compactionBytePerSecond int64) error

compact a volume based on deletions in .idx files

func (*Volume) ContentSize

func (v *Volume) ContentSize() uint64

func (*Volume) DeletedCount

func (v *Volume) DeletedCount() uint64

func (*Volume) DeletedSize

func (v *Volume) DeletedSize() uint64

func (*Volume) Destroy

func (v *Volume) Destroy() (err error)

Destroy removes everything related to this volume

func (*Volume) FileCount

func (v *Volume) FileCount() uint64

func (*Volume) FileName

func (v *Volume) FileName() (fileName string)

func (*Volume) FileStat

func (v *Volume) FileStat() (datSize uint64, idxSize uint64, modTime time.Time)

func (*Volume) GetVolumeInfo

func (v *Volume) GetVolumeInfo() *volume_server_pb.VolumeInfo

func (*Volume) GetVolumeSyncStatus

func (v *Volume) GetVolumeSyncStatus() *volume_server_pb.VolumeSyncStatusResponse

func (*Volume) HasRemoteFile

func (v *Volume) HasRemoteFile() bool

func (*Volume) IncrementalBackup

func (v *Volume) IncrementalBackup(volumeServer string, grpcDialOption grpc.DialOption) error

func (*Volume) IndexFileSize

func (v *Volume) IndexFileSize() uint64

func (*Volume) IsReadOnly

func (v *Volume) IsReadOnly() bool

func (*Volume) LoadRemoteFile

func (v *Volume) LoadRemoteFile() error

func (*Volume) MaxFileKey

func (v *Volume) MaxFileKey() types.NeedleId

func (*Volume) NeedToReplicate

func (v *Volume) NeedToReplicate() bool

func (*Volume) RemoteStorageNameKey

func (v *Volume) RemoteStorageNameKey() (storageName, storageKey string)

func (*Volume) SaveVolumeInfo

func (v *Volume) SaveVolumeInfo() error

func (*Volume) String

func (v *Volume) String() string

func (*Volume) ToVolumeInformationMessage

func (v *Volume) ToVolumeInformationMessage() *master_pb.VolumeInformationMessage

func (*Volume) Version

func (v *Volume) Version() needle.Version

type VolumeFileScanner

type VolumeFileScanner interface {
	VisitSuperBlock(super_block.SuperBlock) error
	ReadNeedleBody() bool
	VisitNeedle(n *needle.Needle, offset int64, needleHeader, needleBody []byte) error
}

type VolumeFileScanner4GenIdx

type VolumeFileScanner4GenIdx struct {
	// contains filtered or unexported fields
}

generate the volume idx

func (*VolumeFileScanner4GenIdx) ReadNeedleBody

func (scanner *VolumeFileScanner4GenIdx) ReadNeedleBody() bool

func (*VolumeFileScanner4GenIdx) VisitNeedle

func (scanner *VolumeFileScanner4GenIdx) VisitNeedle(n *needle.Needle, offset int64, needleHeader, needleBody []byte) error

func (*VolumeFileScanner4GenIdx) VisitSuperBlock

func (scanner *VolumeFileScanner4GenIdx) VisitSuperBlock(superBlock super_block.SuperBlock) error

type VolumeFileScanner4Vacuum

type VolumeFileScanner4Vacuum struct {
	// contains filtered or unexported fields
}

func (*VolumeFileScanner4Vacuum) ReadNeedleBody

func (scanner *VolumeFileScanner4Vacuum) ReadNeedleBody() bool

func (*VolumeFileScanner4Vacuum) VisitNeedle

func (scanner *VolumeFileScanner4Vacuum) VisitNeedle(n *needle.Needle, offset int64, needleHeader, needleBody []byte) error

func (*VolumeFileScanner4Vacuum) VisitSuperBlock

func (scanner *VolumeFileScanner4Vacuum) VisitSuperBlock(superBlock super_block.SuperBlock) error

type VolumeInfo

type VolumeInfo struct {
	Id                needle.VolumeId
	Size              uint64
	ReplicaPlacement  *super_block.ReplicaPlacement
	Ttl               *needle.TTL
	Collection        string
	Version           needle.Version
	FileCount         int
	DeleteCount       int
	DeletedByteCount  uint64
	ReadOnly          bool
	CompactRevision   uint32
	ModifiedAtSecond  int64
	RemoteStorageName string
	RemoteStorageKey  string
}

func NewVolumeInfo

func NewVolumeInfo(m *master_pb.VolumeInformationMessage) (vi VolumeInfo, err error)

func NewVolumeInfoFromShort

func NewVolumeInfoFromShort(m *master_pb.VolumeShortInformationMessage) (vi VolumeInfo, err error)

func (VolumeInfo) IsRemote

func (vi VolumeInfo) IsRemote() bool

func (VolumeInfo) String

func (vi VolumeInfo) String() string

func (VolumeInfo) ToVolumeInformationMessage

func (vi VolumeInfo) ToVolumeInformationMessage() *master_pb.VolumeInformationMessage

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL