dsort

package
v1.3.19 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 6, 2023 License: MIT Imports: 45 Imported by: 0

Documentation

Overview

Package dsort provides distributed massively parallel resharding for very large datasets.

  • Copyright (c) 2018-2022, NVIDIA CORPORATION. All rights reserved. *

Package dsort provides distributed massively parallel resharding for very large datasets.

  • Copyright (c) 2018-2023, NVIDIA CORPORATION. All rights reserved.

Package dsort provides distributed massively parallel resharding for very large datasets.

  • Copyright (c) 2018-2022, NVIDIA CORPORATION. All rights reserved.

Package dsort provides distributed massively parallel resharding for very large datasets.

  • Copyright (c) 2018-2022, NVIDIA CORPORATION. All rights reserved.

Package dsort provides distributed massively parallel resharding for very large datasets.

  • Copyright (c) 2018-2022, NVIDIA CORPORATION. All rights reserved.

Package dsort provides distributed massively parallel resharding for very large datasets.

  • Copyright (c) 2018-2023, NVIDIA CORPORATION. All rights reserved.

Package dsort provides distributed massively parallel resharding for very large datasets.

  • Copyright (c) 2018-2021, NVIDIA CORPORATION. All rights reserved.

Package dsort provides distributed massively parallel resharding for very large datasets.

  • Copyright (c) 2018-2021, NVIDIA CORPORATION. All rights reserved.

Package dsort provides distributed massively parallel resharding for very large datasets.

  • Copyright (c) 2018-2021, NVIDIA CORPORATION. All rights reserved.

msgp -file <path to dsort/manager_types.go> -tests=false -marshal=false -unexported Code generated by the command above; see docs/msgp.md. DO NOT EDIT.

Package dsort provides APIs for distributed archive file shuffling.

  • Copyright (c) 2018-2021, NVIDIA CORPORATION. All rights reserved.

Package dsort provides distributed massively parallel resharding for very large datasets.

  • Copyright (c) 2018-2022, NVIDIA CORPORATION. All rights reserved.

Package dsort provides APIs for distributed archive file shuffling.

Package dsort provides APIs for distributed archive file shuffling.

  • Copyright (c) 2018-2023, NVIDIA CORPORATION. All rights reserved.

Index

Constants

View Source
const (
	ExtractionPhase = "extraction"
	SortingPhase    = "sorting"
	CreationPhase   = "creation"
)
View Source
const (
	SortKindAlphanumeric = "alphanumeric" // sort the records (decreasing or increasing)
	SortKindNone         = "none"         // none, used for resharding
	SortKindMD5          = "md5"
	SortKindShuffle      = "shuffle" // shuffle randomly, can be used with seed to get reproducible results
	SortKindContent      = "content" // sort by content of given file
)
View Source
const DSortName = "dsort"
View Source
const (
	DSorterGeneralType = "dsort_general"
)
View Source
const (
	DSorterMemType = "dsort_mem"
)
View Source
const PrefixJobID = "srt-"

Variables

This section is empty.

Functions

func InitManagers

func InitManagers(db kvdb.Driver)

func ProxyAbortSortHandler

func ProxyAbortSortHandler(w http.ResponseWriter, r *http.Request)

DELETE /v1/sort/abort

func ProxyGetHandler

func ProxyGetHandler(w http.ResponseWriter, r *http.Request)

GET /v1/sort

func ProxyRemoveSortHandler

func ProxyRemoveSortHandler(w http.ResponseWriter, r *http.Request)

DELETE /v1/sort

func ProxyStartSortHandler

func ProxyStartSortHandler(w http.ResponseWriter, r *http.Request, parsedRS *ParsedRequestSpec)

POST /v1/sort

func RegisterNode

func RegisterNode(smapOwner meta.Sowner, bmdOwner meta.Bowner, snode *meta.Snode, t cluster.Target,
	stats stats.Tracker)

func TargetHandler

func TargetHandler(w http.ResponseWriter, r *http.Request)

[METHOD] /v1/sort

Types

type CreationPhaseMetadata

type CreationPhaseMetadata struct {
	Shards    []*extract.Shard          `msg:"shards"`
	SendOrder map[string]*extract.Shard `msg:"send_order"`
}

func (*CreationPhaseMetadata) DecodeMsg

func (z *CreationPhaseMetadata) DecodeMsg(dc *msgp.Reader) (err error)

DecodeMsg implements msgp.Decodable

func (*CreationPhaseMetadata) EncodeMsg

func (z *CreationPhaseMetadata) EncodeMsg(en *msgp.Writer) (err error)

EncodeMsg implements msgp.Encodable

func (*CreationPhaseMetadata) Msgsize

func (z *CreationPhaseMetadata) Msgsize() (s int)

Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message

type DetailedStats

type DetailedStats struct {
	*TimeStats
	*ThroughputStats
}

DetailedStats contains time and throughput statistics .

type JobInfo

type JobInfo struct {
	ID                string        `json:"id"`
	Description       string        `json:"description"`
	StartedTime       time.Time     `json:"started_time,omitempty"`
	FinishTime        time.Time     `json:"finish_time,omitempty"`
	ExtractedDuration time.Duration `json:"started_meta_sorting,omitempty"`
	SortingDuration   time.Duration `json:"started_shard_creation,omitempty"`
	CreationDuration  time.Duration `json:"finished_shard_creation,omitempty"`
	Aborted           bool          `json:"aborted"`
	Archived          bool          `json:"archived"`
}

JobInfo is a struct that contains stats that represent the DSort run in a list

func (*JobInfo) Aggregate

func (j *JobInfo) Aggregate(other *JobInfo)

func (*JobInfo) IsFinished

func (j *JobInfo) IsFinished() bool

func (*JobInfo) IsRunning

func (j *JobInfo) IsRunning() bool

type LocalExtraction

type LocalExtraction struct {
	PhaseInfo
	// TotalCnt is the number of shards DSort has to process in total.
	TotalCnt int64 `json:"total_count,string"`
	// ExtractedCnt describes number of extracted shards to given moment. At the
	// end, number should be roughly equal to TotalCnt/#Targets.
	ExtractedCnt int64 `json:"extracted_count,string"`
	// ExtractedSize describes uncompressed size of extracted shards to given moment.
	ExtractedSize int64 `json:"extracted_size,string"`
	// ExtractedRecordCnt describes number of records extracted from all shards.
	ExtractedRecordCnt int64 `json:"extracted_record_count,string"`
	// ExtractedToDiskCnt describes number of shards extracted to the disk. To
	// compute the number shards extracted to memory just subtract it from
	// ExtractedCnt.
	ExtractedToDiskCnt int64 `json:"extracted_to_disk_count,string"`
	// ExtractedToDiskSize describes uncompressed size of extracted shards to disk
	// to given moment.
	ExtractedToDiskSize int64 `json:"extracted_to_disk_size,string"`
	// ShardExtractionStats describes time statistics about single shard extraction.
	ShardExtractionStats *DetailedStats `json:"single_shard_stats,omitempty"`
}

LocalExtraction contains metrics for first phase of DSort.

type Manager

type Manager struct {
	// Fields with json tags are the only fields which are persisted
	// into the disk once the dSort is finished.
	ManagerUUID string   `json:"manager_uuid"`
	Metrics     *Metrics `json:"metrics"`
	// contains filtered or unexported fields
}

Manager maintains all the state required for a single run of a distributed archive file shuffle.

func (*Manager) ListenSmapChanged

func (m *Manager) ListenSmapChanged()

func (*Manager) String

func (m *Manager) String() string

type ManagerGroup

type ManagerGroup struct {
	// contains filtered or unexported fields
}

ManagerGroup abstracts multiple dsort managers into single struct.

var Managers *ManagerGroup

func NewManagerGroup

func NewManagerGroup(db kvdb.Driver, skipHk bool) *ManagerGroup

NewManagerGroup returns new, initialized manager group.

func (*ManagerGroup) AbortAll

func (mg *ManagerGroup) AbortAll(err error)

func (*ManagerGroup) Add

func (mg *ManagerGroup) Add(managerUUID string) (*Manager, error)

Add new, non-initialized manager with given managerUUID to manager group. Returned manager is locked, it's caller responsibility to unlock it. Returns error when manager with specified managerUUID already exists.

func (*ManagerGroup) Get

func (mg *ManagerGroup) Get(managerUUID string, ap ...bool) (*Manager, bool)

Get gets manager with given mangerUUID. When manager with given uuid does not exist and user requested persisted lookup, it looks for it in persistent storage and returns it if found. Returns false if does not exist, true otherwise.

func (*ManagerGroup) List

func (mg *ManagerGroup) List(descRegex *regexp.Regexp, onlyActive bool) []JobInfo

func (*ManagerGroup) Remove

func (mg *ManagerGroup) Remove(managerUUID string) error

Remove the managerUUID from history. Used for reducing clutter. Fails if process hasn't been cleaned up.

type MetaSorting

type MetaSorting struct {
	PhaseInfo
	// SentStats describes time statistics about records sending to another target
	SentStats *TimeStats `json:"sent_stats,omitempty"`
	// RecvStats describes time statistics about records receiving from another target
	RecvStats *TimeStats `json:"recv_stats,omitempty"`
}

MetaSorting contains metrics for second phase of DSort.

type Metrics

type Metrics struct {
	Extraction *LocalExtraction `json:"local_extraction,omitempty"`
	Sorting    *MetaSorting     `json:"meta_sorting,omitempty"`
	Creation   *ShardCreation   `json:"shard_creation,omitempty"`

	// Aborted specifies if the DSort has been aborted or not.
	Aborted atomic.Bool `json:"aborted,omitempty"`
	// Archived specifies if the DSort has been archived to persistent storage.
	Archived atomic.Bool `json:"archived,omitempty"`

	// Description of the job.
	Description string `json:"description,omitempty"`

	// Warnings which were produced during the job.
	Warnings []string `json:"warnings,omitempty"`
	// Errors which happened during the job.
	Errors []string `json:"errors,omitempty"`
	// contains filtered or unexported fields
}

Metrics is general struct which contains all stats about DSort run.

func (*Metrics) ElapsedTime

func (m *Metrics) ElapsedTime() time.Duration

func (*Metrics) Marshal

func (m *Metrics) Marshal() []byte

func (*Metrics) ToJobInfo

func (m *Metrics) ToJobInfo(id string) JobInfo

type ParsedRequestSpec

type ParsedRequestSpec struct {
	Bck                 cmn.Bck               `json:"bck"`
	Description         string                `json:"description"`
	OutputBck           cmn.Bck               `json:"output_bck"`
	Extension           string                `json:"extension"`
	OutputShardSize     int64                 `json:"output_shard_size,string"`
	InputFormat         *parsedInputTemplate  `json:"input_format"`
	OutputFormat        *parsedOutputTemplate `json:"output_format"`
	Algorithm           *SortAlgorithm        `json:"algorithm"`
	OrderFileURL        string                `json:"order_file"`
	OrderFileSep        string                `json:"order_file_sep"`
	MaxMemUsage         cos.ParsedQuantity    `json:"max_mem_usage"`
	TargetOrderSalt     []byte                `json:"target_order_salt"`
	ExtractConcMaxLimit int                   `json:"extract_concurrency_max_limit"`
	CreateConcMaxLimit  int                   `json:"create_concurrency_max_limit"`
	StreamMultiplier    int                   `json:"stream_multiplier"` // TODO: should be removed
	ExtendedMetrics     bool                  `json:"extended_metrics"`

	// debug
	DSorterType string `json:"dsorter_type"`
	DryRun      bool   `json:"dry_run"`

	cmn.DSortConf
}

type PhaseInfo

type PhaseInfo struct {
	Start time.Time `json:"started_time"`
	End   time.Time `json:"end_time"`
	// Elapsed time (in seconds) from start to given point of time or end when
	// phase has finished.
	Elapsed time.Duration `json:"elapsed"`
	// Running specifies if phase is in progress.
	Running bool `json:"running"`
	// Finished specifies if phase has finished. If running and finished is
	// false this means that the phase did not have started yet.
	Finished bool `json:"finished"`
	// contains filtered or unexported fields
}

PhaseInfo contains general stats and state for given phase. It is base struct which is extended by actual phases structs.

type RemoteResponse

type RemoteResponse struct {
	Record    *extract.Record    `msg:"r"`
	RecordObj *extract.RecordObj `msg:"o"`
}

func (*RemoteResponse) DecodeMsg

func (z *RemoteResponse) DecodeMsg(dc *msgp.Reader) (err error)

DecodeMsg implements msgp.Decodable

func (*RemoteResponse) EncodeMsg

func (z *RemoteResponse) EncodeMsg(en *msgp.Writer) (err error)

EncodeMsg implements msgp.Encodable

func (*RemoteResponse) Msgsize

func (z *RemoteResponse) Msgsize() (s int)

Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message

type RequestSpec

type RequestSpec struct {
	// Required
	Bck             cmn.Bck `json:"bck" yaml:"bck"`
	Extension       string  `json:"extension" yaml:"extension"`
	InputFormat     string  `json:"input_format" yaml:"input_format"`
	OutputFormat    string  `json:"output_format" yaml:"output_format"`
	OutputShardSize string  `json:"output_shard_size" yaml:"output_shard_size"`

	// Optional
	Description string `json:"description" yaml:"description"`
	// Default: same as `bck` field
	OutputBck cmn.Bck `json:"output_bck" yaml:"output_bck"`
	// Default: alphanumeric, increasing
	Algorithm SortAlgorithm `json:"algorithm" yaml:"algorithm"`
	// Default: ""
	OrderFileURL string `json:"order_file" yaml:"order_file"`
	// Default: "\t"
	OrderFileSep string `json:"order_file_sep" yaml:"order_file_sep"`
	// Default: "80%"
	MaxMemUsage string `json:"max_mem_usage" yaml:"max_mem_usage"`
	// Default: calcMaxLimit()
	ExtractConcMaxLimit int `json:"extract_concurrency_max_limit" yaml:"extract_concurrency_max_limit"`
	// Default: calcMaxLimit()
	CreateConcMaxLimit int `json:"create_concurrency_max_limit" yaml:"create_concurrency_max_limit"`
	// Default: bundle.Multiplier
	StreamMultiplier int `json:"stream_multiplier" yaml:"stream_multiplier"`
	// Default: false
	ExtendedMetrics bool `json:"extended_metrics" yaml:"extended_metrics"`

	// debug
	DSorterType string `json:"dsorter_type"`
	DryRun      bool   `json:"dry_run"` // Default: false

	cmn.DSortConf
}

RequestSpec defines the user specification for requests to the endpoint /v1/sort.

func (*RequestSpec) Parse

func (rs *RequestSpec) Parse() (*ParsedRequestSpec, error)

Parse returns a non-nil error if a RequestSpec is invalid. When RequestSpec is valid it parses all the fields, sets the values and returns ParsedRequestSpec.

type ShardCreation

type ShardCreation struct {
	PhaseInfo
	// ToCreate specifies number of shards that have to be created in this phase.
	ToCreate int64 `json:"to_create,string"`
	// CreatedCnt specifies the number of shards that have been so far created.
	// Should match ToCreate when phase is finished.
	CreatedCnt int64 `json:"created_count,string"`
	// MovedShardCnt specifies the number of shards that have migrated from this
	// to another target in the cluster. Applies only when dealing with compressed
	// data. Sometimes it is faster to create a shard on a specific target and send it
	// over (rather than creating on a destination target).
	MovedShardCnt int64 `json:"moved_shard_count,string"`
	// RequestStats describes time statistics about request to other target.
	RequestStats *TimeStats `json:"req_stats,omitempty"`
	// ResponseStats describes time statistics about response to other target.
	ResponseStats *TimeStats `json:"resp_stats,omitempty"`
	// LocalSendStats describes time statistics about sending record content to other target.
	LocalSendStats *DetailedStats `json:"local_send_stats,omitempty"`
	// LocalRecvStats describes time statistics about receiving record content from other target.
	LocalRecvStats *DetailedStats `json:"local_recv_stats,omitempty"`
	// ShardCreationStats describes time statistics about single shard creation.
	ShardCreationStats *DetailedStats `json:"single_shard_stats,omitempty"`
}

ShardCreation contains metrics for third and last phase of DSort.

type SortAlgorithm

type SortAlgorithm struct {
	Kind string `json:"kind"`

	// Kind: alphanumeric, content
	Decreasing bool `json:"decreasing"`

	// Kind: shuffle
	Seed string `json:"seed"` // seed provided to random generator

	// Kind: content
	Extension  string `json:"extension"`
	FormatType string `json:"format_type"`
}

type ThroughputStats

type ThroughputStats struct {
	MinTp int64 `json:"min_throughput,string"`
	MaxTp int64 `json:"max_throughput,string"`
	AvgTp int64 `json:"avg_throughput,string"`
	// contains filtered or unexported fields
}

ThroughputStats contains statistics about throughput of specific task.

type TimeStats

type TimeStats struct {
	// Total contains total number of milliseconds spend on
	// specific task.
	Total int64 `json:"total_ms,string"`
	// Count contains number of time specific task was triggered.
	Count int64 `json:"count,string"`
	MinMs int64 `json:"min_ms,string"`
	MaxMs int64 `json:"max_ms,string"`
	AvgMs int64 `json:"avg_ms,string"`
}

TimeStats contains statistics about time spent on specific task. It calculates min, max and avg times.

Directories

Path Synopsis
Package extract provides provides functions for working with compressed files
Package extract provides provides functions for working with compressed files
Package filetype provides the implementation of custom content file type for dsort.
Package filetype provides the implementation of custom content file type for dsort.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL