cfg

package
v0.0.7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 6, 2022 License: Apache-2.0 Imports: 35 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// LogOnly only logs with no alert notification
	LogOnly = -1 * time.Second
)
View Source
const (
	// PrefixDelimiter for message prefix
	PrefixDelimiter = "-"
)

Variables

View Source
var (
	// AllowedPriorities a list of allowed priorities
	AllowedPriorities = []string{"P1", "P2", "P3", "P4", "P5"}
)

Functions

func Alert

func Alert(msg string)

Alert alerts to slack, email, text.

func AllMsgPayloads

func AllMsgPayloads(prefix string, payloadSizes []string, numOfMsg int) ([][]byte, int)

AllMsgPayloads generates a series of payloads based on specified payload sizes or the number of messages

func BrokerTopicsQuery

func BrokerTopicsQuery(brokerBaseURL, token string) ([]string, error)

BrokerTopicsQuery returns a map of broker and topic full name, or error of this operation

func BuildTenantsUsageThread

func BuildTenantsUsageThread()

BuildTenantsUsageThread is the daemon thread that builds last 30s tenants usage and expose to Prometheus metrics

func ClearIncident

func ClearIncident(component string)

ClearIncident clears an incident

func CloseOpsGenieAlert

func CloseOpsGenieAlert(component, alertID string, genieKey string) error

CloseOpsGenieAlert deletes an OpsGenie alert

func ConnectBrokerHealthcheckTopic

func ConnectBrokerHealthcheckTopic(brokerURL, clusterName, pulsarURL string, tokenSupplier func() (string, error), completeChan chan error)

ConnectBrokerHealthcheckTopic reads the latest messages off broker's healthcheck topic

func CreateIncident

func CreateIncident(component, alias, msg, desc, priority string)

CreateIncident creates incident

func CreateOpsGenieAlert

func CreateOpsGenieAlert(msg Incident, genieKey string) error

CreateOpsGenieAlert creates an OpsGenie alert

func CreatePDIncident

func CreatePDIncident(component, alias, msg, pdIntegrationKey string) error

CreatePDIncident creates PagerDuty incident

func EvaluateBrokers

func EvaluateBrokers(urlPrefix, clusterName, pulsarURL string, tokenSupplier func() (string, error), duration time.Duration) (int, error)

EvaluateBrokers evaluates all brokers' health

func EvaluateClusterHealth

func EvaluateClusterHealth(client *k8s.Client) error

EvaluateClusterHealth evaluates and reports the k8s cluster health

func FuncLatencyGaugeOpt

func FuncLatencyGaugeOpt() prometheus.GaugeOpts

FuncLatencyGaugeOpt is the description of Pulsar Function latency gauge

func GenPayload

func GenPayload(prefix, size string) ([]byte, int)

GenPayload generates an array of bytes with prefix string and payload size. If the specified payload size is less than the prefix size, the payload will just be the prefix.

func GetBrokers

func GetBrokers(restBaseURL, clusterName string, tokenSupplier func() (string, error)) ([]string, error)

GetBrokers gets a list of brokers and ports

func GetGaugeType

func GetGaugeType(nameType string) prometheus.GaugeOpts

GetGaugeType get the Prometheus Gauge Option based on type/subsystem

func GetMessageID

func GetMessageID(prefix, str string) int

GetMessageID returns the message index by parsing the template payload string with a prefix.

func GetOfflinePodsCounter

func GetOfflinePodsCounter(subsystem string) prometheus.GaugeOpts

GetOfflinePodsCounter returns prometheus GaugeOpts for kubernetes cluster pod offline counter

func GetPulsarClient

func GetPulsarClient(globalConfiguration *Configuration, pulsarURL string, tokenSupplier func() (string, error)) (pulsar.Client, error)

GetPulsarClient gets the pulsar client object Note: the caller has to Close() the client object

func HeartBeatToOpsGenie

func HeartBeatToOpsGenie(genieURL, genieKey string) error

HeartBeatToOpsGenie send heart beat to ops genie

func HeartbeatCounterOpt

func HeartbeatCounterOpt() prometheus.CounterOpts

HeartbeatCounterOpt is the description for heart beat counter

func MonitorK8sPulsarCluster

func MonitorK8sPulsarCluster() error

MonitorK8sPulsarCluster start K8sPulsarClusterMonitor thread

func MonitorSites

func MonitorSites()

MonitorSites monitors a list of sites

func MsgLatencyGaugeOpt

func MsgLatencyGaugeOpt(typeName, desc string) prometheus.GaugeOpts

MsgLatencyGaugeOpt is the description for Pulsar message latency gauge

func NumOfBytes

func NumOfBytes(size string) int

NumOfBytes returns a number of bytes with specified size in MB or KB

func OfflinePodGaugeOpt

func OfflinePodGaugeOpt(subsystem, desc string) prometheus.GaugeOpts

OfflinePodGaugeOpt is offline pods counter

func PdV2Event

func PdV2Event(action, dedupKey, routingKey string, payload *pd.V2Payload) (*pd.V2EventResponse, error)

PdV2Event is pd client

func PromCounter

func PromCounter(opt prometheus.CounterOpts, cluster string)

PromCounter registers counter and increment

func PromGauge

func PromGauge(opt prometheus.GaugeOpts, cluster string, num float64)

PromGauge registers gauge reading

func PromGaugeInt

func PromGaugeInt(opt prometheus.GaugeOpts, cluster string, num int)

PromGaugeInt registers gauge reading in integer

func PromLatencySum

func PromLatencySum(opt prometheus.GaugeOpts, cluster string, remoteCluster string, latency time.Duration)

PromLatencySum expose monitoring metrics to Prometheus

func PubSubDowntimeGaugeOpt

func PubSubDowntimeGaugeOpt() prometheus.GaugeOpts

PubSubDowntimeGaugeOpt is the description for downtime summary

func PubSubLatency

func PubSubLatency(globalConfiguration *Configuration, clusterName string, topicConfig TopicCfg, tokenSupplier func() (string, error), msgPrefix string, payloads [][]byte, maxPayloadSize int) (map[string]PubSubResult, error)

PubSubLatency the latency including successful produce and consume of a message

func PulsarAdminTenant

func PulsarAdminTenant(clusterURL string, tokenSupplier func() (string, error)) (int, error)

PulsarAdminTenant probes the tenant endpoint to get a list of tenants returns the number of tenants on the cluster

func PulsarTenants

func PulsarTenants()

PulsarTenants get a list of tenants on each cluster

func PushToPrometheusProxy

func PushToPrometheusProxy(proxyURL, authKey string) error

PushToPrometheusProxy pushes exp data to PrometheusProxy

func PushToPrometheusProxyThread

func PushToPrometheusProxyThread()

PushToPrometheusProxyThread is the daemon thread that scrape and pushes metrics to prometheus proxy

func ReadConfigFile

func ReadConfigFile(configFile string)

ReadConfigFile reads configuration file.

func Remail

func Remail(globalConfig *Configuration, topic TopicCfg)

Remail consume all messages and referrals by adding the cluster name in the "ping-remailer" field of the message. If the message already contains this field or we are the sender, the message is simply discarded. To survive the loss of connection, the state of the client and consumer is checked on each iteration and if it was closed it is recreated.

func RemailerThread

func RemailerThread()

RemailerThread Run remailer thread on each topics.

func RemoveIncident

func RemoveIncident(component string)

RemoveIncident removes an existing incident

func ReportIncident

func ReportIncident(component, alias, msg, desc string, eval *AlertPolicyCfg) bool

ReportIncident reports an incident return bool indicate an incident is created or not.

func ResolvePDIncident

func ResolvePDIncident(component, alias, pdIntegrationKey string) error

ResolvePDIncident resolves PagerDuty incident

func RunInterval

func RunInterval(fn monitorFunc, interval time.Duration)

RunInterval runs interval

func SendSlackNotification

func SendSlackNotification(webhookURL string, msg SlackMessage) error

SendSlackNotification will post to an 'Incoming Webook' url setup in Slack Apps. It accepts some text and the slack channel is saved within Slack.

func SiteLatencyGaugeOpt

func SiteLatencyGaugeOpt() prometheus.GaugeOpts

SiteLatencyGaugeOpt is the description for hosting site latency gauge

func StartHeartBeat

func StartHeartBeat()

StartHeartBeat starts heartbeat monitoring the program by OpsGenie

func TenantsGaugeOpt

func TenantsGaugeOpt() prometheus.GaugeOpts

TenantsGaugeOpt is the description for rest api tenant counts

func TestBrokers

func TestBrokers(topicCfg TopicCfg) error

TestBrokers evaluates and reports all brokers health

func TestTopicLatency

func TestTopicLatency(topicCfg TopicCfg)

TestTopicLatency test generic message delivery in topics and the latency

func TestWsLatency

func TestWsLatency(config WsConfig)

TestWsLatency test all clusters' websocket pub sub latency

func TopicLatencyTestThread

func TopicLatencyTestThread()

TopicLatencyTestThread tests a message delivery in topic and measure the latency.

func UptimeHeartBeat

func UptimeHeartBeat()

UptimeHeartBeat sends heartbeat to uptime counter

func VerboseAlert

func VerboseAlert(component, message string, silenceWindow time.Duration)

VerboseAlert is able to reduce the verbosity to Slack channel

func WebSocketTopicLatencyTestThread

func WebSocketTopicLatencyTestThread()

WebSocketTopicLatencyTestThread tests a message websocket delivery in topic and measure the latency.

Types

type AckMessage

type AckMessage struct {
	MessageID string `json:"messageId"`
}

AckMessage is the message struct to acknowledge a message

type AlertPolicyCfg

type AlertPolicyCfg struct {
	// first evaluation to count continuous failure
	Ceiling int `json:"ceiling"`
	// Second evaluation for moving window
	MovingWindowSeconds   int `json:"movingWindowSeconds"`
	CeilingInMovingWindow int `json:"ceilingInMovingWindow"`
}

AlertPolicyCfg is a set of criteria to evaluation triggers for incident alert

type AlertVerbosity

type AlertVerbosity struct {
	// contains filtered or unexported fields
}

AlertVerbosity contains attributes required to calculate whether verbose alert is required or not

func (*AlertVerbosity) MustAlert

func (av *AlertVerbosity) MustAlert() bool

MustAlert returns whether the silence window has expired since the last alert.

type AnalyticsCfg

type AnalyticsCfg struct {
	APIKey            string `json:"apiKey"`
	IngestionURL      string `json:"ingestionUrl"`
	InsightsWriteKey  string `json:"insightsWriteKey"`
	InsightsAccountID string `json:"insightsAccountId"`
}

AnalyticsCfg is analytics usage and statistucs tracking configuration

type BrokersCfg

type BrokersCfg struct {
	BrokerTestRequired bool           `json:"brokerTestRequired"`
	InClusterRESTURL   string         `json:"inclusterRestURL"`
	IntervalSeconds    int            `json:"intervalSeconds"`
	AlertPolicy        AlertPolicyCfg `json:"AlertPolicy"`
}

BrokersCfg monitors all brokers in the cluster

type ClusterHealth

type ClusterHealth struct {
	sync.RWMutex
	Status         k8s.ClusterStatusCode
	MissingBrokers int
}

ClusterHealth a cluster health struct

func (*ClusterHealth) Get

func (h *ClusterHealth) Get() (k8s.ClusterStatusCode, int)

Get gets the cluster health status

func (*ClusterHealth) Set

func (h *ClusterHealth) Set(status k8s.ClusterStatusCode, offlineBrokers int)

Set sets the cluster health status

type Configuration

type Configuration struct {
	// Name is the Pulsar cluster name, it is mandatory
	Name string `json:"name"`
	// ClusterName is the Pulsar cluster name if the Name cannot be used as the Pulsar cluster name, optional
	ClusterName      string                    `json:"clusterName"`
	TokenOAuthConfig *clientcredentials.Config `json:"tokenOAuthConfig"`
	// TokenFilePath is the file path to Pulsar JWT. It takes precedence of the token attribute.
	TokenFilePath string `json:"tokenFilePath"`
	// Token is a Pulsar JWT can be used for both client client or http admin client
	Token             string             `json:"token"`
	BrokersConfig     BrokersCfg         `json:"brokersConfig"`
	TrustStore        string             `json:"trustStore"`
	K8sConfig         K8sClusterCfg      `json:"k8sConfig"`
	AnalyticsConfig   AnalyticsCfg       `json:"analyticsConfig"`
	PrometheusConfig  PrometheusCfg      `json:"prometheusConfig"`
	SlackConfig       SlackCfg           `json:"slackConfig"`
	OpsGenieConfig    OpsGenieCfg        `json:"opsGenieConfig"`
	PagerDutyConfig   PagerDutyCfg       `json:"pagerDutyConfig"`
	PulsarAdminConfig PulsarAdminRESTCfg `json:"pulsarAdminRestConfig"`
	PulsarTopicConfig []TopicCfg         `json:"pulsarTopicConfig"`
	SitesConfig       SitesCfg           `json:"sitesConfig"`
	WebSocketConfig   []WsConfig         `json:"webSocketConfig"`
	TenantUsageConfig TenantUsageCfg     `json:"tenantUsageConfig"`
	// contains filtered or unexported fields
}

Configuration - this server's configuration

var Config Configuration

Config - this server's configuration instance

func GetConfig

func GetConfig() *Configuration

GetConfig returns a reference to the Configuration

func (*Configuration) Init

func (c *Configuration) Init()

func (*Configuration) TokenSupplier

func (c *Configuration) TokenSupplier() func() (string, error)

type Incident

type Incident struct {
	Message     string    `json:"message"`
	Description string    `json:"description"`
	Priority    string    `json:"priority"`
	Entity      string    `json:"entity"`
	Alias       string    `json:"alias"`
	Tags        []string  `json:"tags"`
	Timestamp   time.Time `json:"timestamp"`
}

Incident is the struct for incident reporting

func NewIncident

func NewIncident(component, alias, msg, desc, priority string) Incident

NewIncident creates a Incident object

type IncidentAlertPolicy

type IncidentAlertPolicy struct {
	Entity            string
	Counters          int
	EvalWindowSeconds time.Duration
	Alerts            map[time.Time]bool
	LimitInWindow     int
	Limit             int
	LastUpdatedAt     time.Time
}

IncidentAlertPolicy tracks and reports incident when threshold is reached

type K8sClusterCfg

type K8sClusterCfg struct {
	Enabled         bool           `json:"enabled"`
	PulsarNamespace string         `json:"pulsarNamespace"`
	KubeConfigDir   string         `json:"kubeConfigDir"`
	AlertPolicy     AlertPolicyCfg `json:"AlertPolicy"`
}

K8sClusterCfg is configuration to monitor kubernete cluster only to be enabled in-cluster monitoring

type MsgResult

type MsgResult struct {
	InOrderDelivery bool
	Latency         time.Duration
	SentTime        time.Time
}

func WsLatencyTest

func WsLatencyTest(producerURL, subscriptionURL string, tokenSupplier func() (string, error)) (MsgResult, error)

WsLatencyTest latency test for websocket

type OpsClusterCfg

type OpsClusterCfg struct {
	Name        string         `json:"name"`
	URL         string         `json:"url"`
	AlertPolicy AlertPolicyCfg `json:"alertPolicy"`
}

OpsClusterCfg is each cluster's configuration

type OpsGenieAlertCloseRequest

type OpsGenieAlertCloseRequest struct {
	User   string `json:"user"`
	Source string `json:"source"`
	Note   string `json:"note"`
}

OpsGenieAlertCloseRequest is the POST request payload json

type OpsGenieAlertCreateResponse

type OpsGenieAlertCreateResponse struct {
	Result    string  `json:"result"`
	Took      float64 `json:"took"`
	RequestID string  `json:"requestId"`
}

OpsGenieAlertCreateResponse is the response struct returned by OpsGenie https://docs.opsgenie.com/docs/alert-api#section-create-alert

type OpsGenieAlertGetResponse

type OpsGenieAlertGetResponse struct {
	Data      alertGetData `json:"data"`
	Took      float64      `json:"took"`
	RequestID string       `json:"requestId"`
}

OpsGenieAlertGetResponse is the response struct returned by OpsGenie https://docs.opsgenie.com/docs/alert-api#section-create-alert

type OpsGenieCfg

type OpsGenieCfg struct {
	HeartBeatURL    string `json:"heartbeatUrl"`
	HeartbeatKey    string `json:"heartbeatKey"`
	AlertKey        string `json:"alertKey"`
	IntervalSeconds int    `json:"intervalSeconds"`
}

OpsGenieCfg is opsGenie configuration

type PagerDutyCfg

type PagerDutyCfg struct {
	IntegrationKey string `json:"integrationKey"`
}

PagerDutyCfg is opsGenie configuration

type Payload

type Payload struct {
	Ceiling        int
	Floor          int
	DefaultPayload []byte // to save time for large payload size generation
}

Payload defines the payload size

func NewPayload

func NewPayload(size int) Payload

NewPayload returns a new Payload object with a fixed payload size

func (*Payload) GenDefaultPayload

func (p *Payload) GenDefaultPayload() []byte

GenDefaultPayload generates default payload size

func (Payload) PrefixDefaultPayload

func (p Payload) PrefixDefaultPayload(prefix string) []byte

PrefixDefaultPayload creates string prefix in the payload

func (Payload) PrefixPayload

func (p Payload) PrefixPayload(prefix string) []byte

PrefixPayload creates string prefix in the payload

type PrometheusCfg

type PrometheusCfg struct {
	Port                  string `json:"port"`
	ExposeMetrics         bool   `json:"exposeMetrics"`
	PrometheusProxyURL    string `json:"prometheusProxyURL"`
	PrometheusProxyAPIKey string `json:"prometheusProxyAPIKey"`
}

PrometheusCfg configures Premetheus set up

type PubSubResult

type PubSubResult struct {
	Latency           time.Duration
	InOrderDelivery   bool
	RemainingMessages int
	Success           bool //TODO usefull ?
}

PubSubResult give stats for one.

type PulsarAdminRESTCfg

type PulsarAdminRESTCfg struct {
	Token           string          `json:"Token"`
	Clusters        []OpsClusterCfg `json:"clusters"`
	IntervalSeconds int             `json:"intervalSeconds"`
}

PulsarAdminRESTCfg is for monitor a list of Pulsar cluster

type PulsarMessage

type PulsarMessage struct {
	Payload    string                 `json:"payload"`
	Properties map[string]interface{} `json:"properties"`
	Context    string                 `json:"context,omitempty"`
}

PulsarMessage is the required message format for Pulsar Websocket message

type ReceivingMessage

type ReceivingMessage struct {
	Payload    string                 `json:"payload"`
	MessageID  string                 `json:"messageId"`
	Properties map[string]interface{} `json:"properties"`
	Context    string                 `json:"context,omitempty"`
}

ReceivingMessage is the Pulsar message for socket consumer

type SiteCfg

type SiteCfg struct {
	Headers         map[string]string `json:"headers"`
	URL             string            `json:"url"`
	Name            string            `json:"name"`
	IntervalSeconds int               `json:"intervalSeconds"`
	ResponseSeconds int               `json:"responseSeconds"`
	StatusCode      int               `json:"statusCode"`
	StatusCodeExpr  string            `json:"statusCodeExpr"`
	Retries         int               `json:"retries"`
	AlertPolicy     AlertPolicyCfg    `json:"alertPolicy"`
}

SiteCfg configures general website

type SitesCfg

type SitesCfg struct {
	Sites []SiteCfg `json:"sites"`
}

SitesCfg configures a list of website`

type SlackCfg

type SlackCfg struct {
	AlertURL string `json:"alertUrl"`
	Verbose  bool   `json:"verbose"`
}

SlackCfg is slack configuration

type SlackMessage

type SlackMessage struct {
	Channel   string `json:"channel"`
	Text      string `json:"text"`
	Username  string `json:"username"`
	IconEmogi string `json:"icon_emogi"`
}

SlackMessage is the message struct to be posted for Slack

type TenantUsageCfg

type TenantUsageCfg struct {
	OutBytesLimit        uint64 `json:"outBytesLimit"`
	AlertIntervalMinutes int    `json:"alertIntervalMinutes"`
}

TenantUsageCfg tenant usage reporting and monitoring

type TopicCfg

type TopicCfg struct {
	Name                    string         `json:"name"`
	ClusterName             string         `json:"clusterName"` // used for broker monitoring if specified
	Token                   string         `json:"token"`
	TrustStore              string         `json:"trustStore"`
	NumberOfPartitions      int            `json:"numberOfPartitions"`
	LatencyBudgetMs         int            `json:"latencyBudgetMs"`
	PulsarURL               string         `json:"pulsarUrl"`
	AdminURL                string         `json:"adminUrl"`
	TopicName               string         `json:"topicName"`
	OutputTopic             string         `json:"outputTopic"`
	IntervalSeconds         int            `json:"intervalSeconds"`
	ExpectedMsg             string         `json:"expectedMsg"`
	PayloadSizes            []string       `json:"payloadSizes"`
	NumberOfMessages        int            `json:"numberOfMessages"`
	AlertPolicy             AlertPolicyCfg `json:"AlertPolicy"`
	DowntimeTrackerDisabled bool           `json:"downtimeTrackerDisabled"`
	RemoteClusters          []string       `json:"RemoteClusters"`
}

TopicCfg is topic configuration

type WsConfig

type WsConfig struct {
	Name            string         `json:"name"`
	Token           string         `json:"token"`
	Cluster         string         `json:"cluster"` // can be used for alert de-dupe
	LatencyBudgetMs int            `json:"latencyBudgetMs"`
	ProducerURL     string         `json:"producerUrl"`
	ConsumerURL     string         `json:"consumerUrl"`
	TopicName       string         `json:"topicName"`
	IntervalSeconds int            `json:"intervalSeconds"`
	Scheme          string         `json:"scheme"`
	Port            string         `json:"port"`
	Subscription    string         `json:"subscription"`
	URLQueryParams  string         `json:"urlQueryParams"`
	AlertPolicy     AlertPolicyCfg `json:"AlertPolicy"`
}

WsConfig is configuration to monitor WebSocket pub sub latency

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL