gather

package
v2.0.0-beta.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 13, 2020 License: MIT Imports: 20 Imported by: 0

README

How to use this package

Make sure nats is running. Both publisher and subscriber are open

// NATS streaming server
m.natsServer = nats.NewServer(nats.Config{FilestoreDir: m.natsPath})
if err := m.natsServer.Open(); err != nil {
    m.logger.Error("Failed to start nats streaming server", zap.Error(err))
    return err
}

publisher := nats.NewAsyncPublisher("nats-publisher")
if err := publisher.Open(); err != nil {
    m.logger.Error("Failed to connect to streaming server", zap.Error(err))
    return err
}

subscriber := nats.NewQueueSubscriber("nats-subscriber")
if err := subscriber.Open(); err != nil {
    m.logger.Error("Failed to connect to streaming server", zap.Error(err))
    return err
}

Make sure the scraperTargetStorageService is accessible

scraperTargetSvc influxdb.ScraperTargetStoreService = m.boltClient

Setup recorder, Make sure subscriber subscribes use the correct recorder with the correct write service

recorder := gather.PlatformWriter{
    Timeout: time.Millisecond * 30,
    Writer: writer,
}
subscriber.Subscribe(MetricsSubject, "", &RecorderHandler{
    Logger:   logger,
    Recorder: recorder,
})

Start the scheduler

scraperScheduler, err := gather.NewScheduler(10, m.logger, scraperTargetSvc, publisher, subscriber, 0, 0)
if err != nil {
    m.logger.Error("Failed to create scraper subscriber", zap.Error(err))
    return err
}

Documentation

Index

Constants

View Source
const (
	MetricsSubject = "metrics"
)

nats subjects

Variables

This section is empty.

Functions

This section is empty.

Types

type MetricType

type MetricType int

MetricType is prometheus metrics type.

const (
	MetricTypeCounter MetricType = iota
	MetricTypeGauge
	MetricTypeSummary
	MetricTypeUntyped
	MetricTypeHistogrm
)

the set of metric types

func (MetricType) String

func (x MetricType) String() string

String returns the string value of MetricType.

func (*MetricType) UnmarshalJSON

func (x *MetricType) UnmarshalJSON(data []byte) error

UnmarshalJSON implements the unmarshaler interface.

func (MetricType) Valid

func (x MetricType) Valid() bool

Valid returns whether the metrics type is valid.

type Metrics

type Metrics struct {
	Name      string                 `json:"name"`
	Tags      map[string]string      `json:"tags"`
	Fields    map[string]interface{} `json:"fields"`
	Timestamp time.Time              `json:"timestamp"`
	Type      MetricType             `json:"type"`
}

Metrics is the default influx based metrics.

type MetricsCollection

type MetricsCollection struct {
	OrgID        influxdb.ID  `json:"orgID"`
	BucketID     influxdb.ID  `json:"bucketID"`
	MetricsSlice MetricsSlice `json:"metrics"`
}

MetricsCollection is the struct including metrics and other requirements.

type MetricsSlice

type MetricsSlice []Metrics

MetricsSlice is a slice of Metrics

func (MetricsSlice) Points

func (ms MetricsSlice) Points() (models.Points, error)

Points convert the MetricsSlice to model.Points

func (MetricsSlice) Reader

func (ms MetricsSlice) Reader() (io.Reader, error)

Reader returns an io.Reader that enumerates the metrics. All metrics are allocated into the underlying buffer.

type PointWriter

type PointWriter struct {
	Writer storage.PointsWriter
}

PointWriter will use the storage.PointWriter interface to record metrics.

func (PointWriter) Record

func (s PointWriter) Record(collected MetricsCollection) error

Record the metrics and write using storage.PointWriter interface.

type Recorder

type Recorder interface {
	// Subscriber nats.Subscriber
	Record(collected MetricsCollection) error
}

Recorder record the metrics of a time based.

type RecorderHandler

type RecorderHandler struct {
	Recorder Recorder
	// contains filtered or unexported fields
}

RecorderHandler implements nats.Handler interface.

func NewRecorderHandler

func NewRecorderHandler(log *zap.Logger, recorder Recorder) *RecorderHandler

func (*RecorderHandler) Process

func (h *RecorderHandler) Process(s nats.Subscription, m nats.Message)

Process consumes job queue, and use recorder to record.

type Scheduler

type Scheduler struct {
	Targets influxdb.ScraperTargetStoreService
	// Interval is between each metrics gathering event.
	Interval time.Duration
	// Timeout is the maxisium time duration allowed by each TCP request
	Timeout time.Duration

	// Publisher will send the gather requests and gathered metrics to the queue.
	Publisher nats.Publisher
	// contains filtered or unexported fields
}

Scheduler is struct to run scrape jobs.

func NewScheduler

func NewScheduler(
	log *zap.Logger,
	numScrapers int,
	targets influxdb.ScraperTargetStoreService,
	p nats.Publisher,
	s nats.Subscriber,
	interval time.Duration,
	timeout time.Duration,
) (*Scheduler, error)

NewScheduler creates a new Scheduler and subscriptions for scraper jobs.

func (*Scheduler) Run

func (s *Scheduler) Run(ctx context.Context) error

Run will retrieve scraper targets from the target storage, and publish them to nats job queue for gather.

type Scraper

type Scraper interface {
	Gather(ctx context.Context, target influxdb.ScraperTarget) (collected MetricsCollection, err error)
}

Scraper gathers metrics from a scraper target.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL