storage

package
v0.0.0-...-9931aa1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 5, 2018 License: Apache-2.0 Imports: 29 Imported by: 0

Documentation

Index

Constants

View Source
const (
	NeedleHeaderSize      = 16 //should never change this
	NeedlePaddingSize     = 8
	NeedleChecksumSize    = 4
	MaxPossibleVolumeSize = 4 * 1024 * 1024 * 1024 * 8
	TombstoneFileSize     = math.MaxUint32
	PairNamePrefix        = "Seaweed-"
)
View Source
const (
	FlagGzip                = 0x01
	FlagHasName             = 0x02
	FlagHasMime             = 0x04
	FlagHasLastModifiedDate = 0x08
	FlagHasTtl              = 0x10
	FlagHasPairs            = 0x20
	FlagIsChunkManifest     = 0x80
	LastModifiedBytesLength = 5
	TtlBytesLength          = 2
)
View Source
const (
	//stored unit types
	Empty byte = iota
	Minute
	Hour
	Day
	Week
	Month
	Year
)
View Source
const (
	Version1       = Version(1)
	Version2       = Version(2)
	CurrentVersion = Version2
)
View Source
const (
	MAX_TTL_VOLUME_REMOVAL_DELAY = 10 // 10 minutes
)
View Source
const (
	NeedleIndexSize = 16
)
View Source
const (
	RowsToRead = 1024
)
View Source
const (
	SuperBlockSize = 8
)

Variables

View Source
var EMPTY_TTL = &TTL{}

Functions

func CheckVolumeDataIntegrity

func CheckVolumeDataIntegrity(v *Volume, indexFile *os.File) error

func ParseKeyHash

func ParseKeyHash(key_hash_string string) (uint64, uint32, error)

func ReadNeedleBlob

func ReadNeedleBlob(r *os.File, offset int64, size uint32) (dataSlice []byte, err error)

func ScanVolumeFile

func ScanVolumeFile(dirname string, collection string, id VolumeId,
	needleMapKind NeedleMapType,
	visitSuperBlock func(SuperBlock) error,
	readNeedleBody bool,
	visitNeedle func(n *Needle, offset int64) error) (err error)

func WalkIndexFile

func WalkIndexFile(r *os.File, fn func(key uint64, offset, size uint32) error) error

walks through the index file, calls fn function with each key, offset, size stops with the error returned by the fn function

Types

type BoltDbNeedleMap

type BoltDbNeedleMap struct {
	// contains filtered or unexported fields
}

func NewBoltDbNeedleMap

func NewBoltDbNeedleMap(dbFileName string, indexFile *os.File) (m *BoltDbNeedleMap, err error)

func (*BoltDbNeedleMap) Close

func (m *BoltDbNeedleMap) Close()

func (*BoltDbNeedleMap) Delete

func (m *BoltDbNeedleMap) Delete(key uint64, offset uint32) error

func (*BoltDbNeedleMap) Destroy

func (m *BoltDbNeedleMap) Destroy() error

func (*BoltDbNeedleMap) Get

func (m *BoltDbNeedleMap) Get(key uint64) (element *needle.NeedleValue, ok bool)

func (*BoltDbNeedleMap) IndexFileContent

func (nm *BoltDbNeedleMap) IndexFileContent() ([]byte, error)

func (*BoltDbNeedleMap) IndexFileName

func (nm *BoltDbNeedleMap) IndexFileName() string

func (*BoltDbNeedleMap) IndexFileSize

func (nm *BoltDbNeedleMap) IndexFileSize() uint64

func (*BoltDbNeedleMap) Put

func (m *BoltDbNeedleMap) Put(key uint64, offset uint32, size uint32) error

type ByOffset

type ByOffset []needle.NeedleValue

func (ByOffset) Len

func (a ByOffset) Len() int

func (ByOffset) Less

func (a ByOffset) Less(i, j int) bool

func (ByOffset) Swap

func (a ByOffset) Swap(i, j int)

type CRC

type CRC uint32

func NewCRC

func NewCRC(b []byte) CRC

func (CRC) Update

func (c CRC) Update(b []byte) CRC

func (CRC) Value

func (c CRC) Value() uint32

type CompactMap

type CompactMap struct {
	// contains filtered or unexported fields
}

This map assumes mostly inserting increasing keys

func NewCompactMap

func NewCompactMap() CompactMap

func (*CompactMap) Delete

func (cm *CompactMap) Delete(key Key) uint32

func (*CompactMap) Get

func (cm *CompactMap) Get(key Key) (*NeedleValue, bool)

func (*CompactMap) Set

func (cm *CompactMap) Set(key Key, offset, size uint32) (oldOffset, oldSize uint32)

func (*CompactMap) Visit

func (cm *CompactMap) Visit(visit func(NeedleValue) error) error

Visit visits all entries or stop if any error when visiting

type CompactSection

type CompactSection struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

func NewCompactSection

func NewCompactSection(start Key) *CompactSection

func (*CompactSection) Delete

func (cs *CompactSection) Delete(key Key) uint32

return old entry size

func (*CompactSection) Get

func (cs *CompactSection) Get(key Key) (*NeedleValue, bool)

func (*CompactSection) Set

func (cs *CompactSection) Set(key Key, offset, size uint32) (oldOffset, oldSize uint32)

return old entry size

type DiskLocation

type DiskLocation struct {
	Directory      string
	MaxVolumeCount int

	sync.RWMutex
	// contains filtered or unexported fields
}

func NewDiskLocation

func NewDiskLocation(dir string, maxVolumeCount int) *DiskLocation

func (*DiskLocation) Close

func (l *DiskLocation) Close()

func (*DiskLocation) DeleteCollectionFromDiskLocation

func (l *DiskLocation) DeleteCollectionFromDiskLocation(collection string) (e error)

func (*DiskLocation) DeleteVolume

func (l *DiskLocation) DeleteVolume(vid VolumeId) error

func (*DiskLocation) FindVolume

func (l *DiskLocation) FindVolume(vid VolumeId) (*Volume, bool)

func (*DiskLocation) LoadVolume

func (l *DiskLocation) LoadVolume(vid VolumeId, needleMapKind NeedleMapType) bool

func (*DiskLocation) SetVolume

func (l *DiskLocation) SetVolume(vid VolumeId, volume *Volume)

func (*DiskLocation) UnloadVolume

func (l *DiskLocation) UnloadVolume(vid VolumeId) error

func (*DiskLocation) VolumesLen

func (l *DiskLocation) VolumesLen() int

type FileId

type FileId struct {
	VolumeId VolumeId
	Key      uint64
	Hashcode uint32
}

func NewFileId

func NewFileId(VolumeId VolumeId, Key uint64, Hashcode uint32) *FileId

func NewFileIdFromNeedle

func NewFileIdFromNeedle(VolumeId VolumeId, n *Needle) *FileId

func ParseFileId

func ParseFileId(fid string) (*FileId, error)

func (*FileId) String

func (n *FileId) String() string

type Key

type Key uint64

func (Key) String

func (k Key) String() string

type LevelDbNeedleMap

type LevelDbNeedleMap struct {
	// contains filtered or unexported fields
}

func NewLevelDbNeedleMap

func NewLevelDbNeedleMap(dbFileName string, indexFile *os.File) (m *LevelDbNeedleMap, err error)

func (*LevelDbNeedleMap) Close

func (m *LevelDbNeedleMap) Close()

func (*LevelDbNeedleMap) Delete

func (m *LevelDbNeedleMap) Delete(key uint64, offset uint32) error

func (*LevelDbNeedleMap) Destroy

func (m *LevelDbNeedleMap) Destroy() error

func (*LevelDbNeedleMap) Get

func (m *LevelDbNeedleMap) Get(key uint64) (element *needle.NeedleValue, ok bool)

func (*LevelDbNeedleMap) IndexFileContent

func (nm *LevelDbNeedleMap) IndexFileContent() ([]byte, error)

func (*LevelDbNeedleMap) IndexFileName

func (nm *LevelDbNeedleMap) IndexFileName() string

func (*LevelDbNeedleMap) IndexFileSize

func (nm *LevelDbNeedleMap) IndexFileSize() uint64

func (*LevelDbNeedleMap) Put

func (m *LevelDbNeedleMap) Put(key uint64, offset uint32, size uint32) error

type MasterNodes

type MasterNodes struct {
	// contains filtered or unexported fields
}

func NewMasterNodes

func NewMasterNodes(bootstrapNode string) (mn *MasterNodes)

func (*MasterNodes) FindMaster

func (mn *MasterNodes) FindMaster() (leader string, err error)

func (*MasterNodes) Reset

func (mn *MasterNodes) Reset()

func (*MasterNodes) SetPossibleLeader

func (mn *MasterNodes) SetPossibleLeader(possibleLeader string)

func (*MasterNodes) String

func (mn *MasterNodes) String() string

type Needle

type Needle struct {
	Cookie uint32 `comment:"random number to mitigate brute force lookups"`
	Id     uint64 `comment:"needle id"`
	Size   uint32 `comment:"sum of DataSize,Data,NameSize,Name,MimeSize,Mime"`

	DataSize     uint32 `comment:"Data size"` //version2
	Data         []byte `comment:"The actual file data"`
	Flags        byte   `comment:"boolean flags"` //version2
	NameSize     uint8  //version2
	Name         []byte `comment:"maximum 256 characters"` //version2
	MimeSize     uint8  //version2
	Mime         []byte `comment:"maximum 256 characters"` //version2
	PairsSize    uint16 //version2
	Pairs        []byte `comment:"additional name value pairs, json format, maximum 64kB"`
	LastModified uint64 //only store LastModifiedBytesLength bytes, which is 5 bytes to disk
	Ttl          *TTL

	Checksum CRC    `comment:"CRC32 to check integrity"`
	Padding  []byte `comment:"Aligned to 8 bytes"`
}

* A Needle means a uploaded and stored file. * Needle file size is limited to 4GB for now.

func NewNeedle

func NewNeedle(r *http.Request, fixJpgOrientation bool) (n *Needle, e error)

func ReadNeedleHeader

func ReadNeedleHeader(r *os.File, version Version, offset int64) (n *Needle, bodyLength uint32, err error)

func (*Needle) Append

func (n *Needle) Append(w io.Writer, version Version) (size uint32, actualSize int64, err error)

func (*Needle) DiskSize

func (n *Needle) DiskSize() int64

func (*Needle) Etag

func (n *Needle) Etag() string

func (*Needle) HasLastModifiedDate

func (n *Needle) HasLastModifiedDate() bool

func (*Needle) HasMime

func (n *Needle) HasMime() bool

func (*Needle) HasName

func (n *Needle) HasName() bool

func (*Needle) HasPairs

func (n *Needle) HasPairs() bool

func (*Needle) HasTtl

func (n *Needle) HasTtl() bool

func (*Needle) IsChunkedManifest

func (n *Needle) IsChunkedManifest() bool

func (*Needle) IsGzipped

func (n *Needle) IsGzipped() bool

func (*Needle) ParseNeedleHeader

func (n *Needle) ParseNeedleHeader(bytes []byte)

func (*Needle) ParsePath

func (n *Needle) ParsePath(fid string) (err error)

func (*Needle) ReadData

func (n *Needle) ReadData(r *os.File, offset int64, size uint32, version Version) (err error)

func (*Needle) ReadNeedleBody

func (n *Needle) ReadNeedleBody(r *os.File, version Version, offset int64, bodyLength uint32) (err error)

n should be a needle already read the header the input stream will read until next file entry

func (*Needle) SetGzipped

func (n *Needle) SetGzipped()

func (*Needle) SetHasLastModifiedDate

func (n *Needle) SetHasLastModifiedDate()

func (*Needle) SetHasMime

func (n *Needle) SetHasMime()

func (*Needle) SetHasName

func (n *Needle) SetHasName()

func (*Needle) SetHasPairs

func (n *Needle) SetHasPairs()

func (*Needle) SetHasTtl

func (n *Needle) SetHasTtl()

func (*Needle) SetIsChunkManifest

func (n *Needle) SetIsChunkManifest()

func (*Needle) String

func (n *Needle) String() (str string)

type NeedleMap

type NeedleMap struct {
	// contains filtered or unexported fields
}

func LoadBtreeNeedleMap

func LoadBtreeNeedleMap(file *os.File) (*NeedleMap, error)

func LoadCompactNeedleMap

func LoadCompactNeedleMap(file *os.File) (*NeedleMap, error)

func NewBtreeNeedleMap

func NewBtreeNeedleMap(file *os.File) *NeedleMap

func NewCompactNeedleMap

func NewCompactNeedleMap(file *os.File) *NeedleMap

func (*NeedleMap) Close

func (nm *NeedleMap) Close()

func (*NeedleMap) Delete

func (nm *NeedleMap) Delete(key uint64, offset uint32) error

func (*NeedleMap) Destroy

func (nm *NeedleMap) Destroy() error

func (*NeedleMap) Get

func (nm *NeedleMap) Get(key uint64) (element *needle.NeedleValue, ok bool)

func (*NeedleMap) IndexFileContent

func (nm *NeedleMap) IndexFileContent() ([]byte, error)

func (*NeedleMap) IndexFileName

func (nm *NeedleMap) IndexFileName() string

func (*NeedleMap) IndexFileSize

func (nm *NeedleMap) IndexFileSize() uint64

func (*NeedleMap) Put

func (nm *NeedleMap) Put(key uint64, offset uint32, size uint32) error

type NeedleMapType

type NeedleMapType int
const (
	NeedleMapInMemory NeedleMapType = iota
	NeedleMapLevelDb
	NeedleMapBoltDb
	NeedleMapBtree
)

type NeedleMapper

type NeedleMapper interface {
	Put(key uint64, offset uint32, size uint32) error
	Get(key uint64) (element *needle.NeedleValue, ok bool)
	Delete(key uint64, offset uint32) error
	Close()
	Destroy() error
	ContentSize() uint64
	DeletedSize() uint64
	FileCount() int
	DeletedCount() int
	MaxFileKey() uint64
	IndexFileSize() uint64
	IndexFileContent() ([]byte, error)
	IndexFileName() string
}

type NeedleValue

type NeedleValue struct {
	Key    Key
	Offset uint32 `comment:"Volume offset"` //since aligned to 8 bytes, range is 4G*8=32G
	Size   uint32 `comment:"Size of the data portion"`
}

type ReplicaPlacement

type ReplicaPlacement struct {
	SameRackCount       int
	DiffRackCount       int
	DiffDataCenterCount int
}

func NewReplicaPlacementFromByte

func NewReplicaPlacementFromByte(b byte) (*ReplicaPlacement, error)

func NewReplicaPlacementFromString

func NewReplicaPlacementFromString(t string) (*ReplicaPlacement, error)

func (*ReplicaPlacement) Byte

func (rp *ReplicaPlacement) Byte() byte

func (*ReplicaPlacement) GetCopyCount

func (rp *ReplicaPlacement) GetCopyCount() int

func (*ReplicaPlacement) String

func (rp *ReplicaPlacement) String() string

type Store

type Store struct {
	Ip        string
	Port      int
	PublicUrl string
	Locations []*DiskLocation

	VolumeSizeLimit uint64 //read from the master
	Client          pb.Seaweed_SendHeartbeatClient
	NeedleMapType   NeedleMapType
	// contains filtered or unexported fields
}

* A VolumeServer contains one Store

func NewStore

func NewStore(port int, ip, publicUrl string, dirnames []string, maxVolumeCounts []int, needleMapKind NeedleMapType) (s *Store)

func (*Store) AddVolume

func (s *Store) AddVolume(volumeListString string, collection string, needleMapKind NeedleMapType, replicaPlacement string, ttlString string, preallocate int64) error

func (*Store) CheckCompactVolume

func (s *Store) CheckCompactVolume(volumeIdString string, garbageThresholdString string) (error, bool)

func (*Store) Close

func (s *Store) Close()

func (*Store) CollectHeartbeat

func (s *Store) CollectHeartbeat() *pb.Heartbeat

func (*Store) CommitCleanupVolume

func (s *Store) CommitCleanupVolume(volumeIdString string) error

func (*Store) CommitCompactVolume

func (s *Store) CommitCompactVolume(volumeIdString string) error

func (*Store) CompactVolume

func (s *Store) CompactVolume(volumeIdString string, preallocate int64) error

func (*Store) Delete

func (s *Store) Delete(i VolumeId, n *Needle) (uint32, error)

func (*Store) DeleteCollection

func (s *Store) DeleteCollection(collection string) (e error)

func (*Store) DeleteVolume

func (s *Store) DeleteVolume(i VolumeId) error

func (*Store) GetVolume

func (s *Store) GetVolume(i VolumeId) *Volume

func (*Store) HasVolume

func (s *Store) HasVolume(i VolumeId) bool

func (*Store) MountVolume

func (s *Store) MountVolume(i VolumeId) error

func (*Store) ReadVolumeNeedle

func (s *Store) ReadVolumeNeedle(i VolumeId, n *Needle) (int, error)

func (*Store) SetDataCenter

func (s *Store) SetDataCenter(dataCenter string)

func (*Store) SetRack

func (s *Store) SetRack(rack string)

func (*Store) Status

func (s *Store) Status() []*VolumeInfo

func (*Store) String

func (s *Store) String() (str string)

func (*Store) UnmountVolume

func (s *Store) UnmountVolume(i VolumeId) error

func (*Store) Write

func (s *Store) Write(i VolumeId, n *Needle) (size uint32, err error)

type SuperBlock

type SuperBlock struct {
	ReplicaPlacement *ReplicaPlacement
	Ttl              *TTL
	CompactRevision  uint16
	// contains filtered or unexported fields
}

* Super block currently has 8 bytes allocated for each volume. * Byte 0: version, 1 or 2 * Byte 1: Replica Placement strategy, 000, 001, 002, 010, etc * Byte 2 and byte 3: Time to live. See TTL for definition * Byte 4 and byte 5: The number of times the volume has been compacted. * Rest bytes: Reserved

func ParseSuperBlock

func ParseSuperBlock(header []byte) (superBlock SuperBlock, err error)

func (*SuperBlock) Bytes

func (s *SuperBlock) Bytes() []byte

func (*SuperBlock) Version

func (s *SuperBlock) Version() Version

type TTL

type TTL struct {
	// contains filtered or unexported fields
}

func LoadTTLFromBytes

func LoadTTLFromBytes(input []byte) (t *TTL)

read stored bytes to a ttl

func LoadTTLFromUint32

func LoadTTLFromUint32(ttl uint32) (t *TTL)

read stored bytes to a ttl

func ParseUpload

func ParseUpload(r *http.Request) (
	fileName string, data []byte, mimeType string, pairMap map[string]string, isGzipped bool,
	modifiedTime uint64, ttl *TTL, isChunkedFile bool, e error)

func ReadTTL

func ReadTTL(ttlString string) (*TTL, error)

translate a readable ttl to internal ttl Supports format example: 3m: 3 minutes 4h: 4 hours 5d: 5 days 6w: 6 weeks 7M: 7 months 8y: 8 years

func (TTL) Minutes

func (t TTL) Minutes() uint32

func (*TTL) String

func (t *TTL) String() string

func (*TTL) ToBytes

func (t *TTL) ToBytes(output []byte)

save stored bytes to an output with 2 bytes

func (*TTL) ToUint32

func (t *TTL) ToUint32() (output uint32)

type Version

type Version uint8

type Volume

type Volume struct {
	Id VolumeId

	Collection string

	SuperBlock
	// contains filtered or unexported fields
}

func NewVolume

func NewVolume(dirname string, collection string, id VolumeId, needleMapKind NeedleMapType, replicaPlacement *ReplicaPlacement, ttl *TTL, preallocate int64) (v *Volume, e error)

func (*Volume) AppendBlob

func (v *Volume) AppendBlob(b []byte) (offset int64, err error)

AppendBlob append a blob to end of the data file, used in replication

func (*Volume) Close

func (v *Volume) Close()

Close cleanly shuts down this volume

func (*Volume) Compact

func (v *Volume) Compact(preallocate int64) error

func (*Volume) Compact2

func (v *Volume) Compact2() error

func (*Volume) ContentSize

func (v *Volume) ContentSize() uint64

func (*Volume) DataFile

func (v *Volume) DataFile() *os.File

func (*Volume) Destroy

func (v *Volume) Destroy() (err error)

Destroy removes everything related to this volume

func (*Volume) FileName

func (v *Volume) FileName() (fileName string)

func (*Volume) GetVolumeSyncStatus

func (v *Volume) GetVolumeSyncStatus() operation.SyncVolumeResponse

func (*Volume) IndexFileContent

func (v *Volume) IndexFileContent() ([]byte, error)

func (*Volume) NeedToReplicate

func (v *Volume) NeedToReplicate() bool

func (*Volume) Size

func (v *Volume) Size() int64

func (*Volume) String

func (v *Volume) String() string

func (*Volume) Synchronize

func (v *Volume) Synchronize(volumeServer string) (err error)

func (*Volume) Version

func (v *Volume) Version() Version

type VolumeId

type VolumeId uint32

func NewVolumeId

func NewVolumeId(vid string) (VolumeId, error)

func (*VolumeId) Next

func (vid *VolumeId) Next() VolumeId

func (*VolumeId) String

func (vid *VolumeId) String() string

type VolumeInfo

type VolumeInfo struct {
	Id               VolumeId
	Size             uint64
	ReplicaPlacement *ReplicaPlacement
	Ttl              *TTL
	Collection       string
	Version          Version
	FileCount        int
	DeleteCount      int
	DeletedByteCount uint64
	ReadOnly         bool
}

func NewVolumeInfo

func NewVolumeInfo(m *pb.VolumeInformationMessage) (vi VolumeInfo, err error)

func (VolumeInfo) String

func (vi VolumeInfo) String() string

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL