Documentation ¶
Index ¶
- Constants
- Variables
- type CompactTask
- type DataNode
- type DataPartition
- type DataPartitionMetrics
- type Disk
- type DiskMetrics
- type DiskUsage
- type MembersFileMetas
- type MessageHandler
- func (msgH *MessageHandler) AllocateNextConn(pkg *Packet) (err error)
- func (msgH *MessageHandler) ClearReplys()
- func (msgH *MessageHandler) ClearReqs(s *DataNode)
- func (msgH *MessageHandler) DelListElement(reply *Packet, e *list.Element, isForClose bool)
- func (msgH *MessageHandler) ExitSign()
- func (msgH *MessageHandler) GetListElement() (e *list.Element)
- func (msgH *MessageHandler) PushListElement(e *Packet)
- func (msgH *MessageHandler) RenewList(isHeadNode bool)
- func (msgH *MessageHandler) Stop()
- type Packet
- func NewBlobStoreGetAllWaterMarker(partitionId uint32) (p *Packet)
- func NewExtentStoreGetAllWaterMarker(partitionId uint32) (p *Packet)
- func NewNotifyBlobRepair(partitionId uint32) (p *Packet)
- func NewNotifyCompactPkg(fileID, datapartitionId uint32) (p *Packet)
- func NewNotifyExtentRepair(partitionId uint32) (p *Packet)
- func NewPacket() (p *Packet)
- func NewStreamBlobFileRepairReadPacket(partitionId uint32, blobfileId int) (p *Packet)
- func NewStreamReadPacket(partitionId uint32, extentId, offset, size int) (p *Packet)
- func (p *Packet) CheckCrc() (err error)
- func (p *Packet) ClassifyErrorOp(errLog string, errMsg string)
- func (p *Packet) CopyFrom(src *Packet)
- func (p *Packet) GetNextAddr(addrs []string) error
- func (p *Packet) IsCreateFileOperation() bool
- func (p *Packet) IsErrPack() bool
- func (p *Packet) IsExtentWritePacket() bool
- func (p *Packet) IsMarkDeleteOperation() bool
- func (p *Packet) IsMarkDeleteReq() bool
- func (p *Packet) IsMasterCommand() bool
- func (p *Packet) IsReadOperation() bool
- func (p *Packet) IsTailNode() (ok bool)
- func (p *Packet) IsTransitPkg() bool
- func (p *Packet) IsWriteOperation() bool
- func (p *Packet) PackErrorBody(action, msg string)
- func (p *Packet) ReadFromConnFromCli(c net.Conn, deadlineTime time.Duration) (err error)
- func (p *Packet) ReadFull(c net.Conn, readSize int) (err error)
- func (p *Packet) UnmarshalAddrs() (addrs []string, err error)
- type PartitionVisitor
- type RepairBlobFileTask
- type SpaceManager
- type Stats
Constants ¶
View Source
const ( PkgRepairCReadRespMaxSize = 15 * util.MB PkgRepairCReadRespLimitSize = 10 * util.MB )
View Source
const ( Standby uint32 = iota Start Running Shutdown Stopped )
View Source
const ( ActionSendToNext = "ActionSendToNext" LocalProcessAddr = "LocalProcess" ActionReceiveFromNext = "ActionReceiveFromNext" ActionStreamRead = "ActionStreamRead" ActionWriteToCli = "ActionWriteToCli" ActionGetDataPartitionMetrics = "ActionGetDataPartitionMetrics" ActionCheckAndAddInfos = "ActionCheckAndAddInfos" ActionCheckBlobFileInfo = "ActionCheckBlobFileInfo" ActionPostToMaster = "ActionPostToMaster" ActionFollowerRequireBlobFileRepairCmd = "ActionFollowerRequireBlobFileRepairCmd" ActionLeaderToFollowerOpRepairReadPackBuffer = "ActionLeaderToFollowerOpRepairReadPackBuffer" ActionLeaderToFollowerOpRepairReadSendPackBuffer = "ActionLeaderToFollowerOpRepairReadSendPackBuffer" ActionGetFollowers = "ActionGetFollowers" ActionCheckReplyAvail = "ActionCheckReplyAvail" )
View Source
const ( ReportToMonitorRole = 1 ReportToSelfRole = 3 )
stats
View Source
const ( InFlow = iota OutFlow )
View Source
const ( CanCompact = 1 NotCanCompact = -1 )
View Source
const ( NoFlag = 0 ReadFlag = 1 WriteFlag = 2 MaxActiveExtents = 50000 )
pack cmd response
View Source
const ( LogHeartbeat = "HB:" LogStats = "Stats:" LogLoad = "Load:" LogExit = "Exit:" LogShutdown = "Shutdown:" LogCreatePartition = "CRV:" LogCreateFile = "CRF:" LogDelPartition = "DELV:" LogDelFile = "DELF:" LogMarkDel = "MDEL:" LogPartitionSnapshot = "Snapshot:" LogGetWm = "WM:" LogGetAllWm = "AllWM:" LogCompactBlobFile = "CompactBlobFile:" LogWrite = "WR:" LogRead = "RD:" LogRepairRead = "RRD:" LogStreamRead = "SRD:" LogRepairNeedles = "RN:" LogRepair = "Repair:" LogChecker = "Checker:" LogTask = "Master Task:" LogGetFlow = "GetFlowInfo:" )
View Source
const ( DataPartitionPrefix = "datapartition" DataPartitionMetaFileName = "META" TimeLayout = "2006-01-02 15:04:05" )
View Source
const ( GetIpFromMaster = master.AdminGetIp DefaultRackName = "huitian_rack1" )
View Source
const ( ConfigKeyPort = "port" // int ConfigKeyClusterID = "clusterID" // string ConfigKeyMasterAddr = "masterAddr" // array ConfigKeyRack = "rack" // string ConfigKeyDisks = "disks" // array )
View Source
const (
CompactThreadNum = 4
)
View Source
const (
ConnIsNullErr = "ConnIsNullErr"
)
View Source
const (
MaxRepairBlobFileCount = 3
)
View Source
const (
NetType = "tcp"
)
View Source
const (
ObjectIDSize = 8
)
View Source
const (
RequestChanSize = 10240
)
View Source
const (
UmpModuleName = "dataNode"
)
View Source
const (
VerifyBlobFile = 1
)
Variables ¶
View Source
var ( ErrBadNodes = errors.New("BadNodesErr") ErrArgLenMismatch = errors.New("ArgLenMismatchErr") ErrAddrsNodesMismatch = errors.New("AddrsNodesMismatchErr") )
View Source
var ( ErrStoreTypeMismatch = errors.New("store type error") ErrPartitionNotExist = errors.New("dataPartition not exists") ErrBlobFileOffsetMismatch = errors.New("blobfile offset not mismatch") ErrNoDiskForCreatePartition = errors.New("no disk for create dataPartition") ErrBadConfFile = errors.New("bad config file") LocalIP string MasterHelper = util.NewMasterHelper() )
View Source
var (
AdminGetDataPartition = master.AdminGetDataPartition
)
View Source
var (
ErrDiskCompactChanFull = errors.New("disk compact chan is full")
)
View Source
var (
ErrorUnknownOp = errors.New("unknown opcode")
)
View Source
var (
GcanCompact int
)
View Source
var (
// Regexp pattern for data partition dir name validate.
RegexpDataPartitionDir, _ = regexp.Compile("^datapartition_(\\d)+_(\\d)+$")
)
Functions ¶
This section is empty.
Types ¶
type CompactTask ¶
type CompactTask struct {
// contains filtered or unexported fields
}
type DataNode ¶
type DataNode struct {
// contains filtered or unexported fields
}
func (*DataNode) AddCompactTask ¶
func (s *DataNode) AddCompactTask(t *CompactTask) (err error)
type DataPartition ¶
type DataPartition interface { ID() uint32 Path() string IsLeader() bool ReplicaHosts() []string Disk() *Disk Size() int Used() int Available() int Status() int ChangeStatus(status int) GetExtentStore() *storage.ExtentStore GetBlobStore() *storage.BlobStore GetObjects(blobfileID uint32, startOid, lastOid uint64) (objects []*storage.Object) PackObject(dataBuf []byte, o *storage.Object, blobfileID uint32) (err error) DelObjects(blobfileId uint32, deleteBuf []byte) (err error) LaunchRepair() MergeExtentStoreRepair(metas *MembersFileMetas) MergeBlobStoreRepair(metas *MembersFileMetas) FlushDelete() error AddWriteMetrics(latency uint64) AddReadMetrics(latency uint64) Stop() }
func CreateDataPartition ¶
func LoadDataPartition ¶
func LoadDataPartition(partitionDir string, disk *Disk) (dp DataPartition, err error)
LoadDataPartition load and returns partition instance from specified directory. This method will read the partition meta file stored under the specified directory and create partition instance.
type DataPartitionMetrics ¶
type DataPartitionMetrics struct { WriteCnt uint64 ReadCnt uint64 SumWriteLatency uint64 SumReadLatency uint64 WriteLatency float64 ReadLatency float64 // contains filtered or unexported fields }
func NewDataPartitionMetrics ¶
func NewDataPartitionMetrics() *DataPartitionMetrics
func (*DataPartitionMetrics) AddReadMetrics ¶
func (metrics *DataPartitionMetrics) AddReadMetrics(latency uint64)
func (*DataPartitionMetrics) AddWriteMetrics ¶
func (metrics *DataPartitionMetrics) AddWriteMetrics(latency uint64)
func (*DataPartitionMetrics) GetReadLatency ¶
func (metrics *DataPartitionMetrics) GetReadLatency() float64
func (*DataPartitionMetrics) GetWriteLatency ¶
func (metrics *DataPartitionMetrics) GetWriteLatency() float64
type Disk ¶
type Disk struct { sync.RWMutex Path string ReadErrs uint64 WriteErrs uint64 Total uint64 Used uint64 Available uint64 Unallocated uint64 Allocated uint64 MaxErrs int Status int RestSize uint64 // contains filtered or unexported fields }
func (*Disk) AttachDataPartition ¶
func (d *Disk) AttachDataPartition(dp DataPartition)
func (*Disk) DataPartitionList ¶
func (*Disk) DetachDataPartition ¶
func (d *Disk) DetachDataPartition(dp DataPartition)
func (*Disk) PartitionCount ¶
func (*Disk) RestorePartition ¶
func (d *Disk) RestorePartition(visitor PartitionVisitor)
type DiskMetrics ¶
type MembersFileMetas ¶
type MembersFileMetas struct { NeedDeleteExtentsTasks []*storage.FileInfo //generator delete extent file task NeedAddExtentsTasks []*storage.FileInfo //generator add extent file task NeedFixExtentSizeTasks []*storage.FileInfo //generator fixSize file task NeedDeleteObjectsTasks map[int][]byte //generator deleteObject on blob file task NeedFixBlobFileSizeTasks []*storage.FileInfo // contains filtered or unexported fields }
every datapartion file metas used for auto repairt
func NewMemberFileMetas ¶
func NewMemberFileMetas() (mf *MembersFileMetas)
type MessageHandler ¶
type MessageHandler struct {
// contains filtered or unexported fields
}
func NewMsgHandler ¶
func NewMsgHandler(inConn *net.TCPConn) *MessageHandler
func (*MessageHandler) AllocateNextConn ¶
func (msgH *MessageHandler) AllocateNextConn(pkg *Packet) (err error)
func (*MessageHandler) ClearReplys ¶
func (msgH *MessageHandler) ClearReplys()
func (*MessageHandler) ClearReqs ¶
func (msgH *MessageHandler) ClearReqs(s *DataNode)
func (*MessageHandler) DelListElement ¶
func (msgH *MessageHandler) DelListElement(reply *Packet, e *list.Element, isForClose bool)
func (*MessageHandler) ExitSign ¶
func (msgH *MessageHandler) ExitSign()
func (*MessageHandler) GetListElement ¶
func (msgH *MessageHandler) GetListElement() (e *list.Element)
func (*MessageHandler) PushListElement ¶
func (msgH *MessageHandler) PushListElement(e *Packet)
func (*MessageHandler) RenewList ¶
func (msgH *MessageHandler) RenewList(isHeadNode bool)
func (*MessageHandler) Stop ¶
func (msgH *MessageHandler) Stop()
type Packet ¶
type Packet struct { proto.Packet NextConn *net.TCPConn NextAddr string IsReturn bool DataPartition DataPartition // contains filtered or unexported fields }
func NewNotifyBlobRepair ¶
func NewNotifyCompactPkg ¶
func NewNotifyExtentRepair ¶
func NewStreamReadPacket ¶
func (*Packet) ClassifyErrorOp ¶
func (*Packet) GetNextAddr ¶
func (*Packet) IsCreateFileOperation ¶
func (*Packet) IsExtentWritePacket ¶
func (*Packet) IsMarkDeleteOperation ¶
func (*Packet) IsMarkDeleteReq ¶
func (*Packet) IsMasterCommand ¶
func (*Packet) IsReadOperation ¶
func (*Packet) IsTailNode ¶
func (*Packet) IsTransitPkg ¶
func (*Packet) IsWriteOperation ¶
func (*Packet) PackErrorBody ¶
func (*Packet) ReadFromConnFromCli ¶
func (*Packet) UnmarshalAddrs ¶
type PartitionVisitor ¶
type PartitionVisitor func(dp DataPartition)
type RepairBlobFileTask ¶
func (*RepairBlobFileTask) ToString ¶
func (repairTask *RepairBlobFileTask) ToString() string
type SpaceManager ¶
type SpaceManager interface { LoadDisk(path string, restSize uint64, maxErrs int) (err error) GetDisk(path string) (d *Disk, err error) GetPartition(partitionId uint32) (dp DataPartition) Stats() *Stats GetDisks() []*Disk CreatePartition(volId string, partitionId uint32, storeSize int, storeType string) (DataPartition, error) DeletePartition(partitionId uint32) RangePartitions(f func(partition DataPartition) bool) Stop() }
func NewSpaceManager ¶
func NewSpaceManager(rack string) SpaceManager
type Stats ¶
type Stats struct { Zone string CurrentConns int64 ClusterID string TcpAddr string Start time.Time Total uint64 Used uint64 Available uint64 CreatedPartitionWeights uint64 //dataPartitionCnt*dataPartitionSize RemainWeightsForCreatePartition uint64 //all-useddataPartitionsWieghts CreatedPartitionCnt uint64 MaxWeightsForCreatePartition uint64 sync.Mutex // contains filtered or unexported fields }
various metrics such free and total storage space, traffic, etc
func (*Stats) AddConnection ¶
func (s *Stats) AddConnection()
func (*Stats) AddInDataSize ¶
func (*Stats) AddOutDataSize ¶
func (*Stats) GetConnectionNum ¶
func (*Stats) RemoveConnection ¶
func (s *Stats) RemoveConnection()
Click to show internal directories.
Click to hide internal directories.