daemon

package
v0.0.20 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 18, 2021 License: MIT Imports: 21 Imported by: 0

Documentation

Index

Constants

View Source
const ReadinessGateType = "katalog-sync.wish.com/synced" // name of readiness gate

ReadinessGate

Variables

View Source
var (
	ConsulSyncSourceName  = "external-sync-source"
	ConsulSyncSourceValue = "katalog-sync"
	ConsulK8sLinkName     = "external-k8s-link"
	ConsulK8sNamespace    = "external-k8s-namespace"
	ConsulK8sPod          = "external-k8s-pod"
)
View Source
var (
	// Annotation names
	ConsulServiceNames          = "katalog-sync.wish.com/service-names"     // comma-separated list of service names
	ConsulServicePort           = "katalog-sync.wish.com/service-port"      // port to use for consul entry
	ConsulServicePortOverride   = "katalog-sync.wish.com/service-port-"     // port override to use for a specific service name
	ConsulServiceTags           = "katalog-sync.wish.com/service-tags"      // tags for the service
	ConsulServiceTagsOverride   = "katalog-sync.wish.com/service-tags-"     // tags override to use for a specific service name
	ConsulServiceMeta           = "katalog-sync.wish.com/service-meta"      // meta for the service
	ConsulServiceMetaOverride   = "katalog-sync.wish.com/service-meta-"     // meta override to use for a specific service name
	ConsulServiceHealth         = "katalog-sync.wish.com/service-health"    // health status for the service (passing/warning/critical)
	ConsulServiceHealthOverride = "katalog-sync.wish.com/service-health-"   // health status override
	SidecarName                 = "katalog-sync.wish.com/sidecar"           // Name of sidecar container, only to be set if it exists
	SyncInterval                = "katalog-sync.wish.com/sync-interval"     // How frequently we want to sync this service
	ConsulServiceCheckTTL       = "katalog-sync.wish.com/service-check-ttl" // TTL for the service checks we put in consul
	ContainerExclusion          = "katalog-sync.wish.com/container-exclude" // comma-separated list of containers to exclude from ready check
)

Functions

func ParseMap added in v0.0.5

func ParseMap(s string) map[string]string

Types

type ConsulAgent

type ConsulAgent interface {
	UpdateTTL(checkID, output, status string) error
	Services() (map[string]*consulApi.AgentService, error)
	ServiceDeregister(serviceID string) error
	ServiceRegister(service *consulApi.AgentServiceRegistration) error
}

ConsulAgent encapsulates the interface for interacting with the local agent and service API

type ConsulCatalog added in v0.0.2

type ConsulCatalog interface {
	Services() (map[string]*consulApi.AgentService, error)
}

ConsulCatalog encapsulates the interface for interacting with the Catalog API

type Daemon

type Daemon struct {
	// contains filtered or unexported fields
}

Daemon is responsible for syncing state from k8s -> consul

func NewDaemon

func NewDaemon(c DaemonConfig, k8sClient Kubelet, consulClient *consulApi.Client) *Daemon

NewDaemon is a helper function to return a new *Daemon

func (*Daemon) ConsulNodeDoUntil added in v0.0.2

func (d *Daemon) ConsulNodeDoUntil(ctx context.Context, nodeName string, opts *consulApi.QueryOptions, f consulNodeFunc) error

ConsulNodeDoUntil is a helper to wait until a change has propogated into the CatalogAPI

func (*Daemon) Deregister

Deregister handles a sidecar request for deregistration. This will block until (2) the service has been removed from the agent services API (3) the entry has been removed from the catalog API (meaning it synced to the cluster)

func (*Daemon) Register

Register handles a sidecar request for registration. This will block until (1) the pod excluding the sidecar container is ready (2) the service has been pushed to the agent services API (3) the entry shows up in the catalog API (meaning it synced to the cluster)

func (*Daemon) Run

func (d *Daemon) Run() error

TODO: refactor into a start/stop/run job (so initial sync is done on start, and the rest in background goroutine)

type DaemonConfig

type DaemonConfig struct {
	MinSyncInterval     time.Duration `long:"min-sync-interval" env:"MIN_SYNC_INTERVAL" description:"minimum duration allowed for sync" default:"500ms"`
	MaxSyncInterval     time.Duration `long:"max-sync-interval" env:"MAX_SYNC_INTERVAL" description:"maximum duration allowed for sync" default:"5s"`
	DefaultSyncInterval time.Duration `long:"default-sync-interval" env:"DEFAULT_SYNC_INTERVAL" default:"1s"`
	DefaultCheckTTL     time.Duration `long:"default-check-ttl" env:"DEFAULT_CHECK_TTL" default:"10s"`
	SyncTTLBuffer       time.Duration `` /* 143-byte string literal not displayed */
}

DaemonConfig contains the configuration options for a katalog-sync-daemon

type Kubelet

type Kubelet interface {
	GetPodList() (*k8sApi.PodList, error)
}

Kubelet encapsulates the interface for kubelet interaction

type KubeletClient

type KubeletClient struct {
	// contains filtered or unexported fields
}

KubeletClient is an HTTP client for kubelet that implements the Kubelet interface

func NewKubeletClient

func NewKubeletClient(c KubeletClientConfig) (*KubeletClient, error)

NewKubeletClient returns a new KubeletClient based on the given config

func (*KubeletClient) GetPodList

func (k *KubeletClient) GetPodList() (*k8sApi.PodList, error)

GetPodList returns the list of pods the kubelet is managing

type KubeletClientConfig

type KubeletClientConfig struct {
	APIEndpoint        string `long:"kubelet-api" env:"KUBELET_API" description:"kubelet API endpoint" default:"http://localhost:10255/pods"`
	InsecureSkipVerify bool   `` /* 146-byte string literal not displayed */
}

KubeletClientConfig holds the config options for connecting to the kubelet API

type Pod

type Pod struct {
	corev1.Pod
	*SidecarState
	// map servicename -> sync status
	SyncStatuses
	OutstandingReadinessGate bool // Do we have a ReadinessGate to set
	InitialSyncDone          bool // Ready and in consul

	CheckTTL     time.Duration
	SyncInterval time.Duration
	Ctx          context.Context
	Cancel       context.CancelFunc
	// contains filtered or unexported fields
}

Pod is our representation of a pod in k8s

func NewPod

func NewPod(pod corev1.Pod, dc *DaemonConfig) (*Pod, error)

NewPod returns a daemon pod based on a config and a k8s pod

func (*Pod) ContainerExclusion added in v0.0.9

func (p *Pod) ContainerExclusion() map[string]struct{}

ContainerExclusion returns the containers that should be excluded from a readiness check

func (*Pod) GetPort

func (p *Pod) GetPort(n string) int

GetPort returns the port for a given service for this pod This first checks the service-specific port, and falls back to the service-level port

func (*Pod) GetServiceHealth added in v0.0.18

func (p *Pod) GetServiceHealth(n string, defaultVal string) string

GetServiceHealth returns the service health specified in annotation, or defaultVal if not specified.

func (*Pod) GetServiceID

func (p *Pod) GetServiceID(serviceName string) string

GetServiceID returns an identifier that addresses this pod.

func (*Pod) GetServiceMeta added in v0.0.5

func (p *Pod) GetServiceMeta(n string) map[string]string

GetServiceMeta returns a map of metadata to be added to the ServiceMetadata

func (*Pod) GetServiceNames

func (p *Pod) GetServiceNames() []string

GetServiceNames returns the list of service names defined in the k8s annotations

func (*Pod) GetTags

func (p *Pod) GetTags(n string) []string

GetTags returns the tags for a given service for this pod This first checks the service-specific tags, and falls back to the service-level tags

func (*Pod) HandleReadinessGate added in v0.0.17

func (p *Pod) HandleReadinessGate() error

func (*Pod) HasChange

func (p *Pod) HasChange(service *consulApi.AgentService) bool

HasChange will return whether a change has been made that needs a full resync if not then a simple TTL update will suffice

func (*Pod) HasServiceName

func (p *Pod) HasServiceName(n string) bool

HasServiceName returns whether a given name is one of the annotated service names for this pod

func (*Pod) Ready

func (p *Pod) Ready() (bool, map[string]bool)

Ready checks the readiness of the containers in the pod

func (*Pod) UpdatePod

func (p *Pod) UpdatePod(k8sPod corev1.Pod)

UpdatePod updates the k8s pod

func (*Pod) WaitChanges added in v0.0.20

func (p *Pod) WaitChanges() chan struct{}

type SidecarState

type SidecarState struct {
	SidecarName string // name of the sidecar container
	Ready       bool
}

State from our sidecar service

type SyncStatus

type SyncStatus struct {
	LastUpdated time.Time
	LastError   error
}

SyncStatus encapsulates the result of the last sync attempt

func (*SyncStatus) SetError

func (s *SyncStatus) SetError(e error)

SetError sets the error and LastUpdated time for the status

type SyncStatuses

type SyncStatuses map[string]*SyncStatus

SyncStatuses is a map of SyncStatus for each service defined in a pod (serviceName -> *SyncStatus)

func (SyncStatuses) GetError

func (s SyncStatuses) GetError() error

GetError returns the first error found in the set of SyncStatuses

func (SyncStatuses) GetStatus

func (s SyncStatuses) GetStatus(n string) *SyncStatus

GetStatus returns the SyncStatus for the given serviceName

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL