wal

package
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 5, 2024 License: Apache-2.0 Imports: 25 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DefaultMaxSegmentAge = time.Hour
)

Variables

View Source
var DefaultWatchConfig = WatchConfig{
	MinReadFrequency: 250 * time.Millisecond,
	MaxReadFrequency: time.Second,
	DrainTimeout:     15 * time.Second,
}

DefaultWatchConfig is the opinionated defaults for operating the Watcher.

Functions

func ReadWAL

func ReadWAL(dir string) ([]loki.Entry, error)

ReadWAL will read all entries in the WAL located under dir. Mainly used for testing

Types

type CleanupEventSubscriber

type CleanupEventSubscriber interface {
	WriteCleanup
}

CleanupEventSubscriber is an interface that objects that want to receive events from the wal Writer can implement. After they can subscribe to events by adding themselves as subscribers on the Writer with writer.SubscribeCleanup.

type Config

type Config struct {
	// Whether WAL-support should be enabled.
	//
	// WAL support is a WIP. Do not enable in production setups until https://github.com/grafana/loki/issues/8197
	// is finished.
	Enabled bool

	// Path where the WAL is written to.
	Dir string

	// MaxSegmentAge is threshold at which a WAL segment is considered old enough to be cleaned up. Default: 1h.
	//
	// Note that this functionality will likely be deprecated in favour of a programmatic cleanup mechanism.
	MaxSegmentAge time.Duration

	// WatchConfig configures the backoff retry used by a WAL watcher when reading from segments not via
	// the notification channel.
	WatchConfig WatchConfig
}

Config contains all WAL-related settings.

func (*Config) UnmarshalYAML

func (c *Config) UnmarshalYAML(unmarshal func(interface{}) error) error

UnmarshalYAML implement YAML Unmarshaler

type Marker

type Marker interface {
	// LastMarkedSegment should return the last segment stored in the marker.
	// Must return -1 if there is no mark.
	//
	// The Watcher will start reading the first segment whose value is greater
	// than the return value.
	LastMarkedSegment() int
}

Marker allows the Watcher to start from a specific segment in the WAL. Implementers can use this interface to save and restore save points.

type Reader

type Reader interface {
	Next() bool
	Err() error
	Record() []byte
}

Reader is a dependency interface to inject generic WAL readers into the Watcher.

type WAL

type WAL interface {
	// Log marshals the records and writes it into the WAL.
	Log(*wal.Record) error

	Delete() error
	Sync() error
	Dir() string
	Close()
	NextSegment() (int, error)
}

WAL is an interface that allows us to abstract ourselves from Prometheus WAL implementation.

func New

func New(cfg Config, log log.Logger, registerer prometheus.Registerer) (WAL, error)

New creates a new wrapper, instantiating the actual wlog.WL underneath.

type WatchConfig

type WatchConfig struct {
	// MinReadFrequency controls the minimum read frequency the Watcher polls the WAL for new records. If the poll is successful,
	// the frequency will remain the same. If not, it will be incremented using an exponential backoff.
	MinReadFrequency time.Duration

	// MaxReadFrequency controls the maximum read frequency the Watcher polls the WAL for new records. As mentioned above
	// it caps the polling frequency to a maximum, to prevent to exponential backoff from making it too high.
	MaxReadFrequency time.Duration

	// DrainTimeout is the maximum amount of time that the Watcher can spend draining the remaining segments in the WAL.
	// After that time, the Watcher is stopped immediately, dropping all the work in process.
	DrainTimeout time.Duration
}

WatchConfig allows the user to configure the Watcher.

For the read frequency settings, the Watcher polls the WAL for new records with two mechanisms: First, it gets notified by the Writer when the WAL is written; also, it has a timer that gets fired every so often. This last one, implements and exponential back-off strategy to prevent the Watcher from doing read too often, if there's no new data.

type Watcher

type Watcher struct {
	MaxSegment int
	// contains filtered or unexported fields
}

func NewWatcher

func NewWatcher(walDir, id string, metrics *WatcherMetrics, writeTo WriteTo, logger log.Logger, config WatchConfig, marker Marker) *Watcher

NewWatcher creates a new Watcher.

func (*Watcher) Drain

func (w *Watcher) Drain()

Drain moves the Watcher to a draining state, which will assume no more data is being written to the WAL, and it will attempt to read until the end of the last written segment. The calling routine of Drain will block until all data is read, or a timeout occurs.

func (*Watcher) NotifyWrite

func (w *Watcher) NotifyWrite()

NotifyWrite allows the Watcher to subscribe to write events published by the Writer. When a write event is received we emit the signal to trigger a segment read on the watcher main routine. If the readNotify channel already is not being listened on, that means the main routine is processing a segment, or waiting because a non-handled error occurred. In that case we drop the signal and make the Watcher wait for the next one.

func (*Watcher) Start

func (w *Watcher) Start()

Start runs the watcher main loop.

func (*Watcher) Stop

func (w *Watcher) Stop()

Stop stops the Watcher, shutting down the main routine.

type WatcherMetrics

type WatcherMetrics struct {
	// contains filtered or unexported fields
}

func NewWatcherMetrics

func NewWatcherMetrics(reg prometheus.Registerer) *WatcherMetrics

type WriteCleanup

type WriteCleanup interface {
	// SeriesReset is called to notify that segments have been deleted. The argument of the call
	// means that all segments with a number lower or equal than segmentNum are safe to be reclaimed.
	SeriesReset(segmentNum int)
}

WriteCleanup is responsible for cleaning up resources used in the process of reading the WAL.

type WriteEventSubscriber

type WriteEventSubscriber interface {
	// NotifyWrite allows others to be notifier when Writer writes to the underlying WAL.
	NotifyWrite()
}

WriteEventSubscriber is an interface that objects that want to receive an event when Writer writes to the WAL can implement, and later subscribe to the Writer via writer.SubscribeWrite.

type WriteTo

type WriteTo interface {
	// WriteCleanup is used to allow the Watcher to react upon being notified of WAL cleanup events, such as segments
	// being reclaimed.
	WriteCleanup

	// StoreSeries is called when series are found in WAL entries by the watcher, alongside with the segmentNum they were
	// found in.
	StoreSeries(series []record.RefSeries, segmentNum int)

	AppendEntries(entries wal.RefEntries, segmentNum int) error
}

WriteTo is an interface used by the Watcher to send the samples it's read from the WAL on to somewhere else, or clean them up. It's the intermediary between all information read by the Watcher and the final destination.

Based on https://github.com/prometheus/prometheus/blob/main/tsdb/wlog/watcher.go#L46

type Writer

type Writer struct {
	// contains filtered or unexported fields
}

Writer implements loki.EntryHandler, exposing a channel were scraping targets can write to. Reading from there, it writes incoming entries to a WAL. Also, since Writer is responsible for all changing operations over the WAL, therefore a routine is run for cleaning old segments.

func NewWriter

func NewWriter(walCfg Config, logger log.Logger, reg prometheus.Registerer) (*Writer, error)

NewWriter creates a new Writer.

func (*Writer) Chan

func (wrt *Writer) Chan() chan<- loki.Entry

func (*Writer) Stop

func (wrt *Writer) Stop()

func (*Writer) SubscribeCleanup

func (wrt *Writer) SubscribeCleanup(subscriber CleanupEventSubscriber)

SubscribeCleanup adds a new CleanupEventSubscriber that will receive cleanup events.

func (*Writer) SubscribeWrite

func (wrt *Writer) SubscribeWrite(subscriber WriteEventSubscriber)

SubscribeWrite adds a new WriteEventSubscriber that will receive write events.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL