storage

package
v1.28.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 19, 2021 License: MIT Imports: 25 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// NSStatus is redis namespace prefix for job/gauge statuses
	NSStatus = "status"
	// NSLatest is redis namespace prefix for latest measurements
	NSLatest = "latest"
)

Variables

This section is empty.

Functions

This section is empty.

Types

type CacheManager

type CacheManager interface {
	// LoadJobStatuses returns statuses of currenly running jobs.
	// returns map where keys are job ids
	LoadJobStatuses() (map[string]core.Status, error)
	// LoadGaugeStatuses returns statuses of gauges for given job
	// returns map where keys are gauge codes
	LoadGaugeStatuses(jobID string) (map[string]core.Status, error)
	// SaveStatus saves harvest status for entire job (if code is empty) or single gauge
	// count means number of saved measurements
	SaveStatus(jobID, code string, err error, count int) error

	// LoadLatestMeasurements returns latest measurements
	// it accepts a map where keys are scripts (not job ids!) and values are sets of gauge codes
	LoadLatestMeasurements(from map[string]core.StringSet) (map[core.GaugeID]core.Measurement, error)
	// SaveLatestMeasurements saves given measurements. If there're multiple values per gauge, the most recent one will be saved
	// Input measurements are supposed to be filtered against previous latest values from cache
	// This is done inside job (it also ensures we don't save dupe measurements in db)
	SaveLatestMeasurements(ctx context.Context, in <-chan *core.Measurement) <-chan error

	// Close is callled when cache must be shut down
	Close()
}

CacheManager manager is used to store latest measurement for each gauge and auxiliary information that is safe to lose

type DatabaseManager

type DatabaseManager interface {
	// ListJobs returns slice of currently active jobs
	ListJobs() ([]core.JobDescription, error)
	// GetJob returns active job by its id
	GetJob(id string) (*core.JobDescription, error)
	// AddJob creates new job from description and starts it immediately
	// onSave argument is used to ensure transactional behavior when adding job to scheduler
	AddJob(job core.JobDescription, onSave func(job core.JobDescription) error) error
	// DeleteJon stops running job and deletes it
	// onDelete argument is used to ensure transactional behavior when adding job to scheduler
	DeleteJob(id string, onDelete func(id string) error) error

	// SaveMeasurements saves measurements from the channel in db, until the channel is closed
	// It supports context cancelation
	// returns channel where one single int will be written: total number of saved mesurements
	SaveMeasurements(ctx context.Context, in <-chan *core.Measurement) (<-chan int, <-chan error)
	// GetMeasurements returns measurements stored in db
	GetMeasurements(query MeasurementsQuery) ([]core.Measurement, error)
	// GetNearestMeasurement returns nearest measurement to timestamp (without interpolation)
	GetNearestMeasurement(script, code string, to time.Time, tolerance time.Duration) (*core.Measurement, error)

	// Close is called when db should be shut down
	Close()
}

DatabaseManager is used to store all harvested measurements it's also used to store jobs so that they persist between service restarts

type DbManager

type DbManager struct {
	// contains filtered or unexported fields
}

DbManager implements DatabaseManager using sql database

func (DbManager) AddJob

func (mgr DbManager) AddJob(job core.JobDescription, onSave func(job core.JobDescription) error) error

AddJob implements DatabaseManager interface

func (DbManager) Close

func (mgr DbManager) Close()

Close implements DatabaseManager interface

func (DbManager) DeleteJob

func (mgr DbManager) DeleteJob(id string, onDelete func(id string) error) error

DeleteJob implements DatabaseManager interface

func (DbManager) GetJob

func (mgr DbManager) GetJob(id string) (*core.JobDescription, error)

GetJob implements DatabaseManager interface

func (DbManager) GetMeasurements

func (mgr DbManager) GetMeasurements(query MeasurementsQuery) ([]core.Measurement, error)

GetMeasurements implements DatabaseManager interface

func (DbManager) GetNearestMeasurement added in v1.16.0

func (mgr DbManager) GetNearestMeasurement(script, code string, to time.Time, tolerance time.Duration) (*core.Measurement, error)

GetNearestMeasurement implements DatabaseManager interface

func (DbManager) ListJobs

func (mgr DbManager) ListJobs() ([]core.JobDescription, error)

ListJobs implements DatabaseManager interface

func (DbManager) SaveMeasurements

func (mgr DbManager) SaveMeasurements(ctx context.Context, in <-chan *core.Measurement) (<-chan int, <-chan error)

SaveMeasurements implements DatabaseManager interface

type DbTestSuite

type DbTestSuite struct {
	suite.Suite
	// contains filtered or unexported fields
}

func (*DbTestSuite) SetupTest

func (s *DbTestSuite) SetupTest()

func (*DbTestSuite) TearDownSuite

func (s *DbTestSuite) TearDownSuite()

func (*DbTestSuite) TestAddJobCallbackError

func (s *DbTestSuite) TestAddJobCallbackError()

func (*DbTestSuite) TestAddJobDupe

func (s *DbTestSuite) TestAddJobDupe()

func (*DbTestSuite) TestAddJobOk

func (s *DbTestSuite) TestAddJobOk()

func (*DbTestSuite) TestDeleteJobCallbackError

func (s *DbTestSuite) TestDeleteJobCallbackError()

func (*DbTestSuite) TestDeleteJobMissing

func (s *DbTestSuite) TestDeleteJobMissing()

func (*DbTestSuite) TestDeleteJobOk

func (s *DbTestSuite) TestDeleteJobOk()

func (*DbTestSuite) TestGetJobNotFound

func (s *DbTestSuite) TestGetJobNotFound()

func (*DbTestSuite) TestGetJobSuccess

func (s *DbTestSuite) TestGetJobSuccess()

func (*DbTestSuite) TestGetMeasurements

func (s *DbTestSuite) TestGetMeasurements()

func (*DbTestSuite) TestGetNearestMeasurement added in v1.16.0

func (s *DbTestSuite) TestGetNearestMeasurement()

func (*DbTestSuite) TestListJobs

func (s *DbTestSuite) TestListJobs()

func (*DbTestSuite) TestSaveMeasurements

func (s *DbTestSuite) TestSaveMeasurements()

func (*DbTestSuite) TestSaveMeasurementsCancel

func (s *DbTestSuite) TestSaveMeasurementsCancel()

func (*DbTestSuite) TestSaveMeasurementsChunks

func (s *DbTestSuite) TestSaveMeasurementsChunks()

type EmbeddedCacheManager

type EmbeddedCacheManager struct {
	RedisCacheManager
	// contains filtered or unexported fields
}

EmbeddedCacheManager is cache manager that uses embedded redis https://github.com/alicebob/miniredis

func NewEmbeddedCacheManager

func NewEmbeddedCacheManager() (*EmbeddedCacheManager, error)

NewEmbeddedCacheManager creates new miniredis cache manager

func (EmbeddedCacheManager) Close

func (cache EmbeddedCacheManager) Close()

Close implements CacheManager interface

type MeasurementsQuery

type MeasurementsQuery struct {
	Script string
	Code   string
	From   *time.Time
	To     *time.Time
}

MeasurementsQuery is intermediate data struct to convert HTTP request to database queries

func NewMeasurementsQuery

func NewMeasurementsQuery(script, code, fromS, toS string) (*MeasurementsQuery, error)

NewMeasurementsQuery builds db query from raw string arguments (passed via URL) If both TO and FROM are empty, a period of 30 days from current db time will be used If TO is empty string, current time from db will be used If given period is longer than 30 days, it will be trimmed to 30 days endig at TO timestamp

type PostgresManager

type PostgresManager struct {
	DbManager
}

PostgresManager implements DatabaseManager interface

func NewPostgresManager

func NewPostgresManager(pgConnStr string, chunkSize int, withoutTimescale bool) (*PostgresManager, error)

NewPostgresManager creates new PostgresManager with connection string and chunk size

type RedisCacheManager

type RedisCacheManager struct {
	// contains filtered or unexported fields
}

RedisCacheManager is cache manager that uses real redis

func NewRedisCacheManager

func NewRedisCacheManager(host, port string) (*RedisCacheManager, error)

NewRedisCacheManager creates new redis cache manager

func (RedisCacheManager) Close

func (cache RedisCacheManager) Close()

Close implements CacheManager interface

func (RedisCacheManager) LoadGaugeStatuses

func (cache RedisCacheManager) LoadGaugeStatuses(jobID string) (map[string]core.Status, error)

LoadGaugeStatuses implements CacheManager interface

func (RedisCacheManager) LoadJobStatuses

func (cache RedisCacheManager) LoadJobStatuses() (map[string]core.Status, error)

LoadJobStatuses implements CacheManager interface

func (RedisCacheManager) LoadLatestMeasurements

func (cache RedisCacheManager) LoadLatestMeasurements(from map[string]core.StringSet) (map[core.GaugeID]core.Measurement, error)

LoadLatestMeasurements implements CacheManager interface

func (RedisCacheManager) SaveLatestMeasurements

func (cache RedisCacheManager) SaveLatestMeasurements(ctx context.Context, in <-chan *core.Measurement) <-chan error

SaveLatestMeasurements implements CacheManager interface

func (RedisCacheManager) SaveStatus

func (cache RedisCacheManager) SaveStatus(jobID, code string, err error, count int) error

SaveStatus implements CacheManager interface

type SqliteManager

type SqliteManager struct {
	DbManager
}

SqliteManager implements DatabaseManager interface for Sqlite datbase https://github.com/mattn/go-sqlite3

func NewSqliteDb

func NewSqliteDb(chunkSize int) (*SqliteManager, error)

NewSqliteDb creates SqliteManager with given chunkSize SqliteManager cannot be used for write access concurrently. Test usage only

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL