fetch

package
v0.0.0-...-1e60831 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 23, 2024 License: NIST-PD-fallback Imports: 28 Imported by: 0

README

ndn-dpdk/app/fetch

This package is the congestion aware fetcher, used in the traffic generator. It implements a consumer that follows the TCP CUBIC congestion control algorithm, simulating traffic patterns similar to bulk file transfer. It requires at least one thread, running the FetchThread_Run function.

Documentation

Overview

Package fetch retrieves segmented objects.

Index

Constants

This section is empty.

Variables

View Source
var (
	GqlConfigInput     *graphql.InputObject
	GqlTaskDefInput    *graphql.InputObject
	GqlTaskDefType     *graphql.Object
	GqlTaskContextType *gqlserver.NodeType[*TaskContext]
	GqlFetcherType     *gqlserver.NodeType[*Fetcher]
)

GraphQL types.

View Source
var GqlRetrieveByFaceID func(id iface.ID) *Fetcher

GqlRetrieveByFaceID returns *Fetcher associated with a face. It is assigned during package tg initialization.

Functions

This section is empty.

Types

type Config

type Config struct {
	TaskSlotConfig

	// NThreads is the number of worker threads.
	// Each worker thread can serve multiple fetch tasks.
	NThreads int `json:"nThreads,omitempty"`

	// NTasks is the number of task slots.
	// Each task retrieves one segmented object and has independent congestion control.
	NTasks int `json:"nTasks,omitempty"`
}

Config contains Fetcher configuration.

func (*Config) Validate

func (cfg *Config) Validate() error

Validate applies defaults and validates the configuration.

type Counters

type Counters struct {
	Elapsed   time.Duration  `json:"elapsed" gqldesc:"Duration since start fetching."`
	Finished  *time.Duration `json:"finished" gqldesc:"Duration between start and finish; null if not finished."`
	LastRtt   time.Duration  `json:"lastRtt" gqldesc:"Last RTT sample."`
	SRtt      time.Duration  `json:"sRtt" gqldesc:"Smoothed RTT."`
	Rto       time.Duration  `json:"rto" gqldesc:"RTO."`
	Cwnd      int            `json:"cwnd" gqldesc:"Congestion window."`
	NInFlight uint32         `json:"nInFlight" gqldesc:"Currently in-flight Interests."`
	NTxRetx   uint64         `json:"nTxRetx" gqldesc:"Retransmitted Interests."`
	NRxData   uint64         `json:"nRxData" gqldesc:"Data satisfying pending Interests."`
}

Counters contains counters of Logic.

func (Counters) String

func (cnt Counters) String() string

type Fetcher

type Fetcher struct {
	// contains filtered or unexported fields
}

Fetcher controls worker threads and task slots on a face.

func New

func New(face iface.Face, cfg Config) (*Fetcher, error)

New creates a Fetcher.

func (*Fetcher) Close

func (fetcher *Fetcher) Close() error

Close deallocates data structures.

func (*Fetcher) ConnectRxQueues

func (fetcher *Fetcher) ConnectRxQueues(demuxD, demuxN *iface.InputDemux)

ConnectRxQueues connects Data InputDemux to RxQueues. Nack InputDemux is set to drop packets because fetcher does not support Nacks.

func (*Fetcher) Face

func (fetcher *Fetcher) Face() iface.Face

Face returns the face.

func (*Fetcher) Fetch

func (fetcher *Fetcher) Fetch(d TaskDef) (task *TaskContext, e error)

Fetch starts a fetch task.

func (*Fetcher) Launch

func (fetcher *Fetcher) Launch()

Launch launches all worker threads.

func (*Fetcher) Reset

func (fetcher *Fetcher) Reset()

Reset aborts all tasks and stops all worker threads.

func (*Fetcher) Stop

func (fetcher *Fetcher) Stop() error

Stop stops all worker threads.

func (*Fetcher) Tasks

func (fetcher *Fetcher) Tasks() (list []*TaskContext)

Tasks returns running fetch tasks.

func (*Fetcher) Workers

func (fetcher *Fetcher) Workers() []ealthread.ThreadWithRole

Workers returns worker threads.

type Logic

type Logic C.FetchLogic

Logic implements fetcher congestion control and scheduling logic.

func (*Logic) Close

func (fl *Logic) Close() error

Close deallocates data structures.

func (*Logic) Counters

func (fl *Logic) Counters() (cnt Counters)

Counters retrieves counters.

func (*Logic) Finished

func (fl *Logic) Finished() bool

Finished determines if all segments have been fetched.

func (*Logic) Init

func (fl *Logic) Init(winCapacity int, socket eal.NumaSocket)

Init initializes the logic and allocates data structures.

func (*Logic) Reset

func (fl *Logic) Reset(r segmented.SegmentRange)

Reset resets this to initial state.

type TaskContext

type TaskContext struct {
	// contains filtered or unexported fields
}

TaskContext provides contextual information about an active fetch task.

func (*TaskContext) Counters

func (task *TaskContext) Counters() Counters

Counters returns congestion control and scheduling counters.

func (*TaskContext) Finished

func (task *TaskContext) Finished() bool

Finished determines if all segments have been fetched.

func (*TaskContext) Stop

func (task *TaskContext) Stop()

Stop aborts/stops the fetch task. This should be called even if the fetch task has succeeded.

type TaskDef

type TaskDef struct {
	// InterestTemplateConfig contains the name prefix, InterestLifetime, etc.
	//
	// The fetcher neither retrieves metadata nor performs version discovery.
	// If the content is published with version component, it should appear in the name prefix.
	//
	// CanBePrefix and MustBeFresh are not normally used, but they may be included for benchmarking purpose.
	ndni.InterestTemplateConfig

	// SegmentRange specifies range of segment numbers.
	// If writing to a file, SegmentEnd must be explicitly specified.
	segmented.SegmentRange

	// Filename is the output file name.
	// If omitted, payload is not written to a file.
	Filename string `json:"filename,omitempty"`

	// SegmentLen is the payload length in each segment.
	// This is only needed when writing to a file.
	// If any segment has incorrect Content TLV-LENGTH, the output file would not contain correct payload.
	SegmentLen int `json:"segmentLen,omitempty"`
}

TaskDef defines a fetch task that retrieves one segmented object.

type TaskSlotConfig

type TaskSlotConfig struct {
	// RxQueue configures the RX queue of Data packets going to each task slot.
	// CoDel cannot be used in these queues.
	RxQueue iface.PktQueueConfig `json:"rxQueue,omitempty"`

	// WindowCapacity is the maximum distance between lower and upper bounds of segment numbers in an ongoing fetch logic.
	WindowCapacity int `json:"windowCapacity,omitempty"`
}

TaskSlotConfig contains task slot configuration.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL