xds

package
v0.0.0-...-6c23f14 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 11, 2023 License: Apache-2.0 Imports: 25 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type EndpointAddress

type EndpointAddress struct {
	// contains filtered or unexported fields
}

EndpointAddress represents a socket ipAddress, with an IP address (e.g., "0.0.0.0" or "[::]") and a port.

type FixedHash

type FixedHash struct{}

FixedHash uses a fixed value as the node hash.

func (FixedHash) ID

func (FixedHash) ID(_ *corev3.Node) string

type GRPCApplication

type GRPCApplication struct {
	Name       string
	PathPrefix string
	Port       uint32
	Endpoints  []GRPCApplicationEndpoints
}

func NewGRPCApplication

func NewGRPCApplication(name string, port uint32, endpoints []GRPCApplicationEndpoints) GRPCApplication

type GRPCApplicationCache

type GRPCApplicationCache struct {
	// contains filtered or unexported fields
}

func NewGRPCApplicationCache

func NewGRPCApplicationCache() *GRPCApplicationCache

func (*GRPCApplicationCache) Get

func (*GRPCApplicationCache) Set

func (c *GRPCApplicationCache) Set(apps []GRPCApplication)

type GRPCApplicationEndpoints

type GRPCApplicationEndpoints struct {
	Node      string
	Zone      string
	Addresses []string
}

func NewGRPCApplicationEndpoints

func NewGRPCApplicationEndpoints(node string, zone string, addresses []string) GRPCApplicationEndpoints

type ServerListenerCache

type ServerListenerCache struct {
	// contains filtered or unexported fields
}

func NewServerListenerCache

func NewServerListenerCache() *ServerListenerCache

func (*ServerListenerCache) Add

func (c *ServerListenerCache) Add(nodeHash string, newAddresses []EndpointAddress) bool

Add returns true if at least one of the new server listener addresses did not already exist in the cache for the provided `nodeHash` cache key.

func (*ServerListenerCache) Get

func (c *ServerListenerCache) Get(nodeHash string) []EndpointAddress

type SnapshotBuilder

type SnapshotBuilder struct {
	// contains filtered or unexported fields
}

SnapshotBuilder builds xDS resource snapshots for the cache.

func NewSnapshotBuilder

func NewSnapshotBuilder() *SnapshotBuilder

NewSnapshotBuilder initializes the builder.

func (*SnapshotBuilder) AddGRPCApplications

func (b *SnapshotBuilder) AddGRPCApplications(apps []GRPCApplication) (*SnapshotBuilder, error)

AddGRPCApplications adds the provided application configurations to the xDS resource snapshot.

TODO: There can be more than one EndpointSlice for a k8s Service. Check if there's already an application with the same name and merge, instead of just blindly overwriting.

func (*SnapshotBuilder) AddServerListenerAddresses

func (b *SnapshotBuilder) AddServerListenerAddresses(addresses []EndpointAddress) *SnapshotBuilder

AddServerListenerAddresses adds server listeners and associated route configurations with the provided IP addresses and ports to the snapshot.

func (*SnapshotBuilder) AddSnapshot

func (b *SnapshotBuilder) AddSnapshot(snapshot cachev3.ResourceSnapshot) *SnapshotBuilder

AddSnapshot adds Listener, RouteConfiguration, Cluster, and ClusterLoadAssignment resources from the provided snapshot to the builder.

func (*SnapshotBuilder) Build

Build adds the server listeners and route configuration for the node hash, and then builds the snapshot.

type SnapshotCache

type SnapshotCache struct {
	// contains filtered or unexported fields
}

SnapshotCache stores snapshots of xDS resources in a delegate cache.

It handles server listener requests by intercepting Listener stream creation, see `CreateWatch()`. Server listeners addresses from these requests are kept in a map, keyed by the node hash, and with a set of addresses per node hash.

It also handles propagating snapshots to all node hashes in the cache.

func NewSnapshotCache

func NewSnapshotCache(ctx context.Context, allowPartialRequests bool, hash cachev3.NodeHash) *SnapshotCache

NewSnapshotCache creates an xDS resource cache for the provided node hash function.

If `allowPartialRequests` is true, the DiscoveryServer will respond to requests for a resource type even if some resources in the snapshot are not named in the request.

func (*SnapshotCache) CreateDeltaWatch

func (c *SnapshotCache) CreateDeltaWatch(request *cachev3.DeltaRequest, state stream.StreamState, responses chan cachev3.DeltaResponse) (cancel func())

CreateDeltaWatch just delegates, since gRPC does not support delta/incremental xDS currently. TODO: Handle request for gRPC server Listeners once gRPC implementation support delta/incremental xDS.

func (*SnapshotCache) CreateWatch

func (c *SnapshotCache) CreateWatch(request *cachev3.Request, state stream.StreamState, responses chan cachev3.Response) (cancel func())

CreateWatch intercepts stream creation before delegating, and if it is a new Listener stream, does the following:

  • Extracts addresses and ports of any server listeners in the request and adds them to the set of known server listener socket addresses for the node hash.
  • If there is no existing snapshot, or if the request contained new and previously unseen server listener addresses the node hash, creates a new snapshot for that node hash, with the server listeners and associated route configuration.

This solves (in a slightly hacky way) bootstrapping of xDS-enabled gRPC servers.

func (*SnapshotCache) Fetch

func (c *SnapshotCache) Fetch(ctx context.Context, request *cachev3.Request) (cachev3.Response, error)

func (*SnapshotCache) UpdateResources

func (c *SnapshotCache) UpdateResources(ctx context.Context, apps []GRPCApplication) error

UpdateResources creates a new snapshot for each node hash in the cache, based on the provided gRPC application configuration, with the addition of server listeners and their associated route configurations.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL