dsort

package
v1.3.22 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 25, 2024 License: MIT Imports: 45 Imported by: 2

Documentation

Overview

Package dsort provides distributed massively parallel resharding for very large datasets.

  • Copyright (c) 2018-2023, NVIDIA CORPORATION. All rights reserved.

Package dsort provides distributed massively parallel resharding for very large datasets.

  • Copyright (c) 2018-2023, NVIDIA CORPORATION. All rights reserved.

Package dsort provides distributed massively parallel resharding for very large datasets.

  • Copyright (c) 2018-2023, NVIDIA CORPORATION. All rights reserved.

Package dsort provides distributed massively parallel resharding for very large datasets.

  • Copyright (c) 2018-2023, NVIDIA CORPORATION. All rights reserved. *

Package dsort provides distributed massively parallel resharding for very large datasets.

  • Copyright (c) 2018-2024, NVIDIA CORPORATION. All rights reserved.

Package dsort provides distributed massively parallel resharding for very large datasets.

  • Copyright (c) 2018-2023, NVIDIA CORPORATION. All rights reserved.

Package dsort provides distributed massively parallel resharding for very large datasets.

  • Copyright (c) 2018-2023, NVIDIA CORPORATION. All rights reserved.

Package dsort provides distributed massively parallel resharding for very large datasets.

  • Copyright (c) 2018-2023, NVIDIA CORPORATION. All rights reserved.

Package dsort provides distributed massively parallel resharding for very large datasets.

  • Copyright (c) 2018-2024, NVIDIA CORPORATION. All rights reserved.

Package dsort provides distributed massively parallel resharding for very large datasets.

  • Copyright (c) 2018-2024, NVIDIA CORPORATION. All rights reserved.

Package dsort provides distributed massively parallel resharding for very large datasets.

  • Copyright (c) 2018-2023, NVIDIA CORPORATION. All rights reserved.

Package dsort provides distributed massively parallel resharding for very large datasets.

  • Copyright (c) 2018-2021, NVIDIA CORPORATION. All rights reserved.

msgp -file <path to dsort/manager_types.go> -tests=false -marshal=false -unexported Code generated by the command above; see docs/msgp.md. DO NOT EDIT.

Package dsort provides APIs for distributed archive file shuffling.

  • Copyright (c) 2018-2021, NVIDIA CORPORATION. All rights reserved.

Package dsort provides distributed massively parallel resharding for very large datasets.

  • Copyright (c) 2018-2023, NVIDIA CORPORATION. All rights reserved.

Package dsort provides APIs for distributed archive file shuffling.

  • Copyright (c) 2018-2023, NVIDIA CORPORATION. All rights reserved.

Package dsort provides distributed massively parallel resharding for very large datasets.

  • Copyright (c) 2018-2023, NVIDIA CORPORATION. All rights reserved.

Index

Constants

View Source
const (
	Alphanumeric = "alphanumeric" // string comparison (decreasing or increasing)
	None         = "none"         // none (used for resharding)
	MD5          = "md5"          // compare md5(name)
	Shuffle      = "shuffle"      // random shuffle (use with the same seed to reproduce)
	Content      = "content"      // extract (int, string, float) from a given file, and compare
)
View Source
const (
	ExtractionPhase = "extraction"
	SortingPhase    = "sorting"
	CreationPhase   = "creation"
)
View Source
const DefaultExt = archive.ExtTar // default shard extension/format/MIME when spec's input_extension is empty
View Source
const (
	GeneralType = "dsort_general"
)
View Source
const (
	MemType = "dsort_mem"
)
View Source
const PrefixJobID = "srt-"

Variables

This section is empty.

Functions

func PabortHandler added in v1.3.19

func PabortHandler(w http.ResponseWriter, r *http.Request)

DELETE /v1/sort/abort

func PgetHandler added in v1.3.19

func PgetHandler(w http.ResponseWriter, r *http.Request)

GET /v1/sort

func Pinit added in v1.3.19

func Pinit(si core.Node, config *cmn.Config)

func PremoveHandler added in v1.3.19

func PremoveHandler(w http.ResponseWriter, r *http.Request)

DELETE /v1/sort

func PstartHandler added in v1.3.19

func PstartHandler(w http.ResponseWriter, r *http.Request, parsc *ParsedReq)

POST /v1/sort

func TargetHandler

func TargetHandler(w http.ResponseWriter, r *http.Request)

[METHOD] /v1/sort

func Tinit added in v1.3.19

func Tinit(tstats stats.Tracker, db kvdb.Driver, config *cmn.Config)

Types

type Algorithm added in v1.3.19

type Algorithm struct {
	// one of the `algorithms` above
	Kind string `json:"kind"`

	// used with two sorting alg-s: Alphanumeric and Content
	Decreasing bool `json:"decreasing"`

	// when sort is a random shuffle
	Seed string `json:"seed"`

	// usage: exclusively for Content sorting
	// e.g.: ".cls" containing sorting key for each record (sample) - see next
	// NOTE: not to confuse with shards "input_extension"
	Ext string `json:"extension"`

	// ditto: Content only
	// `shard.contentKeyTypes` enum values: {"int", "string", "float" }
	ContentKeyType string `json:"content_key_type"`
}

type CreationPhaseMetadata

type CreationPhaseMetadata struct {
	Shards    []*shard.Shard          `msg:"shards"`
	SendOrder map[string]*shard.Shard `msg:"send_order"`
}

func (*CreationPhaseMetadata) DecodeMsg

func (z *CreationPhaseMetadata) DecodeMsg(dc *msgp.Reader) (err error)

DecodeMsg implements msgp.Decodable

func (*CreationPhaseMetadata) EncodeMsg

func (z *CreationPhaseMetadata) EncodeMsg(en *msgp.Writer) (err error)

EncodeMsg implements msgp.Encodable

func (*CreationPhaseMetadata) Msgsize

func (z *CreationPhaseMetadata) Msgsize() (s int)

Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message

type JobInfo

type JobInfo struct {
	ID                string        `json:"id"` // job ID == xact ID (aka managerUUID)
	SrcBck            cmn.Bck       `json:"src-bck"`
	DstBck            cmn.Bck       `json:"dst-bck"`
	StartedTime       time.Time     `json:"started_time,omitempty"`
	FinishTime        time.Time     `json:"finish_time,omitempty"`
	ExtractedDuration time.Duration `json:"started_meta_sorting,omitempty"`
	SortingDuration   time.Duration `json:"started_shard_creation,omitempty"`
	CreationDuration  time.Duration `json:"finished_shard_creation,omitempty"`
	Objs              int64         `json:"loc-objs,string"`  // locally processed
	Bytes             int64         `json:"loc-bytes,string"` //
	Metrics           *Metrics
	Aborted           bool `json:"aborted"`
	Archived          bool `json:"archived"`
}

JobInfo is a struct that contains stats that represent the Dsort run in a list

func (*JobInfo) Aggregate

func (j *JobInfo) Aggregate(other *JobInfo)

func (*JobInfo) IsFinished

func (j *JobInfo) IsFinished() bool

func (*JobInfo) IsRunning

func (j *JobInfo) IsRunning() bool

type LocalExtraction

type LocalExtraction struct {

	// TotalCnt is the number of shards Dsort has to process in total.
	TotalCnt int64 `json:"total_count,string"`
	// ExtractedCnt is the cumulative number of extracted shards. In the
	// end, this should be roughly equal to TotalCnt/#Targets.
	ExtractedCnt int64 `json:"extracted_count,string"`
	// ExtractedSize is uncompressed size of extracted shards.
	ExtractedSize int64 `json:"extracted_size,string"`
	// ExtractedRecordCnt - number of records extracted from all shards.
	ExtractedRecordCnt int64 `json:"extracted_record_count,string"`
	// ExtractedToDiskCnt describes number of shards extracted to the disk. To
	// compute the number shards extracted to memory just subtract it from
	// ExtractedCnt.
	ExtractedToDiskCnt int64 `json:"extracted_to_disk_count,string"`
	// ExtractedToDiskSize - uncompressed size of shards extracted to disk.
	ExtractedToDiskSize int64 `json:"extracted_to_disk_size,string"`
	// contains filtered or unexported fields
}

LocalExtraction contains metrics for first phase of Dsort.

type Manager

type Manager struct {
	// tagged fields are the only fields persisted once dsort finishes
	ManagerUUID string         `json:"manager_uuid"`
	Metrics     *Metrics       `json:"metrics"`
	Pars        *parsedReqSpec `json:"pars"`
	// contains filtered or unexported fields
}

Manager maintains all the state required for a single run of a distributed archive file shuffle.

func (*Manager) String

func (m *Manager) String() string

type ManagerGroup

type ManagerGroup struct {
	// contains filtered or unexported fields
}

ManagerGroup abstracts multiple dsort managers into single struct.

var Managers *ManagerGroup

func NewManagerGroup

func NewManagerGroup(db kvdb.Driver, skipHk bool) *ManagerGroup

NewManagerGroup returns new, initialized manager group.

func (*ManagerGroup) AbortAll

func (mg *ManagerGroup) AbortAll(err error)

func (*ManagerGroup) Add

func (mg *ManagerGroup) Add(managerUUID string) (*Manager, error)

Add new, non-initialized manager with given managerUUID to manager group. Returned manager is locked, it's caller responsibility to unlock it. Returns error when manager with specified managerUUID already exists.

func (*ManagerGroup) Get

func (mg *ManagerGroup) Get(managerUUID string, inclArchived bool) (*Manager, bool)

Get gets manager with given mangerUUID. When manager with given uuid does not exist and user requested persisted lookup, it looks for it in persistent storage and returns it if found. Returns false if does not exist, true otherwise.

func (*ManagerGroup) List

func (mg *ManagerGroup) List(descRegex *regexp.Regexp, onlyActive bool) []JobInfo

func (*ManagerGroup) Remove

func (mg *ManagerGroup) Remove(managerUUID string) error

Remove the managerUUID from history. Used for reducing clutter. Fails if process hasn't been cleaned up.

type MetaSorting

type MetaSorting struct {

	// SentStats - time statistics about records sent to another target
	SentStats *TimeStats `json:"sent_stats,omitempty"`
	// RecvStats - time statistics about records receivied from another target
	RecvStats *TimeStats `json:"recv_stats,omitempty"`
	// contains filtered or unexported fields
}

MetaSorting contains metrics for second phase of Dsort.

type Metrics

type Metrics struct {
	Extraction *LocalExtraction `json:"local_extraction,omitempty"`
	Sorting    *MetaSorting     `json:"meta_sorting,omitempty"`
	Creation   *ShardCreation   `json:"shard_creation,omitempty"`

	// job description
	Description string `json:"description,omitempty"`

	// warnings during the run
	Warnings []string `json:"warnings,omitempty"`
	// errors, if any
	Errors []string `json:"errors,omitempty"`

	// has been aborted
	Aborted atomic.Bool `json:"aborted,omitempty"`
	// has been archived to persistent storage
	Archived atomic.Bool `json:"archived,omitempty"`
}

Metrics is general struct which contains all stats about Dsort run.

func (*Metrics) ElapsedTime

func (m *Metrics) ElapsedTime() time.Duration

func (*Metrics) ToJobInfo

func (m *Metrics) ToJobInfo(id string, pars *parsedReqSpec) JobInfo

type ParsedReq added in v1.3.19

type ParsedReq struct {
	InputBck  cmn.Bck
	OutputBck cmn.Bck
	// contains filtered or unexported fields
}

type RemoteResponse

type RemoteResponse struct {
	Record    *shard.Record    `msg:"r"`
	RecordObj *shard.RecordObj `msg:"o"`
}

func (*RemoteResponse) DecodeMsg

func (z *RemoteResponse) DecodeMsg(dc *msgp.Reader) (err error)

DecodeMsg implements msgp.Decodable

func (*RemoteResponse) EncodeMsg

func (z *RemoteResponse) EncodeMsg(en *msgp.Writer) (err error)

EncodeMsg implements msgp.Encodable

func (*RemoteResponse) Msgsize

func (z *RemoteResponse) Msgsize() (s int)

Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message

type RequestSpec

type RequestSpec struct {
	// Required
	InputBck        cmn.Bck       `json:"input_bck" yaml:"input_bck"`
	InputFormat     apc.ListRange `json:"input_format" yaml:"input_format"`
	OutputFormat    string        `json:"output_format" yaml:"output_format"`
	OutputShardSize string        `json:"output_shard_size" yaml:"output_shard_size"`

	// Desirable
	InputExtension string `json:"input_extension" yaml:"input_extension"`

	// Optional
	// Default: InputExtension
	OutputExtension string `json:"output_extension" yaml:"output_extension"`
	// Default: ""
	Description string `json:"description" yaml:"description"`
	// Default: same as `bck` field
	OutputBck cmn.Bck `json:"output_bck" yaml:"output_bck"`
	// Default: alphanumeric, increasing
	Algorithm Algorithm `json:"algorithm" yaml:"algorithm"`
	// Default: ""
	OrderFileURL string `json:"order_file" yaml:"order_file"`
	// Default: "\t"
	OrderFileSep string `json:"order_file_sep" yaml:"order_file_sep"`
	// Default: "80%"
	MaxMemUsage string `json:"max_mem_usage" yaml:"max_mem_usage"`
	// Default: calcMaxLimit()
	ExtractConcMaxLimit int `json:"extract_concurrency_max_limit" yaml:"extract_concurrency_max_limit"`
	// Default: calcMaxLimit()
	CreateConcMaxLimit int `json:"create_concurrency_max_limit" yaml:"create_concurrency_max_limit"`

	// debug
	DsorterType string `json:"dsorter_type"`
	DryRun      bool   `json:"dry_run"` // Default: false

	Config cmn.DsortConf
}

RequestSpec defines the user specification for requests to the endpoint /v1/sort.

func (*RequestSpec) ParseCtx added in v1.3.19

func (rs *RequestSpec) ParseCtx() (*ParsedReq, error)

type ShardCreation

type ShardCreation struct {

	// ToCreate - number of shards that to be created in this phase.
	ToCreate int64 `json:"to_create,string"`
	// CreatedCnt the number of shards that have been so far created.
	// Should match ToCreate when phase finishes.
	CreatedCnt int64 `json:"created_count,string"`
	// MovedShardCnt specifies the number of shards that have migrated from this
	// to another target. Applies only when dealing with compressed
	// data. Sometimes, rather than creating at the destination, it is faster
	// to create a shard on a specific target and send it over (to the destination).
	MovedShardCnt int64 `json:"moved_shard_count,string"`
	// RequestStats - time statistics: requests to other targets.
	RequestStats *TimeStats `json:"req_stats,omitempty"`
	// ResponseStats - time statistics: responses to other targets.
	ResponseStats *TimeStats `json:"resp_stats,omitempty"`
	// contains filtered or unexported fields
}

ShardCreation contains metrics for third and last phase of Dsort.

type TimeStats

type TimeStats struct {
	// Total contains total number of milliseconds spend on
	// specific task.
	Total int64 `json:"total_ms,string"`
	// Count contains number of time specific task was triggered.
	Count int64 `json:"count,string"`
	MinMs int64 `json:"min_ms,string"`
	MaxMs int64 `json:"max_ms,string"`
	AvgMs int64 `json:"avg_ms,string"`
}

TimeStats contains statistics about time spent on specific task. It calculates min, max and avg times.

Directories

Path Synopsis
Package ct provides additional dsort-specific content types
Package ct provides additional dsort-specific content types
Package shard provides Extract(shard), Create(shard), and associated methods across all suppported archival formats (see cmn/archive/mime.go)
Package shard provides Extract(shard), Create(shard), and associated methods across all suppported archival formats (see cmn/archive/mime.go)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL