access

package
v0.0.0-...-303e327 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 22, 2023 License: Apache-2.0 Imports: 44 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewHandler

func NewHandler(service *Service) *rpc.Router

NewHandler returns app server handler

Types

type CodeModePair

type CodeModePair struct {
	Policy codemode.Policy
	Tactic codemode.Tactic
}

CodeModePair codemode with pair of tactic and policy

type CodeModePairs

type CodeModePairs map[codemode.CodeMode]CodeModePair

CodeModePairs map of CodeModePair

func (CodeModePairs) SelectCodeMode

func (c CodeModePairs) SelectCodeMode(size int64) codemode.CodeMode

SelectCodeMode select codemode by size

type Config

type Config struct {
	cmd.Config

	ServiceRegister consul.Config `json:"service_register"`
	Stream          StreamConfig  `json:"stream"`
	Limit           LimitConfig   `json:"limit"`
}

Config service configs

type Handler

type Handler struct {
	StreamConfig
	// contains filtered or unexported fields
}

Handler stream handler

func (*Handler) Admin

func (h *Handler) Admin() interface{}

Admin returns internal admin interface.

func (*Handler) Alloc

func (h *Handler) Alloc(ctx context.Context, size uint64, blobSize uint32,
	assignClusterID proto.ClusterID, codeMode codemode.CodeMode) (*access.Location, error)

Alloc access interface /alloc

required: size, file size
optional: blobSize > 0, alloc with blobSize
          assignClusterID > 0, assign to alloc in this cluster certainly
          codeMode > 0, alloc in this codemode
return: a location of file

func (*Handler) Delete

func (h *Handler) Delete(ctx context.Context, location *access.Location) error

Delete delete all blobs in this location

func (*Handler) Get

func (h *Handler) Get(ctx context.Context, w io.Writer, location access.Location, readSize, offset uint64) (func() error, error)

Get read file

   required: location, readSize
   optional: offset(default is 0)

   first return value is data transfer to copy data after argument checking

Read data shards firstly, if blob size is small or read few bytes
then ec reconstruct-read, try to reconstruct from N+X to N+M
Just read essential bytes in each shard when reconstruct-read.

sorted N+X is, such as we use mode EC6P10L2, X=2 and Read from idc=2
shards like this
            data N 6        |    parity M 10     | local L 2
      d1  d2  d3  d4  d5  d6  p1 .. p5  p6 .. p10  l1  l2
 idc   1   1   1   2   2   2     1         2        1   2

sorted d4 d5 d6 p6 .. p10 d1 d2 d3 p1 .. p5 read-1 [d4 p10] read-2 [d4 p10 d1] read-3 [d4 p10 d1 d2] ... read-9 [d4 p5] failed

func (*Handler) Put

func (h *Handler) Put(ctx context.Context, rc io.Reader, size int64,
	hasherMap access.HasherMap) (*access.Location, error)

Put put one object

required: size, file size
optional: hasher map to calculate hash.Hash

func (*Handler) PutAt

func (h *Handler) PutAt(ctx context.Context, rc io.Reader,
	clusterID proto.ClusterID, vid proto.Vid, bid proto.BlobID, size int64,
	hasherMap access.HasherMap) error

PutAt access interface /putat, put one blob

required: rc file reader
required: clusterID VolumeID BlobID
required: size, one blob size
optional: hasherMap, computing hash

type LimitConfig

type LimitConfig struct {
	NameRps    map[string]int `json:"name_rps"`    // request with name n/s
	ReaderMBps int            `json:"reader_mbps"` // read with MB/s
	WriterMBps int            `json:"writer_mbps"` // write with MB/s
}

LimitConfig configuration of limiter

type Limiter

type Limiter interface {
	// Acquire acquire with one request per second
	Acquire(name string) error
	// Release release of one request per second
	Release(name string)

	// Reader return io.Reader with bandwidth rate limit
	Reader(ctx context.Context, r io.Reader) io.Reader
	// Writer return io.Writer with bandwidth rate limit
	Writer(ctx context.Context, w io.Writer) io.Writer

	// Status returns running status
	// TODO: calculate rate limit wait concurrent
	Status() Status
}

Limiter rps and bps limiter

func NewLimiter

func NewLimiter(cfg LimitConfig) Limiter

NewLimiter returns a Limiter

type Reader

type Reader struct {
	// contains filtered or unexported fields
}

Reader limited reader

func (*Reader) Read

func (r *Reader) Read(p []byte) (n int, err error)

type Service

type Service struct {
	// contains filtered or unexported fields
}

Service rpc service

func New

func New(cfg Config) *Service

New returns an access service

func (*Service) Alloc

func (s *Service) Alloc(c *rpc.Context)

Alloc alloc one location

func (*Service) Close

func (s *Service) Close()

Close close server

func (*Service) Delete

func (s *Service) Delete(c *rpc.Context)

Delete all blobs in this location

func (*Service) DeleteBlob

func (s *Service) DeleteBlob(c *rpc.Context)

DeleteBlob delete one blob

func (*Service) Get

func (s *Service) Get(c *rpc.Context)

Get read file

func (*Service) Limit

func (s *Service) Limit(c *rpc.Context)

Limit rps controller

func (*Service) Put

func (s *Service) Put(c *rpc.Context)

Put one object

func (*Service) PutAt

func (s *Service) PutAt(c *rpc.Context)

PutAt put one blob

func (*Service) RegisterAdminHandler

func (s *Service) RegisterAdminHandler()

RegisterAdminHandler register admin handler to profile

func (*Service) RegisterService

func (s *Service) RegisterService()

RegisterService register service to rpc

func (*Service) Sign

func (s *Service) Sign(c *rpc.Context)

Sign generate crc with locations

type Status

type Status struct {
	Config    LimitConfig    `json:"config"`     // configuration status
	Running   map[string]int `json:"running"`    // running request
	ReadWait  int            `json:"read_wait"`  // wait reading duration
	WriteWait int            `json:"write_wait"` // wait writing duration
}

Status running status

type StreamConfig

type StreamConfig struct {
	IDC string `json:"idc"`

	MaxBlobSize                uint32 `json:"max_blob_size"`
	DiskPunishIntervalS        int    `json:"disk_punish_interval_s"`
	DiskTimeoutPunishIntervalS int    `json:"disk_timeout_punish_interval_s"`
	ServicePunishIntervalS     int    `json:"service_punish_interval_s"`
	AllocRetryTimes            int    `json:"alloc_retry_times"`
	AllocRetryIntervalMS       int    `json:"alloc_retry_interval_ms"`
	EncoderEnableVerify        bool   `json:"encoder_enableverify"`
	EncoderConcurrency         int    `json:"encoder_concurrency"`
	MinReadShardsX             int    `json:"min_read_shards_x"`
	ShardCrcDisabled           bool   `json:"shard_crc_disabled"`

	MemPoolSizeClasses map[int]int `json:"mem_pool_size_classes"`

	// CodeModesPutQuorums
	// just for one AZ is down, cant write quorum in all AZs
	CodeModesPutQuorums map[codemode.CodeMode]int `json:"code_mode_put_quorums"`

	ClusterConfig  controller.ClusterConfig `json:"cluster_config"`
	BlobnodeConfig blobnode.Config          `json:"blobnode_config"`
	ProxyConfig    proxy.Config             `json:"proxy_config"`

	// hystrix command config
	AllocCommandConfig hystrix.CommandConfig `json:"alloc_command_config"`
	RWCommandConfig    hystrix.CommandConfig `json:"rw_command_config"`
}

StreamConfig access stream handler config

type StreamHandler

type StreamHandler interface {
	// Alloc access interface /alloc
	//     required: size, file size
	//     optional: blobSize > 0, alloc with blobSize
	//               assignClusterID > 0, assign to alloc in this cluster certainly
	//               codeMode > 0, alloc in this codemode
	//     return: a location of file
	Alloc(ctx context.Context, size uint64, blobSize uint32,
		assignClusterID proto.ClusterID, codeMode codemode.CodeMode) (*access.Location, error)

	// PutAt access interface /putat, put one blob
	//     required: rc file reader
	//     required: clusterID VolumeID BlobID
	//     required: size, one blob size
	//     optional: hasherMap, computing hash
	PutAt(ctx context.Context, rc io.Reader,
		clusterID proto.ClusterID, vid proto.Vid, bid proto.BlobID, size int64, hasherMap access.HasherMap) error

	// Put put one object
	//     required: size, file size
	//     optional: hasher map to calculate hash.Hash
	Put(ctx context.Context, rc io.Reader, size int64, hasherMap access.HasherMap) (*access.Location, error)

	// Get read file
	//     required: location, readSize
	//     optional: offset(default is 0)
	//
	//     first return value is data transfer to copy data after argument checking
	//
	//  Read data shards firstly, if blob size is small or read few bytes
	//  then ec reconstruct-read, try to reconstruct from N+X to N+M
	//
	//  sorted N+X is, such as we use mode EC6P10L2, X=2 and Read from idc=2
	//  shards like this
	//              data N 6        |    parity M 10     | local L 2
	//        d1  d2  d3  d4  d5  d6  p1 .. p5  p6 .. p10  l1  l2
	//   idc   1   1   1   2   2   2     1         2        1   2
	//
	//sorted  d4  d5  d6  p6 .. p10  d1  d2  d3  p1 .. p5
	//read-1 [d4                p10]
	//read-2 [d4                p10  d1]
	//read-3 [d4                p10  d1  d2]
	//...
	//read-9 [d4                                       p5]
	//failed
	Get(ctx context.Context, w io.Writer, location access.Location, readSize, offset uint64) (func() error, error)

	// Delete delete all blobs in this location
	Delete(ctx context.Context, location *access.Location) error

	// Admin returns internal admin interface.
	Admin() interface{}
}

StreamHandler stream http handler

func NewStreamHandler

func NewStreamHandler(cfg *StreamConfig, stopCh <-chan struct{}) StreamHandler

NewStreamHandler returns a stream handler

type Writer

type Writer struct {
	// contains filtered or unexported fields
}

Writer limited writer

func (*Writer) Write

func (w *Writer) Write(p []byte) (n int, err error)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL