sinks

package
v0.0.0-...-599b752 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 31, 2022 License: Apache-2.0 Imports: 47 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

Functions

func GetString

func GetString(event *kube.EnhancedEvent, text string) (string, error)

Types

type Avro

type Avro struct {
	SchemaID string `yaml:"schemaID"`
	Schema   string `yaml:"schema"`
	// contains filtered or unexported fields
}

type BatchSink

type BatchSink interface {
	Sink
	SendBatch([]*kube.EnhancedEvent) error
}

BatchSink is an extension Sink that can handle batch events. NOTE: Currently no provider implements it nor the receivers can handle it.

type BigQueryConfig

type BigQueryConfig struct {
	// BigQuery table config
	Location string `yaml:"location"`
	Project  string `yaml:"project"`
	Dataset  string `yaml:"dataset"`
	Table    string `yaml:"table"`

	// Path to a JSON file that contains your service account key.
	CredentialsPath string `yaml:"credentials_path"`

	// Batching config
	BatchSize       int `yaml:"batch_size"`
	MaxRetries      int `yaml:"max_retries"`
	IntervalSeconds int `yaml:"interval_seconds"`
	TimeoutSeconds  int `yaml:"timeout_seconds"`
}

type BigQuerySink

type BigQuerySink struct {
	// contains filtered or unexported fields
}

func NewBigQuerySink

func NewBigQuerySink(cfg *BigQueryConfig) (*BigQuerySink, error)

func (*BigQuerySink) Close

func (e *BigQuerySink) Close()

func (*BigQuerySink) Send

func (e *BigQuerySink) Send(ctx context.Context, ev *kube.EnhancedEvent) error

type Elasticsearch

type Elasticsearch struct {
	// contains filtered or unexported fields
}

func NewElasticsearch

func NewElasticsearch(cfg *ElasticsearchConfig) (*Elasticsearch, error)

func (*Elasticsearch) Close

func (e *Elasticsearch) Close()

func (*Elasticsearch) Send

type ElasticsearchConfig

type ElasticsearchConfig struct {
	// Connection specific
	Hosts    []string `yaml:"hosts"`
	Username string   `yaml:"username"`
	Password string   `yaml:"password"`
	CloudID  string   `yaml:"cloudID"`
	APIKey   string   `yaml:"apiKey"`
	// Indexing preferences
	UseEventID bool `yaml:"useEventID"`
	// DeDot all labels and annotations in the event. For both the event and the involvedObject
	DeDot       bool                   `yaml:"deDot"`
	Index       string                 `yaml:"index"`
	IndexFormat string                 `yaml:"indexFormat"`
	Type        string                 `yaml:"type"`
	TLS         TLS                    `yaml:"tls"`
	Layout      map[string]interface{} `yaml:"layout"`
}

type EventBridgeConfig

type EventBridgeConfig struct {
	DetailType   string                 `yaml:"detailType"`
	Details      map[string]interface{} `yaml:"details"`
	Source       string                 `yaml:"source"`
	EventBusName string                 `yaml:"eventBusName"`
	Region       string                 `yaml:"region"`
}

type EventBridgeSink

type EventBridgeSink struct {
	// contains filtered or unexported fields
}

func (*EventBridgeSink) Close

func (s *EventBridgeSink) Close()

func (*EventBridgeSink) Send

type File

type File struct {
	// contains filtered or unexported fields
}

func NewFileSink

func NewFileSink(config *FileConfig) (*File, error)

func (*File) Close

func (f *File) Close()

func (*File) Send

func (f *File) Send(ctx context.Context, ev *kube.EnhancedEvent) error

type FileConfig

type FileConfig struct {
	Path       string                 `yaml:"path"`
	Layout     map[string]interface{} `yaml:"layout"`
	MaxSize    int                    `yaml:"maxsize"`
	MaxAge     int                    `yaml:"maxage"`
	MaxBackups int                    `yaml:"maxbackups"`
}

func (*FileConfig) Validate

func (f *FileConfig) Validate() error

type FirehoseConfig

type FirehoseConfig struct {
	DeliveryStreamName string                 `yaml:"deliveryStreamName"`
	Region             string                 `yaml:"region"`
	Layout             map[string]interface{} `yaml:"layout"`
}

type FirehoseSink

type FirehoseSink struct {
	// contains filtered or unexported fields
}

func (*FirehoseSink) Close

func (f *FirehoseSink) Close()

func (*FirehoseSink) Send

func (f *FirehoseSink) Send(ctx context.Context, ev *kube.EnhancedEvent) error

type InMemory

type InMemory struct {
	Events []*kube.EnhancedEvent
	Config *InMemoryConfig
}

func (*InMemory) Close

func (i *InMemory) Close()

func (*InMemory) Send

func (i *InMemory) Send(ctx context.Context, ev *kube.EnhancedEvent) error

type InMemoryConfig

type InMemoryConfig struct {
	Ref *InMemory
}

type KafkaConfig

type KafkaConfig struct {
	Topic            string                 `yaml:"topic"`
	Brokers          []string               `yaml:"brokers"`
	Layout           map[string]interface{} `yaml:"layout"`
	ClientId         string                 `yaml:"clientId"`
	CompressionCodec string                 `yaml:"compressionCodec" default:"none"`
	TLS              struct {
		Enable             bool   `yaml:"enable"`
		CaFile             string `yaml:"caFile"`
		CertFile           string `yaml:"certFile"`
		KeyFile            string `yaml:"keyFile"`
		InsecureSkipVerify bool   `yaml:"insecureSkipVerify"`
	} `yaml:"tls"`
	SASL struct {
		Enable   bool   `yaml:"enable"`
		Username string `yaml:"username"`
		Password string `yaml:"password"`
	} `yaml:"sasl"`
	KafkaEncode Avro `yaml:"avro"`
}

KafkaConfig is the Kafka producer configuration

type KafkaEncoder

type KafkaEncoder interface {
	// contains filtered or unexported methods
}

KafkaEncoder is an interface type for adding an encoder to the kafka data pipeline

func NewAvroEncoder

func NewAvroEncoder(schemaID, schema string) (KafkaEncoder, error)

NewAvroEncoder creates an encoder which will be used to avro encode all events prior to sending to kafka

Its only used by the kafka sink

type KafkaSink

type KafkaSink struct {
	// contains filtered or unexported fields
}

KafkaSink is a sink that sends events to a Kafka topic

func (*KafkaSink) Close

func (k *KafkaSink) Close()

Close the Kafka producer

func (*KafkaSink) Send

func (k *KafkaSink) Send(ctx context.Context, ev *kube.EnhancedEvent) error

Send an event to Kafka synchronously

type KinesisConfig

type KinesisConfig struct {
	StreamName string                 `yaml:"streamName"`
	Region     string                 `yaml:"region"`
	Layout     map[string]interface{} `yaml:"layout"`
}

type KinesisSink

type KinesisSink struct {
	// contains filtered or unexported fields
}

func (*KinesisSink) Close

func (k *KinesisSink) Close()

func (*KinesisSink) Send

func (k *KinesisSink) Send(ctx context.Context, ev *kube.EnhancedEvent) error

type OpsCenterConfig

type OpsCenterConfig struct {
	Category        string            `yaml:"category"`
	Description     string            `yaml:"description"`
	Notifications   []string          `yaml:"notifications"`
	OperationalData map[string]string `yaml:"operationalData"`
	Priority        string            `yaml:"priority"`
	Region          string            `yaml:"region"`
	RelatedOpsItems []string          `yaml:"relatedOpsItems"`
	Severity        string            `yaml:"severity"`
	Source          string            `yaml:"source"`
	Tags            map[string]string `yaml:"tags"`
	Title           string            `yaml:"title"`
}

OpsCenterConfig is the configuration of the Sink.

type OpsCenterSink

type OpsCenterSink struct {
	// contains filtered or unexported fields
}

OpsCenterSink is an AWS OpsCenter notifcation path.

func (*OpsCenterSink) Close

func (s *OpsCenterSink) Close()

Close ...

func (*OpsCenterSink) Send

Send ...

type OpsgenieConfig

type OpsgenieConfig struct {
	ApiKey      string            `yaml:"apiKey"`
	URL         client.ApiUrl     `yaml:"URL"`
	Priority    string            `yaml:"priority"`
	Message     string            `yaml:"message"`
	Alias       string            `yaml:"alias"`
	Description string            `yaml:"description"`
	Tags        []string          `yaml:"tags"`
	Details     map[string]string `yaml:"details"`
}

type OpsgenieSink

type OpsgenieSink struct {
	// contains filtered or unexported fields
}

func (*OpsgenieSink) Close

func (o *OpsgenieSink) Close()

func (*OpsgenieSink) Send

func (o *OpsgenieSink) Send(ctx context.Context, ev *kube.EnhancedEvent) error

type Pipe

type Pipe struct {
	// contains filtered or unexported fields
}

func NewPipeSink

func NewPipeSink(config *PipeConfig) (*Pipe, error)

func (*Pipe) Close

func (f *Pipe) Close()

func (*Pipe) Send

func (f *Pipe) Send(ctx context.Context, ev *kube.EnhancedEvent) error

type PipeConfig

type PipeConfig struct {
	Path   string                 `yaml:"path"`
	Layout map[string]interface{} `yaml:"layout"`
}

func (*PipeConfig) Validate

func (f *PipeConfig) Validate() error

type PubsubConfig

type PubsubConfig struct {
	GcloudProjectId string `yaml:"gcloud_project_id"`
	Topic           string `yaml:"topic"`
	CreateTopic     bool   `yaml:"create_topic"`
}

type PubsubSink

type PubsubSink struct {
	// contains filtered or unexported fields
}

func (*PubsubSink) Close

func (ps *PubsubSink) Close()

func (*PubsubSink) Send

func (ps *PubsubSink) Send(ctx context.Context, ev *kube.EnhancedEvent) error

type ReceiverConfig

type ReceiverConfig struct {
	Name          string               `yaml:"name"`
	InMemory      *InMemoryConfig      `yaml:"inMemory"`
	Webhook       *WebhookConfig       `yaml:"webhook"`
	File          *FileConfig          `yaml:"file"`
	Syslog        *SyslogConfig        `yaml:"syslog"`
	Stdout        *StdoutConfig        `yaml:"stdout"`
	Elasticsearch *ElasticsearchConfig `yaml:"elasticsearch"`
	Kinesis       *KinesisConfig       `yaml:"kinesis"`
	Firehose      *FirehoseConfig      `yaml:"firehose"`
	Opsgenie      *OpsgenieConfig      `yaml:"opsgenie"`
	SQS           *SQSConfig           `yaml:"sqs"`
	SNS           *SNSConfig           `yaml:"sns"`
	Slack         *SlackConfig         `yaml:"slack"`
	Kafka         *KafkaConfig         `yaml:"kafka"`
	Pubsub        *PubsubConfig        `yaml:"pubsub"`
	Opscenter     *OpsCenterConfig     `yaml:"opscenter"`
	Teams         *TeamsConfig         `yaml:"teams"`
	BigQuery      *BigQueryConfig      `yaml:"bigquery"`
	EventBridge   *EventBridgeConfig   `yaml:"eventbridge"`
	Pipe          *PipeConfig          `yaml:"pipe"`
}

Receiver allows receiving

func (*ReceiverConfig) GetSink

func (r *ReceiverConfig) GetSink() (Sink, error)

func (*ReceiverConfig) Validate

func (r *ReceiverConfig) Validate() error

type SNSConfig

type SNSConfig struct {
	TopicARN string                 `yaml:"topicARN"`
	Region   string                 `yaml:"region"`
	Layout   map[string]interface{} `yaml:"layout"`
}

type SNSSink

type SNSSink struct {
	// contains filtered or unexported fields
}

func (*SNSSink) Close

func (s *SNSSink) Close()

func (*SNSSink) Send

func (s *SNSSink) Send(ctx context.Context, ev *kube.EnhancedEvent) error

type SQSConfig

type SQSConfig struct {
	QueueName string                 `yaml:"queueName"`
	Region    string                 `yaml:"region"`
	Layout    map[string]interface{} `yaml:"layout"`
}

type SQSSink

type SQSSink struct {
	// contains filtered or unexported fields
}

func (*SQSSink) Close

func (s *SQSSink) Close()

func (*SQSSink) Send

func (s *SQSSink) Send(ctx context.Context, ev *kube.EnhancedEvent) error

type Sink

type Sink interface {
	Send(ctx context.Context, ev *kube.EnhancedEvent) error
	Close()
}

Sink is the interface that the third-party providers should implement. It should just get the event and transform it depending on its configuration and submit it. Error handling for retries etc. should be handled inside for now.

func NewEventBridgeSink

func NewEventBridgeSink(cfg *EventBridgeConfig) (Sink, error)

func NewFirehoseSink

func NewFirehoseSink(cfg *FirehoseConfig) (Sink, error)

func NewKafkaSink

func NewKafkaSink(cfg *KafkaConfig) (Sink, error)

func NewKinesisSink

func NewKinesisSink(cfg *KinesisConfig) (Sink, error)

func NewOpsCenterSink

func NewOpsCenterSink(cfg *OpsCenterConfig) (Sink, error)

NewOpsCenterSink returns a new OpsCenterSink.

func NewOpsgenieSink

func NewOpsgenieSink(config *OpsgenieConfig) (Sink, error)

func NewPubsubSink

func NewPubsubSink(cfg *PubsubConfig) (Sink, error)

func NewSNSSink

func NewSNSSink(cfg *SNSConfig) (Sink, error)

func NewSQSSink

func NewSQSSink(cfg *SQSConfig) (Sink, error)

func NewSlackSink

func NewSlackSink(cfg *SlackConfig) (Sink, error)

func NewSyslogSink

func NewSyslogSink(config *SyslogConfig) (Sink, error)

func NewTeamsSink

func NewTeamsSink(cfg *TeamsConfig) (Sink, error)

func NewWebhook

func NewWebhook(cfg *WebhookConfig) (Sink, error)

type SlackConfig

type SlackConfig struct {
	Token      string            `yaml:"token"`
	Channel    string            `yaml:"channel"`
	Message    string            `yaml:"message"`
	Color      string            `yaml:"color"`
	Footer     string            `yaml:"footer"`
	Title      string            `yaml:"title"`
	AuthorName string            `yaml:"author_name"`
	Fields     map[string]string `yaml:"fields"`
}

type SlackSink

type SlackSink struct {
	// contains filtered or unexported fields
}

func (*SlackSink) Close

func (s *SlackSink) Close()

func (*SlackSink) Send

func (s *SlackSink) Send(ctx context.Context, ev *kube.EnhancedEvent) error

type Stdout

type Stdout struct {
	// contains filtered or unexported fields
}

func NewStdoutSink

func NewStdoutSink(config *StdoutConfig) (*Stdout, error)

func (*Stdout) Close

func (f *Stdout) Close()

func (*Stdout) Send

func (f *Stdout) Send(ctx context.Context, ev *kube.EnhancedEvent) error

type StdoutConfig

type StdoutConfig struct {
	Layout map[string]interface{} `yaml:"layout"`
}

func (*StdoutConfig) Validate

func (f *StdoutConfig) Validate() error

type SyslogConfig

type SyslogConfig struct {
	Network string `yaml:"network"`
	Address string `yaml:"address"`
	Tag     string `yaml:"tag"`
}

type SyslogSink

type SyslogSink struct {
	// contains filtered or unexported fields
}

func (*SyslogSink) Close

func (w *SyslogSink) Close()

func (*SyslogSink) Send

func (w *SyslogSink) Send(ctx context.Context, ev *kube.EnhancedEvent) error

type TLS

type TLS struct {
	InsecureSkipVerify bool   `yaml:"insecureSkipVerify"`
	ServerName         string `yaml:"serverName"`
	CaFile             string `yaml:"caFile"`
	KeyFile            string `yaml:"keyFile"`
	CertFile           string `yaml:"certFile"`
}

type Teams

type Teams struct {
	// contains filtered or unexported fields
}

func (*Teams) Close

func (w *Teams) Close()

func (*Teams) Send

func (w *Teams) Send(ctx context.Context, ev *kube.EnhancedEvent) error

type TeamsConfig

type TeamsConfig struct {
	Endpoint string                 `yaml:"endpoint"`
	Layout   map[string]interface{} `yaml:"layout"`
	Headers  map[string]string      `yaml:"headers"`
}

type Webhook

type Webhook struct {
	// contains filtered or unexported fields
}

func (*Webhook) Close

func (w *Webhook) Close()

func (*Webhook) Send

func (w *Webhook) Send(ctx context.Context, ev *kube.EnhancedEvent) error

type WebhookConfig

type WebhookConfig struct {
	Endpoint string                 `yaml:"endpoint"`
	TLS      TLS                    `yaml:"tls"`
	Layout   map[string]interface{} `yaml:"layout"`
	Headers  map[string]string      `yaml:"headers"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL