core

package
v0.0.0-...-303e327 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 22, 2023 License: Apache-2.0 Imports: 25 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DefaultDiskReservedSpaceB           = int64(60 << 30) // 60 GiB
	DefaultCompactReservedSpaceB        = int64(20 << 30) // 20 GiB
	DefaultChunkSize                    = int64(16 << 30) // 16 GiB
	DefaultMaxChunks                    = int32(1 << 13)  // 8192
	DefaultChunkReleaseProtectionM      = 1440            // 1 days
	DefaultChunkGcCreateTimeProtectionM = 1440            // 1 days
	DefaultChunkGcModifyTimeProtectionM = 1440            // 1 days
	DefaultChunkCompactIntervalSec      = 10 * 60         // 10 min
	DefaultChunkCleanIntervalSec        = 60              // 1 min
	DefaultDiskUsageIntervalSec         = 60              // 1 min
	DefaultDiskCleanTrashIntervalSec    = 60 * 60         // 60 min
	DefaultDiskTrashProtectionM         = 2880            // 2 days
	DefaultCompactBatchSize             = 1024            // 1024 counts
	DefaultCompactMinSizeThreshold      = 16 * (1 << 30)  // 16 GiB
	DefaultCompactTriggerThreshold      = 1 * (1 << 40)   // 1 TiB
	DefaultMetricReportIntervalS        = 30              // 30 Sec
	DefaultBlockBufferSize              = 64 * 1024       // 64k
	DefaultCompactEmptyRateThreshold    = float64(0.8)    // 80% rate
)
View Source
const (
	CrcBlockUnitSize = 64 * 1024 // 64k
)
View Source
const (
	FormatMetaTypeV1 = "fs"
)

Variables

View Source
var (
	ErrChunkScanEOF      = errors.New("chunk scan occur eof")
	ErrEnoughShardNumber = errors.New("chunk scan enough shard number")
)
View Source
var (
	ErrFormatInfoCheckSum = errors.New("format info check sum error")
	ErrInvalidPathPrefix  = errors.New("invalid path prefix")
)
View Source
var (
	ErrShardHeaderMagic = errors.New("shard header magic")
	ErrShardHeaderSize  = errors.New("shard header size")
	ErrShardHeaderCrc   = errors.New("shard header crc not match")
	ErrShardFooterMagic = errors.New("shard footer magic")
	ErrShardFooterSize  = errors.New("shard footer size")
	ErrShardBufferSize  = errors.New("shard buffer size not match")
)

Functions

func AlignSize

func AlignSize(p int64, bound int64) (r int64)

func Alignphysize

func Alignphysize(shardSize int64) int64

func EnsureDiskArea

func EnsureDiskArea(diskpath string, rootPrefix string) (err error)

func GetDataPath

func GetDataPath(diskRoot string) (path string)

func GetMetaPath

func GetMetaPath(diskRoot string, metaRootPrefix string) (path string)

func GetShardFooterSize

func GetShardFooterSize() int64

func GetShardHeaderSize

func GetShardHeaderSize() int64

func InitConfig

func InitConfig(conf *Config) error

func IsFormatConfigExist

func IsFormatConfigExist(diskRootPath string) (bool, error)

func OpenFile

func OpenFile(filename string, createIfMiss bool) (*os.File, error)

func SaveDiskFormatInfo

func SaveDiskFormatInfo(ctx context.Context, diskPath string, formatInfo *FormatInfo) (err error)

func SysTrashPath

func SysTrashPath(diskRoot string) (path string)

Types

type BaseConfig

type BaseConfig struct {
	Path        string `json:"path"`
	AutoFormat  bool   `json:"auto_format"`
	MaxChunks   int32  `json:"max_chunks"`
	DisableSync bool   `json:"disable_sync"`
}

Config for disk

type BlobFile

type BlobFile interface {
	RawFile
	Allocate(off int64, size int64) (err error)
	Discard(off int64, size int64) (err error)
	SysStat() (sysstat syscall.Stat_t, err error)
}

func NewBlobFile

func NewBlobFile(file RawFile, handleIOError func(err error)) BlobFile

type ChunkAPI

type ChunkAPI interface {
	// infos
	ID() bnapi.ChunkId
	Vuid() proto.Vuid
	Disk() (disk DiskAPI)
	Status() bnapi.ChunkStatus
	VuidMeta() (vm *VuidMeta)
	ChunkInfo(ctx context.Context) (info bnapi.ChunkInfo)

	// method
	Write(ctx context.Context, b *Shard) (err error)
	Read(ctx context.Context, b *Shard) (n int64, err error)
	RangeRead(ctx context.Context, b *Shard) (n int64, err error)
	MarkDelete(ctx context.Context, bid proto.BlobID) (err error)
	Delete(ctx context.Context, bid proto.BlobID) (err error)
	ReadShardMeta(ctx context.Context, bid proto.BlobID) (sm *ShardMeta, err error)
	ListShards(ctx context.Context, startBid proto.BlobID, cnt int, status bnapi.ShardStatus) (infos []*bnapi.ShardInfo, next proto.BlobID, err error)
	Sync(ctx context.Context) (err error)
	SyncData(ctx context.Context) (err error)
	Close(ctx context.Context)

	// compact
	StartCompact(ctx context.Context) (ncs ChunkAPI, err error)
	CommitCompact(ctx context.Context, ncs ChunkAPI) (err error)
	StopCompact(ctx context.Context, ncs ChunkAPI) (err error)
	NeedCompact(ctx context.Context) bool
	IsDirty() bool
	IsClosed() bool
	AllowModify() (err error)
	HasEnoughSpace(needSize int64) bool
	HasPendingRequest() bool
	SetStatus(status bnapi.ChunkStatus) (err error)
	SetDirty(dirty bool)
}

chunk storage api

type Config

type Config struct {
	BaseConfig
	RuntimeConfig
	HostInfo
	db.MetaConfig

	AllocDiskID      func(ctx context.Context) (proto.DiskID, error)
	HandleIOError    func(ctx context.Context, diskID proto.DiskID, diskErr error)
	NotifyCompacting func(ctx context.Context, args *cmapi.SetCompactChunkArgs) (err error)
}

type ConsistencyController

type ConsistencyController struct {
	// contains filtered or unexported fields
}

func NewConsistencyController

func NewConsistencyController() (cc *ConsistencyController)

func (*ConsistencyController) Begin

func (cc *ConsistencyController) Begin(item interface{}) (elem *list.Element)

func (*ConsistencyController) CurrentTime

func (cc *ConsistencyController) CurrentTime() uint64

func (*ConsistencyController) End

func (cc *ConsistencyController) End(elem *list.Element)

func (*ConsistencyController) Synchronize

func (cc *ConsistencyController) Synchronize() uint64

type DataHandler

type DataHandler interface {
	Write(ctx context.Context, shard *Shard) error
	Read(ctx context.Context, shard *Shard, from, to uint32) (r io.Reader, err error)
	Stat() (stat *StorageStat, err error)
	Flush() (err error)
	Delete(ctx context.Context, shard *Shard) (err error)
	Destroy(ctx context.Context) (err error)
	Close()
}

type DiskAPI

type DiskAPI interface {
	ID() proto.DiskID
	Status() (status proto.DiskStatus)
	DiskInfo() (info bnapi.DiskInfo)
	Stats() (stat DiskStats)
	GetChunkStorage(vuid proto.Vuid) (cs ChunkAPI, found bool)
	GetConfig() (config *Config)
	GetIoQos() (ioQos qos.Qos)
	GetDataPath() (path string)
	GetMetaPath() (path string)
	SetStatus(status proto.DiskStatus)
	LoadDiskInfo(ctx context.Context) (dm DiskMeta, err error)
	UpdateDiskStatus(ctx context.Context, status proto.DiskStatus) (err error)
	CreateChunk(ctx context.Context, vuid proto.Vuid, chunksize int64) (cs ChunkAPI, err error)
	ReleaseChunk(ctx context.Context, vuid proto.Vuid, force bool) (err error)
	UpdateChunkStatus(ctx context.Context, vuid proto.Vuid, status bnapi.ChunkStatus) (err error)
	UpdateChunkCompactState(ctx context.Context, vuid proto.Vuid, compacting bool) (err error)
	ListChunks(ctx context.Context) (chunks []VuidMeta, err error)
	EnqueueCompact(ctx context.Context, vuid proto.Vuid)
	GcRubbishChunk(ctx context.Context) (mayBeLost []bnapi.ChunkId, err error)
	WalkChunksWithLock(ctx context.Context, fn func(cs ChunkAPI) error) (err error)
	ResetChunks(ctx context.Context)
	Close(ctx context.Context)
}

type DiskMeta

type DiskMeta struct {
	FormatInfo
	Host       string           `json:"host"`
	Path       string           `json:"path"`
	Status     proto.DiskStatus `json:"status"`
	Registered bool             `json:"registered"`
	Mtime      int64            `json:"mtime"`
}

disk meta data for rocksdb

type DiskStats

type DiskStats struct {
	Used          int64 `json:"used"`            // actual physical space usage
	Free          int64 `json:"free"`            // actual remaining physical space on the disk
	Reserved      int64 `json:"reserved"`        // reserve space on the disk
	TotalDiskSize int64 `json:"total_disk_size"` // total actual disk size
}

type FormatInfo

type FormatInfo struct {
	FormatInfoProtectedField
	CheckSum uint32 `json:"check_sum"`
}

func ReadFormatInfo

func ReadFormatInfo(ctx context.Context, diskRootPath string) (
	formatInfo *FormatInfo, err error)

func (*FormatInfo) CalCheckSum

func (fi *FormatInfo) CalCheckSum() (uint32, error)

func (*FormatInfo) Verify

func (fi *FormatInfo) Verify() error

type FormatInfoProtectedField

type FormatInfoProtectedField struct {
	DiskID  proto.DiskID `json:"diskid"`
	Version uint8        `json:"version"`
	Ctime   int64        `json:"ctime"`
	Format  string       `json:"format"`
}

type HostInfo

type HostInfo struct {
	ClusterID proto.ClusterID `json:"cluster_id"`
	IDC       string          `json:"idc"`
	Rack      string          `json:"rack"`
	Host      string          `json:"host"`
}

type MetaHandler

type MetaHandler interface {
	ID() bnapi.ChunkId
	InnerDB() db.MetaHandler
	SupportInline() bool
	Write(ctx context.Context, bid proto.BlobID, value ShardMeta) (err error)
	Read(ctx context.Context, bid proto.BlobID) (value ShardMeta, err error)
	Delete(ctx context.Context, bid proto.BlobID) (err error)
	Scan(ctx context.Context, startBid proto.BlobID, limit int,
		fn func(bid proto.BlobID, sm *ShardMeta) error) (err error)
	Destroy(ctx context.Context) (err error)
	Close()
}

type Option

type Option struct {
	DB               db.MetaHandler
	Disk             DiskAPI
	Conf             *Config
	IoQos            qos.Qos
	CreateDataIfMiss bool
}

create chunk option

type OptionFunc

type OptionFunc func(option *Option)

type RawFile

type RawFile interface {
	Name() string
	Fd() uintptr
	ReadAt(b []byte, off int64) (n int, err error)
	WriteAt(b []byte, off int64) (n int, err error)
	Stat() (info os.FileInfo, err error)
	Sync() error
	Close() error
}

type Request

type Request struct {
	// contains filtered or unexported fields
}

type RuntimeConfig

type RuntimeConfig struct {
	DiskReservedSpaceB           int64   `json:"disk_reserved_space_B"`             // threshold
	CompactReservedSpaceB        int64   `json:"compact_reserved_space_B"`          // compact reserve
	ChunkReleaseProtectionM      int64   `json:"chunk_protection_M"`                // protect
	ChunkCompactIntervalSec      int64   `json:"chunk_compact_interval_S"`          // loop
	ChunkCleanIntervalSec        int64   `json:"chunk_clean_interval_S"`            // loop
	ChunkGcCreateTimeProtectionM int64   `json:"chunk_gc_create_time_protection_M"` // protect
	ChunkGcModifyTimeProtectionM int64   `json:"chunk_gc_modify_time_protection_M"` // protect
	DiskUsageIntervalSec         int64   `json:"disk_usage_interval_S"`             // loop
	DiskCleanTrashIntervalSec    int64   `json:"disk_clean_trash_interval_S"`       // loop
	DiskTrashProtectionM         int64   `json:"disk_trash_protection_M"`           // protect
	AllowCleanTrash              bool    `json:"allow_clean_trash"`
	DisableModifyInCompacting    bool    `json:"disable_modify_in_compacting"`
	CompactMinSizeThreshold      int64   `json:"compact_min_size_threshold"`
	CompactTriggerThreshold      int64   `json:"compact_trigger_threshold"`
	CompactEmptyRateThreshold    float64 `json:"compact_empty_rate_threshold"`
	NeedCompactCheck             bool    `json:"need_compact_check"`
	AllowForceCompact            bool    `json:"allow_force_compact"`
	CompactBatchSize             int     `json:"compact_batch_size"`
	MustMountPoint               bool    `json:"must_mount_point"`
	IOStatFileDryRun             bool    `json:"iostat_file_dryrun"`
	MetricReportIntervalS        int64   `json:"metric_report_interval_S"`
	BlockBufferSize              int64   `json:"block_buffer_size"`
	EnableDataInspect            bool    `json:"enable_data_inspect"`

	DataQos qos.Config `json:"data_qos"`
}

type Shard

type Shard struct {
	Bid  proto.BlobID // shard id
	Vuid proto.Vuid   // volume unit id

	Size   uint32            // size for shard
	Offset int64             // offset in data file. align when write
	Crc    uint32            // crc for shard data
	Flag   bnapi.ShardStatus // shard status

	Inline bool   // shard data inline
	Buffer []byte // inline data

	Body     io.Reader // for put: shard body
	From, To int64     // for get: range (note: may fix in cs)
	Writer   io.Writer // for get: transmission to network

	PrepareHook func(shard *Shard)
	AfterHook   func(shard *Shard)
}

Blob Shard in memory

func NewShardReader

func NewShardReader(id proto.BlobID, vuid proto.Vuid, from int64, to int64, writer io.Writer) *Shard

for read

func NewShardWriter

func NewShardWriter(id proto.BlobID, vuid proto.Vuid, size uint32, body io.Reader) *Shard

for write

func ShardCopy

func ShardCopy(src *Shard) (dest *Shard)

func (*Shard) FillMeta

func (b *Shard) FillMeta(meta ShardMeta)

func (*Shard) ParseFooter

func (b *Shard) ParseFooter(buf []byte) (err error)

func (*Shard) ParseHeader

func (b *Shard) ParseHeader(buf []byte) (err error)

func (*Shard) String

func (b *Shard) String() string

func (*Shard) WriterFooter

func (b *Shard) WriterFooter(buf []byte) (err error)

func (*Shard) WriterHeader

func (b *Shard) WriterHeader(buf []byte) (err error)

type ShardKey

type ShardKey struct {
	Chunk bnapi.ChunkId `json:"chunk"`
	Bid   proto.BlobID  `json:"bid"`
}

meta db key

type ShardMeta

type ShardMeta struct {
	Version uint8
	Flag    bnapi.ShardStatus
	Offset  int64
	Size    uint32
	Crc     uint32
	Padding [8]byte
	Inline  bool
	Buffer  []byte
}

meta db value

func (*ShardMeta) Marshal

func (sm *ShardMeta) Marshal() ([]byte, error)

func (*ShardMeta) Unmarshal

func (sm *ShardMeta) Unmarshal(data []byte) error

type Storage

type Storage interface {
	ID() bnapi.ChunkId
	MetaHandler() MetaHandler
	DataHandler() DataHandler
	RawStorage() Storage
	Write(ctx context.Context, b *Shard) (err error)
	ReadShardMeta(ctx context.Context, bid proto.BlobID) (sm *ShardMeta, err error)
	NewRangeReader(ctx context.Context, b *Shard, from, to int64) (rc io.Reader, err error)
	MarkDelete(ctx context.Context, bid proto.BlobID) (err error)
	Delete(ctx context.Context, bid proto.BlobID) (n int64, err error)
	ScanMeta(ctx context.Context, startBid proto.BlobID, limit int,
		fn func(bid proto.BlobID, sm *ShardMeta) error) (err error)
	SyncData(ctx context.Context) (err error)
	Sync(ctx context.Context) (err error)
	Stat(ctx context.Context) (stat *StorageStat, err error)
	PendingError() error
	PendingRequest() int64
	IncrPendingCnt()
	DecrPendingCnt()
	Close(ctx context.Context)
	Destroy(ctx context.Context)
}

type StorageStat

type StorageStat struct {
	FileSize   int64         `json:"file_size"`
	PhySize    int64         `json:"phy_size"`
	ParentID   bnapi.ChunkId `json:"parent_id"`
	CreateTime int64         `json:"create_time"`
}

type VuidMeta

type VuidMeta struct {
	Version     uint8             `json:"version"`
	Vuid        proto.Vuid        `json:"vuid"`
	DiskID      proto.DiskID      `json:"diskid"`
	ChunkId     bnapi.ChunkId     `json:"chunkname"`
	ParentChunk bnapi.ChunkId     `json:"parentchunk"`
	ChunkSize   int64             `json:"chunksize"`
	Ctime       int64             `json:"ctime"` // nsec
	Mtime       int64             `json:"mtime"` // nsec
	Compacting  bool              `json:"compacting"`
	Status      bnapi.ChunkStatus `json:"status"` // normal、release
	Reason      string            `json:"reason"`
}

chunk meta data for kv db

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL