storage

package
v0.0.0-...-f1e43fe Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 24, 2019 License: Apache-2.0 Imports: 34 Imported by: 0

Documentation

Index

Constants

View Source
const (
	MAX_TTL_VOLUME_REMOVAL_DELAY = 10 // 10 minutes
)

Variables

View Source
var ErrorNotFound = errors.New("not found")

Functions

func CheckVolumeDataIntegrity

func CheckVolumeDataIntegrity(v *Volume, indexFile *os.File) (lastAppendAtNs uint64, e error)

func ScanVolumeFile

func ScanVolumeFile(dirname string, collection string, id needle.VolumeId,
	needleMapKind NeedleMapType,
	volumeFileScanner VolumeFileScanner) (err error)

func ScanVolumeFileFrom

func ScanVolumeFileFrom(version needle.Version, dataFile *os.File, offset int64, volumeFileScanner VolumeFileScanner) (err error)

func ScanVolumeFileNeedleFrom

func ScanVolumeFileNeedleFrom(version needle.Version, dataFile *os.File, offset int64, fn func(needleHeader, needleBody []byte, needleAppendAtNs uint64) error) (err error)

func VolumeFileName

func VolumeFileName(dir string, collection string, id int) (fileName string)

Types

type DiskLocation

type DiskLocation struct {
	Directory      string
	MaxVolumeCount int

	sync.RWMutex
	// contains filtered or unexported fields
}

func NewDiskLocation

func NewDiskLocation(dir string, maxVolumeCount int) *DiskLocation

func (*DiskLocation) Close

func (l *DiskLocation) Close()

func (*DiskLocation) DeleteCollectionFromDiskLocation

func (l *DiskLocation) DeleteCollectionFromDiskLocation(collection string) (e error)

func (*DiskLocation) DeleteVolume

func (l *DiskLocation) DeleteVolume(vid needle.VolumeId) error

func (*DiskLocation) DestroyEcVolume

func (l *DiskLocation) DestroyEcVolume(vid needle.VolumeId)

func (*DiskLocation) FindEcShard

func (*DiskLocation) FindEcVolume

func (l *DiskLocation) FindEcVolume(vid needle.VolumeId) (*erasure_coding.EcVolume, bool)

func (*DiskLocation) FindVolume

func (l *DiskLocation) FindVolume(vid needle.VolumeId) (*Volume, bool)

func (*DiskLocation) LoadEcShard

func (l *DiskLocation) LoadEcShard(collection string, vid needle.VolumeId, shardId erasure_coding.ShardId) (err error)

func (*DiskLocation) LoadVolume

func (l *DiskLocation) LoadVolume(vid needle.VolumeId, needleMapKind NeedleMapType) bool

func (*DiskLocation) SetVolume

func (l *DiskLocation) SetVolume(vid needle.VolumeId, volume *Volume)

func (*DiskLocation) UnloadEcShard

func (l *DiskLocation) UnloadEcShard(vid needle.VolumeId, shardId erasure_coding.ShardId) bool

func (*DiskLocation) UnloadVolume

func (l *DiskLocation) UnloadVolume(vid needle.VolumeId) error

func (*DiskLocation) VolumesLen

func (l *DiskLocation) VolumesLen() int

type LevelDbNeedleMap

type LevelDbNeedleMap struct {
	// contains filtered or unexported fields
}

func NewLevelDbNeedleMap

func NewLevelDbNeedleMap(dbFileName string, indexFile *os.File, opts *opt.Options) (m *LevelDbNeedleMap, err error)

func (*LevelDbNeedleMap) Close

func (m *LevelDbNeedleMap) Close()

func (*LevelDbNeedleMap) Delete

func (m *LevelDbNeedleMap) Delete(key NeedleId, offset Offset) error

func (*LevelDbNeedleMap) Destroy

func (m *LevelDbNeedleMap) Destroy() error

func (*LevelDbNeedleMap) Get

func (m *LevelDbNeedleMap) Get(key NeedleId) (element *needle_map.NeedleValue, ok bool)

func (*LevelDbNeedleMap) IndexFileContent

func (nm *LevelDbNeedleMap) IndexFileContent() ([]byte, error)

func (*LevelDbNeedleMap) IndexFileName

func (nm *LevelDbNeedleMap) IndexFileName() string

func (*LevelDbNeedleMap) IndexFileSize

func (nm *LevelDbNeedleMap) IndexFileSize() uint64

func (*LevelDbNeedleMap) Put

func (m *LevelDbNeedleMap) Put(key NeedleId, offset Offset, size uint32) error

type NeedleMap

type NeedleMap struct {
	// contains filtered or unexported fields
}

func LoadBtreeNeedleMap

func LoadBtreeNeedleMap(file *os.File) (*NeedleMap, error)

func LoadCompactNeedleMap

func LoadCompactNeedleMap(file *os.File) (*NeedleMap, error)

func NewBtreeNeedleMap

func NewBtreeNeedleMap(file *os.File) *NeedleMap

func NewCompactNeedleMap

func NewCompactNeedleMap(file *os.File) *NeedleMap

func (*NeedleMap) Close

func (nm *NeedleMap) Close()

func (*NeedleMap) Delete

func (nm *NeedleMap) Delete(key NeedleId, offset Offset) error

func (*NeedleMap) Destroy

func (nm *NeedleMap) Destroy() error

func (*NeedleMap) Get

func (nm *NeedleMap) Get(key NeedleId) (element *needle_map.NeedleValue, ok bool)

func (*NeedleMap) IndexFileContent

func (nm *NeedleMap) IndexFileContent() ([]byte, error)

func (*NeedleMap) IndexFileName

func (nm *NeedleMap) IndexFileName() string

func (*NeedleMap) IndexFileSize

func (nm *NeedleMap) IndexFileSize() uint64

func (*NeedleMap) Put

func (nm *NeedleMap) Put(key NeedleId, offset Offset, size uint32) error

type NeedleMapType

type NeedleMapType int
const (
	NeedleMapInMemory      NeedleMapType = iota
	NeedleMapLevelDb                     // small memory footprint, 4MB total, 1 write buffer, 3 block buffer
	NeedleMapLevelDbMedium               // medium memory footprint, 8MB total, 3 write buffer, 5 block buffer
	NeedleMapLevelDbLarge                // large memory footprint, 12MB total, 4write buffer, 8 block buffer
)

type NeedleMapper

type NeedleMapper interface {
	Put(key NeedleId, offset Offset, size uint32) error
	Get(key NeedleId) (element *needle_map.NeedleValue, ok bool)
	Delete(key NeedleId, offset Offset) error
	Close()
	Destroy() error
	ContentSize() uint64
	DeletedSize() uint64
	FileCount() int
	DeletedCount() int
	MaxFileKey() NeedleId
	IndexFileSize() uint64
	IndexFileContent() ([]byte, error)
	IndexFileName() string
}

type ReplicaPlacement

type ReplicaPlacement struct {
	SameRackCount       int
	DiffRackCount       int
	DiffDataCenterCount int
}

func NewReplicaPlacementFromByte

func NewReplicaPlacementFromByte(b byte) (*ReplicaPlacement, error)

func NewReplicaPlacementFromString

func NewReplicaPlacementFromString(t string) (*ReplicaPlacement, error)

func (*ReplicaPlacement) Byte

func (rp *ReplicaPlacement) Byte() byte

func (*ReplicaPlacement) GetCopyCount

func (rp *ReplicaPlacement) GetCopyCount() int

func (*ReplicaPlacement) String

func (rp *ReplicaPlacement) String() string

type Store

type Store struct {
	MasterAddress string

	Ip        string
	Port      int
	PublicUrl string
	Locations []*DiskLocation

	NeedleMapType       NeedleMapType
	NewVolumesChan      chan master_pb.VolumeShortInformationMessage
	DeletedVolumesChan  chan master_pb.VolumeShortInformationMessage
	NewEcShardsChan     chan master_pb.VolumeEcShardInformationMessage
	DeletedEcShardsChan chan master_pb.VolumeEcShardInformationMessage
	// contains filtered or unexported fields
}

* A VolumeServer contains one Store

func NewStore

func NewStore(grpcDialOption grpc.DialOption, port int, ip, publicUrl string, dirnames []string, maxVolumeCounts []int, needleMapKind NeedleMapType) (s *Store)

func (*Store) AddVolume

func (s *Store) AddVolume(volumeId needle.VolumeId, collection string, needleMapKind NeedleMapType, replicaPlacement string, ttlString string, preallocate int64) error

func (*Store) CheckCompactVolume

func (s *Store) CheckCompactVolume(volumeId needle.VolumeId) (float64, error)

func (*Store) Close

func (s *Store) Close()

func (*Store) CollectErasureCodingHeartbeat

func (s *Store) CollectErasureCodingHeartbeat() *master_pb.Heartbeat

func (*Store) CollectHeartbeat

func (s *Store) CollectHeartbeat() *master_pb.Heartbeat

func (*Store) CommitCleanupVolume

func (s *Store) CommitCleanupVolume(vid needle.VolumeId) error

func (*Store) CommitCompactVolume

func (s *Store) CommitCompactVolume(vid needle.VolumeId) error

func (*Store) CompactVolume

func (s *Store) CompactVolume(vid needle.VolumeId, preallocate int64, compactionBytePerSecond int64) error

func (*Store) DeleteCollection

func (s *Store) DeleteCollection(collection string) (e error)

func (*Store) DeleteEcShardNeedle

func (s *Store) DeleteEcShardNeedle(ctx context.Context, ecVolume *erasure_coding.EcVolume, n *needle.Needle, cookie types.Cookie) (int64, error)

func (*Store) DeleteVolume

func (s *Store) DeleteVolume(i needle.VolumeId) error

func (*Store) DeleteVolumeNeedle

func (s *Store) DeleteVolumeNeedle(i needle.VolumeId, n *needle.Needle) (uint32, error)

func (*Store) DestroyEcVolume

func (s *Store) DestroyEcVolume(vid needle.VolumeId)

func (*Store) EcVolumes

func (s *Store) EcVolumes() (ecVolumes []*erasure_coding.EcVolume)

func (*Store) FindEcVolume

func (s *Store) FindEcVolume(vid needle.VolumeId) (*erasure_coding.EcVolume, bool)

func (*Store) FindFreeLocation

func (s *Store) FindFreeLocation() (ret *DiskLocation)

func (*Store) GetVolume

func (s *Store) GetVolume(i needle.VolumeId) *Volume

func (*Store) GetVolumeSizeLimit

func (s *Store) GetVolumeSizeLimit() uint64

func (*Store) HasVolume

func (s *Store) HasVolume(i needle.VolumeId) bool

func (*Store) MarkVolumeReadonly

func (s *Store) MarkVolumeReadonly(i needle.VolumeId) error

func (*Store) MountEcShards

func (s *Store) MountEcShards(collection string, vid needle.VolumeId, shardId erasure_coding.ShardId) error

func (*Store) MountVolume

func (s *Store) MountVolume(i needle.VolumeId) error

func (*Store) ReadEcShardNeedle

func (s *Store) ReadEcShardNeedle(ctx context.Context, vid needle.VolumeId, n *needle.Needle) (int, error)

func (*Store) ReadVolumeNeedle

func (s *Store) ReadVolumeNeedle(i needle.VolumeId, n *needle.Needle) (int, error)

func (*Store) SetDataCenter

func (s *Store) SetDataCenter(dataCenter string)

func (*Store) SetRack

func (s *Store) SetRack(rack string)

func (*Store) SetVolumeSizeLimit

func (s *Store) SetVolumeSizeLimit(x uint64)

func (*Store) Status

func (s *Store) Status() []*VolumeInfo

func (*Store) String

func (s *Store) String() (str string)

func (*Store) UnmountEcShards

func (s *Store) UnmountEcShards(vid needle.VolumeId, shardId erasure_coding.ShardId) error

func (*Store) UnmountVolume

func (s *Store) UnmountVolume(i needle.VolumeId) error

func (*Store) WriteVolumeNeedle

func (s *Store) WriteVolumeNeedle(i needle.VolumeId, n *needle.Needle) (size uint32, isUnchanged bool, err error)

type SuperBlock

type SuperBlock struct {
	ReplicaPlacement   *ReplicaPlacement
	Ttl                *needle.TTL
	CompactionRevision uint16
	Extra              *master_pb.SuperBlockExtra
	// contains filtered or unexported fields
}

* Super block currently has 8 bytes allocated for each volume. * Byte 0: version, 1 or 2 * Byte 1: Replica Placement strategy, 000, 001, 002, 010, etc * Byte 2 and byte 3: Time to live. See TTL for definition * Byte 4 and byte 5: The number of times the volume has been compacted. * Rest bytes: Reserved

func ReadSuperBlock

func ReadSuperBlock(dataFile *os.File) (superBlock SuperBlock, err error)

ReadSuperBlock reads from data file and load it into volume's super block

func (*SuperBlock) BlockSize

func (s *SuperBlock) BlockSize() int

func (*SuperBlock) Bytes

func (s *SuperBlock) Bytes() []byte

func (*SuperBlock) Version

func (s *SuperBlock) Version() needle.Version

type Volume

type Volume struct {
	Id needle.VolumeId

	Collection string

	SuperBlock
	// contains filtered or unexported fields
}

func NewVolume

func NewVolume(dirname string, collection string, id needle.VolumeId, needleMapKind NeedleMapType, replicaPlacement *ReplicaPlacement, ttl *needle.TTL, preallocate int64) (v *Volume, e error)

func (*Volume) BinarySearchByAppendAtNs

func (v *Volume) BinarySearchByAppendAtNs(sinceNs uint64) (offset Offset, isLast bool, err error)

on server side

func (*Volume) Close

func (v *Volume) Close()

Close cleanly shuts down this volume

func (*Volume) CommitCompact

func (v *Volume) CommitCompact() error

func (*Volume) Compact

func (v *Volume) Compact(preallocate int64, compactionBytePerSecond int64) error

func (*Volume) Compact2

func (v *Volume) Compact2() error

func (*Volume) ContentSize

func (v *Volume) ContentSize() uint64

func (*Volume) DataFile

func (v *Volume) DataFile() *os.File

func (*Volume) Destroy

func (v *Volume) Destroy() (err error)

Destroy removes everything related to this volume

func (*Volume) FileCount

func (v *Volume) FileCount() uint64

func (*Volume) FileName

func (v *Volume) FileName() (fileName string)

func (*Volume) FileStat

func (v *Volume) FileStat() (datSize uint64, idxSize uint64, modTime time.Time)

func (*Volume) GetVolumeSyncStatus

func (v *Volume) GetVolumeSyncStatus() *volume_server_pb.VolumeSyncStatusResponse

func (*Volume) IncrementalBackup

func (v *Volume) IncrementalBackup(volumeServer string, grpcDialOption grpc.DialOption) error

func (*Volume) IndexFileSize

func (v *Volume) IndexFileSize() uint64

func (*Volume) NeedToReplicate

func (v *Volume) NeedToReplicate() bool

func (*Volume) String

func (v *Volume) String() string

func (*Volume) ToVolumeInformationMessage

func (v *Volume) ToVolumeInformationMessage() *master_pb.VolumeInformationMessage

func (*Volume) Version

func (v *Volume) Version() needle.Version

type VolumeFileScanner

type VolumeFileScanner interface {
	VisitSuperBlock(SuperBlock) error
	ReadNeedleBody() bool
	VisitNeedle(n *needle.Needle, offset int64) error
}

type VolumeFileScanner4GenIdx

type VolumeFileScanner4GenIdx struct {
	// contains filtered or unexported fields
}

generate the volume idx

func (*VolumeFileScanner4GenIdx) ReadNeedleBody

func (scanner *VolumeFileScanner4GenIdx) ReadNeedleBody() bool

func (*VolumeFileScanner4GenIdx) VisitNeedle

func (scanner *VolumeFileScanner4GenIdx) VisitNeedle(n *needle.Needle, offset int64) error

func (*VolumeFileScanner4GenIdx) VisitSuperBlock

func (scanner *VolumeFileScanner4GenIdx) VisitSuperBlock(superBlock SuperBlock) error

type VolumeFileScanner4Vacuum

type VolumeFileScanner4Vacuum struct {
	// contains filtered or unexported fields
}

func (*VolumeFileScanner4Vacuum) ReadNeedleBody

func (scanner *VolumeFileScanner4Vacuum) ReadNeedleBody() bool

func (*VolumeFileScanner4Vacuum) VisitNeedle

func (scanner *VolumeFileScanner4Vacuum) VisitNeedle(n *needle.Needle, offset int64) error

func (*VolumeFileScanner4Vacuum) VisitSuperBlock

func (scanner *VolumeFileScanner4Vacuum) VisitSuperBlock(superBlock SuperBlock) error

type VolumeInfo

type VolumeInfo struct {
	Id               needle.VolumeId
	Size             uint64
	ReplicaPlacement *ReplicaPlacement
	Ttl              *needle.TTL
	Collection       string
	Version          needle.Version
	FileCount        int
	DeleteCount      int
	DeletedByteCount uint64
	ReadOnly         bool
	CompactRevision  uint32
	ModifiedAtSecond int64
}

func NewVolumeInfo

func NewVolumeInfo(m *master_pb.VolumeInformationMessage) (vi VolumeInfo, err error)

func NewVolumeInfoFromShort

func NewVolumeInfoFromShort(m *master_pb.VolumeShortInformationMessage) (vi VolumeInfo, err error)

func (VolumeInfo) String

func (vi VolumeInfo) String() string

func (VolumeInfo) ToVolumeInformationMessage

func (vi VolumeInfo) ToVolumeInformationMessage() *master_pb.VolumeInformationMessage

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL