datanode

package
v0.0.0-...-71dd0ca Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 12, 2018 License: Apache-2.0 Imports: 32 Imported by: 0

Documentation

Index

Constants

View Source
const (
	PkgRepairCReadRespMaxSize   = 15 * util.MB
	PkgRepairCReadRespLimitSize = 10 * util.MB
)
View Source
const (
	Standby uint32 = iota
	Start
	Running
	Shutdown
	Stopped
)
View Source
const (
	ActionSendToNext                                 = "ActionSendToNext"
	LocalProcessAddr                                 = "LocalProcess"
	ActionReceiveFromNext                            = "ActionReceiveFromNext"
	ActionStreamRead                                 = "ActionStreamRead"
	ActionWriteToCli                                 = "ActionWriteToCli"
	ActionGetDataPartitionMetrics                    = "ActionGetDataPartitionMetrics"
	ActionCheckAndAddInfos                           = "ActionCheckAndAddInfos"
	ActionCheckBlobFileInfo                          = "ActionCheckBlobFileInfo"
	ActionPostToMaster                               = "ActionPostToMaster"
	ActionFollowerRequireBlobFileRepairCmd           = "ActionFollowerRequireBlobFileRepairCmd"
	ActionLeaderToFollowerOpRepairReadPackBuffer     = "ActionLeaderToFollowerOpRepairReadPackBuffer"
	ActionLeaderToFollowerOpRepairReadSendPackBuffer = "ActionLeaderToFollowerOpRepairReadSendPackBuffer"

	ActionGetFollowers    = "ActionGetFollowers"
	ActionCheckReplyAvail = "ActionCheckReplyAvail"
)
View Source
const (
	ReportToMonitorRole = 1
	ReportToSelfRole    = 3
)

stats

View Source
const (
	InFlow = iota
	OutFlow
)
View Source
const (
	CanCompact    = 1
	NotCanCompact = -1
)
View Source
const (
	NoFlag           = 0
	ReadFlag         = 1
	WriteFlag        = 2
	MaxActiveExtents = 50000
)

pack cmd response

View Source
const (
	LogHeartbeat         = "HB:"
	LogStats             = "Stats:"
	LogLoad              = "Load:"
	LogExit              = "Exit:"
	LogShutdown          = "Shutdown:"
	LogCreatePartition   = "CRV:"
	LogCreateFile        = "CRF:"
	LogDelPartition      = "DELV:"
	LogDelFile           = "DELF:"
	LogMarkDel           = "MDEL:"
	LogPartitionSnapshot = "Snapshot:"
	LogGetWm             = "WM:"
	LogGetAllWm          = "AllWM:"
	LogCompactBlobFile   = "CompactBlobFile:"
	LogWrite             = "WR:"
	LogRead              = "RD:"
	LogRepairRead        = "RRD:"
	LogStreamRead        = "SRD:"
	LogRepairNeedles     = "RN:"
	LogRepair            = "Repair:"
	LogChecker           = "Checker:"
	LogTask              = "Master Task:"
	LogGetFlow           = "GetFlowInfo:"
)
View Source
const (
	DataPartitionPrefix       = "datapartition"
	DataPartitionMetaFileName = "META"
	TimeLayout                = "2006-01-02 15:04:05"
)
View Source
const (
	GetIpFromMaster = master.AdminGetIp
	DefaultRackName = "huitian_rack1"
)
View Source
const (
	ConfigKeyPort       = "port"       // int
	ConfigKeyClusterID  = "clusterID"  // string
	ConfigKeyMasterAddr = "masterAddr" // array
	ConfigKeyRack       = "rack"       // string
	ConfigKeyDisks      = "disks"      // array
)
View Source
const (
	CompactThreadNum = 4
)
View Source
const (
	ConnIsNullErr = "ConnIsNullErr"
)
View Source
const (
	MaxRepairBlobFileCount = 3
)
View Source
const (
	NetType = "tcp"
)
View Source
const (
	ObjectIDSize = 8
)
View Source
const (
	RequestChanSize = 10240
)
View Source
const (
	UmpModuleName = "dataNode"
)
View Source
const (
	VerifyBlobFile = 1
)

Variables

View Source
var (
	ErrBadNodes           = errors.New("BadNodesErr")
	ErrArgLenMismatch     = errors.New("ArgLenMismatchErr")
	ErrAddrsNodesMismatch = errors.New("AddrsNodesMismatchErr")
)
View Source
var (
	ErrStoreTypeMismatch        = errors.New("store type error")
	ErrPartitionNotExist        = errors.New("dataPartition not exists")
	ErrBlobFileOffsetMismatch   = errors.New("blobfile offset not mismatch")
	ErrNoDiskForCreatePartition = errors.New("no disk for create dataPartition")
	ErrBadConfFile              = errors.New("bad config file")

	LocalIP string

	MasterHelper = util.NewMasterHelper()
)
View Source
var (
	AdminGetDataPartition = master.AdminGetDataPartition
)
View Source
var (
	ErrDiskCompactChanFull = errors.New("disk compact chan is full")
)
View Source
var (
	ErrorUnknownOp = errors.New("unknown opcode")
)
View Source
var (
	GcanCompact int
)
View Source
var (
	// Regexp pattern for data partition dir name validate.
	RegexpDataPartitionDir, _ = regexp.Compile("^datapartition_(\\d)+_(\\d)+$")
)

Functions

This section is empty.

Types

type CompactTask

type CompactTask struct {
	// contains filtered or unexported fields
}

type DataNode

type DataNode struct {
	// contains filtered or unexported fields
}

func NewServer

func NewServer() *DataNode

func (*DataNode) AddCompactTask

func (s *DataNode) AddCompactTask(t *CompactTask) (err error)

func (*DataNode) Shutdown

func (s *DataNode) Shutdown()

func (*DataNode) Start

func (s *DataNode) Start(cfg *config.Config) (err error)

func (*DataNode) Sync

func (s *DataNode) Sync()

type DataPartition

type DataPartition interface {
	ID() uint32
	Path() string
	IsLeader() bool
	ReplicaHosts() []string
	Disk() *Disk

	Size() int
	Used() int
	Available() int

	Status() int
	ChangeStatus(status int)

	GetExtentStore() *storage.ExtentStore
	GetBlobStore() *storage.BlobStore

	GetObjects(blobfileID uint32, startOid, lastOid uint64) (objects []*storage.Object)
	PackObject(dataBuf []byte, o *storage.Object, blobfileID uint32) (err error)
	DelObjects(blobfileId uint32, deleteBuf []byte) (err error)

	LaunchRepair()
	MergeExtentStoreRepair(metas *MembersFileMetas)
	MergeBlobStoreRepair(metas *MembersFileMetas)
	FlushDelete() error

	AddWriteMetrics(latency uint64)
	AddReadMetrics(latency uint64)

	Stop()
}

func CreateDataPartition

func CreateDataPartition(volId string, partitionId uint32, disk *Disk, size int, partitionType string) (dp DataPartition, err error)

func LoadDataPartition

func LoadDataPartition(partitionDir string, disk *Disk) (dp DataPartition, err error)

LoadDataPartition load and returns partition instance from specified directory. This method will read the partition meta file stored under the specified directory and create partition instance.

type DataPartitionMetrics

type DataPartitionMetrics struct {
	WriteCnt        uint64
	ReadCnt         uint64
	SumWriteLatency uint64
	SumReadLatency  uint64
	WriteLatency    float64
	ReadLatency     float64
	// contains filtered or unexported fields
}

func NewDataPartitionMetrics

func NewDataPartitionMetrics() *DataPartitionMetrics

func (*DataPartitionMetrics) AddReadMetrics

func (metrics *DataPartitionMetrics) AddReadMetrics(latency uint64)

func (*DataPartitionMetrics) AddWriteMetrics

func (metrics *DataPartitionMetrics) AddWriteMetrics(latency uint64)

func (*DataPartitionMetrics) GetReadLatency

func (metrics *DataPartitionMetrics) GetReadLatency() float64

func (*DataPartitionMetrics) GetWriteLatency

func (metrics *DataPartitionMetrics) GetWriteLatency() float64

type Disk

type Disk struct {
	sync.RWMutex
	Path        string
	ReadErrs    uint64
	WriteErrs   uint64
	Total       uint64
	Used        uint64
	Available   uint64
	Unallocated uint64
	Allocated   uint64
	MaxErrs     int
	Status      int
	RestSize    uint64
	// contains filtered or unexported fields
}

func NewDisk

func NewDisk(path string, restSize uint64, maxErrs int, space *spaceManager) (d *Disk)

func (*Disk) AttachDataPartition

func (d *Disk) AttachDataPartition(dp DataPartition)

func (*Disk) DataPartitionList

func (d *Disk) DataPartitionList() (partitionIds []uint32)

func (*Disk) DetachDataPartition

func (d *Disk) DetachDataPartition(dp DataPartition)

func (*Disk) PartitionCount

func (d *Disk) PartitionCount() int

func (*Disk) RestorePartition

func (d *Disk) RestorePartition(visitor PartitionVisitor)

type DiskMetrics

type DiskMetrics struct {
	Status               int32
	ReadErrs             int32
	WriteErrs            int32
	MaxDiskErrs          int32
	MinRestWeight        int64
	TotalWeight          int64
	RealAvailWeight      int64
	PartitionAvailWeight int64
	Path                 string
}

type DiskUsage

type DiskUsage struct {
	Total       uint64
	Used        uint64
	Available   uint64
	Unallocated uint64
	Allocated   uint64
}

type MembersFileMetas

type MembersFileMetas struct {
	NeedDeleteExtentsTasks   []*storage.FileInfo //generator delete extent file task
	NeedAddExtentsTasks      []*storage.FileInfo //generator add extent file task
	NeedFixExtentSizeTasks   []*storage.FileInfo //generator fixSize file task
	NeedDeleteObjectsTasks   map[int][]byte      //generator deleteObject on blob file task
	NeedFixBlobFileSizeTasks []*storage.FileInfo
	// contains filtered or unexported fields
}

every datapartion file metas used for auto repairt

func NewMemberFileMetas

func NewMemberFileMetas() (mf *MembersFileMetas)

type MessageHandler

type MessageHandler struct {
	// contains filtered or unexported fields
}

func NewMsgHandler

func NewMsgHandler(inConn *net.TCPConn) *MessageHandler

func (*MessageHandler) AllocateNextConn

func (msgH *MessageHandler) AllocateNextConn(pkg *Packet) (err error)

func (*MessageHandler) ClearReplys

func (msgH *MessageHandler) ClearReplys()

func (*MessageHandler) ClearReqs

func (msgH *MessageHandler) ClearReqs(s *DataNode)

func (*MessageHandler) DelListElement

func (msgH *MessageHandler) DelListElement(reply *Packet, e *list.Element, isForClose bool)

func (*MessageHandler) ExitSign

func (msgH *MessageHandler) ExitSign()

func (*MessageHandler) GetListElement

func (msgH *MessageHandler) GetListElement() (e *list.Element)

func (*MessageHandler) PushListElement

func (msgH *MessageHandler) PushListElement(e *Packet)

func (*MessageHandler) RenewList

func (msgH *MessageHandler) RenewList(isHeadNode bool)

func (*MessageHandler) Stop

func (msgH *MessageHandler) Stop()

type Packet

type Packet struct {
	proto.Packet
	NextConn      *net.TCPConn
	NextAddr      string
	IsReturn      bool
	DataPartition DataPartition
	// contains filtered or unexported fields
}

func NewBlobStoreGetAllWaterMarker

func NewBlobStoreGetAllWaterMarker(partitionId uint32) (p *Packet)

func NewExtentStoreGetAllWaterMarker

func NewExtentStoreGetAllWaterMarker(partitionId uint32) (p *Packet)

func NewNotifyBlobRepair

func NewNotifyBlobRepair(partitionId uint32) (p *Packet)

func NewNotifyCompactPkg

func NewNotifyCompactPkg(fileID, datapartitionId uint32) (p *Packet)

func NewNotifyExtentRepair

func NewNotifyExtentRepair(partitionId uint32) (p *Packet)

func NewPacket

func NewPacket() (p *Packet)

func NewStreamBlobFileRepairReadPacket

func NewStreamBlobFileRepairReadPacket(partitionId uint32, blobfileId int) (p *Packet)

func NewStreamReadPacket

func NewStreamReadPacket(partitionId uint32, extentId, offset, size int) (p *Packet)

func (*Packet) CheckCrc

func (p *Packet) CheckCrc() (err error)

func (*Packet) ClassifyErrorOp

func (p *Packet) ClassifyErrorOp(errLog string, errMsg string)

func (*Packet) CopyFrom

func (p *Packet) CopyFrom(src *Packet)

func (*Packet) GetNextAddr

func (p *Packet) GetNextAddr(addrs []string) error

func (*Packet) IsCreateFileOperation

func (p *Packet) IsCreateFileOperation() bool

func (*Packet) IsErrPack

func (p *Packet) IsErrPack() bool

func (*Packet) IsExtentWritePacket

func (p *Packet) IsExtentWritePacket() bool

func (*Packet) IsMarkDeleteOperation

func (p *Packet) IsMarkDeleteOperation() bool

func (*Packet) IsMarkDeleteReq

func (p *Packet) IsMarkDeleteReq() bool

func (*Packet) IsMasterCommand

func (p *Packet) IsMasterCommand() bool

func (*Packet) IsReadOperation

func (p *Packet) IsReadOperation() bool

func (*Packet) IsTailNode

func (p *Packet) IsTailNode() (ok bool)

func (*Packet) IsTransitPkg

func (p *Packet) IsTransitPkg() bool

func (*Packet) IsWriteOperation

func (p *Packet) IsWriteOperation() bool

func (*Packet) PackErrorBody

func (p *Packet) PackErrorBody(action, msg string)

func (*Packet) ReadFromConnFromCli

func (p *Packet) ReadFromConnFromCli(c net.Conn, deadlineTime time.Duration) (err error)

func (*Packet) ReadFull

func (p *Packet) ReadFull(c net.Conn, readSize int) (err error)

func (*Packet) UnmarshalAddrs

func (p *Packet) UnmarshalAddrs() (addrs []string, err error)

type PartitionVisitor

type PartitionVisitor func(dp DataPartition)

type RepairBlobFileTask

type RepairBlobFileTask struct {
	BlobFileId int
	StartObj   uint64
	EndObj     uint64
}

func (*RepairBlobFileTask) ToString

func (repairTask *RepairBlobFileTask) ToString() string

type SpaceManager

type SpaceManager interface {
	LoadDisk(path string, restSize uint64, maxErrs int) (err error)
	GetDisk(path string) (d *Disk, err error)
	GetPartition(partitionId uint32) (dp DataPartition)
	Stats() *Stats
	GetDisks() []*Disk
	CreatePartition(volId string, partitionId uint32, storeSize int, storeType string) (DataPartition, error)
	DeletePartition(partitionId uint32)
	RangePartitions(f func(partition DataPartition) bool)
	Stop()
}

func NewSpaceManager

func NewSpaceManager(rack string) SpaceManager

type Stats

type Stats struct {
	Zone                            string
	CurrentConns                    int64
	ClusterID                       string
	TcpAddr                         string
	Start                           time.Time
	Total                           uint64
	Used                            uint64
	Available                       uint64
	CreatedPartitionWeights         uint64 //dataPartitionCnt*dataPartitionSize
	RemainWeightsForCreatePartition uint64 //all-useddataPartitionsWieghts
	CreatedPartitionCnt             uint64
	MaxWeightsForCreatePartition    uint64

	sync.Mutex
	// contains filtered or unexported fields
}

various metrics such free and total storage space, traffic, etc

func NewStats

func NewStats(zone string) (s *Stats)

func (*Stats) AddConnection

func (s *Stats) AddConnection()

func (*Stats) AddInDataSize

func (s *Stats) AddInDataSize(size uint64)

func (*Stats) AddOutDataSize

func (s *Stats) AddOutDataSize(size uint64)

func (*Stats) GetConnectionNum

func (s *Stats) GetConnectionNum() int64

func (*Stats) RemoveConnection

func (s *Stats) RemoveConnection()

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL