ingestion

package module
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 10, 2020 License: Apache-2.0 Imports: 2 Imported by: 0

README

prometheus-kafka-druid-ingestion

At noris network we're sending Prometheus data via `prometheus-kafka-adapter to Kafka and then use Apache Druid's Kafka Ingestion for storing those metrics.

This repo creates an Apache Druid ingestion spec from prometheus-kafka-adapter data.

Background

Prometheus data can be sent via remote_write to prometheus-kafka-adapter, which in turn sends it to Kafka. The prometheus-kafka-adapter message will have the following structure:

{
  "timestamp": "1970-01-01T00:00:00Z",
  "value": "9876543210",
  "name": "up",

  "labels": {
    "__name__": "up",
    "label1": "value1",
    "label2": "value2"
  }
}

This data can be send to Apache Druid for long term storage, using Druid's Kafka Ingestion feature. For the data to be consumed and saved, we need to create an ingestion spec, including the final schema in the database and the Kafka brokers to consume the metrics from.

Install

From Source

Clone the repo and cd into it, then run make:

make build

The binary will be available in bin/generate-ingestion.

Usage

$ ./bin/generate-ingestion -h

Usage:
  generate-ingestion [flags]

Flags:
  -a, --address string             The address of the Prometheus server to send the query to (default "http://prometheus:9090")
  -d, --druid-data-source string   The druid data source (default "prometheus")
  -f, --file string                The file to save the ingestion spec to
  -h, --help                       help for generate-ingestion
      --ingest-via-ssl             Enables data ingestion from Kafka to Druid via SSL (default true)
  -b, --kafka-brokers string       The Kafka brokers for druid to ingest data from (default "kafka01:9092,kafka02:9092,kafka03:9092")
  -t, --kafka-topic string         The Kafka topic for druid to ingest data from (default "prometheus")
  -q, --query string               The query to send to the Prometheus server (default "{__name__=~\"job:.+\"}")
      --tls-skip-verify            Skip TLS certificate verification
  -o, --toStdout                   Prints the JSON ingestion spec to STDOUT (default true)

Executing the file sends the query specified with the -q / --query flag to a Prometheus server running at -a / --address. The script will then extract the unique labels of all time series returned and build an opinionated ingestion spec from those labels.

If all recording rules with the prefix job: are sent to prometheus-kafka-adapter via remote_write, the PromQL query {__name__=~"job:.+"} would retrieve those series.

By default Druid will consume messages from Kafka via SSL, meaning the following block will be populated in the spec:

            # ...
            "security.protocol": "SSL",
            "ssl.truststore.type": "PKCS12",
            "ssl.enabled.protocols": "TLSv1.2",
            "ssl.truststore.location": "/var/private/ssl/truststore.p12",
            "ssl.truststore.password": {
                "type": "environment",
                "variable": "DRUID_TRUSTSTORE_PASSWORD"
            },
            "ssl.keystore.location": "/var/private/ssl/keystore.p12",
            "ssl.keystore.password": {
                "type": "environment",
                "variable": "DRUID_KEYSTORE_PASSWORD"
            }
        },

This behaviour can be disabled with the --ingest-via-ssl=false flag.

By default the ingestion spec is displayed to stdout, but can be saved with the -f flag:

$ generate-ingestion -f ingestion.json
{
    "type": "kafka",
    "dataSchema": {
        "dataSource": "prometheus",
        "parser": {
            "type": "string",
            "parseSpec": {
                "format": "json",
                "timestampSpec": {
                    "column": "timestamp",
                    "format": "iso"
                },
                "flattenSpec": {
                    "fields": [
                        {
                            "type": "path",
                            "name": "clustername",
                            "expr": "$.labels.clustername"
                        },
                        {
                            "type": "path",
                            "name": "source",
                            "expr": "$.labels.source"
                        },
                        {
                            "type": "root",
                            "name": "name",
                            "expr": "name"
                        },
                        {
                            "type": "root",
                            "name": "value",
                            "expr": "value"
                        }
                    ]
                },
                "dimensionsSpec": {
                    "dimensions": [
                        "name",
                        "clustername",
                        "source"
                    ]
                }
            }
        },
        "metricsSpec": [
            {
                "name": "count",
                "type": "count"
            },
            {
                "name": "value",
                "type": "doubleMax",
                "fieldName": "value"
            }
        ],
        "granularitySpec": {
            "type": "uniform",
            "segmentGranularity": "HOUR",
            "queryGranularity": "MINUTE"
        }
    },
    "ioConfig": {
        "topic": "prometheus",
        "consumerProperties": {
            "bootstrap.servers": "kafka01:9092,kafka02:9092,kafka03:9092",
            "security.protocol": "SSL",
            "ssl.truststore.type": "PKCS12",
            "ssl.enabled.protocols": "TLSv1.2",
            "ssl.truststore.location": "/var/private/ssl/truststore.p12",
            "ssl.truststore.password": {
                "type": "environment",
                "variable": "DRUID_TRUSTSTORE_PASSWORD"
            },
            "ssl.keystore.location": "/var/private/ssl/keystore.p12",
            "ssl.keystore.password": {
                "type": "environment",
                "variable": "DRUID_KEYSTORE_PASSWORD"
            }
        },
        "taskDuration": "PT10M",
        "useEarliestOffset": true
    }
}

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type DataSchema

type DataSchema struct {
	DataSource      string          `json:"dataSource"`
	Parser          Parser          `json:"parser"`
	MetricsSpec     []Metric        `json:"metricsSpec"`
	GranularitySpec GranularitySpec `json:"granularitySpec"`
}

DataSchema represents the Druid dataSchema spec. Right now only the legacy spec is supported.

type DimensionsSpec

type DimensionsSpec struct {
	Dimensions LabelSet `json:"dimensions"`
}

DimensionsSpec is responsible for configuring Druid's dimensions. They're a set of columns in Druid's data model that can be used for grouping, filtering or applying aggregations.

type Field

type Field struct {
	Type string `json:"type"`
	Name string `json:"name"`
	Expr string `json:"expr"`
}

Field defines a piece of data.

type FieldList

type FieldList []Field

FieldList is a list of Fields.

type FlattenSpec

type FlattenSpec struct {
	Fields FieldList `json:"fields"`
}

FlattenSpec responsible for bridging the gap between potentially nested input data (such as JSON, Avro, etc) and Druid's flat data model.

type GranularitySpec

type GranularitySpec struct {
	Type               string `json:"type"`
	SegmentGranularity string `json:"segmentGranularity"`
	QueryGranularity   string `json:"queryGranularity"`
}

GranularitySpec allows for configuring operations such as data segment partitioning, truncating timestamps, time chunk segmentation or roll-up.

type IOConfig

type IOConfig struct {
	Topic              string                  `json:"topic"`
	ConsumerProperties KafkaConsumerProperties `json:"consumerProperties"`
	TaskDuration       string                  `json:"taskDuration"`
	UseEarliestOffset  bool                    `json:"useEarliestOffset"`
}

IOConfig influences how data is read into Druid from a source system. Right now only Kafka is supported.

type KafkaConsumerProperties

type KafkaConsumerProperties struct {
	BootstrapServers      string            `json:"bootstrap.servers"`
	SecurityProtocol      *string           `json:"security.protocol,omitempty"`
	SSLTruststoreType     *string           `json:"ssl.truststore.type,omitempty"`
	SSLEnabledProtocols   *string           `json:"ssl.enabled.protocols,omitempty"`
	SSLTruststoreLocation *string           `json:"ssl.truststore.location,omitempty"`
	SSLTruststorePassword *PasswordProvider `json:"ssl.truststore.password,omitempty"`
	SSLKeystoreLocation   *string           `json:"ssl.keystore.location,omitempty"`
	SSLKeystorePassword   *PasswordProvider `json:"ssl.keystore.password,omitempty"`
}

KafkaConsumerProperties is a set of properties that is passed to the Kafka consumer.

type KafkaIngestionSpec

type KafkaIngestionSpec struct {
	Type       string     `json:"type"`
	DataSchema DataSchema `json:"dataSchema"`
	IOConfig   IOConfig   `json:"ioConfig"`
}

KafkaIngestionSpec is the root-level type defining an ingestion spec used by Apache Druid.

func NewKafkaIngestionSpec

func NewKafkaIngestionSpec(options ...KafkaIngestionSpecOptions) *KafkaIngestionSpec

NewKafkaIngestionSpec returns a default KafkaIngestionSpec and applies any options passed to it.

type KafkaIngestionSpecOptions

type KafkaIngestionSpecOptions func(*KafkaIngestionSpec)

KafkaIngestionSpecOptions allows for configuring a KafkaIngestionSpec.

func ApplySSLConfig

func ApplySSLConfig() KafkaIngestionSpecOptions

ApplySSLConfig adds an opinionated SSL config that is used for communicating with Kafka securely.

func SetBrokers

func SetBrokers(brokers string) KafkaIngestionSpecOptions

SetBrokers sets the addresses of Kafka brokers. E.g. 'kafka01:9092, kafka02:9092,kafka03:9092'.

func SetDataSource

func SetDataSource(ds string) KafkaIngestionSpecOptions

SetDataSource sets the name of the dataSource used in Druid.

func SetLabels

func SetLabels(labels LabelSet) KafkaIngestionSpecOptions

SetLabels uses a LabelSet to configure the ingestion spec with. This sets the FieldList under FlattenSpec, as well as Dimensions.

func SetTopic

func SetTopic(topic string) KafkaIngestionSpecOptions

SetTopic sets the Kafka topic to consume data from.

type LabelSet

type LabelSet []string

LabelSet is a unique set of Prometheus labels.

func ExtractUniqueLabels

func ExtractUniqueLabels(result model.Value) (LabelSet, error)

ExtractUniqueLabels extracts unique labels from a Prometheus query result.

func (LabelSet) ToDimensions

func (labels LabelSet) ToDimensions() []string

ToDimensions converts a LabelSet to a slice of strings that can be used for Druids Dimensions. It also adds the dimension 'name' to the slice.

func (LabelSet) ToFieldList

func (labels LabelSet) ToFieldList() FieldList

ToFieldList converts a LabelSet to a FieldList

type Metric

type Metric struct {
	Name      string `json:"name"`
	Type      string `json:"type"`
	FieldName string `json:"fieldName,omitempty"`
}

Metric is a Druid aggregator that is applied at ingestion time.

type ParseSpec

type ParseSpec struct {
	Format         string         `json:"format"`
	TimeStampSpec  TimestampSpec  `json:"timestampSpec"`
	FlattenSpec    FlattenSpec    `json:"flattenSpec"`
	DimensionsSpec DimensionsSpec `json:"dimensionsSpec"`
}

ParseSpec represents the parseSpec object under Parser.

type Parser

type Parser struct {
	Type      string    `json:"type"`
	ParseSpec ParseSpec `json:"parseSpec"`
}

Parser is responsible for configuring a wide variety of items related to parsing input records.

type PasswordProvider

type PasswordProvider struct {
	Type     string `json:"type"`
	Variable string `json:"variable"`
}

PasswordProvider allows Druid to configure secrets via environment variables.

type TimestampSpec

type TimestampSpec struct {
	Column string `json:"column"`
	Format string `json:"format"`
}

TimestampSpec is responsible for configuring the primary timestamp.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL