core

package
v0.0.0-...-7d840d9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 16, 2024 License: Apache-2.0 Imports: 21 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type CrawlHandler

type CrawlHandler[I PeerInfo[I]] struct {

	// A map that maps peer IDs to their database IDs. This speeds up the insertion of neighbor information as
	// the database does not need to look up every peer ID but only the ones not yet present in the database.
	// Speed up for ~11k peers: 5.5 min -> 30s
	PeerMappings map[peer.ID]int

	// A map that keeps track of all k-bucket entries of a particular peer.
	RoutingTables map[peer.ID]*RoutingTable[I]

	// A map of agent versions and their occurrences that happened during the crawl.
	AgentVersion map[string]int

	// A map of protocols and their occurrences that happened during the crawl.
	Protocols map[string]int

	// A map of errors that happened when trying to dial a peer.
	ConnErrs map[string]int

	// A map of errors that happened during the crawl.
	CrawlErrs map[string]int

	// The number of peers we would still need to crawl after the Run method has returned.
	QueuedPeers int

	// The number of peers that were crawled.
	CrawledPeers int
	// contains filtered or unexported fields
}

CrawlHandler is the default implementation for a Handler that can be used as the basis for crawl operations.

func NewCrawlHandler

func NewCrawlHandler[I PeerInfo[I]](cfg *CrawlHandlerConfig) *CrawlHandler[I]

func (*CrawlHandler[I]) HandlePeerResult

func (h *CrawlHandler[I]) HandlePeerResult(ctx context.Context, result Result[CrawlResult[I]]) []I

func (*CrawlHandler[I]) HandleWriteResult

func (h *CrawlHandler[I]) HandleWriteResult(ctx context.Context, result Result[WriteResult])

func (*CrawlHandler[I]) TotalErrors

func (h *CrawlHandler[I]) TotalErrors() int

TotalErrors counts the total amount of errors - equivalent to undialable peers during this crawl.

type CrawlHandlerConfig

type CrawlHandlerConfig struct {
	// a flag that indicates whether we want to track and keep routing table
	// configurations of all peers in memory and write them to disk after the
	// crawl has finished.
	TrackNeighbors bool
}

type CrawlResult

type CrawlResult[I PeerInfo[I]] struct {
	// The crawler that generated this result
	CrawlerID string

	// Information about crawled peer
	Info I

	// The neighbors of the crawled peer
	RoutingTable *RoutingTable[I]

	// The agent version of the crawled peer
	Agent string

	// The protocols the peer supports
	Protocols []string

	// Indicates whether the above routing table information was queried through the API.
	// The API routing table does not include MultiAddresses, so we won't use them for further crawls.
	RoutingTableFromAPI bool

	// Any error that has occurred when connecting to the peer
	ConnectError error

	// The above error transferred to a known error
	ConnectErrorStr string

	// Any error that has occurred during fetching neighbor information
	CrawlError error

	// The above error transferred to a known error
	CrawlErrorStr string

	// When was the crawl started
	CrawlStartTime time.Time

	// When did this crawl end
	CrawlEndTime time.Time

	// When was the connection attempt made
	ConnectStartTime time.Time

	// As it can take some time to handle the result we track the timestamp explicitly
	ConnectEndTime time.Time

	// Additional properties of that specific peer we have crawled
	Properties json.RawMessage

	// Debug flag that indicates whether to log the full error string
	LogErrors bool
}

CrawlResult captures data that is gathered from crawling a single peer.

func (CrawlResult[I]) ConnectDuration

func (r CrawlResult[I]) ConnectDuration() time.Duration

ConnectDuration returns the time it took to connect to the peer. This includes dialing and the identity protocol.

func (CrawlResult[I]) CrawlDuration

func (r CrawlResult[I]) CrawlDuration() time.Duration

CrawlDuration returns the time it took to crawl to the peer (connecting + fetching neighbors)

func (CrawlResult[I]) IsSuccess

func (r CrawlResult[I]) IsSuccess() bool

func (CrawlResult[I]) LogEntry

func (r CrawlResult[I]) LogEntry() *log.Entry

func (CrawlResult[I]) PeerInfo

func (r CrawlResult[I]) PeerInfo() I

type CrawlWriter

type CrawlWriter[I PeerInfo[I]] struct {
	// contains filtered or unexported fields
}

CrawlWriter handles the insert/upsert/update operations for a particular crawl result.

func NewCrawlWriter

func NewCrawlWriter[I PeerInfo[I]](id string, dbc db.Client, dbCrawlID int, cfg *CrawlWriterConfig) *CrawlWriter[I]

func (*CrawlWriter[I]) Work

func (w *CrawlWriter[I]) Work(ctx context.Context, task CrawlResult[I]) (WriteResult, error)

Work takes a crawl result (persist job) and inserts a denormalized database entry of the results.

type CrawlWriterConfig

type CrawlWriterConfig struct {
	AddrTrackType config.AddrType
}

type DialHandler

type DialHandler[I PeerInfo[I]] struct {
	// contains filtered or unexported fields
}

func NewDialHandler

func NewDialHandler[I PeerInfo[I]](cfg *DialHandlerConfig) *DialHandler[I]

func (*DialHandler[I]) HandlePeerResult

func (h *DialHandler[I]) HandlePeerResult(ctx context.Context, result Result[DialResult[I]]) []I

func (*DialHandler[I]) HandleWriteResult

func (h *DialHandler[I]) HandleWriteResult(ctx context.Context, result Result[WriteResult])

type DialHandlerConfig

type DialHandlerConfig struct{}

type DialResult

type DialResult[I PeerInfo[I]] struct {
	// The dialer that generated this result
	DialerID string

	// The dialed peer
	Info I

	// If error is set, the peer was not dialable
	Error error

	// The above error transferred to a known error
	DialError string

	// When was the dial started
	DialStartTime time.Time

	// When did this crawl end
	DialEndTime time.Time
}

DialResult captures data that is gathered from pinging a single peer.

func (DialResult[I]) DialDuration

func (r DialResult[I]) DialDuration() time.Duration

DialDuration returns the time it took to dial the peer

func (DialResult[I]) IsSuccess

func (r DialResult[I]) IsSuccess() bool

func (DialResult[I]) LogEntry

func (r DialResult[I]) LogEntry() *log.Entry

func (DialResult[I]) PeerInfo

func (r DialResult[I]) PeerInfo() I

type DialWriter

type DialWriter[I PeerInfo[I]] struct {
	// contains filtered or unexported fields
}

DialWriter handles the insert/upsert/update operations for a particular crawl result.

func NewDialWriter

func NewDialWriter[I PeerInfo[I]](id string, dbc *db.DBClient) *DialWriter[I]

func (*DialWriter[I]) Work

func (w *DialWriter[I]) Work(ctx context.Context, task DialResult[I]) (WriteResult, error)

Work takes a crawl result (persist job) and inserts a denormalized database entry of the results.

type Driver

type Driver[I PeerInfo[I], R WorkResult[I]] interface {
	// NewWorker returns a new [Worker] that takes a [PeerInfo], performs its
	// duties by contacting that peer, and returns the resulting WorkResult.
	// In the current implementation, this could be either a "peer crawl" (when
	// you run "nebula crawl") or a "peer dial" (when you run "nebula monitor").
	NewWorker() (Worker[I, R], error)

	// NewWriter returns a new [Worker] that takes a [WorkResult], performs its
	// duties by storing that result somewhere, and returns information about
	// how that all went.
	NewWriter() (Worker[R, WriteResult], error)

	// Tasks returns a channel on which the driver should emit peer processing
	// tasks. This method will only be called once by the engine. The engine
	// will keep running until the returned channel was closed. Closing the
	// channel signals the engine that we don't anticipate to schedule any more
	// tasks. However, this doesn't mean that the engine will stop right away.
	// It will first process all remaining tasks it has in its queue. If you
	// want to prematurely stop the engine, cancel the context you passed into
	// [Engine.Run].
	Tasks() <-chan I

	// Close is called when the engine is about to shut down. This gives the
	// stack a chance to clean up internal resources. Implementation must be
	// idempotent as it may be called multiple times.
	Close()
}

A Driver is a data structure that provides the necessary implementations and tasks for the engine to operate.

type Engine

type Engine[I PeerInfo[I], R WorkResult[I]] struct {
	// contains filtered or unexported fields
}

Engine is the integral data structure for orchestrating the communication with peers and writing the processing results to disk. It maintains a pool of workers and writers that are concurrently processing peers and writing the results to disk. The engine is responsible for scheduling which peer to process next, making sure to not process the same peer twice. At the same time, it buffers the processing results and schedules results to be stored and distributes these tasks to the writers when they have capacity. The engine can be configured with the EngineConfig struct. It is generic on the peer information type (PeerInfo) and the result that the peer workers return (WorkResult).

func NewEngine

func NewEngine[I PeerInfo[I], R WorkResult[I]](driver Driver[I, R], handler Handler[I, R], cfg *EngineConfig) (*Engine[I, R], error)

NewEngine initializes a new engine. See the Engine documentation for more information.

func (*Engine[I, R]) Run

func (e *Engine[I, R]) Run(ctx context.Context) (map[string]I, error)

Run is a blocking call that starts the worker and writer pools to accept and perform tasks. It enters an indefinite loop expecting to receive tasks from the driver. In the case of a crawl operation, these should be the bootstrap peers start the crawl from. Then it sends these peers to the workers which then process that peer and send back the result which is then sent to one of the writer workers which in turn stores the result. The engine, in the meantime, keeps track of and exposes prometheus metrics. The engine will keep running as long as the tasksChan from the driver isn't closed. If the channel was closed, the engine will process all remaining peers in the queue. Each result is passed to a handler that may return additional peers to process.

type EngineConfig

type EngineConfig struct {
	// the number of internal workers. This translates to how many peers we
	// process in parallel.
	WorkerCount int

	// the number of internal writers that store the results to disk.
	WriterCount int

	// maximum number of peers to process before stopping the engine. 0 means
	// to process peers until there are no more in the work queue. If
	// [DuplicateProcessing] is true, process indefinitely.
	Limit int

	// if set to true, the engine won't keep track of which peers were already
	// processed to prevent processing a peer twice. The engine is solely driven
	// by what the driver will emit on its tasks channel.
	DuplicateProcessing bool

	// which type addresses should be dialed. Relevant for parking
	// peers during a crawl
	AddrDialType config.AddrType

	// MeterProvider is the meter provider to use when initialising metric instruments.
	MeterProvider metric.MeterProvider

	// TracerProvider is the tracer provider to use when initialising tracing
	TracerProvider trace.TracerProvider
}

The EngineConfig object configures the core Nebula Engine below.

func DefaultEngineConfig

func DefaultEngineConfig() *EngineConfig

DefaultEngineConfig returns a default engine configuration that can and should be adjusted for different networks.

func (*EngineConfig) Validate

func (cfg *EngineConfig) Validate() error

Validate verifies the engine configuration's invariants.

type Handler

type Handler[I PeerInfo[I], R WorkResult[I]] interface {
	// HandlePeerResult is called when the worker that has processed a peer
	// has emitted a new processing result. This can be a [CrawlResult] or
	// [DialResult] at the moment.
	HandlePeerResult(context.Context, Result[R]) []I

	// HandleWriteResult is called when the writer has written a [CrawlResult]
	// or [DialResult] to disk.
	HandleWriteResult(context.Context, Result[WriteResult])
}

Handler defines the interface that the engine will call every time it has received a result from any of its workers.

type PeerInfo

type PeerInfo[T any] interface {
	// ID should return the peer's/node's identifier mapped into a libp2p peer.ID.
	ID() peer.ID

	// Addrs should return all addresses that this peer is reachable at in multi address format.
	Addrs() []multiaddr.Multiaddr

	// Merge takes another peer info and merges it information into the callee
	// peer info struct. The implementation of Merge may panic if the peer IDs
	// don't match.
	Merge(other T) T
}

PeerInfo is the interface that any peer information struct must conform to.

type Pool

type Pool[T any, R any] struct {
	// contains filtered or unexported fields
}

A Pool manages a set of Workers

func NewPool

func NewPool[T any, R any](workers ...Worker[T, R]) *Pool[T, R]

func (*Pool[T, R]) Size

func (w *Pool[T, R]) Size() int

func (*Pool[T, R]) Start

func (w *Pool[T, R]) Start(ctx context.Context, tasks <-chan T) <-chan Result[R]

Start takes a channel on which tasks will be scheduled. It is guaranteed that the Pool reads from this channel as fast as possible. To stop the worker pool you need to close the channel. To wait until all Workers have finished, wait until the results channel returned from this method was closed as well.

type PriorityQueue

type PriorityQueue[T any] struct {
	// contains filtered or unexported fields
}

PriorityQueue is can take unique items and pops them according to their priority.

func NewPriorityQueue

func NewPriorityQueue[T any]() *PriorityQueue[T]

func (*PriorityQueue[T]) All

func (tq *PriorityQueue[T]) All() map[string]T

func (*PriorityQueue[T]) Drop

func (tq *PriorityQueue[T]) Drop(key string) bool

func (*PriorityQueue[T]) Find

func (tq *PriorityQueue[T]) Find(key string) (T, bool)

func (*PriorityQueue[T]) Len

func (tq *PriorityQueue[T]) Len() int

func (*PriorityQueue[T]) Peek

func (tq *PriorityQueue[T]) Peek() (T, bool)

func (*PriorityQueue[T]) Pop

func (tq *PriorityQueue[T]) Pop() (T, bool)

func (*PriorityQueue[T]) Push

func (tq *PriorityQueue[T]) Push(key string, value T, priority int) bool

func (*PriorityQueue[T]) Update

func (tq *PriorityQueue[T]) Update(key string, value T, priority int) bool

Update modifies the priority and value of an item in the queue.

func (*PriorityQueue[T]) UpdatePriority

func (tq *PriorityQueue[T]) UpdatePriority(key string, priority int) bool

UpdatePriority modifies the priority of an item in the queue.

type Result

type Result[R any] struct {
	Value R
	Error error
}

Result is a generic result object. It captures a generic value or any error that might have occurred when producing this result.

type RoutingTable

type RoutingTable[I PeerInfo[I]] struct {
	// PeerID is the peer whose neighbors (routing table entries) are in the array below.
	PeerID peer.ID
	// The peers that are in the routing table of the above peer
	Neighbors []I
	// First error that has occurred during crawling that peer
	Error error
	// Little Endian representation of at which CPLs errors occurred during
	// neighbors fetches.
	// errorBits tracks at which CPL errors have occurred.
	// 0000 0000 0000 0000 - No error
	// 0000 0000 0000 0001 - An error has occurred at CPL 0
	// 1000 0000 0000 0001 - An error has occurred at CPL 0 and 15
	ErrorBits uint16
}

RoutingTable captures the routing table information and crawl error of a particular peer

type WorkResult

type WorkResult[I PeerInfo[I]] interface {
	// PeerInfo returns information of the peer that was processed
	PeerInfo() I

	// LogEntry returns logging information that can be used by the engine
	LogEntry() *log.Entry

	// IsSuccess indicates whether this WorkResult is considered a success
	IsSuccess() bool
}

WorkResult must be implemented by the result that a Worker which processes peers returns.

type Worker

type Worker[T any, R any] interface {
	// Work instructs the Worker to process the task and produce a result.
	Work(ctx context.Context, task T) (R, error)
}

A Worker processes tasks of type T and returns results of type R or an error. Workers are used to process a single peer or store a crawl result to the database. It is the unit of concurrency in this system.

type WriteResult

type WriteResult struct {
	*db.InsertVisitResult
	WriterID string
	PeerID   peer.ID
	Duration time.Duration
	Error    error
}

WriteResult must be returned by write workers.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL