batchInserter

package
v0.7.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 11, 2023 License: Apache-2.0 Imports: 11 Imported by: 2

Documentation

Overview

package batchInserter provides a wrapper around the db.Inserter to provide a way to group records together before inserting, in order to decrease database requests needed for inserting.

Index

Constants

View Source
const (
	InsertingQueueDepth            = "inserting_queue_depth"
	DroppedEventsFromDbFailCounter = "dropped_events_db_fail_count"
)

Variables

View Source
var (
	ErrBadBeginning = errors.New("invalid value for the beginning time of the record")
	ErrBadData      = errors.New("data nil or empty")
)

Functions

func Metrics

func Metrics() []xmetrics.Metric

Types

type BatchInserter

type BatchInserter struct {
	// contains filtered or unexported fields
}

BatchInserter manages batching events that need to be inserted, ensuring that an event that needs to be inserted isn't waiting for longer than a set period of time and that each batch doesn't pass a specified size.

func NewBatchInserter

func NewBatchInserter(config Config, logger log.Logger, metricsRegistry provider.Provider, inserter db.Inserter, timeTracker TimeTracker) (*BatchInserter, error)

NewBatchInserter creates a BatchInserter with the given values, ensuring that the configuration and other values given are valid. If configuration values aren't valid, a default value is used.

func (*BatchInserter) Insert

func (b *BatchInserter) Insert(rwt RecordWithTime) error

Insert adds the event to the queue inside of BatchInserter, preparing for it to be inserted. This can block, if the queue is full. If the record has certain fields empty, an error is returned.

func (*BatchInserter) Start

func (b *BatchInserter) Start()

Start starts the batcher, which pulls from the queue inside of the BatchInserter.

func (*BatchInserter) Stop

func (b *BatchInserter) Stop()

Stop closes the internal queue and waits for the workers to finish processing what has already been added. This can block as it waits for everything to stop. After Stop() is called, Insert() should not be called again, or there will be a panic. TODO: ensure consumers can't cause a panic?

type Config

type Config struct {
	ParseWorkers     int
	MaxInsertWorkers int
	MaxBatchSize     int
	MaxBatchWaitTime time.Duration
	QueueSize        int
}

Config holds the configuration values for a batch inserter.

type Measures

type Measures struct {
	InsertingQueue               metrics.Gauge
	DroppedEventsFromDbFailCount metrics.Counter
}

func NewMeasures

func NewMeasures(p provider.Provider) *Measures

NewMeasures constructs a Measures given a go-kit metrics Provider

type RecordWithTime added in v0.6.0

type RecordWithTime struct {
	Record    db.Record
	Beginning time.Time
}

RecordWithTime provides the db record and the time this event was received by a service

type TimeTracker added in v0.6.0

type TimeTracker interface {
	TrackTime(time.Duration)
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL