blobnode

package
v0.0.0-...-303e327 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 22, 2023 License: Apache-2.0 Imports: 57 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DefaultHeartbeatIntervalSec        = 30           // 30 s
	DefaultChunkReportIntervalSec      = 60           // 1 min
	DefaultCleanExpiredStatIntervalSec = 60 * 60      // 60 min
	DefaultChunkGcIntervalSec          = 30 * 60      // 30 min
	DefaultChunkInspectIntervalSec     = 2 * 60 * 60  // 2 hour
	DefaultChunkProtectionPeriodSec    = 48 * 60 * 60 // 48 hour
	DefaultDiskStatusCheckIntervalSec  = 2 * 60       // 2 min

	DefaultPutQpsLimitPerDisk    = 128
	DefaultGetQpsLimitPerDisk    = 512
	DefaultGetQpsLimitPerKey     = 64
	DefaultDeleteQpsLimitPerDisk = 128
)
View Source
const (
	ShardListPageLimit = 65536
	BidBatchReadLimit  = 1024
)
View Source
const (
	TickInterval   = 1
	HeartbeatTicks = 30
	ExpiresTicks   = 60
)
View Source
const (
	TaskInit uint8 = iota + 1
	TaskRunning
	TaskStopping
	TaskSuccess
	TaskStopped
)

task runner status

View Source
const (
	DefaultShutdownTimeout = 60 * time.Second // 60 s
)
View Source
const (
	// StatusInterrupt interrupt error status
	StatusInterrupt = 596
)

Variables

View Source
var (
	ErrNotSupportKey     = errors.New("not support this key")
	ErrValueType         = errors.New("value type not match this key")
	ErrNotConfigPrevious = errors.New("level previously not config")
	ErrNotConfigNow      = errors.New("level not config now")
	ErrValueOutOfLimit   = errors.New("value out of limit")
)
View Source
var (
	ErrNotEnoughWellReplicaCnt = errors.New("well replicas cnt is not enough")
	ErrNotEnoughBidsInTasklet  = errors.New("check len of tasklet and bids is not equal")
	ErrTaskletSizeInvalid      = errors.New("tasklet size is invalid")
	ErrBidSizeOverTaskletSize  = errors.New("bid size is over tasklet size")
	ErrUnexpected              = errors.New("unexpected error when get bench bids")
)
View Source
var (
	// ErrBidMissing bid is missing
	ErrBidMissing = errors.New("bid is missing")
	// ErrBidNotMatch bid not match
	ErrBidNotMatch = errors.New("bid not match")
)
View Source
var ErrNotReadyForMigrate = errors.New("not ready for migrate")

ErrNotReadyForMigrate not ready for migrate

Functions

func AllShardsCanNotDownload

func AllShardsCanNotDownload(shardDownloadFail error) bool

AllShardsCanNotDownload judge whether all shards can download or not accord by download error

func BidsSplit

func BidsSplit(ctx context.Context, bids []*ShardInfoSimple, taskletSize int) ([]Tasklet, *WorkError)

BidsSplit split bids list to many tasklets by taskletSize

func GenMigrateBids

func GenMigrateBids(ctx context.Context, blobnodeCli client.IBlobNode, srcReplicas Vunits,
	dst proto.VunitLocation, mode codemode.CodeMode, badIdxs []uint8,
) (migBids, benchmarkBids []*ShardInfoSimple, wErr *WorkError)

GenMigrateBids generates migrate blob ids

func GetBids

func GetBids(shardMetas []*ShardInfoSimple) []proto.BlobID

GetBids returns bids

func GetReplicasBids

func GetReplicasBids(ctx context.Context, cli client.IBlobNode, replicas Vunits) map[proto.Vuid]*ReplicaBidsRet

GetReplicasBids returns replicas bids info

func NewHandler

func NewHandler(service *Service) *rpc.Router

func ScanShard

func ScanShard(ctx context.Context, cs core.ChunkAPI, startBid proto.BlobID, inspectFunc func(ctx context.Context, batchShards []*bnapi.ShardInfo) (err error)) (err error)

func ShouldReclaim

func ShouldReclaim(e *WorkError) bool

ShouldReclaim returns true if the task should reclaim

Types

type Config

type Config struct {
	cmd.Config
	core.HostInfo
	WorkerConfig
	Disks         []core.Config      `json:"disks"`
	DiskConfig    core.RuntimeConfig `json:"disk_config"`
	MetaConfig    db.MetaConfig      `json:"meta_config"`
	FlockFilename string             `json:"flock_filename"`

	Clustermgr *cmapi.Config `json:"clustermgr"`

	HeartbeatIntervalSec        int `json:"heartbeat_interval_S"`
	ChunkReportIntervalSec      int `json:"chunk_report_interval_S"`
	ChunkGcIntervalSec          int `json:"chunk_gc_interval_S"`
	ChunkInspectIntervalSec     int `json:"chunk_inspect_interval_S"`
	ChunkProtectionPeriodSec    int `json:"chunk_protection_period_S"`
	CleanExpiredStatIntervalSec int `json:"clean_expired_stat_interval_S"`
	DiskStatusCheckIntervalSec  int `json:"disk_status_check_interval_S"`

	PutQpsLimitPerDisk    int `json:"put_qps_limit_per_disk"`
	GetQpsLimitPerDisk    int `json:"get_qps_limit_per_disk"`
	GetQpsLimitPerKey     int `json:"get_qps_limit_per_key"`
	DeleteQpsLimitPerDisk int `json:"delete_qps_limit_per_disk"`
}

type ITaskWorker

type ITaskWorker interface {
	// split tasklets accord by volume benchmark bids
	GenTasklets(ctx context.Context) ([]Tasklet, *WorkError)
	// define tasklet execution operator ,eg:disk repair & migrate
	ExecTasklet(ctx context.Context, t Tasklet) *WorkError
	// check whether the task is executed successfully when volume task finish
	Check(ctx context.Context) *WorkError
	OperateArgs() scheduler.OperateTaskArgs
	TaskType() (taskType proto.TaskType)
	GetBenchmarkBids() []*ShardInfoSimple
}

ITaskWorker define interface used for task execution

func NewMigrateWorker

func NewMigrateWorker(task MigrateTaskEx) ITaskWorker

NewMigrateWorker returns migrate worker

type InspectTaskMgr

type InspectTaskMgr struct {
	// contains filtered or unexported fields
}

InspectTaskMgr inspect task manager

func NewInspectTaskMgr

func NewInspectTaskMgr(concurrency int, bidGetter client.IBlobNode, reporter scheduler.IInspector) *InspectTaskMgr

NewInspectTaskMgr returns inspect task manager

func (*InspectTaskMgr) AddTask

func (mgr *InspectTaskMgr) AddTask(ctx context.Context, task *proto.VolumeInspectTask) error

AddTask adds inspect task

func (*InspectTaskMgr) RunningTaskSize

func (mgr *InspectTaskMgr) RunningTaskSize() int

RunningTaskSize returns running inspect task size

type MigrateTaskEx

type MigrateTaskEx struct {
	// contains filtered or unexported fields
}

MigrateTaskEx migrate task execution machine

type MigrateWorker

type MigrateWorker struct {
	// contains filtered or unexported fields
}

MigrateWorker used to manager migrate task

func (*MigrateWorker) Check

func (w *MigrateWorker) Check(ctx context.Context) *WorkError

Check checks migrate task execute result

func (*MigrateWorker) ExecTasklet

func (w *MigrateWorker) ExecTasklet(ctx context.Context, tasklet Tasklet) *WorkError

ExecTasklet execute migrate tasklet

func (*MigrateWorker) GenTasklets

func (w *MigrateWorker) GenTasklets(ctx context.Context) ([]Tasklet, *WorkError)

GenTasklets generates migrate tasklets

func (*MigrateWorker) GetBenchmarkBids

func (w *MigrateWorker) GetBenchmarkBids() []*ShardInfoSimple

GetBenchmarkBids returns benchmark bids

func (*MigrateWorker) OperateArgs

func (w *MigrateWorker) OperateArgs() scheduler.OperateTaskArgs

OperateArgs args for cancel, complete, reclaim.

func (*MigrateWorker) TaskType

func (w *MigrateWorker) TaskType() (taskType proto.TaskType)

TaskType returns task type

type ReplicaBidsRet

type ReplicaBidsRet struct {
	RetErr error
	Bids   map[proto.BlobID]*client.ShardInfo
}

ReplicaBidsRet with bids info and error message

type Service

type Service struct {
	Disks         map[proto.DiskID]core.DiskAPI
	WorkerService *WorkerService

	// client handler
	ClusterMgrClient *cmapi.Client

	Conf *Config

	// limiter
	PutQpsLimitPerDisk    limit.ResettableLimiter
	GetQpsLimitPerDisk    limit.Limiter
	GetQpsLimitPerKey     limit.Limiter
	DeleteQpsLimitPerKey  limit.Limiter
	DeleteQpsLimitPerDisk limit.ResettableLimiter
	ChunkLimitPerVuid     limit.Limiter
	DiskLimitPerKey       limit.Limiter
	InspectLimiterPerKey  limit.Limiter

	RequestCount int64
	// contains filtered or unexported fields
}

func NewService

func NewService(conf Config) (svr *Service, err error)

func (*Service) ChunkCompact

func (s *Service) ChunkCompact(c *rpc.Context)

* method: POST * url: /chunk/compact/diskid/{diskid}/vuid/{vuid} * request body: json.Marshal(CompactChunkArgs)

func (*Service) ChunkCreate

func (s *Service) ChunkCreate(c *rpc.Context)

* method: POST * url: /chunk/create/diskid/{diskid}/vuid/{vuid}?chunksize={chunksize} * request body: json.Marshal(bnapi.ChunkCreateArgs)

func (*Service) ChunkInspect

func (s *Service) ChunkInspect(c *rpc.Context)

* method: POST * url: /chunk/inspect/diskid/{diskid}/vuid/{vuid}

func (*Service) ChunkList

func (s *Service) ChunkList(c *rpc.Context)

* method: GET * url: /chunk/list/diskid/{diskid}

func (*Service) ChunkReadonly

func (s *Service) ChunkReadonly(c *rpc.Context)

* method: POST * url: /chunk/readonly/diskid/{diskid}/vuid/{vuid}/ * request body: json.Marshal(ChunkArgs)

func (*Service) ChunkReadwrite

func (s *Service) ChunkReadwrite(c *rpc.Context)

* method: POST * url: /chunk/readwrite/diskid/{diskid}/vuid/{vuid} * request body: json.Marshal(ChunkArgs)

func (*Service) ChunkRelease

func (s *Service) ChunkRelease(c *rpc.Context)

* method: POST * url: /chunk/release/diskid/{diskid}/vuid/{vuid} * request body: json.Marshal(ChunkArgs)

func (*Service) ChunkStat

func (s *Service) ChunkStat(c *rpc.Context)

* method: GET * url: /chunk/stat/diskid/{diskid}/vuid/{vuid} * response body: json.Marshal(ChunkInfo)

func (*Service) Close

func (s *Service) Close()

func (*Service) DebugStat

func (s *Service) DebugStat(c *rpc.Context)

* method: GET * url: /debug/stat * response body: json.Marshal([]DiskInfo)

func (*Service) DiskProbe

func (s *Service) DiskProbe(c *rpc.Context)

* method: POST * url: /disk/probe * request body: json.Marshal(DiskProbeArgs)

func (*Service) DiskStat

func (s *Service) DiskStat(c *rpc.Context)

* method: GET * url: /disk/stat/diskid/{diskid} * response body: json.Marshal(DiskInfo)

func (*Service) GcRubbishChunk

func (s *Service) GcRubbishChunk()

func (*Service) ShardDelete

func (s *Service) ShardDelete(c *rpc.Context)

* method: POST * url: /shard/delete/diskid/{diskid}/vuid/{vuid}/bid/{bid} * request body: json.Marshal(deleteArgs)

func (*Service) ShardGet

func (s *Service) ShardGet(c *rpc.Context)

* method: GET * url: /shard/get/diskid/{diskid}/vuid/{vuid}/bid/{bid}?iotype={iotype} * response body: bidData

func (*Service) ShardList

func (s *Service) ShardList(c *rpc.Context)

* method: GET * url: /shard/list/diskid/{diskid}/vuid/{vuid}/startbid/{bid}/status/{status}/count/{count} * response body: Marshal([]*bnapi.ShardInfo)

func (*Service) ShardMarkdelete

func (s *Service) ShardMarkdelete(c *rpc.Context)

* method: POST * url: /shard/markdelete/diskid/{diskid}/vuid/{vuid}/bid/{bid} * request body: json.Marshal(deleteArgs)

func (*Service) ShardPut

func (s *Service) ShardPut(c *rpc.Context)

* method: POST * url: /shard/put/diskid/{diskid}/vuid/{vuid}/bid/{bid}/size/{size}?iotype={iotype} * request body: bidData

func (*Service) ShardStat

func (s *Service) ShardStat(c *rpc.Context)

* method: GET * url: /shard/stat/diskid/{diskid}/vuid/{vuidValue}/bid/{bidValue} * response body: json.Marshal(ShardMeta)

func (*Service) Stat

func (s *Service) Stat(c *rpc.Context)

* method: GET * url: /stat * response body: json.Marshal([]DiskInfo)

type ShardInfoEx

type ShardInfoEx struct {
	// contains filtered or unexported fields
}

ShardInfoEx shard info execution

func (*ShardInfoEx) BadReason

func (shard *ShardInfoEx) BadReason() error

BadReason returns bad reason

func (*ShardInfoEx) IsBad

func (shard *ShardInfoEx) IsBad() bool

IsBad returns true is shard return error

func (*ShardInfoEx) MarkDeleted

func (shard *ShardInfoEx) MarkDeleted() bool

MarkDeleted returns true if no err return and shard is mark deleted

func (*ShardInfoEx) Normal

func (shard *ShardInfoEx) Normal() bool

Normal returns true if no err return and shard status is normal

func (*ShardInfoEx) NotExist

func (shard *ShardInfoEx) NotExist() bool

NotExist returns true if no err return and shard is not exist

func (*ShardInfoEx) ShardSize

func (shard *ShardInfoEx) ShardSize() int64

ShardSize returns shard size

type ShardInfoSimple

type ShardInfoSimple struct {
	Bid  proto.BlobID
	Size int64
}

ShardInfoSimple with blob id and size

func GetBenchmarkBids

func GetBenchmarkBids(ctx context.Context, cli client.IBlobNode, replicas Vunits,
	mode codemode.CodeMode, badIdxs []uint8) (bids []*ShardInfoSimple, err error)

GetBenchmarkBids returns bench mark bids

func MergeBids

func MergeBids(replicasBids map[proto.Vuid]*ReplicaBidsRet) []*ShardInfoSimple

MergeBids merge bids

type ShardInfoWithCrc

type ShardInfoWithCrc struct {
	Bid   proto.BlobID
	Size  int64
	Crc32 uint32
}

ShardInfoWithCrc with blob id and size and crc

func GetSingleVunitNormalBids

func GetSingleVunitNormalBids(ctx context.Context, cli client.IBlobNode, replica proto.VunitLocation) (bids []*ShardInfoWithCrc, err error)

GetSingleVunitNormalBids returns single volume unit bids info

type ShardRecover

type ShardRecover struct {
	// contains filtered or unexported fields
}

ShardRecover used to recover shard data

func NewShardRecover

func NewShardRecover(replicas Vunits, mode codemode.CodeMode, bidInfos []*ShardInfoSimple,
	shardGetter client.IBlobNode, vunitShardGetConcurrency int, taskType proto.TaskType,
) *ShardRecover

NewShardRecover returns shard recover

func (*ShardRecover) GetShard

func (r *ShardRecover) GetShard(idx uint8, bid proto.BlobID) ([]byte, error)

GetShard returns shards data

func (*ShardRecover) RecoverShards

func (r *ShardRecover) RecoverShards(ctx context.Context, repairIdxs []uint8, direct bool) error

RecoverShards recover shards

func (*ShardRecover) ReleaseBuf

func (r *ShardRecover) ReleaseBuf()

ReleaseBuf release chunks shards buffer

type ShardRepairer

type ShardRepairer struct {
	// contains filtered or unexported fields
}

ShardRepairer used to repair shard data

func NewShardRepairer

func NewShardRepairer(cli client.IBlobNode) *ShardRepairer

NewShardRepairer returns shard repairer

func (*ShardRepairer) RepairShard

func (repairer *ShardRepairer) RepairShard(ctx context.Context, task *proto.ShardRepairTask) error

RepairShard repair shard data

type ShardsBuf

type ShardsBuf struct {
	// contains filtered or unexported fields
}

ShardsBuf used to store shard data in memory

func NewShardsBuf

func NewShardsBuf(buf []byte) *ShardsBuf

NewShardsBuf returns shards buffer

func (*ShardsBuf) FetchShard

func (shards *ShardsBuf) FetchShard(bid proto.BlobID) ([]byte, error)

FetchShard returns shard data

func (*ShardsBuf) PlanningDataLayout

func (shards *ShardsBuf) PlanningDataLayout(bids []*ShardInfoSimple) error

PlanningDataLayout planning data layout

func (*ShardsBuf) PutShard

func (shards *ShardsBuf) PutShard(bid proto.BlobID, input io.Reader) error

PutShard put shard data to shardsBuf

func (*ShardsBuf) ShardCrc32

func (shards *ShardsBuf) ShardCrc32(bid proto.BlobID) (crc uint32, err error)

ShardCrc32 returns shard crc32

func (*ShardsBuf) ShardSizeIsZero

func (shards *ShardsBuf) ShardSizeIsZero(bid proto.BlobID) bool

ShardSizeIsZero return true if shard size is zero

type TaskRunner

type TaskRunner struct {
	// contains filtered or unexported fields
}

TaskRunner used to manage task

func NewTaskRunner

func NewTaskRunner(ctx context.Context, taskID string, w ITaskWorker, idc string,
	taskletRunConcurrency int, taskCounter *taskCounter, schedulerCli scheduler.IMigrator) *TaskRunner

NewTaskRunner return task runner

func (*TaskRunner) Alive

func (r *TaskRunner) Alive() bool

Alive returns true if task is alive

func (*TaskRunner) Run

func (r *TaskRunner) Run()

Run runs task

func (*TaskRunner) Stop

func (r *TaskRunner) Stop()

Stop stops task

func (*TaskRunner) Stopped

func (r *TaskRunner) Stopped() bool

Stopped returns true if task is stopped

type TaskRunnerMgr

type TaskRunnerMgr struct {
	// contains filtered or unexported fields
}

TaskRunnerMgr task runner manager

func NewTaskRunnerMgr

func NewTaskRunnerMgr(idc string, meter WorkerConfigMeter, genWorker WorkerGenerator,
	renewalCli, schedulerCli scheduler.IMigrator) *TaskRunnerMgr

NewTaskRunnerMgr returns task runner manager

func (*TaskRunnerMgr) AddTask

func (tm *TaskRunnerMgr) AddTask(ctx context.Context, task MigrateTaskEx) error

AddTask add migrate task.

func (*TaskRunnerMgr) GetAliveTasks

func (tm *TaskRunnerMgr) GetAliveTasks() map[proto.TaskType][]string

GetAliveTasks returns all alive migrate task.

func (*TaskRunnerMgr) RenewalTaskLoop

func (tm *TaskRunnerMgr) RenewalTaskLoop(stopCh <-chan struct{})

RenewalTaskLoop renewal task.

func (*TaskRunnerMgr) RunningTaskCnt

func (tm *TaskRunnerMgr) RunningTaskCnt() map[proto.TaskType]int

RunningTaskCnt return running task count

func (*TaskRunnerMgr) StopAllAliveRunner

func (tm *TaskRunnerMgr) StopAllAliveRunner()

StopAllAliveRunner stops all alive runner

func (*TaskRunnerMgr) TaskStats

func (tm *TaskRunnerMgr) TaskStats() blobnode.WorkerStats

TaskStats task counter result.

type Tasklet

type Tasklet struct {
	// contains filtered or unexported fields
}

Tasklet is the smallest unit of task exe

func (*Tasklet) DataSizeByte

func (t *Tasklet) DataSizeByte() uint64

DataSizeByte returns total bids size

type Vunits

type Vunits []proto.VunitLocation

Vunits volume stripe locations.

func (Vunits) Indexes

func (locs Vunits) Indexes() []uint8

func (Vunits) IntactGlobalSet

func (locs Vunits) IntactGlobalSet(mode codemode.CodeMode, bad []uint8) Vunits

func (Vunits) IsValid

func (locs Vunits) IsValid() bool

func (Vunits) Subset

func (locs Vunits) Subset(idxes []int) Vunits

type WokeErrorType

type WokeErrorType uint8

WokeErrorType worker error type

const (
	DstErr WokeErrorType = iota + 1
	SrcErr
	OtherErr
)

task runner error type

type WorkError

type WorkError struct {
	// contains filtered or unexported fields
}

WorkError with error type and error

func CheckVunit

func CheckVunit(ctx context.Context, expectBids []*ShardInfoSimple, dest proto.VunitLocation, blobnodeCli client.IBlobNode) *WorkError

CheckVunit checks volume unit info

func DstError

func DstError(err error) *WorkError

DstError returns destination error type

func MigrateBids

func MigrateBids(ctx context.Context, shardRecover *ShardRecover, badIdx uint8, destLocation proto.VunitLocation,
	direct bool, bids []*ShardInfoSimple, blobnodeCli client.IBlobNode) *WorkError

MigrateBids migrate the bids data to destination

func OtherError

func OtherError(err error) *WorkError

OtherError returns other error type

func SrcError

func SrcError(err error) *WorkError

SrcError returns source error type

func (*WorkError) Error

func (e *WorkError) Error() string

Error returns error info

func (*WorkError) String

func (e *WorkError) String() string

String return error message with error type and error detail message

type WorkerConfig

type WorkerConfig struct {
	WorkerConfigMeter

	// buffer pool use for migrate and repair shard repair
	BufPoolConf base.BufConfig `json:"buf_pool_conf"`

	// acquire task period
	AcquireIntervalMs int `json:"acquire_interval_ms"`

	// scheduler client config
	Scheduler scheduler.Config `json:"scheduler"`
	// blbonode client config
	BlobNode bnapi.Config `json:"blobnode"`

	DroppedBidRecord *recordlog.Config `json:"dropped_bid_record"`
}

WorkerConfig worker service config

type WorkerConfigMeter

type WorkerConfigMeter struct {
	// max task run count of disk repair & balance & disk drop
	MaxTaskRunnerCnt int `json:"max_task_runner_cnt"`
	// tasklet concurrency of single repair task
	RepairConcurrency int `json:"repair_concurrency"`
	// tasklet concurrency of single balance task
	BalanceConcurrency int `json:"balance_concurrency"`
	// tasklet concurrency of single disk drop task
	DiskDropConcurrency int `json:"disk_drop_concurrency"`
	// tasklet concurrency of single manual migrate task
	ManualMigrateConcurrency int `json:"manual_migrate_concurrency"`
	// shard repair concurrency
	ShardRepairConcurrency int `json:"shard_repair_concurrency"`
	// volume inspect concurrency
	InspectConcurrency int `json:"inspect_concurrency"`

	// batch download concurrency of single tasklet
	DownloadShardConcurrency int `json:"download_shard_concurrency"`
}

WorkerConfigMeter worker controller meter.

type WorkerGenerator

type WorkerGenerator = func(task MigrateTaskEx) ITaskWorker

WorkerGenerator generates task worker.

type WorkerService

type WorkerService struct {
	closer.Closer
	WorkerConfig
	// contains filtered or unexported fields
}

WorkerService worker worker_service

func NewWorkerService

func NewWorkerService(cfg *WorkerConfig, service cmapi.APIService, clusterID proto.ClusterID, idc string) (*WorkerService, error)

NewWorkerService returns rpc worker_service

func (*WorkerService) Run

func (s *WorkerService) Run()

Run runs backend task

func (*WorkerService) ShardRepair

func (s *WorkerService) ShardRepair(c *rpc.Context)

ShardRepair repair shard

func (*WorkerService) WorkerStats

func (s *WorkerService) WorkerStats(c *rpc.Context)

WorkerStats returns worker_service stats

Directories

Path Synopsis
qos

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL