Documentation ¶
Overview ¶
Package consumer contains interface for any consumer that is able to process messages. It also contains implementation of various Kafka consumers.
For more information please see: https://redhatinsights.github.io/insights-data-schemas/external-pipeline/ccx_data_pipeline.html
Package consumer contains interface for any consumer that is able to process messages. It also contains implementation of Kafka consumer.
It is expected that consumed messages are generated by ccx-data-pipeline based on OCP rules framework. The report generated by the framework are enhanced with more context information taken from different sources, like the organization ID, account number, unique cluster name, and the LastChecked timestamp (taken from the incoming Kafka record containing the URL to the archive).
It is also expected that consumed messages contains one INFO rule hit that contains cluster version. That rule hit is produced by special rule used only in external data pipeline: "version_info|CLUSTER_VERSION_INFO"
Package consumer contains interface for any consumer that is able to process messages. It also contains implementation of Kafka consumer.
It is expected that consumed messages are generated by ccx-data-pipeline based on OCP rules framework. The report generated by the framework are enhanced with more context information taken from different sources, like the organization ID, account number, unique cluster name, and the LastChecked timestamp (taken from the incoming Kafka record containing the URL to the archive).
It is also expected that consumed messages contains one INFO rule hit that contains cluster version. That rule hit is produced by special rule used only in external data pipeline: "version_info|CLUSTER_VERSION_INFO"
Package consumer contains interface for any consumer that is able to process messages. It also contains implementation of Kafka consumer.
It is expected that consumed messages are generated by ccx-data-pipeline based on OCP rules framework. The report generated by the framework are enhanced with more context information taken from different sources, like the organization ID, account number, unique cluster name, and the LastChecked timestamp (taken from the incoming Kafka record containing the URL to the archive).
It is also expected that consumed messages contains one INFO rule hit that contains cluster version. That rule hit is produced by special rule used only in external data pipeline: "version_info|CLUSTER_VERSION_INFO"
Index ¶
- Variables
- func DecompressMessage(messageValue []byte) ([]byte, error)
- func IsMessageInGzipFormat(messageValue []byte) bool
- type AttributeChecker
- type Consumer
- type DVORulesProcessor
- type DvoMetrics
- type JSONAttributeChecker
- type KafkaConsumer
- func NewDVORulesConsumer(brokerCfg broker.Configuration, storage storage.DVORecommendationsStorage) (*KafkaConsumer, error)
- func NewKafkaConsumer(brokerCfg broker.Configuration, storage storage.Storage, ...) (*KafkaConsumer, error)
- func NewKafkaConsumerWithSaramaConfig(brokerCfg broker.Configuration, storage storage.Storage, ...) (*KafkaConsumer, error)
- func NewOCPRulesConsumer(brokerCfg broker.Configuration, storage storage.OCPRecommendationsStorage) (*KafkaConsumer, error)
- func (consumer *KafkaConsumer) Cleanup(sarama.ConsumerGroupSession) error
- func (consumer *KafkaConsumer) Close() error
- func (consumer *KafkaConsumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
- func (consumer *KafkaConsumer) GetNumberOfErrorsConsumingMessages() uint64
- func (consumer *KafkaConsumer) GetNumberOfSuccessfullyConsumedMessages() uint64
- func (consumer *KafkaConsumer) HandleMessage(msg *sarama.ConsumerMessage) error
- func (consumer *KafkaConsumer) Serve()
- func (consumer *KafkaConsumer) Setup(sarama.ConsumerGroupSession) error
- type MessageProcessor
- type OCPRulesProcessor
- type Report
Constants ¶
This section is empty.
Variables ¶
var DefaultSaramaConfig *sarama.Config
DefaultSaramaConfig is a config which will be used by default here you can use specific version of a protocol for example useful for testing
Functions ¶
func DecompressMessage ¶ added in v1.4.0
DecompressMessage will try to decompress the message if the message is compressed by using any supported method (GZIP at this moment)
func IsMessageInGzipFormat ¶ added in v1.4.0
IsMessageInGzipFormat function checks if the format of the message is gzip if it is it will return true if not it will return false
Types ¶
type AttributeChecker ¶ added in v1.4.0
type AttributeChecker interface {
IsEmpty() bool
}
AttributeChecker is an interface for checking if an attribute is empty.
type Consumer ¶
type Consumer interface { Serve() Close() error HandleMessage(msg *sarama.ConsumerMessage) error }
Consumer represents any consumer of insights-rules messages
type DVORulesProcessor ¶ added in v1.4.0
type DVORulesProcessor struct { }
DVORulesProcessor satisfies MessageProcessor interface
type DvoMetrics ¶ added in v1.4.0
type DvoMetrics map[string]*json.RawMessage
DvoMetrics represents DVO workload recommendations received as part of the incoming message
type JSONAttributeChecker ¶ added in v1.4.0
type JSONAttributeChecker struct {
// contains filtered or unexported fields
}
JSONAttributeChecker is an implementation of Checker for JSON data.
func (*JSONAttributeChecker) IsEmpty ¶ added in v1.4.0
func (j *JSONAttributeChecker) IsEmpty() bool
IsEmpty returns whether the data of the JSON data is empty
type KafkaConsumer ¶
type KafkaConsumer struct { Configuration broker.Configuration ConsumerGroup sarama.ConsumerGroup Storage storage.Storage MessageProcessor MessageProcessor // contains filtered or unexported fields }
KafkaConsumer is an implementation of Consumer interface Example:
KafkaConsumer, err := consumer.NewKafkaConsumer(brokerCfg, storage)
if err != nil { panic(err) }
KafkaConsumer.Serve()
err := KafkaConsumer.Stop()
if err != nil { panic(err) }
func NewDVORulesConsumer ¶ added in v1.4.0
func NewDVORulesConsumer(brokerCfg broker.Configuration, storage storage.DVORecommendationsStorage) (*KafkaConsumer, error)
NewDVORulesConsumer constructs new implementation of Consumer interface
func NewKafkaConsumer ¶ added in v1.4.0
func NewKafkaConsumer(brokerCfg broker.Configuration, storage storage.Storage, processor MessageProcessor) (*KafkaConsumer, error)
NewKafkaConsumer constructs new implementation of Consumer interface
func NewKafkaConsumerWithSaramaConfig ¶ added in v1.4.0
func NewKafkaConsumerWithSaramaConfig( brokerCfg broker.Configuration, storage storage.Storage, saramaConfig *sarama.Config, processor MessageProcessor, ) (*KafkaConsumer, error)
NewKafkaConsumerWithSaramaConfig constructs new implementation of Consumer interface with custom sarama config
func NewOCPRulesConsumer ¶ added in v1.4.0
func NewOCPRulesConsumer(brokerCfg broker.Configuration, storage storage.OCPRecommendationsStorage) (*KafkaConsumer, error)
NewOCPRulesConsumer constructs new implementation of Consumer interface
func (*KafkaConsumer) Cleanup ¶
func (consumer *KafkaConsumer) Cleanup(sarama.ConsumerGroupSession) error
Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
func (*KafkaConsumer) Close ¶
func (consumer *KafkaConsumer) Close() error
Close method closes all resources used by consumer
func (*KafkaConsumer) ConsumeClaim ¶
func (consumer *KafkaConsumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
ConsumeClaim starts a consumer loop of ConsumerGroupClaim's Messages().
func (*KafkaConsumer) GetNumberOfErrorsConsumingMessages ¶
func (consumer *KafkaConsumer) GetNumberOfErrorsConsumingMessages() uint64
GetNumberOfErrorsConsumingMessages returns number of errors during consuming messages since creating KafkaConsumer obj
func (*KafkaConsumer) GetNumberOfSuccessfullyConsumedMessages ¶
func (consumer *KafkaConsumer) GetNumberOfSuccessfullyConsumedMessages() uint64
GetNumberOfSuccessfullyConsumedMessages returns number of consumed messages since creating KafkaConsumer obj
func (*KafkaConsumer) HandleMessage ¶
func (consumer *KafkaConsumer) HandleMessage(msg *sarama.ConsumerMessage) error
HandleMessage handles the message and does all logging, metrics, etc.
Log message is written for every step made during processing, but in order to reduce amount of messages sent to ElasticSearch, most messages are produced only when log level is set to DEBUG.
A typical example which log messages are produced w/o DEBUG log level during processing:
1:26PM INF started processing message message_timestamp=2023-07-26T13:26:54+02:00 offset=7 partition=0 topic=ccx.ocp.results 1:26PM INF Consumed group=aggregator offset=7 topic=ccx.ocp.results 1:26PM INF Read cluster=5d5892d3-1f74-4ccf-91af-548dfc9767aa offset=7 organization=11789772 partition=0 request ID=missing topic=ccx.ocp.results version=2 1:26PM WRN Received data with unexpected version. cluster=5d5892d3-1f74-4ccf-91af-548dfc9767aa offset=7 organization=11789772 partition=0 topic=ccx.ocp.results version=2 1:26PM INF Stored info report cluster=5d5892d3-1f74-4ccf-91af-548dfc9767aa offset=7 organization=11789772 partition=0 request ID=missing topic=ccx.ocp.results version=2 1:26PM WRN request ID is missing, null or empty Operation=TrackPayload 1:26PM INF Message consumed duration=3 offset=7
When log level is set to DEBUG, many log messages useful for debugging are generated as well:
2:53PM INF started processing message message_timestamp=2023-07-26T14:53:32+02:00 offset=8 partition=0 topic=ccx.ocp.results 2:53PM INF Consumed group=aggregator offset=8 topic=ccx.ocp.results 2:53PM INF Read cluster=5d5892d3-1f74-4ccf-91af-548dfc9767aa offset=8 organization=11789772 partition=0 request ID=missing topic=ccx.ocp.results version=2 2:53PM WRN Received data with unexpected version. cluster=5d5892d3-1f74-4ccf-91af-548dfc9767aa offset=8 organization=11789772 partition=0 topic=ccx.ocp.results version=2 2:53PM DBG Organization allow listing disabled cluster=5d5892d3-1f74-4ccf-91af-548dfc9767aa offset=8 organization=11789772 partition=0 request ID=missing topic=ccx.ocp.results version=2 2:53PM DBG Marshalled cluster=5d5892d3-1f74-4ccf-91af-548dfc9767aa offset=8 organization=11789772 partition=0 request ID=missing topic=ccx.ocp.results version=2 2:53PM DBG Time ok cluster=5d5892d3-1f74-4ccf-91af-548dfc9767aa offset=8 organization=11789772 partition=0 request ID=missing topic=ccx.ocp.results version=2 2:53PM DBG Stored report cluster=5d5892d3-1f74-4ccf-91af-548dfc9767aa offset=8 organization=11789772 partition=0 request ID=missing topic=ccx.ocp.results version=2 2:53PM DBG Stored recommendations cluster=5d5892d3-1f74-4ccf-91af-548dfc9767aa offset=8 organization=11789772 partition=0 request ID=missing topic=ccx.ocp.results version=2 2:53PM DBG rule hits for 11789772.5d5892d3-1f74-4ccf-91af-548dfc9767aa (request ID missing):
rule: ccx_rules_ocp.external.rules.nodes_requirements_check.report; error key: NODES_MINIMUM_REQUIREMENTS_NOT_MET rule: ccx_rules_ocp.external.bug_rules.bug_1766907.report; error key: BUGZILLA_BUG_1766907 rule: ccx_rules_ocp.external.rules.nodes_kubelet_version_check.report; error key: NODE_KUBELET_VERSION rule: ccx_rules_ocp.external.rules.samples_op_failed_image_import_check.report; error key: SAMPLES_FAILED_IMAGE_IMPORT_ERR rule: ccx_rules_ocp.external.rules.cluster_wide_proxy_auth_check.report; error key: AUTH_OPERATOR_PROXY_ERROR
2:53PM DBG rule hits for 11789772.5d5892d3-1f74-4ccf-91af-548dfc9767aa (request ID missing):
rule: ccx_rules_ocp.external.rules.nodes_requirements_check.report; error key: NODES_MINIMUM_REQUIREMENTS_NOT_MET rule: ccx_rules_ocp.external.bug_rules.bug_1766907.report; error key: BUGZILLA_BUG_1766907 rule: ccx_rules_ocp.external.rules.nodes_kubelet_version_check.report; error key: NODE_KUBELET_VERSION rule: ccx_rules_ocp.external.rules.samples_op_failed_image_import_check.report; error key: SAMPLES_FAILED_IMAGE_IMPORT_ERR rule: ccx_rules_ocp.external.rules.cluster_wide_proxy_auth_check.report; error key: AUTH_OPERATOR_PROXY_ERROR
2:53PM INF Stored info report cluster=5d5892d3-1f74-4ccf-91af-548dfc9767aa offset=8 organization=11789772 partition=0 request ID=missing topic=ccx.ocp.results version=2 2:53PM DBG read duration=2287 offset=8 2:53PM DBG org_filtering duration=440 offset=8 2:53PM DBG marshalling duration=2023 offset=8 2:53PM DBG time_check duration=120 offset=8 2:53PM DBG db_store_report duration=119 offset=8 2:53PM DBG db_store_recommendations duration=11 offset=8 2:53PM DBG db_store_info_report duration=102 offset=8 2:53PM WRN request ID is missing, null or empty Operation=TrackPayload 2:53PM WRN request ID is missing, null or empty Operation=TrackPayload 2:53PM DBG processing of message took '0.005895183' seconds offset=8 partition=0 topic=ccx.ocp.results 2:53PM WRN request ID is missing, null or empty Operation=TrackPayload 2:53PM INF Message consumed duration=6 offset=8
func (*KafkaConsumer) Serve ¶
func (consumer *KafkaConsumer) Serve()
Serve starts listening for messages and processing them. It blocks current thread.
func (*KafkaConsumer) Setup ¶
func (consumer *KafkaConsumer) Setup(sarama.ConsumerGroupSession) error
Setup is run at the beginning of a new session, before ConsumeClaim
type MessageProcessor ¶ added in v1.4.0
type MessageProcessor interface {
// contains filtered or unexported methods
}
MessageProcessor offers the interface for processing a received message
type OCPRulesProcessor ¶ added in v1.4.0
type OCPRulesProcessor struct { }
OCPRulesProcessor satisfies MessageProcessor interface
type Report ¶
type Report map[string]*json.RawMessage
Report represents report sent in a message consumed from any broker