fwdp

package
v0.0.0-...-1e60831 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 23, 2024 License: NIST-PD-fallback Imports: 32 Imported by: 0

README

ndn-dpdk/app/fwdp

This package implements the forwarder's data plane.

Input Thread (FwInput)

An FwInput thread runs an iface.RxLoop as its main loop ("RX" role), which reads and decodes packets from one or more network interfaces. Bursts of received L3 packets are processed by InputDemux, configured to use the NDT for Interests and the PIT token for Data and Nacks.

Forwarding Thread (FwFwd)

An FwFwd thread runs the FwFwd_Run function as its main loop ("FWD" role). The main loop first performs some maintenance work:

  • Mark a URCU quiescent state, as required by the FIB.
  • Trigger the PIT timeout scheduler.

Then it reads packets from the input queues and handles each packet separately:

  • FwFwd_RxInterest function handles an incoming Interest.
  • FwFwd_RxData function handles an incoming Data.
  • FwFwd_RxNack function handles an incoming Nack.
Data Structure Usage

All FwFwd threads have read-only access to a shared FIB replica on the same NUMA socket. Each FwFwd thread has read-write access to a FibEntryDyn struct associated with each FIB entry.

Each FwFwd has a private partition of PIT and CS. An outgoing Interest from a FwFwd must carry the identifier of this FwFwd as the first 8 bits of its PIT token, so that returning Data or Nack can be dispatched to the same FwFwd and thus use the same PIT-CS partition.

Congestion Control

Each FwFwd has three CoDel queues, one for each L3 packet type. They are backed by DPDK rings in multi-producer single-consumer mode. An FwInput thread enqueues packets to these queues; in case the DPDK ring is full, the packet is dropped. An FwFwd dequeues packets from these queues; if the CoDel algorithm indicates a packet should be dropped, FwFwd places a congestion mark on the packet but does not drop it. The ratio of dequeue burst size among the three queues determines the relative weight among L3 packet types; for example, dequeuing up to 48 Interests, 64 Data, and 64 Nacks would give Data/Nacks priority over Interests.

Note that congestion mark handling is currently incomplete. Some limitations are:

  • FwFwd can place a congestion mark only on the ingress side (e.g., to signal that the forwarder cannot sustain the current rate of incoming packets), not on the egress side (e.g., to signal link congestion).
  • FwFwd does not add or remove the congestion mark during Interest aggregation or Data caching.
Per-Packet Logging

FwFwd C code uses the DEBUG log level for per-packet logging. Generally, a log line has several key-value pairs delimited by whitespace. Keys use "kebab-case". Common keys include:

  • "interest-from", "data-from", "nack-from": incoming FaceID in packet arrival.
  • "interest-to", "data-to", "nack-to": outgoing FaceID in packet transmission.
  • "npkt" (meaning "NDN packet"): memory address of a packet.
  • "dn-token": PIT token at the downstream node.
  • "up-token": PIT token assigned by this node, which is sent upstream.
  • "drop": reason for dropping a packet.
  • "pit-entry", "cs-entry": memory address of a table entry.
  • "pit-key": debug string of a PIT entry.
  • "sg-id": strategy identifier.
  • "sg-res": return value of a strategy invocation.
  • "helper": handing off to a helper.

Crypto Helper (FwCrypto)

FwCrypto provides implicit digest computation for Data packets. When an FwFwd processes an incoming Data packet and finds a PIT entry whose Interest carries an ImplicitSha256DigestComponent, it needs to know the Data's implicit digest in order to determine whether the Data satisfies the Interest. Instead of performing the digest computation synchronously, which would block the processing of other packets, the FwFwd passes the Data to FwCrypto. After the digest is computed, the Data packet goes back to FwFwd, which can then re-process it and use the computed digest to determine whether it satisfies the pending Interest.

An FwCrypto thread runs the FwCrypto_Run function as its main loop ("CRYPTO" role). It receives Data packets from FwFwd threads through a queue, and enqueues crypto operations toward a DPDK cryptodev. The cryptodev computes the SHA-256 digest of the packet and stores it in the mbuf header. The FwCrypto then dequeues the completed crypto operations from the cryptodev and re-dispatches the Data to FwFwd in the same fashion as an input thread.

It is possible to disable FwCrypto by assigning zero lcores to "CRYPTO" role. In this case, the forwarder does not support implicit digest computation, and incoming Interests with implicit digest component are dropped.

Disk Helper (FwDisk)

FwDisk enables on-disk caching in the Content Store. See package disk for general concepts.

This implementation is work in progress. Currently, it can only use emulated block device with Malloc or file backend, but not a hardware NVMe device.

Documentation

Overview

Package fwdp implements the forwarder's data plane.

Index

Constants

View Source
const (
	RoleInput  = iface.RoleRx
	RoleOutput = iface.RoleTx
	RoleCrypto = "CRYPTO"
	RoleDisk   = "DISK"
	RoleFwd    = "FWD"
)

Thread roles.

Variables

View Source
var (
	GqlDispatchThreadInterface *gqlserver.Interface
	GqlInputType               *gqlserver.NodeType[*Input]
	GqlCryptoType              *gqlserver.NodeType[*Crypto]
	GqlDiskType                *gqlserver.NodeType[*Disk]
	GqlFwdType                 *gqlserver.NodeType[*Fwd]
	GqlDataPlaneType           *graphql.Object
	GqlDispatchCountersType    *graphql.Object
	GqlFwdCountersType         *graphql.Object
	GqlFibNexthopRttType       *graphql.Object
)

GraphQL types.

Functions

func DefaultAlloc

func DefaultAlloc() (m map[string]eal.LCores, e error)

DefaultAlloc is the default lcore allocation algorithm.

Types

type Config

type Config struct {
	LCoreAlloc ealthread.Config `json:"-"`

	Ndt      ndt.Config         `json:"ndt,omitempty"`
	Fib      fibdef.Config      `json:"fib,omitempty"`
	Pcct     pcct.Config        `json:"pcct,omitempty"`
	Suppress pit.SuppressConfig `json:"suppress,omitempty"`

	Crypto                CryptoConfig         `json:"crypto,omitempty"`
	Disk                  DiskConfig           `json:"disk,omitempty"`
	FwdInterestQueue      iface.PktQueueConfig `json:"fwdInterestQueue,omitempty"`
	FwdDataQueue          iface.PktQueueConfig `json:"fwdDataQueue,omitempty"`
	FwdNackQueue          iface.PktQueueConfig `json:"fwdNackQueue,omitempty"`
	LatencySampleInterval int                  `json:"latencySampleInterval,omitempty"`
}

Config contains data plane configuration.

type Crypto

type Crypto struct {
	ealthread.ThreadWithCtrl
	// contains filtered or unexported fields
}

Crypto represents a crypto helper thread.

func (*Crypto) Close

func (fwc *Crypto) Close() error

Close stops and releases the thread.

func (*Crypto) DemuxOf

func (fwc *Crypto) DemuxOf(t ndni.PktType) *iface.InputDemux

DemuxOf implements DispatchThread interface.

func (*Crypto) DispatchThreadID

func (fwc *Crypto) DispatchThreadID() int

DispatchThreadID implements DispatchThread interface.

func (*Crypto) String

func (fwc *Crypto) String() string

func (Crypto) ThreadRole

func (Crypto) ThreadRole() string

ThreadRole implements ealthread.ThreadWithRole interface.

type CryptoConfig

type CryptoConfig struct {
	InputCapacity  int `json:"inputCapacity,omitempty"`
	OpPoolCapacity int `json:"opPoolCapacity,omitempty"`
}

CryptoConfig contains crypto helper thread configuration.

type CryptoShared

type CryptoShared struct {
	// contains filtered or unexported fields
}

CryptoShared contains per NUMA socket shared resources for crypto helper threads.

func (*CryptoShared) AssignTo

func (fwcsh *CryptoShared) AssignTo(fwcs []*Crypto)

AssignTo assigns shared resources to crypto helper threads.

func (*CryptoShared) Close

func (fwcsh *CryptoShared) Close() error

Close deletes resources.

func (*CryptoShared) ConnectTo

func (fwcsh *CryptoShared) ConnectTo(fwd *Fwd)

ConnectTo connects forwarding thread to crypto input queue.

type DataPlane

type DataPlane struct {
	// contains filtered or unexported fields
}

DataPlane represents the forwarder data plane.

var (
	// GqlDataPlane is the DataPlane instance accessible via GraphQL.
	GqlDataPlane *DataPlane
)

func New

func New(cfg Config) (dp *DataPlane, e error)

New creates and launches forwarder data plane.

func (*DataPlane) Close

func (dp *DataPlane) Close() error

Close stops the data plane and releases resources.

func (*DataPlane) Fib

func (dp *DataPlane) Fib() *fib.Fib

Fib returns the FIB.

func (*DataPlane) Fwds

func (dp *DataPlane) Fwds() []*Fwd

Fwds returns a list of forwarding threads.

func (*DataPlane) Ndt

func (dp *DataPlane) Ndt() *ndt.Ndt

Ndt returns the NDT.

type Disk

type Disk struct {
	*spdkenv.Thread
	// contains filtered or unexported fields
}

Disk represents a disk helper thread.

func (*Disk) Close

func (fwdisk *Disk) Close() error

Close stops and releases the thread.

func (*Disk) DemuxOf

func (fwdisk *Disk) DemuxOf(t ndni.PktType) *iface.InputDemux

DemuxOf implements DispatchThread interface.

func (*Disk) DispatchThreadID

func (fwdisk *Disk) DispatchThreadID() int

DispatchThreadID implements DispatchThread interface.

func (*Disk) String

func (fwdisk *Disk) String() string

func (Disk) ThreadRole

func (Disk) ThreadRole() string

ThreadRole implements ealthread.ThreadWithRole interface.

type DiskConfig

type DiskConfig struct {
	// Locator describes where to create or attach a block device.
	bdev.Locator

	// Overprovision is the ratio of block device size divided by CS disk capacity.
	// Setting this above 1.00 can reduce disk full errors due to some slots still occupied by async I/O.
	// Default is 1.05.
	Overprovision float64 `json:"overprovision"`

	// Bdev specifies the block device.
	// If set, Locator and Overprovision are ignored.
	Bdev bdev.Device `json:"-"`

	// BdevCloser allows closing the block device.
	BdevCloser io.Closer `json:"-"`
	// contains filtered or unexported fields
}

DiskConfig contains disk service thread configuration.

type DispatchCounters

type DispatchCounters struct {
	NInterestsQueued  []uint64 `json:"nInterestsQueued" gqldesc:"Interests enqueued toward each forwarding thread."`
	NInterestsDropped []uint64 `json:"nInterestsDropped" gqldesc:"Interests dropped toward each forwarding thread."`
	NDataQueued       []uint64 `json:"nDataQueued" gqldesc:"Data enqueued toward each forwarding thread."`
	NDataDropped      []uint64 `json:"nDataDropped" gqldesc:"Data dropped toward each forwarding thread."`
	NNacksQueued      []uint64 `json:"nNacksQueued" gqldesc:"Nacks enqueued toward each forwarding thread."`
	NNacksDropped     []uint64 `json:"nNacksDropped" gqldesc:"Nacks dropped toward each forwarding thread."`
}

DispatchCounters contains counters of packets dispatched from a thread toward forwarding threads.

func ReadDispatchCounters

func ReadDispatchCounters(th DispatchThread, nFwds int) (cnt DispatchCounters)

ReadDispatchCounters retrieves DispatchCounters.

type DispatchThread

type DispatchThread interface {
	// DispatchThreadID returns numeric index of the dispatch thread.
	// IDs should be sequentially assigned.
	DispatchThreadID() int

	// WithInputDemuxes contains DemuxOf function that returns InputDemux.
	// If the dispatch thread does not handle a particular packet type, that function returns nil.
	iface.WithInputDemuxes
}

DispatchThread represents a thread that dispatches packets to forwarding threads. It could be an input, crypto, or disk service thread.

type Fwd

type Fwd struct {
	ealthread.ThreadWithCtrl
	// contains filtered or unexported fields
}

Fwd represents a forwarding thread.

func (*Fwd) Close

func (fwd *Fwd) Close() error

Close stops and releases the thread.

func (*Fwd) Counters

func (fwd *Fwd) Counters() (cnt FwdCounters)

Counters retrieves forwarding thread counters.

func (*Fwd) Cs

func (fwd *Fwd) Cs() *cs.Cs

Cs returns the CS.

func (*Fwd) GetFib

func (fwd *Fwd) GetFib() (replica unsafe.Pointer, dynIndex int)

GetFib implements fib.LookupThread interface.

func (*Fwd) GetFibSgGlobal

func (fwd *Fwd) GetFibSgGlobal() unsafe.Pointer

GetFibSgGlobal implements fib.LookupThread interface.

func (*Fwd) LatencyStat

func (fwd *Fwd) LatencyStat() *runningstat.RunningStat

LatencyStat returns latency statistics collector. Its reading reflects the latency since packet arrival until forwarding thread starts processing the packet.

func (*Fwd) NumaSocket

func (fwd *Fwd) NumaSocket() eal.NumaSocket

NumaSocket implements fib.LookupThread interface.

func (*Fwd) Pit

func (fwd *Fwd) Pit() *pit.Pit

Pit returns the PIT.

func (*Fwd) PktQueueOf

func (fwd *Fwd) PktQueueOf(t ndni.PktType) *iface.PktQueue

PktQueueOf returns PktQueue of specified PktType.

func (*Fwd) SetFib

func (fwd *Fwd) SetFib(replica unsafe.Pointer, dynIndex int)

SetFib implements fib.LookupThread interface.

func (*Fwd) String

func (fwd *Fwd) String() string

func (Fwd) ThreadRole

func (Fwd) ThreadRole() string

ThreadRole implements ealthread.ThreadWithRole interface.

type FwdCounters

type FwdCounters struct {
	NInterestsCongMarked uint64               `json:"nInterestsCongMarked" gqldesc:"Congestion marked added to Interests."`
	NDataCongMarked      uint64               `json:"nDataCongMarked" gqldesc:"Congestion marked added to Data."`
	NNacksCongMarked     uint64               `json:"nNacksCongMarked" gqldesc:"Congestion marked added to Nacks."`
	InputLatency         runningstat.Snapshot `json:"inputLatency" gqldesc:"Latency between packet arrival and dequeuing at forwarding thread, in nanoseconds."`

	NNoFibMatch   uint64 `json:"nNoFibMatch" gqldesc:"Interests dropped due to no FIB match."`
	NDupNonce     uint64 `json:"nDupNonce" gqldesc:"Interests dropped due to duplicate nonce."`
	NSgNoFwd      uint64 `json:"nSgNoFwd" gqldesc:"Interests not forwarded by strategy."`
	NNackMismatch uint64 `json:"nNackMismatch" gqldesc:"Nacks dropped due to outdated nonce."`
}

FwdCounters contains forwarding thread counters.

type Input

type Input struct {
	// contains filtered or unexported fields
}

Input represents an input thread.

func (*Input) Close

func (fwi *Input) Close() error

Close stops the thread.

func (*Input) DemuxOf

func (fwi *Input) DemuxOf(t ndni.PktType) *iface.InputDemux

DemuxOf implements DispatchThread interface.

func (*Input) DispatchThreadID

func (fwi *Input) DispatchThreadID() int

DispatchThreadID implements DispatchThread interface.

func (*Input) String

func (fwi *Input) String() string

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL