topology

package
v0.1.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 10, 2021 License: Apache-2.0 Imports: 31 Imported by: 0

Documentation

Overview

Package topology is the preferred method for creating and supervising system traces when using the Swoll API on modern container management and orchestration systems such as Kubernetes.

To better understand what this package does, it is best to start with learning a little bit about how Swoll creates, captures, filters, and emits data from the kernel back into our code.

The Swoll BPF has a very simple userspace-configurable filtering mechanism which allows us to either white-list or black-list what syscalls we want to monitor. Optionally, each call we want to monitor can also be associated with a specific kernel namespace. So, for example, a user can request to only see events which made the sytem call "open" in the kernel PID-Namespace `31337`. Any events that do not match this specific rule will be silently dropped by the kernel.

Furthermore, each filter can optionally maintain a basic sample-rate configuration, giving the developer the option to gain insight into high-load system-calls such as `sys_read` without impacting performance too much.

Since each container within a `Pod` gets its own unique (or derived if shared) namespace, swoll exploits the above ns+syscall filter feature by maintaining the relations between Kubernetes and the container-runtime by dynamically updating and tuning the filters in real-time.

In short (using Kubernetes as an example), when we request Swoll to monitor syscall events for the Pod "foobar", we connect to the kube-api-server, watch for Pod events that match "foobar", and when matched, utilizes the Container Runtime Interface to find process details for that Pod. Once we have obtained the init PID from the CRI, we can render the PID namespace we need to use to set the filter in the kernel.

In theory this sounds simple, but in practice things are not as easy. Swoll strives to run as lean-and-mean as possible, and in doing so, the goal of which is "One BPF Context To Mon Them All", and still without sacrificing performance for flexibility or vice-versa.

And the Topology API is exactly that. It "observes" events from Kubernetes and CRI (see: topology.Observer), runs one or more v1alpha1.Trace specifications as a topology.Job, which in-turn dynamically updates, de-duplicates, and prunes the kernel filter inside a single BPF context, better known as the topology.Hub.

Index

Examples

Constants

This section is empty.

Variables

View Source
var ErrBadNamespace = errors.New("invalid kernel pid-namespace")

ErrBadNamespace is the error returned to indicate the observer was unable to resolve the PID-Namespace of the container

View Source
var ErrContainerNotFound = errors.New("container not found")

ErrContainerNotFound is the error returned to indicate the container was unable to be resolved

View Source
var ErrNilContainer = errors.New("nil container")

ErrNilContainer is the error returned to indicate the observer sent an empty container message

View Source
var ErrNilEvent = errors.New("nil event")

ErrNilEvent is the error returned to indicate the observer sent an empty message

View Source
var ErrUnknownType = errors.New("unknown event-type")

ErrUnknownType is the error returned to indicate a malformed observer event

Functions

This section is empty.

Types

type EventType

type EventType int

These are the two states in which an observer event can be in.

const (
	EventTypeStart EventType = iota // container started
	EventTypeStop                   // container stopped
)

type Hub added in v0.1.3

type Hub struct {
	sync.Mutex
	// contains filtered or unexported fields
}

Hub maintains all of the underlying kernel-filters, job request and output routing, metric-rules, de-duplication, garbage-collection using information it has obtained via the underlying Observer.

func NewHub added in v0.1.3

func NewHub(bpf *bytes.Reader, observer Observer) (*Hub, error)

NewHub creates and initializes a Hub context and the underlying BPF, primes the kernel filter, and sets up the in-kernel metrics.

hub := topology.NewHub(assets.LoadBPFReader(), topology.NewKubernetes())

func (*Hub) AttachPath added in v0.1.3

func (h *Hub) AttachPath(name string, paths []string, cb func(string, *event.TraceEvent)) pubsub.Unsubscriber

AttachPath taps the caller into a subset of the data being sent to a running Job. Whenever an event is sent to a job, the Hub will also broadcast a copy of this event to a prefix-hash like so:

hash("kube-namespace/", "kube-pod/", "kube-container/", "syscall-name/")

Monitor ns/pod/container/syscall

hub.AttachPath("<name>", []string{"<namespace>", "<pod>", "<container>", "syscall"}, cb)

Monitor all syscalls and containers in pod:

hub.AttachPath("<name>", []string{"<namespace>", "<pod>"}, cb)
Example

In this example we use AttachPath to "subscribe" to a subset of events being sent to a running Job output.

package main

import (
	"github.com/criticalstack/swoll/pkg/event"
	"github.com/criticalstack/swoll/pkg/topology"
)

func main() {
	var hub *topology.Hub
	// Assumes there is a job that has matches namespace=kube-system,
	// pod=foo-pod, and a container named "boo"
	unsub := hub.AttachPath("example", []string{"kube-system", "foo-pod", "boo"},
		func(name string, ev *event.TraceEvent) {})
	defer unsub()
}
Output:

func (*Hub) AttachTrace added in v0.1.3

func (h *Hub) AttachTrace(t *v1alpha1.Trace, cb func(n string, ev *event.TraceEvent)) pubsub.Unsubscriber

AttachTrace taps the caller into the events for a running Trace

Example

Simple example to show how to use the AttachTrace method, this assumes the topology.Hub is already running with an Observer.

package main

import (
	"context"

	"github.com/criticalstack/swoll/api/v1alpha1"
	"github.com/criticalstack/swoll/pkg/event"
	"github.com/criticalstack/swoll/pkg/topology"

	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func main() {
	var hub *topology.Hub

	trace := &v1alpha1.Trace{
		ObjectMeta: metav1.ObjectMeta{
			Namespace: "kube-system",
		},
		Spec: v1alpha1.TraceSpec{
			Syscalls: []string{"execve", "openat"},
		},
		Status: v1alpha1.TraceStatus{
			JobID: "foo-bar",
		},
	}

	go hub.RunTrace(context.TODO(), trace)
	hub.AttachTrace(trace, func(name string, ev *event.TraceEvent) {})
}
Output:

func (*Hub) DeleteTrace added in v0.1.3

func (h *Hub) DeleteTrace(t *v1alpha1.Trace) error

DeleteTrace will stop all the running jobs that are associated with this specification. All kernel-filters that were added to create this job are removed if no other jobs share the same rules

func (*Hub) MustRun added in v0.1.3

func (h *Hub) MustRun(ctx context.Context)

MustRun is a fail-wrapper around Run

func (*Hub) MustRunJob added in v0.1.3

func (h *Hub) MustRunJob(ctx context.Context, job *Job)

MustRunJob is a fail-wrapper around RunJob

func (*Hub) Probe added in v0.1.3

func (h *Hub) Probe() *kernel.Probe

Probe returns the Hub's current kernel.Probe context

func (*Hub) PushJob added in v0.1.3

func (h *Hub) PushJob(job *Job, ns, nr int)

PushJob insert a namespace+nr specific job as a value of a list in two buckets; "nsmap": a mapping of pid-namespace+syscall -> list of jobs, "idmap": a mapping of a job-ID to individual job contexts.

This is done to solve potential job duplication issues with overlapping rules. For example if we have two rules:

rule-A = syscall_A, syscall_B
rule-B = syscall_A, syscall_C

And if "rule-B" is deleted, we don't want the kernel filter "syscall_A" removed due to the fact it is still needed for "rule-A".

func (*Hub) Run added in v0.1.3

func (h *Hub) Run(ctx context.Context) error

Run runs the main Hub event-loop. It maintains the filters and metric rules that run in the BPF, resolves and routes system-call events to all the job output queues, accepts Trace specs to run, and keeps the bpf running light.

Example

Running the Hub

package main

import (
	"bytes"
	"context"

	"github.com/criticalstack/swoll/pkg/topology"
)

func main() {
	obs, err := topology.NewKubernetes()
	if err != nil {
		panic(err)
	}

	var bpf *bytes.Reader

	hub, err := topology.NewHub(bpf, obs)
	if err != nil {
		panic(err)
	}

	ctx := context.Background()
	go hub.Run(ctx)
	<-ctx.Done()
}
Output:

func (*Hub) RunJob added in v0.1.3

func (h *Hub) RunJob(ctx context.Context, job *Job) error

RunJob will schedule an already-allocated trace-job to be run inside the Hub.

func (*Hub) RunTrace added in v0.1.3

func (h *Hub) RunTrace(ctx context.Context, t *v1alpha1.Trace) error

RunTrace will create and schedule a trace-job to be run inside the Hub.

Example

A short example showing how to use the RunTrace call

package main

import (
	"bytes"
	"context"

	"github.com/criticalstack/swoll/api/v1alpha1"
	"github.com/criticalstack/swoll/pkg/topology"

	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func main() {
	var (
		bpf      *bytes.Reader
		observer topology.Observer
	)

	hub, err := topology.NewHub(bpf, observer)
	if err != nil {
		panic(err)
	}

	ctx := context.Background()

	// Run the Hub.
	go hub.MustRun(ctx)

	// Monitor execve and openat in the kubernetes-namespace 'kube-system' and
	// name the job "foo-bar".
	go hub.RunTrace(ctx, &v1alpha1.Trace{
		ObjectMeta: metav1.ObjectMeta{
			Namespace: "kube-system",
		},
		Spec: v1alpha1.TraceSpec{
			Syscalls: []string{"execve", "openat"},
		},
		Status: v1alpha1.TraceStatus{
			JobID: "foo-bar",
		},
	})

	// trace is now running inside the Hub, you must attach to it to recv events
	<-ctx.Done()

}
Output:

func (*Hub) Topology added in v0.1.3

func (h *Hub) Topology() *Topology

Topology returns this Hub's current underlying topology context

func (*Hub) WriteEvent added in v0.1.3

func (h *Hub) WriteEvent(ev *event.TraceEvent)

WriteEvent writes a single TraceEvent to all subscribers using the path-subscriptions

type Job added in v0.1.3

type Job struct {
	*v1alpha1.Trace
	// contains filtered or unexported fields
}

Job stores the trace specification and a running list of hosts which have matched this job.

func NewJob added in v0.1.3

func NewJob(t *v1alpha1.Trace) *Job

NewJob returns a Job for the trace

func (*Job) AddContainer added in v0.1.3

func (j *Job) AddContainer(pod, name string)

AddContainer tells the job to monitor a very specific container in a specific pod.

func (*Job) Duration added in v0.1.3

func (j *Job) Duration() time.Duration

Duration returns how long this job has been running as seen by kube.

func (*Job) JobID added in v0.1.3

func (j *Job) JobID() string

JobID returns the raw job-id associated with this job

func (*Job) MonitoredHosts added in v0.1.3

func (j *Job) MonitoredHosts(all bool) []string

MonitoredHosts returns a list of hosts that have been monitored by this job. If `all` is `false`, then ony containers that are currently being monitored will return, otherwise it will return every host that has ever matched this job.

func (*Job) RemoveContainer added in v0.1.3

func (j *Job) RemoveContainer(pod, name string)

RemoveContainer removes the watch for a specific container in a specific pod

func (*Job) Run added in v0.1.3

func (j *Job) Run(ctx context.Context, h *Hub) error

Run will run a job inside the Hub. The primary goal of this function is to read topology events using the LabelMatch, and for each pod that matches, create the kernel filter if needed, and append the JobContext to the list of running jobs in the Hub.

func (*Job) TraceSpec added in v0.1.3

func (j *Job) TraceSpec() *v1alpha1.TraceSpec

TraceSpec returns the `TraceSpec` defined for this `Job`

func (*Job) TraceStatus added in v0.1.3

func (j *Job) TraceStatus() *v1alpha1.TraceStatus

TraceStatus returns the status of this Job

func (*Job) WriteEvent added in v0.1.3

func (j *Job) WriteEvent(h *Hub, ev *event.TraceEvent)

WriteEvent writes event `ev` to all listeners of this `Job`

type JobContext added in v0.1.3

type JobContext struct {
	*Job
	// contains filtered or unexported fields
}

JobContext contains information about the filters that were created in order to run a Job. Since multiple jobs can have shared resources (like kernel-filters), all possible rules are created and set.

For example, say we have two jobs: "job-A", and "job-B".

job-A monitors pods that match the label: app=nginx for the syscalls: "open", and "close"
job-B monitors pods that match the label: type=webserver for just the syscall "open"

If a pod was created with both the labels above (app=nginx,type=webserver), and we were to blindly delete "job-B", any filters that were added that matched both rules would be removed.

Thus every filter is accounted for, treated much like a reference counter, only removing from the kernel-filter when no rules require it.

type JobList added in v0.1.3

type JobList struct {
	*list.List
}

JobList is a wrapper around a simple linked-list for groups of JobContexts

type Kubernetes

type Kubernetes struct {
	// contains filtered or unexported fields
}

Kubernetes satisfies the Observer interface for the topology. Using a combination of the kubelet api-server and the container-runtime interface, this will emit container start and stop messages to the caller

func NewKubernetes

func NewKubernetes(opts ...KubernetesOption) (*Kubernetes, error)

NewKubernetes creates an Observer for Kubernetes

Example
package main

import (
	"fmt"

	"github.com/criticalstack/swoll/pkg/topology"
)

func main() {
	observer, err := topology.NewKubernetes(
		topology.WithKubernetesConfig("/root/.kube/config"),
		topology.WithKubernetesNamespace("kube-system"),
		topology.WithKubernetesCRI("/run/containerd/containerd.sock"),
		topology.WithKubernetesLabelSelector("app=nginx"),
		topology.WithKubernetesFieldSelector("status.phase=Running"))
	if err != nil {
		panic(err)
	}

	fmt.Println(observer)
}
Output:

func (*Kubernetes) Close

func (k *Kubernetes) Close() error

Close frees up all the running resources of this Kubernetes observer

func (*Kubernetes) Connect

func (k *Kubernetes) Connect(ctx context.Context) error

Connect establishes the connections between the kube-apiserver and the container-runtime-interface.

func (*Kubernetes) Containers

func (k *Kubernetes) Containers(ctx context.Context) ([]*types.Container, error)

Containers returns a list of all currently running containers

func (*Kubernetes) Copy added in v0.1.3

func (k *Kubernetes) Copy(opts ...interface{}) (Observer, error)

Copy will copy all underlying data minus the client communication sockets.

func (*Kubernetes) Run

func (k *Kubernetes) Run(ctx context.Context, out chan<- *ObservationEvent)

Run watches and maintains a cache of all running containers for kubernetes, sending events as an Observer to the topology.

type KubernetesOption

type KubernetesOption func(*Kubernetes) error

func WithKubernetesCRI

func WithKubernetesCRI(criSocket string) KubernetesOption

WithKubernetesCRI is the fully-qualified path to the container-runtime interface UNIX socket. This file must exist on the host that runs this code.

func WithKubernetesConfig

func WithKubernetesConfig(kubeConfig string) KubernetesOption

WithKubernetesConfig will use the kubernetes configuration file. By default, this will attempt to use the in-cluster Kubernetes configuration settings.

func WithKubernetesFieldSelector

func WithKubernetesFieldSelector(f string) KubernetesOption

WithKubernetesFieldSelector will only match hosts what matched this field-selector labelset.

func WithKubernetesLabelSelector

func WithKubernetesLabelSelector(l string) KubernetesOption

WithKubernetesLabelSelector will only match hosts that match this label.

func WithKubernetesNamespace

func WithKubernetesNamespace(namespace string) KubernetesOption

WithKubernetesNamespace will limit the observation to a specific kubernetes namespace

func WithKubernetesProcRoot

func WithKubernetesProcRoot(path string) KubernetesOption

WithKubernetesProcRoot will look for the ProcFS mount inside the path. Useful if the containers you are monitoring are mounted to a different path. Defaults to "/"

type ObservationEvent

type ObservationEvent struct {
	Type      EventType
	Container *types.Container
}

type Observer

type Observer interface {
	Connect(ctx context.Context) error
	Containers(ctx context.Context) ([]*types.Container, error)
	Run(ctx context.Context, out chan<- *ObservationEvent)
	Copy(opts ...interface{}) (Observer, error)
	Close() error
}

type OnEventCallback

type OnEventCallback func(t EventType, container *types.Container)

type Topology

type Topology struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

func NewTopology

func NewTopology(obs Observer) *Topology

func (*Topology) Close

func (t *Topology) Close() error

func (*Topology) Connect

func (t *Topology) Connect(ctx context.Context) error

func (*Topology) Containers

func (t *Topology) Containers(ctx context.Context) ([]*types.Container, error)

func (*Topology) LookupContainer

func (t *Topology) LookupContainer(ctx context.Context, pidns int) (*types.Container, error)

func (*Topology) Run

func (t *Topology) Run(ctx context.Context, cb OnEventCallback)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL