differ

package
v0.0.0-...-4dfe32f Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 26, 2023 License: Apache-2.0 Imports: 32 Imported by: 0

Documentation

Overview

Package differ contains core of CCX Notification Service. Differ itself is implemented there, together with storage and comparator implementations.

Index

Constants

View Source
const (
	// ExitStatusOK means that the tool finished with success
	ExitStatusOK = iota
	// ExitStatusConfiguration is an error code related to program configuration
	ExitStatusConfiguration
	// ExitStatusError is a general error code
	ExitStatusError
	// ExitStatusStorageError is returned in case of any consumer-related error
	ExitStatusStorageError
	// ExitStatusFetchContentError is returned in case content cannot be fetch correctly
	ExitStatusFetchContentError
	// ExitStatusKafkaBrokerError is for kafka broker connection establishment errors
	ExitStatusKafkaBrokerError
	// ExitStatusKafkaProducerError is for kafka event production failures
	ExitStatusKafkaProducerError
	// ExitStatusKafkaConnectionNotClosedError is raised when connection cannot be closed
	ExitStatusKafkaConnectionNotClosedError
	// ExitStatusCleanerError is raised when clean operation is not successful
	ExitStatusCleanerError
	// ExitStatusMetricsError is raised when prometheus metrics cannot be pushed
	ExitStatusMetricsError
	// ExitStatusEventFilterError is raised when event filter is not set correctly
	ExitStatusEventFilterError
	// ExitStatusServiceLogError is raised when Service Log notifier cannot be initialized
	ExitStatusServiceLogError
)

Exit codes

View Source
const (
	// TotalRiskLow is the numerical representation of 'Low' total risk
	TotalRiskLow = 1
	// TotalRiskModerate is the numerical representation of 'Moderate' total risk
	TotalRiskModerate = iota + 1
	// TotalRiskImportant   is the numerical representation of 'Important  ' total risk
	TotalRiskImportant
	// TotalRiskCritical is the numerical representation of 'Critical' total risk
	TotalRiskCritical
	// TotalRiskMax is the highest total risk handled
	TotalRiskMax = TotalRiskCritical
	// TotalRiskMin is the lowest total risk handled
	TotalRiskMin = TotalRiskLow
)

Total risk values

View Source
const (
	AccountNumberAttribute = "account number"

	ReportNotFoundError = "report for rule ID %v and error key %v has not been found"
)

Messages

View Source
const (
	DefaultTotalRiskThreshold = 2
	DefaultEventFilter        = "totalRisk >= totalRiskThreshold"
)

Constants used to filter events

View Source
const (
	ServiceLogSeverityInfo     = "Info"
	ServiceLogSeverityWarning  = "Warning"
	ServiceLogSeverityMajor    = "Major"
	ServiceLogSeverityCritical = "Critical"
)

Constants used for creating Service Log entries - there is a length limit on text fields in Service Log, which will return an error status code in case this limit is exceeded

View Source
const (
	FetchContentErrorsName            = "fetch_content_errors"
	ReadClusterListErrorsName         = "read_cluster_list_errors"
	ReadReportedErrorsName            = "read_reported_errors"
	ProducerSetupErrorsName           = "producer_setup_errors"
	StorageSetupErrorsName            = "storage_setup_errors"
	ReadReportForClusterErrorsName    = "read_report_for_cluster_errors"
	DeserializeReportErrorsName       = "deserialize_report_errors"
	ReportWithHighImpactName          = "report_with_high_impact"
	NotificationNotSentSameStateName  = "notification_not_sent_same_state"
	NotificationNotSentErrorStateName = "notification_not_sent_error_state"
	NotificationSentName              = "notification_sent"
	NoSeverityTotalRiskName           = "total_risk_no_severity"
)

Metrics names

View Source
const (
	FetchContentErrorsHelp            = "The total number of errors during fetch from content service"
	ReadClusterListErrorsHelp         = "The total number of errors when reading cluster list from new_reports table"
	ReadReportedErrorsHelp            = "The total number of errors when reading previously reported reports fpr given clusters from reported table"
	ProducerSetupErrorsHelp           = "The total number of errors when setting up Kafka producer"
	StorageSetupErrorsHelp            = "The total number of errors when setting up storage connection"
	ReadReportForClusterErrorsHelp    = "The total number of errors when getting latest report for a given cluster ID"
	DeserializeReportErrorsHelp       = "The total number of errors when deserializing a report retrieved from the new_reports table"
	ReportWithHighImpactHelp          = "The total number of reports with total risk higher than the configured threshold"
	NotificationNotSentSameStateHelp  = "The total number of notifications not sent because we parsed the same report"
	NotificationNotSentErrorStateHelp = "The total number of notifications not sent because of a Kafka producer error"
	NotificationSentHelp              = "The total number of notifications sent"
	NoSeverityTotalRiskHelp           = "The total number of times we handled a total risk that does not have an equivalent service log severity level"
)

Metrics helps

View Source
const (
	OrgIDMessage         = "Organization ID"
	ClusterNameMessage   = "Cluster name"
	TimestampMessage     = "Timestamp (notified_at/updated_at)"
	AccountNumberMessage = "Account number"
	UpdatedAtMessage     = "Updated at"
	AgeMessage           = "Age"
	MaxAgeAttribute      = "max age"
	DeleteStatement      = "delete statement"
)

other messages

View Source
const (
	QueryRecordExistsInReadErrors = `
		SELECT exists(SELECT 1 FROM read_errors WHERE org_id=$1 and cluster=$2 and updated_at=$3);
`

	InsertReadErrorsStatement = `
		INSERT INTO read_errors(org_id, cluster, updated_at, created_at, error_text)
		VALUES ($1, $2, $3, $4, $5);
	`

	ReadReportForClusterQuery = `` /* 134-byte string literal not displayed */

	ReadReportForClusterAtOffsetQuery = `
		SELECT report
		   FROM new_reports
		  WHERE org_id = $1 AND cluster = $2 AND kafka_offset = $3;
       `

	ReadReportForClusterAtTimeQuery = `
		SELECT report
		   FROM new_reports
		  WHERE org_id = $1 AND cluster = $2 AND updated_at = $3;
       `
)

SQL statements

Variables

DeserializeReportErrors shows number of errors when deserializing a report retrieved from the new_reports table

FetchContentErrors shows number of errors during fetch from content service

NoSeverityTotalRisk shows how many times a total risk not mapped to a service log severity is received

NotificationNotSentErrorState shows number of notifications not sent because of a Kafka producer error

NotificationNotSentSameState shows number of notifications not sent because we parsed the same report

NotificationSent shows number notifications sent to the configured Kafka topic

ProducerSetupErrors shows number of errors when setting up Kafka producer

ReadClusterListErrors shows number of errors when reading cluster list from new_reports table

ReadReportForClusterErrors shows number of errors when getting latest report for a given cluster

ReadReportedErrors shows number of errors when getting previously notified reports from reported table

ReportWithHighImpact shows number of reports with total risk higher than the configured threshold

StorageSetupErrors shows number of errors when setting up storage

Functions

func AddMetricsWithNamespaceAndSubsystem

func AddMetricsWithNamespaceAndSubsystem(namespace, subsystem string)

AddMetricsWithNamespaceAndSubsystem register the desired metrics using a given namespace

func IssueNotInReport

func IssueNotInReport(oldReport types.Report, issue *types.EvaluatedReportItem) bool

IssueNotInReport searches for a specific issue in given OCP report. It returns a boolean flag indicating that the report does not contain the issue and thus user needs to be informed about it.

func IssuesEqual

func IssuesEqual(issue1, issue2 *types.EvaluatedReportItem) bool

IssuesEqual compares two issues from reports

func PerformCleanupOnStartup

func PerformCleanupOnStartup(storage *DBStorage, cliFlags types.CliFlags) error

PerformCleanupOnStartup function cleans up the database before differ is started

func PerformCleanupOperation

func PerformCleanupOperation(storage *DBStorage, cliFlags types.CliFlags) error

PerformCleanupOperation function performs selected cleanup operation

func PushCollectedMetrics

func PushCollectedMetrics(metricsConf *conf.MetricsConfiguration) error

PushCollectedMetrics function pushes the metrics to the configured prometheus push gateway

func PushMetricsInLoop

func PushMetricsInLoop(ctx context.Context, metricsConf *conf.MetricsConfiguration)

PushMetricsInLoop pushes the metrics in a loop until context is done

func Run

func Run()

Run function is entry point to the differ

func ShowAuthors

func ShowAuthors()

ShowAuthors function displays information about authors.

func ShowConfiguration

func ShowConfiguration(config *conf.ConfigStruct)

ShowConfiguration function displays actual configuration.

func ShowVersion

func ShowVersion()

ShowVersion function displays version information.

Types

type ClusterFilterStatistic

type ClusterFilterStatistic struct {
	Input    int
	Allowed  int
	Blocked  int
	Filtered int
}

ClusterFilterStatistic is a structure containing elementary statistic about clusters being filtered by filterClusterList function. It can be used for logging and debugging purposes.

type DBStorage

type DBStorage struct {
	// contains filtered or unexported fields
}

DBStorage is an implementation of Storage interface that use selected SQL like database like SQLite, PostgreSQL, MariaDB, RDS etc. That implementation is based on the standard sql package. It is possible to configure connection via Configuration structure. SQLQueriesLog is log for sql queries, default is nil which means nothing is logged

func NewFromConnection

func NewFromConnection(connection *sql.DB, dbDriverType types.DBDriver) *DBStorage

NewFromConnection function creates and initializes a new instance of Storage interface from prepared connection

func NewStorage

func NewStorage(configuration *conf.StorageConfiguration) (*DBStorage, error)

NewStorage function creates and initializes a new instance of Storage interface

func (DBStorage) Cleanup

func (storage DBStorage) Cleanup(maxAge, statement string) (int, error)

Cleanup method deletes all reports older than specified relative time

func (DBStorage) CleanupNewReports

func (storage DBStorage) CleanupNewReports(maxAge string) (int, error)

CleanupNewReports method deletes all reports from `new_reports` table older than specified relative time

func (DBStorage) CleanupOldReports

func (storage DBStorage) CleanupOldReports(maxAge string) (int, error)

CleanupOldReports method deletes all reports from `reported` table older than specified relative time

func (DBStorage) Close

func (storage DBStorage) Close() error

Close method closes the connection to database. Needs to be called at the end of application lifecycle.

func (DBStorage) DeleteRowFromNewReports

func (storage DBStorage) DeleteRowFromNewReports(
	orgID types.OrgID,
	clusterName types.ClusterName,
	updatedAt types.Timestamp) (int, error)

DeleteRowFromNewReports deletes one selected row from `new_reports` table. Number of deleted rows (zero or one) is returned.

func (DBStorage) DeleteRowFromReported

func (storage DBStorage) DeleteRowFromReported(
	orgID types.OrgID,
	clusterName types.ClusterName,
	notifiedAt types.Timestamp) (int, error)

DeleteRowFromReported deletes one selected row from `reported` table. Number of deleted rows (zero or one) is returned.

func (DBStorage) PrintNewReports

func (storage DBStorage) PrintNewReports(maxAge, query, tableName string) error

PrintNewReports method prints all reports from selected table older than specified relative time

func (DBStorage) PrintNewReportsForCleanup

func (storage DBStorage) PrintNewReportsForCleanup(maxAge string) error

PrintNewReportsForCleanup method prints all reports from `new_reports` table older than specified relative time

func (DBStorage) PrintOldReportsForCleanup

func (storage DBStorage) PrintOldReportsForCleanup(maxAge string) error

PrintOldReportsForCleanup method prints all reports from `reported` table older than specified relative time

func (DBStorage) ReadClusterList

func (storage DBStorage) ReadClusterList() ([]types.ClusterEntry, error)

ReadClusterList method creates list of clusters from all the rows in new_reports table.

func (DBStorage) ReadErrorExists

func (storage DBStorage) ReadErrorExists(
	orgID types.OrgID,
	clusterName types.ClusterName,
	lastCheckedTime time.Time,
) (bool, error)

ReadErrorExists method checks if read_errors table contains given combination org_id+cluster_name+updated_at

func (DBStorage) ReadLastNotifiedRecordForClusterList

func (storage DBStorage) ReadLastNotifiedRecordForClusterList(
	clusterEntries []types.ClusterEntry, timeOffset string, eventTarget types.EventTarget,
) (types.NotifiedRecordsPerCluster, error)

ReadLastNotifiedRecordForClusterList method returns the last notification with state = 'sent' for given event target, organization IDs and clusters.

func (DBStorage) ReadNotificationTypes

func (storage DBStorage) ReadNotificationTypes() ([]types.NotificationType, error)

ReadNotificationTypes method read all notification types from the database.

func (DBStorage) ReadReportForCluster

func (storage DBStorage) ReadReportForCluster(
	orgID types.OrgID, clusterName types.ClusterName,
) (types.ClusterReport, types.Timestamp, error)

ReadReportForCluster reads the latest result (health status) for selected cluster

func (DBStorage) ReadReportForClusterAtOffset

func (storage DBStorage) ReadReportForClusterAtOffset(
	orgID types.OrgID, clusterName types.ClusterName,
	kafkaOffset types.KafkaOffset,
) (types.ClusterReport, error)

ReadReportForClusterAtOffset reads result (health status) for selected cluster and given Kafka offset. It is possible that there are more reports stored for a cluster, so it is needed to specify Kafka offset to select just one such report.

See also: ReadReportForClusterAtTime

func (DBStorage) ReadReportForClusterAtTime

func (storage DBStorage) ReadReportForClusterAtTime(
	orgID types.OrgID, clusterName types.ClusterName,
	updatedAt types.Timestamp,
) (types.ClusterReport, error)

ReadReportForClusterAtTime reads result (health status) for selected cluster and given timestamp. It is possible that there are more reports stored for a cluster, so it is needed to specify timestamp to select just one such report.

See also: ReadReportForClusterAtOffset

func (DBStorage) ReadStates

func (storage DBStorage) ReadStates() ([]types.State, error)

ReadStates method reads all possible notification states from the database.

func (DBStorage) WriteNotificationRecord

func (storage DBStorage) WriteNotificationRecord(
	notificationRecord *types.NotificationRecord) error

WriteNotificationRecord method writes a report (with given state and notification type) into the database table `reported`. Data for several columns are passed via NotificationRecord structure.

See also: WriteNotificationRecordForCluster, WriteNotificationRecordImpl

func (DBStorage) WriteNotificationRecordForCluster

func (storage DBStorage) WriteNotificationRecordForCluster(
	clusterEntry types.ClusterEntry,
	notificationTypeID types.NotificationTypeID,
	stateID types.StateID,
	report types.ClusterReport,
	notifiedAt types.Timestamp,
	errorLog string,
	eventTarget types.EventTarget) error

WriteNotificationRecordForCluster method writes a report (with given state and notification type) into the database table `reported`. Data for several columns are passed via ClusterEntry structure (as returned by ReadReportForClusterAtTime and ReadReportForClusterAtOffset methods).

See also: WriteNotificationRecord, WriteNotificationRecordImpl

func (DBStorage) WriteNotificationRecordImpl

func (storage DBStorage) WriteNotificationRecordImpl(
	orgID types.OrgID,
	accountNumber types.AccountNumber,
	clusterName types.ClusterName,
	notificationTypeID types.NotificationTypeID,
	stateID types.StateID,
	report types.ClusterReport,
	updatedAt types.Timestamp,
	notifiedAt types.Timestamp,
	errorLog string,
	eventTarget types.EventTarget) error

WriteNotificationRecordImpl method writes a report (with given state and notification type) into the database table `reported`. Data for all columns are passed explicitly.

See also: WriteNotificationRecord, WriteNotificationRecordForCluster

func (DBStorage) WriteReadError

func (storage DBStorage) WriteReadError(
	orgID types.OrgID,
	clusterName types.ClusterName,
	lastCheckedTime time.Time,
	e error,
) error

WriteReadError method writes information about read error into table read_errors.

type Differ

type Differ struct {
	Storage            Storage
	Notifier           producer.Producer
	NotificationType   types.EventType
	Target             types.EventTarget
	PreviouslyReported types.NotifiedRecordsPerCluster
	CoolDown           string
	Thresholds         EventThresholds
	Filter             string
	FilterByTag        bool
	TagsSet            types.TagsSet
}

Differ is the struct that holds all the dependencies and configuration of this service

func New

func New(config *conf.ConfigStruct, storage Storage) *Differ

New constructs new implementation of Differ interface

func (*Differ) ProcessClusters

func (d *Differ) ProcessClusters(config *conf.ConfigStruct, ruleContent types.RulesMap,
	clusters []types.ClusterEntry)

ProcessClusters function creates desired notification messages for all the clusters obtained from the database

func (*Differ) ProduceEntriesToServiceLog

func (d *Differ) ProduceEntriesToServiceLog(configuration *conf.ConfigStruct, cluster types.ClusterEntry,
	rules types.Rules, ruleContent types.RulesMap, reports types.ReportContent) (totalMessages int, err error)

ProduceEntriesToServiceLog sends an entry to the service log integration for each issue found in the given reports

func (*Differ) SetupKafkaProducer

func (d *Differ) SetupKafkaProducer(config *conf.ConfigStruct)

SetupKafkaProducer function creates a Kafka producer using the provided configuration

func (*Differ) ShouldNotify

func (d *Differ) ShouldNotify(cluster types.ClusterEntry, issue *types.EvaluatedReportItem) bool

ShouldNotify asserts whether an issue has already been sent in a previous notification event

type EventThresholds

type EventThresholds struct {
	TotalRisk  int
	Likelihood int
	Impact     int
	Severity   int
}

EventThresholds structure contains all threshold values for event filter evaluator

type EventValue

type EventValue struct {
	TotalRisk  int
	Likelihood int
	Impact     int
	Severity   int
}

EventValue structure contains all event values for event filter evaluator

type NotificationURLs

type NotificationURLs struct {
	ClusterDetails  string
	RuleDetails     string
	InsightsAdvisor string
}

NotificationURLs structure contains all the URLs that are inserted in the notifications

type PushGatewayClient

type PushGatewayClient struct {
	AuthToken string
	// contains filtered or unexported fields
}

PushGatewayClient is a simple wrapper over http.Client so that prometheus can do HTTP requests with the given authentication header

func (*PushGatewayClient) Do

func (pgc *PushGatewayClient) Do(request *http.Request) (*http.Response, error)

Do is a simple wrapper over http.Client.Do method that includes the authentication header configured in the PushGatewayClient instance

type Storage

type Storage interface {
	Close() error
	ReadReportForCluster(
		orgID types.OrgID, clusterName types.ClusterName) (types.ClusterReport, types.Timestamp, error,
	)
	ReadClusterList() ([]types.ClusterEntry, error)
	ReadNotificationTypes() ([]types.NotificationType, error)
	ReadStates() ([]types.State, error)
	ReadReportForClusterAtTime(
		orgID types.OrgID, clusterName types.ClusterName,
		updatedAt types.Timestamp) (types.ClusterReport, error,
	)
	ReadReportForClusterAtOffset(
		orgID types.OrgID, clusterName types.ClusterName,
		offset types.KafkaOffset) (types.ClusterReport, error,
	)
	ReadLastNotifiedRecordForClusterList(
		clusterEntries []types.ClusterEntry, timeOffset string, eventTarget types.EventTarget) (types.NotifiedRecordsPerCluster, error)
	WriteNotificationRecord(
		notificationRecord *types.NotificationRecord) error
	WriteNotificationRecordForCluster(
		clusterEntry types.ClusterEntry,
		notificationTypeID types.NotificationTypeID,
		stateID types.StateID,
		report types.ClusterReport,
		notifiedAt types.Timestamp,
		errorLog string,
		eventTarget types.EventTarget) error
	WriteNotificationRecordImpl(
		orgID types.OrgID,
		accountNumber types.AccountNumber,
		clusterName types.ClusterName,
		notificationTypeID types.NotificationTypeID,
		stateID types.StateID,
		report types.ClusterReport,
		updatedAt types.Timestamp,
		notifiedAt types.Timestamp,
		errorLog string,
		eventType types.EventTarget) error
	ReadErrorExists(
		orgID types.OrgID,
		clusterName types.ClusterName,
		lastCheckedTime time.Time,
	) (bool, error)
	WriteReadError(
		orgID types.OrgID,
		clusterName types.ClusterName,
		lastCheckedTime time.Time,
		e error) error
	DeleteRowFromNewReports(
		orgID types.OrgID,
		clusterName types.ClusterName,
		updatedAt types.Timestamp) (int, error)
	DeleteRowFromReported(
		orgID types.OrgID,
		clusterName types.ClusterName,
		notifiedAt types.Timestamp) (int, error)
	PrintNewReportsForCleanup(maxAge string) error
	CleanupNewReports(maxAge string) (int, error)
	PrintOldReportsForCleanup(maxAge string) error
	CleanupOldReports(maxAge string) (int, error)
}

Storage represents an interface to almost any database or storage system

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL