userspacelin

package module
v0.0.0-...-c7476d4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 19, 2024 License: Apache-2.0 Imports: 37 Imported by: 1

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrMissingServiceEntry = errors.New("missing service entry")
	ErrMissingEndpoints    = errors.New("missing endpoints")
)
View Source
var EndpointDialTimeouts = []time.Duration{250 * time.Millisecond, 500 * time.Millisecond, 1 * time.Second, 2 * time.Second}

How long we wait for a connection to a backend in seconds

View Source
var (
	// ErrProxyOnLocalhost is returned by NewProxier if the user requests a proxier on
	// the loopback address. May be checked for by callers of NewProxier to know whether
	// the caller provided invalid input.
	ErrProxyOnLocalhost = fmt.Errorf("cannot proxy on localhost")
)

Functions

func CleanupLeftovers

func CleanupLeftovers(ipt iptablesutil.Interface) (encounteredError bool)

CleanupLeftovers removes all iptables rules and chains created by the Proxier It returns true if an error was encountered. Errors are logged.

func GetLocalAddrSet

func GetLocalAddrSet() utilnet.IPSet

GetLocalAddrSet return a local IPSet. If failed to get local addr, will assume no local ips.

func GetLocalAddrs

func GetLocalAddrs() ([]net.IP, error)

GetLocalAddrs returns a list of all network addresses on the local system

func ProxyTCP

func ProxyTCP(in, out *net.TCPConn)

ProxyTCP proxies data bi-directionally between in and out.

func ShouldSkipService

func ShouldSkipService(service *localv1.Service) bool

ShouldSkipService checks if a given service should skip proxying

func ShuffleStrings

func ShuffleStrings(s []string) []string

ShuffleStrings copies strings from the specified slice into a copy in random order. It returns a new slice.

func ToCIDR

func ToCIDR(ip net.IP) string

ToCIDR returns a host address of the form <ip-address>/32 for IPv4 and <ip-address>/128 for IPv6

func TryConnectEndpoints

func TryConnectEndpoints(service iptables.ServicePortName, srcAddr net.Addr, protocol string, loadBalancer LoadBalancer) (out net.Conn, err error)

TryConnectEndpoints attempts to connect to the next available endpoint for the given service, cycling through until it is able to successfully connect, or it has tried with all timeouts in EndpointDialTimeouts.

Types

type Backend

type Backend struct {
	localsink.Config
	// contains filtered or unexported fields
}

func New

func New() *Backend

func (*Backend) BindFlags

func (s *Backend) BindFlags(flags *pflag.FlagSet)

func (*Backend) DeleteEndpoint

func (s *Backend) DeleteEndpoint(namespace, serviceName, epKey string)

func (*Backend) DeleteService

func (s *Backend) DeleteService(namespace, name string)

func (*Backend) Reset

func (s *Backend) Reset()

func (*Backend) SetEndpoint

func (s *Backend) SetEndpoint(namespace, serviceName, epKey string, endpoint *localv1.Endpoint)

name of the endpoint is the same as the service name

func (*Backend) SetService

func (s *Backend) SetService(svc *localv1.Service)

func (*Backend) Setup

func (s *Backend) Setup()

func (*Backend) Sink

func (s *Backend) Sink() localsink.Sink

func (*Backend) Sync

func (s *Backend) Sync()

type BoundedFrequencyRunner

type BoundedFrequencyRunner struct {
	// contains filtered or unexported fields
}

BoundedFrequencyRunner manages runs of a user-provided function. See NewBoundedFrequencyRunner for examples.

func (*BoundedFrequencyRunner) Loop

func (bfr *BoundedFrequencyRunner) Loop(stop <-chan struct{})

Loop handles the periodic timer and run requests. This is expected to be called as a goroutine.

func (*BoundedFrequencyRunner) RetryAfter

func (bfr *BoundedFrequencyRunner) RetryAfter(interval time.Duration)

RetryAfter ensures that the function will run again after no later than interval. This can be called from inside a run of the BoundedFrequencyRunner's function, or asynchronously.

func (*BoundedFrequencyRunner) Run

func (bfr *BoundedFrequencyRunner) Run()

Run the function as soon as possible. If this is called while Loop is not running, the call may be deferred indefinitely. If there is already a queued request to call the underlying function, it may be dropped - it is just guaranteed that we will try calling the underlying function as soon as possible starting from now.

type ClientCache

type ClientCache struct {
	Mu      sync.Mutex
	Clients map[string]net.Conn // addr string -> connection
}

Holds all the known UDP clients that have not timed out.

type Interface

type Interface interface {
	// EnsureChain checks if the specified chain exists and, if not, creates it.  If the chain existed, return true.
	EnsureChain(table iptables.Table, chain iptables.Chain) (bool, error)
	// FlushChain clears the specified chain.  If the chain did not exist, return error.
	FlushChain(table iptables.Table, chain iptables.Chain) error
	// DeleteChain deletes the specified chain.  If the chain did not exist, return error.
	DeleteChain(table iptables.Table, chain iptables.Chain) error
	// ChainExists tests whether the specified chain exists, returning an error if it
	// does not, or if it is unable to check.
	ChainExists(table iptables.Table, chain iptables.Chain) (bool, error)
	// EnsureRule checks if the specified rule is present and, if not, creates it.  If the rule existed, return true.
	EnsureRule(position iptables.RulePosition, table iptables.Table, chain iptables.Chain, args ...string) (bool, error)
	// DeleteRule checks if the specified rule is present and, if so, deletes it.
	DeleteRule(table iptables.Table, chain iptables.Chain, args ...string) error
	// IsIPv6 returns true if this is managing ipv6 tables.
	IsIPv6() bool
	// Protocol returns the IP family this instance is managing,
	Protocol() iptables.Protocol
	// SaveInto calls `iptables-save` for table and stores result in a given buffer.
	SaveInto(table iptables.Table, buffer *bytes.Buffer) error
	// Restore runs `iptables-restore` passing data through []byte.
	// table is the Table to restore
	// data should be formatted like the output of SaveInto()
	// flush sets the presence of the "--noflush" flag. see: FlushFlag
	// counters sets the "--counters" flag. see: RestoreCountersFlag
	Restore(table iptables.Table, data []byte, flush iptables.FlushFlag, counters iptables.RestoreCountersFlag) error
	// RestoreAll is the same as Restore except that no table is specified.
	RestoreAll(data []byte, flush iptables.FlushFlag, counters iptables.RestoreCountersFlag) error
	// Monitor detects when the given iptables tables have been flushed by an external
	// tool (e.g. a firewall reload) by creating canary chains and polling to see if
	// they have been deleted. (Specifically, it polls tables[0] every interval until
	// the canary has been deleted from there, then waits a short additional time for
	// the canaries to be deleted from the remaining tables as well. You can optimize
	// the polling by listing a relatively empty table in tables[0]). When a flush is
	// detected, this calls the reloadFunc so the caller can reload their own iptables
	// rules. If it is unable to create the canary chains (either initially or after
	// a reload) it will log an error and stop monitoring.
	// (This function should be called from a goroutine.)
	Monitor(canary iptables.Chain, tables []iptables.Table, reloadFunc func(), interval time.Duration, stopCh <-chan struct{})
	// HasRandomFully reveals whether `-j MASQUERADE` takes the
	// `--random-fully` option.  This is helpful to work around a
	// Linux kernel bug that sometimes causes multiple flows to get
	// mapped to the same IP:PORT and consequently some suffer packet
	// drops.
	HasRandomFully() bool
}

Interface is an injectable interface for running iptables commands. Implementations must be goroutine-safe.

type LoadBalancer

type LoadBalancer interface {
	// NextEndpoint returns the endpoint to handle a request for the given
	// service-port and source address.
	NextEndpoint(service iptables.ServicePortName, srcAddr net.Addr, sessionAffinityReset bool) (string, error)
	NewService(service iptables.ServicePortName, affinityClientIP *localv1.ClientIPAffinity, stickyMaxAgeSeconds int) error
	DeleteService(service iptables.ServicePortName)
	CleanupStaleStickySessions(service iptables.ServicePortName)
	ServiceHasEndpoints(service iptables.ServicePortName) bool

	// For userspace because we dont have an EndpointChangeTracker which can auto lookup services behind the scenes,
	// we need to send this explicitly.
	OnEndpointsAdd(ep *localv1.Endpoint, svc *localv1.Service)
	OnEndpointsDelete(ep *localv1.Endpoint, svc *localv1.Service)
	OnEndpointsSynced()
}

LoadBalancer is an interface for distributing incoming requests to service endpoints.

type LoadBalancerRR

type LoadBalancerRR struct {
	// contains filtered or unexported fields
}

LoadBalancerRR is a round-robin load balancer.

func NewLoadBalancerRR

func NewLoadBalancerRR() *LoadBalancerRR

NewLoadBalancerRR returns a new LoadBalancerRR.

func (*LoadBalancerRR) CleanupStaleStickySessions

func (lb *LoadBalancerRR) CleanupStaleStickySessions(svcPort iptables.ServicePortName)

func (*LoadBalancerRR) DeleteService

func (lb *LoadBalancerRR) DeleteService(svcPort iptables.ServicePortName)

func (*LoadBalancerRR) NewService

func (lb *LoadBalancerRR) NewService(svcPort iptables.ServicePortName, affinityType *localv1.ClientIPAffinity, ttlSeconds int) error

func (*LoadBalancerRR) NextEndpoint

func (lb *LoadBalancerRR) NextEndpoint(svcPort iptables.ServicePortName, srcAddr net.Addr, sessionAffinityReset bool) (string, error)

NextEndpoint returns a service endpoint. The service endpoint is chosen using the round-robin algorithm.

func (*LoadBalancerRR) OnEndpointsAdd

func (lb *LoadBalancerRR) OnEndpointsAdd(ep *localv1.Endpoint, svc *localv1.Service)

func (*LoadBalancerRR) OnEndpointsDelete

func (lb *LoadBalancerRR) OnEndpointsDelete(ep *localv1.Endpoint, svc *localv1.Service)

func (*LoadBalancerRR) OnEndpointsSynced

func (lb *LoadBalancerRR) OnEndpointsSynced()

func (*LoadBalancerRR) ServiceHasEndpoints

func (lb *LoadBalancerRR) ServiceHasEndpoints(svcPort iptables.ServicePortName) bool

ServiceHasEndpoints checks whether a service entry has endpoints.

type PortAllocator

type PortAllocator interface {
	AllocateNext() (int, error)
	Release(int)
}

type ProxySocket

type ProxySocket interface {
	// Addr gets the net.Addr for a ProxySocket.
	Addr() net.Addr
	// Close stops the ProxySocket from accepting incoming connections.
	// Each implementation should comment on the impact of calling Close
	// while sessions are active.
	Close() error
	// ProxyLoop proxies incoming connections for the specified service to the service endpoints.
	ProxyLoop(service iptables.ServicePortName, info *ServiceInfo, loadBalancer LoadBalancer)
	// ListenPort returns the host port that the ProxySocket is listening on
	ListenPort() int
}

Abstraction over TCP/UDP sockets which are proxied.

type ProxySocketFunc

type ProxySocketFunc func(protocol localv1.Protocol, ip net.IP, port int) (ProxySocket, error)

ProxySocketFunc is a function which constructs a ProxySocket from a protocol, ip, and port

type ServiceInfo

type ServiceInfo struct {
	// Timeout is the read/write timeout (used for UDP connections)
	Timeout time.Duration
	// ActiveClients is the cache of active UDP clients being proxied by this proxy for this service
	ActiveClients *ClientCache
	// contains filtered or unexported fields
}

ServiceInfo contains information and state for a particular proxied service

func (*ServiceInfo) IsAlive

func (info *ServiceInfo) IsAlive() bool

func (*ServiceInfo) IsFinished

func (info *ServiceInfo) IsFinished() bool

func (*ServiceInfo) IsStarted

func (info *ServiceInfo) IsStarted() bool

type ServicePortName

type ServicePortName struct {
	types.NamespacedName
	Port     string
	Protocol localv1.Protocol
	PortName string // FYI Jay added this, because we needed it for the BuildPortsToEndpointsMap function by KPNG
}

ServicePortName carries a namespace + name + portname. This is the unique identifier for a load-balanced service.

type UserspaceLinux

type UserspaceLinux struct {
	// contains filtered or unexported fields
}

Proxier is a simple proxy for TCP connections between a localhost:lport and services that provide the actual implementations.

func NewCustomProxier

func NewCustomProxier(loadBalancer LoadBalancer, listenIP net.IP, iptables iptablesutil.Interface, exec utilexec.Interface, pr utilnet.PortRange, syncPeriod, minSyncPeriod, udpIdleTimeout time.Duration, makeProxySocket ProxySocketFunc) (*UserspaceLinux, error)

NewCustomProxier functions similarly to NewProxier, returning a new Proxier for the given LoadBalancer and address. The new proxier is constructed using the ProxySocket constructor provided, however, instead of constructing the default ProxySockets.

func NewUserspaceLinux

func NewUserspaceLinux(loadBalancer LoadBalancer, listenIP net.IP, iptables iptablesutil.Interface, exec utilexec.Interface, pr utilnet.PortRange, syncPeriod, minSyncPeriod, udpIdleTimeout time.Duration) (*UserspaceLinux, error)

func (*UserspaceLinux) OnEndpointsAdd

func (proxier *UserspaceLinux) OnEndpointsAdd(ep *localv1.Endpoint, svc *localv1.Service)

OnEndpointsAdd is called whenever creation of new endpoints object is observed.

func (*UserspaceLinux) OnEndpointsDelete

func (proxier *UserspaceLinux) OnEndpointsDelete(ep *localv1.Endpoint, svc *localv1.Service)

OnEndpointsDelete is called whenever deletion of an existing endpoints object is observed.

func (*UserspaceLinux) OnEndpointsSynced

func (proxier *UserspaceLinux) OnEndpointsSynced()

OnEndpointsSynced is called once all the initial event handlers were called and the state is fully propagated to local cache.

func (*UserspaceLinux) OnEndpointsUpdate

func (proxier *UserspaceLinux) OnEndpointsUpdate(oldEndpoints, endpoints *localv1.Endpoint)

OnEndpointsUpdate is called whenever modification of an existing endpoints object is observed.

func (*UserspaceLinux) OnServiceAdd

func (proxier *UserspaceLinux) OnServiceAdd(service *localv1.Service)

OnServiceAdd is called whenever creation of new service object is observed.

func (*UserspaceLinux) OnServiceDelete

func (proxier *UserspaceLinux) OnServiceDelete(service *localv1.Service)

OnServiceDelete is called whenever deletion of an existing service object is observed.

func (*UserspaceLinux) OnServiceSynced

func (proxier *UserspaceLinux) OnServiceSynced()

OnServiceSynced is called once all the initial event handlers were called and the state is fully propagated to local cache.

func (*UserspaceLinux) OnServiceUpdate

func (proxier *UserspaceLinux) OnServiceUpdate(oldService, service *localv1.Service)

OnServiceUpdate is called whenever modification of an existing service object is observed.

func (*UserspaceLinux) Sync

func (proxier *UserspaceLinux) Sync()

Sync is called to synchronize the proxier state to iptables as soon as possible.

func (*UserspaceLinux) SyncLoop

func (proxier *UserspaceLinux) SyncLoop()

SyncLoop runs periodic work. This is expected to run as a goroutine or as the main loop of the app. It does not return.

type UserspaceServiceChangeTracker

type UserspaceServiceChangeTracker struct {
	// contains filtered or unexported fields
}

func (*UserspaceServiceChangeTracker) Delete

func (sct *UserspaceServiceChangeTracker) Delete(namespace, name string) bool

func (*UserspaceServiceChangeTracker) Update

func (sct *UserspaceServiceChangeTracker) Update(current *localv1.Service) bool

Update updates given service's change map based on the <previous, current> service pair. It returns true if items changed, otherwise return false. Update can be used to add/update/delete items of ServiceChangeMap. For example, Add item

  • pass <nil, service> as the <previous, current> pair.

Update item

  • pass <oldService, service> as the <previous, current> pair.

Delete item

  • pass <service, nil> as the <previous, current> pair.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL