rehydration

package module
v1.51.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 14, 2024 License: Apache-2.0 Imports: 17 Imported by: 0

Documentation

Overview

Package rehydration contains interfaces and implementations used across all rehydration receivers

Index

Constants

View Source
const TimeFormat = "2006-01-02T15:04"

TimeFormat is the format for the starting and end time

Variables

View Source
var ErrInvalidEntityPath = errors.New("invalid entity path")

ErrInvalidEntityPath is the error for invalid entity path

Functions

func GzipDecompress

func GzipDecompress(contents []byte) ([]byte, error)

GzipDecompress does a gzip decompression on the passed in contents

func IsInTimeRange

func IsInTimeRange(entityTime, startingTime, endingTime time.Time) bool

IsInTimeRange returns true if startingTime <= entityTime <= endingTime

func ParseEntityPath

func ParseEntityPath(entityName string) (entityTime *time.Time, telemetryType component.DataType, err error)

ParseEntityPath returns true if the entity is within the existing time range

Types

type CheckPoint

type CheckPoint struct {
	// LastTs is the time created from the folder path of the last consumed entity
	LastTs time.Time `json:"last_ts"`

	// ParsedEntities is a lookup of all entities that were parsed in the LastTs path
	ParsedEntities map[string]struct{} `json:"parsed_entities"`
}

CheckPoint is the checkpoint used with a storage extension to keep track of what's been rehydrated.

func NewCheckpoint

func NewCheckpoint() *CheckPoint

NewCheckpoint creates a new CheckPoint

func (*CheckPoint) ShouldParse

func (c *CheckPoint) ShouldParse(entityTime time.Time, entityName string) bool

ShouldParse returns true if the entity should be parsed based on it's time and name. A value of false will be returned for entities that have a time before the LastTs or who's name is already tracked as parsed.

func (*CheckPoint) UpdateCheckpoint

func (c *CheckPoint) UpdateCheckpoint(newTs time.Time, lastEntityName string)

UpdateCheckpoint updates the checkpoint with the lastEntityName. If the newTs is after the LastTs it sets lastTs to the newTs and clears it's mapping of ParsedEntities. The lastEntityName is tracked in the mapping of ParsedEntities

type CheckpointStorage

type CheckpointStorage struct {
	// contains filtered or unexported fields
}

CheckpointStorage is checkpoint storer backed by a storage extension

func NewCheckpointStorage

func NewCheckpointStorage(ctx context.Context, host component.Host, storageID, componentID component.ID, componentType component.DataType) (*CheckpointStorage, error)

NewCheckpointStorage creates a new CheckpointStorage based on the storage and component IDs

func (*CheckpointStorage) Close

func (c *CheckpointStorage) Close(ctx context.Context) error

Close closes the checkpoint storage

func (*CheckpointStorage) LoadCheckPoint

func (c *CheckpointStorage) LoadCheckPoint(ctx context.Context, key string) (*CheckPoint, error)

LoadCheckPoint loads a checkpoint for the passed in key. If no checkpoint is found return an empty one

func (*CheckpointStorage) SaveCheckpoint

func (c *CheckpointStorage) SaveCheckpoint(ctx context.Context, key string, checkpoint *CheckPoint) error

SaveCheckpoint saves the supplied checkpoint

type CheckpointStorer

type CheckpointStorer interface {
	// SaveCheckpoint saves the supplied checkpoint
	SaveCheckpoint(ctx context.Context, key string, checkpoint *CheckPoint) error

	// LoadCheckPoint loads a checkpoint for the passed in key.
	// If no checkpoint is found return an empty one
	LoadCheckPoint(ctx context.Context, key string) (*CheckPoint, error)

	// Close closes the storage client
	Close(ctx context.Context) error
}

CheckpointStorer handles storing of checkpoints for rehydration receivers

type Consumer

type Consumer interface {
	// Consume consumes entity contents at the path and unmarshals it.
	Consume(ctx context.Context, entityContent []byte) error
}

Consumer is responsible for turning entities into OTLP data and sending to the next consumer.

type LogsConsumer

type LogsConsumer struct {
	// contains filtered or unexported fields
}

LogsConsumer consumes rehydrated log entities and marshals them into pdata structures

func NewLogsConsumer

func NewLogsConsumer(nextConsumer consumer.Logs) *LogsConsumer

NewLogsConsumer creates a new logs consumer

func (*LogsConsumer) Consume

func (l *LogsConsumer) Consume(ctx context.Context, entityContent []byte) error

Consume unmarshals entityContent into plogs and consumes it

type MetricsConsumer

type MetricsConsumer struct {
	// contains filtered or unexported fields
}

MetricsConsumer consumes rehydrated metric entities and marshals them into pdata structures

func NewMetricsConsumer

func NewMetricsConsumer(nextConsumer consumer.Metrics) *MetricsConsumer

NewMetricsConsumer creates a new metrics consumer

func (*MetricsConsumer) Consume

func (m *MetricsConsumer) Consume(ctx context.Context, entityContent []byte) error

Consume unmarshals entityContent into pmetrics and consumes it

type MockConsumer

type MockConsumer struct {
	mock.Mock
}

MockConsumer is an autogenerated mock type for the Consumer type

func NewMockConsumer

func NewMockConsumer(t interface {
	mock.TestingT
	Cleanup(func())
}) *MockConsumer

NewMockConsumer creates a new instance of MockConsumer. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. The first argument is typically a *testing.T value.

func (*MockConsumer) Consume

func (_m *MockConsumer) Consume(ctx context.Context, entityContent []byte) error

Consume provides a mock function with given fields: ctx, entityContent

func (*MockConsumer) EXPECT

func (_m *MockConsumer) EXPECT() *MockConsumer_Expecter

type MockConsumer_Consume_Call

type MockConsumer_Consume_Call struct {
	*mock.Call
}

MockConsumer_Consume_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Consume'

func (*MockConsumer_Consume_Call) Return

func (*MockConsumer_Consume_Call) Run

func (_c *MockConsumer_Consume_Call) Run(run func(ctx context.Context, entityContent []byte)) *MockConsumer_Consume_Call

func (*MockConsumer_Consume_Call) RunAndReturn

type MockConsumer_Expecter

type MockConsumer_Expecter struct {
	// contains filtered or unexported fields
}

func (*MockConsumer_Expecter) Consume

func (_e *MockConsumer_Expecter) Consume(ctx interface{}, entityContent interface{}) *MockConsumer_Consume_Call

Consume is a helper method to define mock.On call

  • ctx context.Context
  • entityContent []byte

type NopStorage

type NopStorage struct{}

NopStorage a nop implementation of CheckpointStorer

func NewNopStorage

func NewNopStorage() *NopStorage

NewNopStorage creates a new NopStorage instance

func (*NopStorage) Close

func (n *NopStorage) Close(_ context.Context) error

Close returns nil

func (*NopStorage) LoadCheckPoint

func (n *NopStorage) LoadCheckPoint(_ context.Context, _ string) (*CheckPoint, error)

LoadCheckPoint returns and empty checkpoint

func (*NopStorage) SaveCheckpoint

func (n *NopStorage) SaveCheckpoint(_ context.Context, _ string, _ *CheckPoint) error

SaveCheckpoint returns nil

type TracesConsumer

type TracesConsumer struct {
	// contains filtered or unexported fields
}

TracesConsumer consumes rehydrated trace entities and marshals them into pdata structures

func NewTracesConsumer

func NewTracesConsumer(nextConsumer consumer.Traces) *TracesConsumer

NewTracesConsumer creates a new trace consumer

func (*TracesConsumer) Consume

func (l *TracesConsumer) Consume(ctx context.Context, entityContent []byte) error

Consume unmarshals entityContent into ptrace and consumes it

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL