ec

package
v1.3.19 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 6, 2023 License: MIT Imports: 30 Imported by: 0

Documentation

Overview

Package ec provides erasure coding (EC) based data protection for AIStore.

  • Copyright (c) 2018-2023, NVIDIA CORPORATION. All rights reserved.

Package ec provides erasure coding (EC) based data protection for AIStore.

  • Copyright (c) 2018-2023, NVIDIA CORPORATION. All rights reserved.

Package ec provides erasure coding (EC) based data protection for AIStore.

  • Copyright (c) 2018-2023, NVIDIA CORPORATION. All rights reserved.

Package ec provides erasure coding (EC) based data protection for AIStore.

* Copyright (c) 2018-2023, NVIDIA CORPORATION. All rights reserved.

Package ec provides erasure coding (EC) based data protection for AIStore.

  • Copyright (c) 2018-2023, NVIDIA CORPORATION. All rights reserved.

Package ec provides erasure coding (EC) based data protection for AIStore.

  • Copyright (c) 2018-2023, NVIDIA CORPORATION. All rights reserved.

Package ec provides erasure coding (EC) based data protection for AIStore.

  • Copyright (c) 2018-2023, NVIDIA CORPORATION. All rights reserved.

Package ec provides erasure coding (EC) based data protection for AIStore.

  • Copyright (c) 2018-2023, NVIDIA CORPORATION. All rights reserved.

Package ec provides erasure coding (EC) based data protection for AIStore.

* Copyright (c) 2018-2021, NVIDIA CORPORATION. All rights reserved.

Package ec provides erasure coding (EC) based data protection for AIStore.

  • Copyright (c) 2018-2021, NVIDIA CORPORATION. All rights reserved.

Package ec provides erasure coding (EC) based data protection for AIStore.

  • Copyright (c) 2018-2021, NVIDIA CORPORATION. All rights reserved.

Package ec provides erasure coding (EC) based data protection for AIStore.

  • Copyright (c) 2018-2021, NVIDIA CORPORATION. All rights reserved.

Index

Constants

View Source
const (
	ActSplit   = "split"
	ActRestore = "restore"
	ActDelete  = "delete"

	RespStreamName = "ec-resp"
	ReqStreamName  = "ec-req"

	ActClearRequests  = "clear-requests"
	ActEnableRequests = "enable-requests"

	URLCT   = "ct"   // for using in URL path - requests for slices/replicas
	URLMeta = "meta" /// .. - metadata requests

)
View Source
const MDVersionLast = 1 // current version of metadata

Variables

View Source
var (
	ErrorECDisabled = errors.New("EC is disabled for bucket")
	ErrorNoMetafile = errors.New("no metafile")
	ErrorNotFound   = errors.New("not found")
)

Functions

func Init

func Init(t cluster.Target)

func IsECCopy

func IsECCopy(size int64, ecConf *cmn.ECConf) bool

func SliceSize

func SliceSize(fileSize int64, slices int) int64

SliceSize returns the size of one slice that EC will create for the object

func WriteReplicaAndMeta

func WriteReplicaAndMeta(t cluster.Target, lom *cluster.LOM, args *WriteArgs) (err error)

WriteReplicaAndMeta saves replica and its metafile

func WriteSliceAndMeta

func WriteSliceAndMeta(t cluster.Target, hdr *transport.ObjHdr, args *WriteArgs) error

WriteSliceAndMeta saves slice and its metafile

Types

type BckXacts

type BckXacts struct {
	// contains filtered or unexported fields
}

func (*BckXacts) AbortGet

func (xacts *BckXacts) AbortGet()

func (*BckXacts) AbortPut

func (xacts *BckXacts) AbortPut()

func (*BckXacts) Get

func (xacts *BckXacts) Get() *XactGet

func (*BckXacts) Put

func (xacts *BckXacts) Put() *XactPut

func (*BckXacts) Req

func (xacts *BckXacts) Req() *XactRespond

func (*BckXacts) SetGet

func (xacts *BckXacts) SetGet(xctn *XactGet)

func (*BckXacts) SetPut

func (xacts *BckXacts) SetPut(xctn *XactPut)

func (*BckXacts) SetReq

func (xacts *BckXacts) SetReq(xctn *XactRespond)

type ExtECGetStats

type ExtECGetStats struct {
	AvgTime     cos.Duration `json:"ec.decode.ns"`
	ErrCount    int64        `json:"ec.decode.err.n,string"`
	AvgObjTime  cos.Duration `json:"ec.obj.process.ns"`
	AvgQueueLen float64      `json:"ec.queue.len.f"`
	IsIdle      bool         `json:"is_idle"`
}

extended x-ec-get statistics

type ExtECPutStats

type ExtECPutStats struct {
	AvgEncodeTime  cos.Duration `json:"ec.encode.ns"`
	AvgDeleteTime  cos.Duration `json:"ec.delete.ns"`
	EncodeCount    int64        `json:"ec.encode.n,string"`
	DeleteCount    int64        `json:"ec.delete.n,string"`
	EncodeSize     int64        `json:"ec.encode.size,string"`
	EncodeErrCount int64        `json:"ec.encode.err.n,string"`
	DeleteErrCount int64        `json:"ec.delete.err.n,string"`
	AvgObjTime     cos.Duration `json:"ec.obj.process.ns"`
	AvgQueueLen    float64      `json:"ec.queue.len.f"`
	IsIdle         bool         `json:"is_idle"`
}

extended x-ec-put statistics

type Manager

type Manager struct {
	// contains filtered or unexported fields
}
var (
	ECM *Manager
)

func (*Manager) BucketsMDChanged

func (mgr *Manager) BucketsMDChanged() error

func (*Manager) CleanupObject

func (mgr *Manager) CleanupObject(lom *cluster.LOM)

func (*Manager) EncodeObject

func (mgr *Manager) EncodeObject(lom *cluster.LOM, cb ...cluster.OnFinishObj) error

EncodeObject generates slices using Reed-Solom algorithm:

  • lom - object to encode
  • intra - if true, it is internal request and has low priority
  • cb - optional callback that is called after the object is encoded

func (*Manager) ListenSmapChanged

func (mgr *Manager) ListenSmapChanged()

func (*Manager) NewGetXact

func (mgr *Manager) NewGetXact(bck *cmn.Bck) *XactGet

func (*Manager) NewPutXact

func (mgr *Manager) NewPutXact(bck *cmn.Bck) *XactPut

func (*Manager) NewRespondXact

func (mgr *Manager) NewRespondXact(bck *cmn.Bck) *XactRespond

func (*Manager) RestoreBckGetXact

func (mgr *Manager) RestoreBckGetXact(bck *meta.Bck) (xget *XactGet)

func (*Manager) RestoreBckPutXact

func (mgr *Manager) RestoreBckPutXact(bck *meta.Bck) (xput *XactPut)

func (*Manager) RestoreBckRespXact

func (mgr *Manager) RestoreBckRespXact(bck *meta.Bck) (xrsp *XactRespond)

func (*Manager) RestoreObject

func (mgr *Manager) RestoreObject(lom *cluster.LOM) error

func (*Manager) String

func (*Manager) String() string

implementing cluster.Slistener interface

type Metadata

type Metadata struct {
	Size        int64            `json:"obj_size"`      // obj size (after EC'ing sum size of slices differs from the original)
	Generation  int64            `json:"generation"`    // Timestamp when the object was EC'ed
	ObjCksum    string           `json:"obj_cksum"`     // checksum of the original object
	ObjVersion  string           `json:"obj_version"`   // object version
	CksumType   string           `json:"cksum_type"`    // slice checksum type
	CksumValue  string           `json:"slice_cksum"`   // slice checksum of the slice if EC is used
	FullReplica string           `json:"replica_node"`  // daemon ID where full(main) replica is
	Daemons     cos.MapStrUint16 `json:"nodes"`         // Locations of all slices: DaemonID <-> SliceID
	Data        int              `json:"data_slices"`   // the number of data slices
	Parity      int              `json:"parity_slices"` // the number of parity slices
	SliceID     int              `json:"slice_id"`      // 0 for full replica, 1 to N for slices
	MDVersion   uint32           `json:"md_version"`    // Metadata format version
	IsCopy      bool             `json:"is_copy"`       // object is replicated(true) or encoded(false)
}

Metadata - EC information stored in metafiles for every encoded object

func LoadMetadata

func LoadMetadata(fqn string) (*Metadata, error)

LoadMetadata loads and parses EC metadata from a file

func MetaFromReader

func MetaFromReader(reader io.Reader) (*Metadata, error)

func NewMetadata

func NewMetadata() *Metadata

func ObjectMetadata

func ObjectMetadata(bck *meta.Bck, objName string) (*Metadata, error)

ObjectMetadata returns metadata for an object or its slice if any exists

func RequestECMeta

func RequestECMeta(bck *cmn.Bck, objName string, si *meta.Snode, client *http.Client) (md *Metadata, err error)

RequestECMeta returns an EC metadata found on a remote target.

func (*Metadata) Clone

func (md *Metadata) Clone() *Metadata

func (*Metadata) NewPack

func (md *Metadata) NewPack() []byte

Do not use MM.SGL for a byte buffer: as the buffer is sent via HTTP, it can result in hard to debug errors when SGL is freed. For details: https://gitlab-master.nvidia.com/aistorage/aistore/issues/472#note_4212419

func (*Metadata) Pack

func (md *Metadata) Pack(packer *cos.BytePack)

func (*Metadata) PackedSize

func (md *Metadata) PackedSize() int

func (*Metadata) RemoteTargets

func (md *Metadata) RemoteTargets(t cluster.Target) []*meta.Snode

RemoteTargets returns list of Snodes that contain a slice or replica. This target(`t`) is removed from the list.

func (*Metadata) Unpack

func (md *Metadata) Unpack(unpacker *cos.ByteUnpack) (err error)

type RequestsControlMsg

type RequestsControlMsg struct {
	Action string
}

type Stats

type Stats struct {
	// mpathrunner(not ecrunner) queue len
	QueueLen float64
	// time between ecrunner receives an object and mpathrunner starts processing it
	WaitTime time.Duration
	// EC encoding time (for both EC'ed and replicated objects)
	EncodeTime time.Duration
	// size of a file put into encode queue
	EncodeSize int64
	// total number of errors while encoding objects
	EncodeErr int64
	// total number of errors while restoring objects
	DecodeErr int64
	// time to restore an object(for both EC'ed and replicated objects)
	DecodeTime time.Duration
	// time to cleanup object's slices(for both EC'ed and replicated objects)
	DeleteTime time.Duration
	// total number of errors while cleaning up object slices
	DeleteErr int64
	// total object processing time: from putting to ecrunner queue to
	// completing the request by mpathrunner
	ObjTime time.Duration
	// total number of cleanup requests
	DelReq int64
	// total number of restore requests
	GetReq int64
	// total number of encode requests
	PutReq int64
	// name of the bucket
	Bck cmn.Bck
	// xaction state: working or waiting for commands
	IsIdle bool
}

Stats are EC-specific stats for clients-side apps - calculated from raw counters All numbers except number of errors and requests are average ones

func (*Stats) String

func (s *Stats) String() string

type WriteArgs

type WriteArgs struct {
	MD         []byte       // CT's metafile content
	Reader     io.Reader    // CT content
	BID        uint64       // bucket ID
	Cksum      *cos.Cksum   // object checksum
	Generation int64        // EC Generation
	Xact       cluster.Xact // xaction that drives it
}

type XactBckEncode

type XactBckEncode struct {
	xact.Base
	// contains filtered or unexported fields
}

func (*XactBckEncode) Run

func (r *XactBckEncode) Run(wg *sync.WaitGroup)

func (*XactBckEncode) Snap

func (r *XactBckEncode) Snap() (snap *cluster.Snap)

type XactGet

type XactGet struct {
	// contains filtered or unexported fields
}

Erasure coding runner: accepts requests and dispatches them to a correct mountpath runner. Runner uses dedicated to EC memory manager inherited by dependent mountpath runners

func NewGetXact

func NewGetXact(t cluster.Target, bck *cmn.Bck, mgr *Manager) *XactGet

func (*XactGet) ClearRequests

func (r *XactGet) ClearRequests()

ClearRequests disables receiving new EC requests, they will be terminated with error Then it starts draining a channel from pending EC requests It does not enable receiving new EC requests, it has to be done explicitly, when EC is enabled again

func (*XactGet) DispatchResp

func (r *XactGet) DispatchResp(iReq intraReq, hdr *transport.ObjHdr, bck *meta.Bck, reader io.Reader)

func (*XactGet) ECStats

func (r *XactGet) ECStats() *Stats

func (*XactGet) EnableRequests

func (r *XactGet) EnableRequests()

func (*XactGet) Run

func (r *XactGet) Run(*sync.WaitGroup)

func (*XactGet) Snap

func (r *XactGet) Snap() (snap *cluster.Snap)

func (*XactGet) Stop

func (r *XactGet) Stop(err error)

type XactPut

type XactPut struct {
	// contains filtered or unexported fields
}

Erasure coding runner: accepts requests and dispatches them to a correct mountpath runner. Runner uses dedicated to EC memory manager inherited by dependent mountpath runners

func NewPutXact

func NewPutXact(t cluster.Target, bck *cmn.Bck, mgr *Manager) *XactPut

func (*XactPut) ClearRequests

func (r *XactPut) ClearRequests()

ClearRequests disables receiving new EC requests, they will be terminated with error Then it starts draining a channel from pending EC requests It does not enable receiving new EC requests, it has to be done explicitly, when EC is enabled again

func (*XactPut) ECStats

func (r *XactPut) ECStats() *Stats

func (*XactPut) EnableRequests

func (r *XactPut) EnableRequests()

func (*XactPut) Run

func (r *XactPut) Run(*sync.WaitGroup)

func (*XactPut) Snap

func (r *XactPut) Snap() (snap *cluster.Snap)

func (*XactPut) Stop

func (r *XactPut) Stop(err error)

type XactRespond

type XactRespond struct {
	// contains filtered or unexported fields
}

Xaction responsible for responding to EC requests of other targets. Should not be stopped if number of known targets is small.

func NewRespondXact

func NewRespondXact(t cluster.Target, bck *cmn.Bck, mgr *Manager) *XactRespond

func (*XactRespond) DispatchReq

func (r *XactRespond) DispatchReq(iReq intraReq, hdr *transport.ObjHdr, bck *meta.Bck)

DispatchReq is responsible for handling request from other targets

func (*XactRespond) DispatchResp

func (r *XactRespond) DispatchResp(iReq intraReq, hdr *transport.ObjHdr, object io.Reader)

func (*XactRespond) ECStats

func (r *XactRespond) ECStats() *Stats

func (*XactRespond) Run

func (r *XactRespond) Run(*sync.WaitGroup)

func (*XactRespond) Snap

func (r *XactRespond) Snap() *cluster.Snap

(compare w/ XactGet/Put)

func (*XactRespond) Stop

func (r *XactRespond) Stop(err error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL