sinks

package
v0.3.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 19, 2024 License: Apache-2.0 Imports: 30 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	LabelPodId = LabelDescriptor{
		Key:         "pod_id",
		Description: "The unique ID of the pod",
	}

	LabelPodName = LabelDescriptor{
		Key:         "pod_name",
		Description: "The name of the pod",
	}

	LabelNamespaceName = LabelDescriptor{
		Key:         "namespace_name",
		Description: "The name of the namespace",
	}

	LabelHostname = LabelDescriptor{
		Key:         "hostname",
		Description: "Hostname where the container ran",
	}
)

Functions

This section is empty.

Types

type EventData

type EventData struct {
	Verb     string    `json:"verb"`
	Event    *v1.Event `json:"event"`
	OldEvent *v1.Event `json:"old_event,omitempty"`
}

EventData encodes an eventrouter event and previous event, with a verb for whether the event is created or updated.

func NewEventData

func NewEventData(eNew *v1.Event, eOld *v1.Event) EventData

NewEventData constructs an EventData struct from an old and new event, setting the verb accordingly

func (*EventData) WriteFlattenedJSON

func (e *EventData) WriteFlattenedJSON(w io.Writer) (int64, error)

WriteFlattenedJSON writes the json to the file in the below format 1) Flattens the json into a not nested key:value 2) Convert the json into snake format Eg: {"event_involved_object_kind":"pod", "event_metadata_namespace":"kube-system"}

func (*EventData) WriteRFC5424

func (e *EventData) WriteRFC5424(w io.Writer) (int64, error)

WriteRFC5424 writes the current event data to the given io.Writer using RFC5424 (syslog over TCP) syntax.

type EventHubSink

type EventHubSink struct {
	// contains filtered or unexported fields
}

EventHubSink sends events to an Azure Event Hub.

func NewEventHubSink

func NewEventHubSink(connString string, overflow bool, bufferSize int) (*EventHubSink, error)

NewEventHubSink constructs a new EventHubSink given a event hub connection string and buffering options.

``` export EVENTHUB_RESOURCE_GROUP=eventrouter export EVENTHUB_NAMESPACE=eventrouter-ns <<< export EVENTHUB_NAME=eventrouter export EVENTHUB_REGION=westus2 export EVENTHUB_RULE_NAME=eventrouter-send

az group create -g ${EVENTHUB_RESOURCE_GROUP} -l ${EVENTHUB_REGION} az eventhubs namespace create -g ${EVENTHUB_RESOURCE_GROUP} -n ${EVENTHUB_NAMESPACE} -l ${EVENTHUB_REGION} az eventhubs eventhub create -g ${EVENTHUB_RESOURCE_GROUP} --namespace-name ${EVENTHUB_NAMESPACE} -n ${EVENTHUB_NAME} az eventhubs eventhub authorization-rule create -g ${EVENTHUB_RESOURCE_GROUP} --namespace-name ${EVENTHUB_NAMESPACE} --eventhub-name ${EVENTHUB_NAME} -n ${EVENTHUB_RULE_NAME} --rights Send export EVENTHUB_CONNECTION_STRING=$(az eventhubs eventhub authorization-rule keys list -g ${EVENTHUB_RESOURCE_GROUP} --namespace-name ${EVENTHUB_NAMESPACE} --eventhub-name ${EVENTHUB_NAME} -n ${EVENTHUB_RULE_NAME} | jq -r '.primaryConnectionString')

cat yaml/eventrouter-azure.yaml | envsubst | kubectl apply -f ```

connString expects the Azure Event Hub connection string format:

`Endpoint=sb://YOUR_ENDPOINT.servicebus.windows.net/;SharedAccessKeyName=YOUR_ACCESS_KEY_NAME;SharedAccessKey=YOUR_ACCESS_KEY;EntityPath=YOUR_EVENT_HUB_NAME`

func (*EventHubSink) Run

func (h *EventHubSink) Run(stopCh <-chan bool)

Run sits in a loop, waiting for data to come in through h.eventCh, and forwarding them to the event hub sink. If multiple events have happened between loop iterations, it puts all of them in one request instead of making a single request per event.

func (*EventHubSink) UpdateEvents

func (h *EventHubSink) UpdateEvents(eNew *v1.Event, eOld *v1.Event)

UpdateEvents implements the EventSinkInterface. It really just writes the event data to the event OverflowingChannel, which should never block. Messages that are buffered beyond the bufferSize specified for this EventHubSink are discarded.

type EventSinkInterface

type EventSinkInterface interface {
	UpdateEvents(eNew *v1.Event, eOld *v1.Event)
}

EventSinkInterface is the interface used to shunt events

func ManufactureSink

func ManufactureSink() (e EventSinkInterface)

ManufactureSink will manufacture a sink according to viper configs TODO: Determine if it should return an array of sinks

TODO: remove gocyclo:ignore

func NewGlogSink

func NewGlogSink() EventSinkInterface

NewGlogSink will create a new

func NewInfuxdbSink

func NewInfuxdbSink(cfg InfluxdbConfig) (EventSinkInterface, error)

Returns a thread-safe implementation of EventSinkInterface for InfluxDB.

func NewKafkaSink

func NewKafkaSink(brokers []string, topic string, async bool, retryMax int, saslUser string, saslPwd string) (EventSinkInterface, error)

NewKafkaSinkSink will create a new KafkaSink with default options, returned as an EventSinkInterface

func NewRocksetSink

func NewRocksetSink(rocksetAPIKey string, rocksetCollectionName string, rocksetWorkspaceName string) EventSinkInterface

NewRocksetSink will create a new RocksetSink with default options, returned as an EventSinkInterface

func NewStdoutSink

func NewStdoutSink(namespace string) EventSinkInterface

NewStdoutSink will create a new StdoutSink with default options, returned as an EventSinkInterface

type GlogSink

type GlogSink struct {
}

GlogSink is the most basic sink Useful when you already have ELK/EFK Stack

func (*GlogSink) UpdateEvents

func (gs *GlogSink) UpdateEvents(eNew *v1.Event, eOld *v1.Event)

UpdateEvents implements the EventSinkInterface

type HTTPSink

type HTTPSink struct {
	SinkURL string
	// contains filtered or unexported fields
}

HTTPSink wraps an HTTP endpoint that messages should be sent to.

func NewHTTPSink

func NewHTTPSink(sinkURL string, overflow bool, bufferSize int) *HTTPSink

NewHTTPSink constructs a new HTTPSink given a sink URL and buffer size

func (*HTTPSink) Run

func (h *HTTPSink) Run(stopCh <-chan bool)

Run sits in a loop, waiting for data to come in through h.eventCh, and forwarding them to the HTTP sink. If multiple events have happened between loop iterations, it puts all of them in one request instead of making a single request per event.

func (*HTTPSink) UpdateEvents

func (h *HTTPSink) UpdateEvents(eNew *v1.Event, eOld *v1.Event)

UpdateEvents implements the EventSinkInterface. It really just writes the event data to the event OverflowingChannel, which should never block. Messages that are buffered beyond the bufferSize specified for this HTTPSink are discarded.

type InfluxDBSink

type InfluxDBSink struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

func (*InfluxDBSink) UpdateEvents

func (sink *InfluxDBSink) UpdateEvents(eNew *v1.Event, eOld *v1.Event)

type InfluxdbConfig

type InfluxdbConfig struct {
	User                  string
	Password              string
	Secure                bool
	Host                  string
	DbName                string
	WithFields            bool
	InsecureSsl           bool
	RetentionPolicy       string
	ClusterName           string
	DisableCounterMetrics bool
	Concurrency           int
}

type KafkaSink

type KafkaSink struct {
	Topic string
	// contains filtered or unexported fields
}

KafkaSink implements the EventSinkInterface

func (*KafkaSink) UpdateEvents

func (ks *KafkaSink) UpdateEvents(eNew *v1.Event, eOld *v1.Event)

UpdateEvents implements EventSinkInterface.UpdateEvents

type LabelDescriptor

type LabelDescriptor struct {
	// Key to use for the label.
	Key string `json:"key,omitempty"`

	// Description of the label.
	Description string `json:"description,omitempty"`
}

type RocksetSink

type RocksetSink struct {
	// contains filtered or unexported fields
}

RocksetSink is a sink that uploads the kubernetes events as json object and converts them to documents inside of a Rockset collection.

Rockset can later be used with many different connectors such as Tableau or Redash to use this data.

func (*RocksetSink) UpdateEvents

func (rs *RocksetSink) UpdateEvents(eNew *v1.Event, eOld *v1.Event)

UpdateEvents implements the EventSinkInterface

type S3Sink

type S3Sink struct {
	// contains filtered or unexported fields
}

S3Sink is the sink that uploads the kubernetes events as json object stored in a file. The sinker uploads it to s3 if any of the below criteria gets fulfilled 1) Time(uploadInterval): If the specfied time has passed since the last upload it uploads 2) [TODO] Data size: If the total data getting uploaded becomes greater than N bytes

S3 is cheap and the sink can be used to store events data. S3 can later then be used with Redshift and other visualization tools to use this data.

func NewS3Sink

func NewS3Sink(awsAccessKeyID string, s3SinkSecretAccessKey string, s3SinkRegion string, s3SinkBucket string, s3SinkBucketDir string, s3SinkUploadInterval int, overflow bool, bufferSize int, outputFormat string) (*S3Sink, error)

NewS3Sink is the factory method constructing a new S3Sink

func (*S3Sink) Run

func (s *S3Sink) Run(stopCh <-chan bool)

Run sits in a loop, waiting for data to come in through h.eventCh, and forwarding them to the HTTP sink. If multiple events have happened between loop iterations, it puts all of them in one request instead of making a single request per event.

func (*S3Sink) UpdateEvents

func (s *S3Sink) UpdateEvents(eNew *v1.Event, eOld *v1.Event)

UpdateEvents implements the EventSinkInterface. It really just writes the event data to the event OverflowingChannel, which should never block. Messages that are buffered beyond the bufferSize specified for this HTTPSink are discarded.

type StdoutSink

type StdoutSink struct {
	// contains filtered or unexported fields
}

StdoutSink is the other basic sink By default, Fluentd/ElasticSearch won't index glog formatted lines By logging raw JSON to stdout, we will get automated indexing which can be queried in Kibana.

func (*StdoutSink) UpdateEvents

func (gs *StdoutSink) UpdateEvents(eNew *v1.Event, eOld *v1.Event)

UpdateEvents implements the EventSinkInterface

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL