network

package
v0.29.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 19, 2023 License: AGPL-3.0 Imports: 18 Imported by: 8

Documentation

Overview

(c) 2019 Dapper Labs - ALL RIGHTS RESERVED

Index

Constants

This section is empty.

Variables

View Source
var (
	EmptyTargetList = errors.New("target list empty")
)
View Source
var ErrBlobNotFound = errors.New("blobservice: key not found")

Functions

func AllPeerUnreachableError

func AllPeerUnreachableError(errs ...error) bool

AllPeerUnreachableError returns whether all errors are PeerUnreachableError

func EventId

func EventId(channel channels.Channel, payload []byte) (hash.Hash, error)

EventId computes the event ID for a given channel and payload (i.e., the hash of the payload and channel). All errors returned by this function are benign and should not cause the node to crash. It errors if the hash function fails to hash the payload and channel.

func IsPeerUnreachableError

func IsPeerUnreachableError(e error) bool

IsPeerUnreachableError returns whether the given error is PeerUnreachableError

func MessageType

func MessageType(decodedPayload interface{}) string

MessageType returns the type of the message payload.

func NewPeerUnreachableError

func NewPeerUnreachableError(err error) error

NewPeerUnreachableError creates a PeerUnreachableError instance with an error

Types

type Adapter

type Adapter interface {
	// UnicastOnChannel sends the message in a reliable way to the given recipient.
	UnicastOnChannel(channels.Channel, interface{}, flow.Identifier) error

	// PublishOnChannel sends the message in an unreliable way to all the given recipients.
	PublishOnChannel(channels.Channel, interface{}, ...flow.Identifier) error

	// MulticastOnChannel unreliably sends the specified event over the channel to randomly selected number of recipients
	// selected from the specified targetIDs.
	MulticastOnChannel(channels.Channel, interface{}, uint, ...flow.Identifier) error

	// UnRegisterChannel unregisters the engine for the specified channel. The engine will no longer be able to send or
	// receive messages from that channel.
	UnRegisterChannel(channel channels.Channel) error
}

Adapter is a wrapper around the Network implementation. It only exposes message dissemination functionalities. Adapter is meant to be utilized by the Conduit interface to send messages to the Network layer to be delivered to the remote targets.

type BasicResolver

type BasicResolver interface {
	LookupIPAddr(context.Context, string) ([]net.IPAddr, error)
	LookupTXT(context.Context, string) ([]string, error)
}

BasicResolver is a low level interface for DNS resolution Note: this is the resolver interface that libp2p expects. We keep a copy of it here for mock generation. https://github.com/multiformats/go-multiaddr-dns/blob/master/resolve.go

type BlobGetter

type BlobGetter interface {
	// GetBlob gets the requested blob.
	GetBlob(ctx context.Context, c cid.Cid) (blobs.Blob, error)

	// GetBlobs does a batch request for the given cids, returning blobs as
	// they are found, in no particular order.
	//
	// It may not be able to find all requested blobs (or the context may
	// be canceled). In that case, it will close the channel early. It is up
	// to the consumer to detect this situation and keep track which blobs
	// it has received and which it hasn't.
	GetBlobs(ctx context.Context, ks []cid.Cid) <-chan blobs.Blob
}

BlobGetter is the common interface shared between blobservice sessions and the blobservice.

type BlobService

type BlobService interface {
	component.Component
	BlobGetter

	// AddBlob puts a given blob to the underlying datastore
	AddBlob(ctx context.Context, b blobs.Blob) error

	// AddBlobs adds a slice of blobs at the same time using batching
	// capabilities of the underlying datastore whenever possible.
	AddBlobs(ctx context.Context, bs []blobs.Blob) error

	// DeleteBlob deletes the given blob from the blobservice.
	DeleteBlob(ctx context.Context, c cid.Cid) error

	// GetSession creates a new session that allows for controlled exchange of wantlists to decrease the bandwidth overhead.
	GetSession(ctx context.Context) BlobGetter

	// TriggerReprovide updates the BlobService's provider entries in the DHT
	TriggerReprovide(ctx context.Context) error
}

BlobService is a hybrid blob datastore. It stores data in a local datastore and may retrieve data from a remote Exchange. It uses an internal `datastore.Datastore` instance to store values.

type BlobServiceOption

type BlobServiceOption func(BlobService)

type Codec

type Codec interface {
	NewEncoder(w io.Writer) Encoder
	NewDecoder(r io.Reader) Decoder
	Encode(v interface{}) ([]byte, error)

	// Decode decodes a message.
	// Expected error returns during normal operations:
	//  - codec.ErrInvalidEncoding if message encoding is invalid.
	//  - codec.ErrUnknownMsgCode if message code byte does not match any of the configured message codes.
	//  - codec.ErrMsgUnmarshal if the codec fails to unmarshal the data to the message type denoted by the message code.
	Decode(data []byte) (interface{}, error)
}

Codec provides factory functions for encoders and decoders.

type Compressor

type Compressor interface {
	NewReader(io.Reader) (io.ReadCloser, error)
	NewWriter(io.Writer) (WriteCloseFlusher, error)
}

Compressor offers compressing and decompressing services for sending and receiving a byte slice at network layer.

type Conduit

type Conduit interface {

	// Publish submits an event to the network layer for unreliable delivery
	// to subscribers of the given event on the network layer. It uses a
	// publish-subscribe layer and can thus not guarantee that the specified
	// recipients received the event.
	// The event is published on the channels of this Conduit and will be received
	// by the nodes specified as part of the targetIDs.
	// TODO: function errors must be documented.
	Publish(event interface{}, targetIDs ...flow.Identifier) error

	// Unicast sends the event in a reliable way to the given recipient.
	// It uses 1-1 direct messaging over the underlying network to deliver the event.
	// It returns an error if the unicast fails.
	// TODO: function errors must be documented.
	Unicast(event interface{}, targetID flow.Identifier) error

	// Multicast unreliably sends the specified event over the channel
	// to the specified number of recipients selected from the specified subset.
	// The recipients are selected randomly from the targetIDs.
	// TODO: function errors must be documented.
	Multicast(event interface{}, num uint, targetIDs ...flow.Identifier) error

	// Close unsubscribes from the channels of this conduit. After calling close,
	// the conduit can no longer be used to send a message.
	Close() error
}

Conduit represents the interface for engines to communicate over the peer-to-peer network. Upon registration with the network, each engine is assigned a conduit, which it can use to communicate across the network in a network-agnostic way. In the background, the network layer connects all engines with the same ID over a shared bus, accessible through the conduit.

type ConduitFactory

type ConduitFactory interface {
	// RegisterAdapter sets the Adapter component of the factory.
	// The Adapter is a wrapper around the Network layer that only exposes the set of methods
	// that are needed by a conduit.
	RegisterAdapter(Adapter) error

	// NewConduit creates a conduit on the specified channel.
	// Prior to creating any conduit, the factory requires an Adapter to be registered with it.
	NewConduit(context.Context, channels.Channel) (Conduit, error)
}

ConduitFactory is an interface type that is utilized by the Network to create conduits for the channels.

type Connection

type Connection interface {
	Send(msg interface{}) error
	Receive() (interface{}, error)
}

Connection represents an interface to read from & write to a connection.

type Decoder

type Decoder interface {
	Decode() (interface{}, error)
}

Decoder decodes from the underlying reader into the given message. Expected error returns during normal operations:

  • codec.ErrInvalidEncoding if message encoding is invalid.
  • codec.ErrUnknownMsgCode if message code byte does not match any of the configured message codes.
  • codec.ErrMsgUnmarshal if the codec fails to unmarshal the data to the message type denoted by the message code.

type Encoder

type Encoder interface {
	Encode(v interface{}) error
}

Encoder encodes the given message into the underlying writer.

type Engine

type Engine interface {
	module.ReadyDoneAware

	// SubmitLocal submits an event originating on the local node.
	// Deprecated: To asynchronously communicate a local message between components:
	// * Define a message queue on the component receiving the message
	// * Define a function (with a concrete argument type) on the component receiving
	//   the message, which adds the message to the message queue
	SubmitLocal(event interface{})

	// Submit submits the given event from the node with the given origin ID
	// for processing in a non-blocking manner. It returns instantly and logs
	// a potential processing error internally when done.
	// Deprecated: Only applicable for use by the networking layer, which should use MessageProcessor instead
	Submit(channel channels.Channel, originID flow.Identifier, event interface{})

	// ProcessLocal processes an event originating on the local node.
	// Deprecated: To synchronously process a local message:
	// * Define a function (with a concrete argument type) on the component receiving
	//   the message, which blocks until the message is processed
	ProcessLocal(event interface{}) error

	// Process processes the given event from the node with the given origin ID
	// in a blocking manner. It returns the potential processing error when
	// done.
	// Deprecated: Only applicable for use by the networking layer, which should use MessageProcessor instead
	Process(channel channels.Channel, originID flow.Identifier, event interface{}) error
}

Engine represents an isolated process running across the peer-to-peer network as part of the node business logic. It provides the network layer with the necessary interface to forward events to engines for processing. Deprecated: Use MessageProcessor instead

type IncomingMessageScope

type IncomingMessageScope struct {
	// contains filtered or unexported fields
}

IncomingMessageScope captures the context around an incoming message that is received by the network layer.

func NewIncomingScope

func NewIncomingScope(originId flow.Identifier, protocol ProtocolType, msg *message.Message, decodedPayload interface{}) (*IncomingMessageScope, error)

NewIncomingScope creates a new incoming message scope. All errors returned by this function are benign and should not cause the node to crash, especially that it is not safe to crash the node when receiving a message. It errors if event id (i.e., hash of the payload and channel) cannot be computed, or if it fails to convert the target IDs from bytes slice to a flow.IdentifierList.

func (IncomingMessageScope) Channel

func (IncomingMessageScope) DecodedPayload

func (m IncomingMessageScope) DecodedPayload() interface{}

func (IncomingMessageScope) EventID

func (m IncomingMessageScope) EventID() []byte

func (IncomingMessageScope) OriginId

func (m IncomingMessageScope) OriginId() flow.Identifier

func (IncomingMessageScope) PayloadType

func (m IncomingMessageScope) PayloadType() string

func (IncomingMessageScope) Proto

func (IncomingMessageScope) Protocol

func (m IncomingMessageScope) Protocol() ProtocolType

func (IncomingMessageScope) Size

func (m IncomingMessageScope) Size() int

func (IncomingMessageScope) TargetIDs

type MessageProcessor

type MessageProcessor interface {
	Process(channel channels.Channel, originID flow.Identifier, message interface{}) error
}

MessageProcessor represents a component which receives messages from the networking layer. Since these messages come from other nodes, which may be Byzantine, implementations must expect and handle arbitrary message inputs (including invalid message types, malformed messages, etc.). Because of this, node-internal messages should NEVER be submitted to a component using Process.

type MessageQueue

type MessageQueue interface {
	// Insert inserts the message in queue
	Insert(message interface{}) error
	// Remove removes the message from the queue in priority order. If no message is found, this call blocks.
	// If two messages have the same priority, items are de-queued in insertion order
	Remove() interface{}
	// Len gives the current length of the queue
	Len() int
}

MessageQueue is the interface of the inbound message queue

type MessageValidator

type MessageValidator interface {
	// Validate validates the message and returns true if the message is to be retained and false if it needs to be dropped
	Validate(msg IncomingMessageScope) bool
}

MessageValidator validates the incoming message. Message validation happens in the middleware right before it is delivered to the network.

type Middleware

type Middleware interface {
	component.Component

	// SetOverlay sets the overlay used by the middleware. This must be called before the middleware can be Started.
	SetOverlay(Overlay)

	// SendDirect sends msg on a 1-1 direct connection to the target ID. It models a guaranteed delivery asynchronous
	// direct one-to-one connection on the underlying network. No intermediate node on the overlay is utilized
	// as the router.
	//
	// Dispatch should be used whenever guaranteed delivery to a specific target is required. Otherwise, Publish is
	// a more efficient candidate.
	// All errors returned from this function can be considered benign.
	SendDirect(msg *OutgoingMessageScope) error

	// Publish publishes a message on the channel. It models a distributed broadcast where the message is meant for all or
	// a many nodes subscribing to the channel. It does not guarantee the delivery though, and operates on a best
	// effort.
	// All errors returned from this function can be considered benign.
	Publish(msg *OutgoingMessageScope) error

	// Subscribe subscribes the middleware to a channel.
	// No errors are expected during normal operation.
	Subscribe(channel channels.Channel) error

	// Unsubscribe unsubscribes the middleware from a channel.
	// All errors returned from this function can be considered benign.
	Unsubscribe(channel channels.Channel) error

	// UpdateNodeAddresses fetches and updates the addresses of all the authorized participants
	// in the Flow protocol.
	UpdateNodeAddresses()

	// NewBlobService creates a new BlobService for the given channel.
	NewBlobService(channel channels.Channel, store datastore.Batching, opts ...BlobServiceOption) BlobService

	// NewPingService creates a new PingService for the given ping protocol ID.
	NewPingService(pingProtocol protocol.ID, provider PingInfoProvider) PingService

	IsConnected(nodeID flow.Identifier) (bool, error)
}

Middleware represents the middleware layer, which manages the connections to our direct neighbours on the network. It handles the creation & teardown of connections, as well as reading & writing to/from the connections.

type Network

type Network interface {
	component.Component
	// Register will subscribe to the channel with the given engine and
	// the engine will be notified with incoming messages on the channel.
	// The returned Conduit can be used to send messages to engines on other nodes subscribed to the same channel
	// On a single node, only one engine can be subscribed to a channel at any given time.
	Register(channel channels.Channel, messageProcessor MessageProcessor) (Conduit, error)

	// RegisterBlobService registers a BlobService on the given channel, using the given datastore to retrieve values.
	// The returned BlobService can be used to request blocks from the network.
	// TODO: We should return a function that can be called to unregister / close the BlobService
	RegisterBlobService(channel channels.Channel, store datastore.Batching, opts ...BlobServiceOption) (BlobService, error)

	// RegisterPingService registers a ping protocol handler for the given protocol ID
	RegisterPingService(pingProtocolID protocol.ID, pingInfoProvider PingInfoProvider) (PingService, error)
}

Network represents the network layer of the node. It allows processes that work across the peer-to-peer network to register themselves as an engine with a unique engine ID. The returned conduit allows the process to communicate to the same engine on other nodes across the network in a network-agnostic way.

type OutgoingMessageScope

type OutgoingMessageScope struct {
	// contains filtered or unexported fields
}

OutgoingMessageScope captures the context around an outgoing message that is about to be sent.

func NewOutgoingScope

func NewOutgoingScope(
	targetIds flow.IdentifierList,
	channelId channels.Channel,
	payload interface{},
	encoder func(interface{}) ([]byte, error),
	protocolType ProtocolType) (*OutgoingMessageScope, error)

NewOutgoingScope creates a new outgoing message scope. All errors returned by this function are benign and should not cause the node to crash. It errors if the encoder fails to encode the payload into a protobuf message, or if the number of target IDs does not match the protocol type (i.e., unicast messages should have exactly one target ID, while pubsub messages should have at least one target ID).

func (OutgoingMessageScope) Channel

func (OutgoingMessageScope) PayloadType

func (o OutgoingMessageScope) PayloadType() string

func (OutgoingMessageScope) Proto

func (OutgoingMessageScope) Size

func (o OutgoingMessageScope) Size() int

func (OutgoingMessageScope) TargetIds

type Overlay

type Overlay interface {
	// Topology returns an identity list of nodes which this node should be directly connected to as peers
	Topology() flow.IdentityList

	// Identities returns a list of all Flow identities on the network
	Identities() flow.IdentityList

	// Identity returns the Identity associated with the given peer ID, if it exists
	Identity(peer.ID) (*flow.Identity, bool)

	Receive(*IncomingMessageScope) error
}

Overlay represents the interface that middleware uses to interact with the overlay network layer.

type PeerUnreachableError

type PeerUnreachableError struct {
	Err error
}

PeerUnreachableError is the error when submitting events to target fails due to the target peer is unreachable

func (PeerUnreachableError) Error

func (e PeerUnreachableError) Error() string

func (PeerUnreachableError) Unwrap

func (e PeerUnreachableError) Unwrap() error

Unwrap returns the wrapped error value

type PingInfoProvider

type PingInfoProvider interface {
	SoftwareVersion() string
	SealedBlockHeight() uint64
	HotstuffView() uint64
}

PingInfoProvider is the interface used by the PingService to respond to incoming PingRequest with a PingResponse populated with the necessary details

type PingService

type PingService interface {
	Ping(ctx context.Context, peerID peer.ID) (message.PingResponse, time.Duration, error)
}

type ProtocolType

type ProtocolType string

ProtocolType defines the type of the protocol a message is sent over. Currently, we have two types of protocols: - unicast: a message is sent to a single node through a direct connection. - pubsub: a message is sent to a set of nodes through a pubsub channel.

const (
	// ProtocolTypeUnicast is protocol type for unicast messages.
	ProtocolTypeUnicast ProtocolType = "unicast"

	// ProtocolTypePubSub is the protocol type for pubsub messages.
	ProtocolTypePubSub ProtocolType = "pubsub"
)

func (ProtocolType) String

func (m ProtocolType) String() string

type SubscriptionManager

type SubscriptionManager interface {
	// Register registers an engine on the channel into the subscription manager.
	Register(channel channels.Channel, engine MessageProcessor) error

	// Unregister removes the engine associated with a channel.
	Unregister(channel channels.Channel) error

	// GetEngine returns engine associated with a channel.
	GetEngine(channel channels.Channel) (MessageProcessor, error)

	// Channels returns all the channels registered in this subscription manager.
	Channels() channels.ChannelList
}

type Topology

type Topology interface {
	// Fanout receives IdentityList of entire network and constructs the fanout IdentityList of the node.
	// A node directly communicates with its fanout IdentityList on epidemic dissemination
	// of the messages (i.e., publish and multicast).
	//
	// Fanout is not concurrency safe. It is responsibility of caller to lock for it (if needed).
	Fanout(ids flow.IdentityList) flow.IdentityList
}

Topology provides a subset of nodes which a given node should directly connect in order to form a connected mesh for epidemic message dissemination (e.g., publisher and subscriber model).

type WriteCloseFlusher

type WriteCloseFlusher interface {
	io.WriteCloser
	Flush() error
}

Directories

Path Synopsis
internal
Package mocknetwork is a generated GoMock package.
Package mocknetwork is a generated GoMock package.
p2p
dht
dns
p2pnode
Package p2pnode encapsulates the libp2p library
Package p2pnode encapsulates the libp2p library

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL