ingest

package
v0.0.0-...-a387ffb Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 26, 2024 License: Apache-2.0, Apache-2.0 Imports: 35 Imported by: 0

README

Ingestion Finite State Machine

The following states are possible:

  • start
  • stop
  • build
  • resume
  • verifyRange
  • historyRange
  • reingestHistoryRange
  • waitForCheckpoint

There is also the stressTest state, but that exists in its own world and is used only for testing ingestion, as its name implies.

Definitions

There are some important terms that need to be defined for clarity as they're used extensively in both the codebase and this breakdown:

  • the historyQ member provides an interface into both the historical data (time-series) as well as the state data (cumulative) in the database and does not refer to the history archives
  • the lastIngestedLedger thus corresponds to the last ledger that Horizon fully ingested into the database (both time-series and cumulative data)
  • the lastHistoryLedger, however, corresponds to the last ledger that Horizon ingested only into the time-series tables (coming from history_ledgers table); this can be not-equal to the lastIngestedLedger because time-series data can be ingested independently of cumulative data (via the db reingest range subcommand). I'll usually refer to it as the lastKnownLedger
  • the lastCheckpoint corresponds to the last checkpoint ledger (reminder: a checkpoint ledger is one in which: (ledger# + 1) mod 64 == 0) and thus to a matching history archive upload.

One of the most important jobs of the FSM described here is to make sure that lastIngestedLedger and lastHistoryLedger are equal: the historyRange updates the latter, but not the former, so that we can track when state data is behind history data.

In general, only one node should ever be writing to a database at once, globally. Hence, there are a few checks at the start of most states to ensure this.

State Diagram

(The above diagram was generated using this Mermaid definition.)

Tables

Within the Horizon database, there are a number of tables touched by ingestion that are worth differentiating explicitly. With these in mind, the subsequently-described states and their respective operations should be much clearer.

The database contains:

  • History tables -- all tables that contain historical time-series data, such as history_transactions, history_operations, etc.
  • Transaction processors -- processors that ingest the history tables (described by the io.LedgerTransaction interface).
  • State tables -- all tables that contain the current cumulative state, such as accounts, offers, etc.
  • Change processors -- processors ingesting deltas update state tables. These aren't related to a particular transaction, but rather describe a transition of a ledger entry from one state to another (described by the io.Change interface). This can take the form of both tx meta (time-series data, where the "change" occurs from one ledger to the next) and history archives (cumulative data, where the "change" occurs from the genesis ledger to the checkpoint).

start State

As you might expect, this state kicks off the FSM process.

There are a few possible branches in this state.

DB upgrade or fresh start

The "happiest path" is the following: either the ingestion database is empty, so we can start purely from scratch, or the state data in a database is outdated, meaning it needs to be upgraded and so can effectively be started from scratch after catch-up.

This branches differently depending on the last known ledger:

  • If it's newer than the last checkpoint, we need to wait for a new checkpoint to get the latest cumulative data. Note that though we probably could make incremental changes from block to block to the cumulative data, that would be more effort than it's worth relative to just waiting on the next history archive to get dumped. Next state: waitForCheckpoint.

  • If it's older, however, then we can just grok the missing gap (i.e. until the latest checkpoint) and build up (only) the time-series data. Next state: historyRange.

In the other cases (matching last-known and last-checkpoint ledger, or no last-known), next state: build.

Otherwise

If we can't have a clean slate to work with, we need to fix partial state. Specifically,

  • If the last-known ledger is ahead of the last-ingested ledger, then Horizon's cumulative state data is behind its historical time-series data in the database. Here, we'll reset the time-series DB and start over. Next state: start, with lastIngestedLedger == 0.

  • If the time-series database is newer than the last-known ledger (can occur if ingestion was done for a different range earlier, for example), then Horizon needs to become aware of the missing ledgers. Next state: historyRange from "last known" to "last stored" in time-series db.

Next state: resume

build state

This is the big kahuna of the state machine: there aren't many state transitions aside from success/failure, and all roads ultimately should lead to Rome build in order to get ingestion done. This state only establishes a baseline for the cumulative data, though.

Properties

This state tracks the:

  • checkpointLedger, which is Horizon's last-known (though possibly-unprocessed) checkpoint ledger, and
  • stop, which optionally (though universally) transitions to the stop after this state is complete.
Process

If any of the checks (incl. the aforementioned sync checks) fail, we'll move to the start state. Sometimes, though, we want to stop, instead (see buildState.stop).

The actual ingestion involves a few steps:

  • turn a checkpoint's history archive into cumulative db data
  • update the ingestion database's version
  • update the last-ingested ledger in the time-series db
  • commit to the ingestion db

These are detailed later, in the Ingestion section. Suffice it to say that at the end of this state, either we've errored out (described above), stopped (on error or success, if buildState.stop is set), or resumed from the checkpoint ledger.

resume state

This state ingests time-series data for a single ledger range, then loops back to itself for the next ledger range.

Properties

This state just tracks one thing:

  • latestSuccessfullyProcessedLedger, whose name should be self-explanatory: this indicates the highest ledger number to be ingested.
Process

First, note the difference between resumeState.latestSuccessfullyProcessedLedger and the queried lastIngestedLedger: one of these is tied to the state machine, while the other is associated with the actual time-series database.

The following are problematic error conditions:

  • the former is larger than the latter
  • the versions (of the DB and current ledgers) mismatch
  • the last-known ledger of the time-series db doesn't match the last-ingested ledger
  • Next state: start.

Otherwise, we have ingestLedger == lastIngestedLedger + 1, and will proceed to process the range from ingestLedger onward.

With the range prepared, only one other primary state transition is possible. If the last-known ledger of the Core backend is outdated relative to the above ingestLedger, we'll block until the requisite ledger is seen and processed by Core. Next state: resume again, with the last-processed ledger set to whatever is last-known to Core.

Otherwise, we can actually turn the ledger into time-series data: this is exactly the responsibility of RunAllProcessorsOnLedger and all of its subsequent friends. The deltas for the ledger(s) are ingested into the time-series db, then verified.

Next state: resume again, except now targeting the next ledger.

historyRange state

The purpose of this state is to ingest a particular ledger range into the cumulative database. Since the next state will be start, we will be rebuilding state in the future anyway.

Properties

This tracks an inclusive ledger range: [fromLedger, toLedger].

Next state: start

reingestHistoryRange state

This state acts much like the historyRange state.

Next state: stop

Properties

This tracks an inclusive ledger range: [fromLedger, toLedger], as well as a force flag that will override certain restrictions.

waitForCheckpoint state

This pauses the state machine for 10 seconds then tries again, in hopes that a new checkpoint ledger has been created (remember, checkpoints occur every 64 ledgers).

Next state: start

Ingestion

TODO

Range Preparation

TODO: See maybePrepareRange

Documentation

Overview

Package ingest contains the new ingestion system for horizon. It currently runs completely independent of the old one, that means that the new system can be ledgers behind/ahead the old system.

Index

Constants

View Source
const (
	// MaxSupportedProtocolVersion defines the maximum supported version of
	// the Stellar protocol.
	MaxSupportedProtocolVersion uint32 = 21

	// CurrentVersion reflects the latest version of the ingestion
	// algorithm. This value is stored in KV store and is used to decide
	// if there's a need to reprocess the ledger state or reingest data.
	//
	// Version history:
	// - 1: Initial version
	// - 2: Added the orderbook, offers processors and distributed ingestion.
	// - 3: Fixed a bug that could potentially result in invalid state
	//      (#1722). Update the version to clear the state.
	// - 4: Fixed a bug in AccountSignersChanged method.
	// - 5: Added trust lines.
	// - 6: Added accounts and accounts data.
	// - 7: Fixes a bug in AccountSignersChanged method.
	// - 8: Fixes AccountSigners processor to remove preauth tx signer
	//      when preauth tx is failed.
	// - 9: Fixes a bug in asset stats processor that counted unauthorized
	//      trustlines.
	// - 10: Fixes a bug in meta processing (fees are now processed before
	//      everything else).
	// - 11: Protocol 14: CAP-23 and CAP-33.
	// - 12: Trigger state rebuild due to `absTime` -> `abs_time` rename
	//       in ClaimableBalances predicates.
	// - 13: Trigger state rebuild to include more than just authorized assets.
	// - 14: Trigger state rebuild to include claimable balances in the asset stats processor.
	// - 15: Fixed bug in asset stat ingestion where clawback is enabled (#3846).
	// - 16: Extract claimants to a separate table for better performance of
	//       claimable balances for claimant queries.
	// - 17: Add contract_id column to exp_asset_stats table which is derived by ingesting
	//       contract data ledger entries.
	// - 18: Ingest contract asset balances so we can keep track of expired / restore asset
	//       balances for asset stats.
	CurrentVersion = 18

	// MaxDBConnections is the size of the postgres connection pool dedicated to Horizon ingestion:
	//  * Ledger ingestion,
	//  * State verifications,
	//  * Metrics updates.
	MaxDBConnections = 3

	// 100 ledgers per flush has shown in stress tests
	// to be best point on performance curve, default to that.
	MaxLedgersPerFlush uint32 = 100
)

Variables

This section is empty.

Functions

This section is empty.

Types

type Config

type Config struct {
	StellarCoreURL         string
	CaptiveCoreBinaryPath  string
	CaptiveCoreStoragePath string
	CaptiveCoreToml        *ledgerbackend.CaptiveCoreToml
	CaptiveCoreConfigUseDB bool
	NetworkPassphrase      string

	HistorySession        db.SessionInterface
	HistoryArchiveURLs    []string
	HistoryArchiveCaching bool

	DisableStateVerification     bool
	EnableReapLookupTables       bool
	EnableExtendedLogLedgerStats bool

	MaxReingestRetries          int
	ReingestRetryBackoffSeconds int

	// The checkpoint frequency will be 64 unless you are using an exotic test setup.
	CheckpointFrequency                  uint32
	StateVerificationCheckpointFrequency uint32
	StateVerificationTimeout             time.Duration

	RoundingSlippageFilter int

	MaxLedgerPerFlush uint32
	SkipTxmeta        bool
}

type ErrReingestRangeConflict

type ErrReingestRangeConflict struct {
	// contains filtered or unexported fields
}

ErrReingestRangeConflict indicates that the reingest range overlaps with horizon's most recently ingested ledger

func (ErrReingestRangeConflict) Error

func (e ErrReingestRangeConflict) Error() string

type Metrics

type Metrics struct {
	// MaxSupportedProtocolVersion exposes the maximum protocol version
	// supported by this version.
	MaxSupportedProtocolVersion prometheus.Gauge

	// LocalLedger exposes the last ingested ledger by this ingesting instance.
	LocalLatestLedger prometheus.Gauge

	// LedgerIngestionDuration exposes timing metrics about the rate and
	// duration of ledger ingestion (including updating DB and graph).
	LedgerIngestionDuration prometheus.Summary

	// LedgerIngestionTradeAggregationDuration exposes timing metrics about the rate and
	// duration of rebuilding trade aggregation buckets.
	LedgerIngestionTradeAggregationDuration prometheus.Summary

	// LedgerIngestionReapLookupTablesDuration exposes timing metrics about the rate and
	// duration of reaping lookup tables.
	LedgerIngestionReapLookupTablesDuration prometheus.Summary

	// StateVerifyDuration exposes timing metrics about the rate and
	// duration of state verification.
	StateVerifyDuration prometheus.Summary

	// StateInvalidGauge exposes state invalid metric. 1 if state is invalid,
	// 0 otherwise.
	StateInvalidGauge prometheus.GaugeFunc

	// StateVerifyLedgerEntriesCount exposes total number of ledger entries
	// checked by the state verifier by type.
	StateVerifyLedgerEntriesCount *prometheus.GaugeVec

	// LedgerStatsCounter exposes ledger stats counters (like number of ops/changes).
	LedgerStatsCounter *prometheus.CounterVec

	// ProcessorsRunDuration exposes processors run durations.
	// Deprecated in favor of: ProcessorsRunDurationSummary.
	ProcessorsRunDuration *prometheus.CounterVec

	// ProcessorsRunDurationSummary exposes processors run durations.
	ProcessorsRunDurationSummary *prometheus.SummaryVec

	// LoadersRunDurationSummary exposes run durations for the ingestion loaders.
	LoadersRunDurationSummary *prometheus.SummaryVec

	// LoadersRunDurationSummary exposes stats for the ingestion loaders.
	LoadersStatsSummary *prometheus.SummaryVec

	// ArchiveRequestCounter counts how many http requests are sent to history server
	HistoryArchiveStatsCounter *prometheus.CounterVec
}

type MockFilters

type MockFilters struct {
	mock.Mock
}

func (*MockFilters) GetFilters

type OrderBookStream

type OrderBookStream struct {

	// LatestLedgerGauge exposes the local (order book graph)
	// latest processed ledger
	LatestLedgerGauge prometheus.Gauge
	// contains filtered or unexported fields
}

OrderBookStream updates an in memory graph to be consistent with offers in the Horizon DB. Any offers which are created, modified, or removed from the Horizon DB during ingestion will be applied to the in memory order book graph. OrderBookStream assumes that no other component will update the in memory graph. However, it is safe for other go routines to use the in memory graph for read operations.

func NewOrderBookStream

func NewOrderBookStream(historyQ history.IngestionQ, graph orderbook.OBGraph) *OrderBookStream

NewOrderBookStream constructs and initializes an OrderBookStream instance

func (*OrderBookStream) Run

func (o *OrderBookStream) Run(ctx context.Context)

Run will call Update() every 30 seconds until the given context is terminated.

func (*OrderBookStream) Update

func (o *OrderBookStream) Update(ctx context.Context) error

Update will query the Horizon DB for offers which have been created, removed, or updated since the last time Update() was called. Those changes will then be applied to the in memory order book graph. After calling this function, the the in memory order book graph should be consistent with the Horizon DB (assuming no error is returned).

type ParallelSystems

type ParallelSystems struct {
	// contains filtered or unexported fields
}

func NewParallelSystems

func NewParallelSystems(config Config, workerCount uint) (*ParallelSystems, error)

func (*ParallelSystems) ReingestRange

func (ps *ParallelSystems) ReingestRange(ledgerRanges []history.LedgerRange, batchSizeSuggestion uint32) error

func (*ParallelSystems) Shutdown

func (ps *ParallelSystems) Shutdown()

type ProcessorRunner

type ProcessorRunner struct {
	// contains filtered or unexported fields
}

func (*ProcessorRunner) DisableMemoryStatsLogging

func (s *ProcessorRunner) DisableMemoryStatsLogging()

func (*ProcessorRunner) EnableMemoryStatsLogging

func (s *ProcessorRunner) EnableMemoryStatsLogging()

func (*ProcessorRunner) RunAllProcessorsOnLedger

func (s *ProcessorRunner) RunAllProcessorsOnLedger(ledger xdr.LedgerCloseMeta) (
	stats ledgerStats,
	err error,
)

func (*ProcessorRunner) RunGenesisStateIngestion

func (s *ProcessorRunner) RunGenesisStateIngestion() (ingest.StatsChangeProcessorResults, error)

func (*ProcessorRunner) RunHistoryArchiveIngestion

func (s *ProcessorRunner) RunHistoryArchiveIngestion(
	checkpointLedger uint32,
	skipChecks bool,
	ledgerProtocolVersion uint32,
	bucketListHash xdr.Hash,
) (ingest.StatsChangeProcessorResults, error)

func (*ProcessorRunner) RunTransactionProcessorsOnLedgers

func (s *ProcessorRunner) RunTransactionProcessorsOnLedgers(ledgers []xdr.LedgerCloseMeta, execInTx bool) (err error)

Runs only transaction processors on the inbound list of ledgers. Updates history tables based on transactions. Intentionally do not make effort to insert or purge tx's on history_transactions_filtered_tmp Thus, using this method does not support tx sub processing for the ledgers passed in, i.e. tx submission queue will not see these.

func (*ProcessorRunner) SetHistoryAdapter

func (s *ProcessorRunner) SetHistoryAdapter(historyAdapter historyArchiveAdapterInterface)

type ProcessorRunnerInterface

type ProcessorRunnerInterface interface {
	SetHistoryAdapter(historyAdapter historyArchiveAdapterInterface)
	EnableMemoryStatsLogging()
	DisableMemoryStatsLogging()
	RunGenesisStateIngestion() (ingest.StatsChangeProcessorResults, error)
	RunHistoryArchiveIngestion(
		checkpointLedger uint32,
		skipChecks bool,
		ledgerProtocolVersion uint32,
		bucketListHash xdr.Hash,
	) (ingest.StatsChangeProcessorResults, error)
	RunTransactionProcessorsOnLedgers(ledgers []xdr.LedgerCloseMeta, execInTx bool) error
	RunAllProcessorsOnLedger(ledger xdr.LedgerCloseMeta) (
		stats ledgerStats,
		err error,
	)
}

type State

type State int
const (
	None State = iota
	Start
	Stop
	Build
	Resume
	WaitForCheckpoint
	StressTest
	VerifyRange
	HistoryRange
	ReingestHistoryRange
)

type System

type System interface {
	Run()
	RegisterMetrics(*prometheus.Registry)
	Metrics() Metrics
	StressTest(numTransactions, changesPerTransaction int) error
	VerifyRange(fromLedger, toLedger uint32, verifyState bool) error
	BuildState(sequence uint32, skipChecks bool) error
	ReingestRange(ledgerRanges []history.LedgerRange, force bool, rebuildTradeAgg bool) error
	BuildGenesisState() error
	Shutdown()
	GetCurrentState() State
	RebuildTradeAggregationBuckets(fromLedger, toLedger uint32) error
}

func NewSystem

func NewSystem(config Config) (System, error)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL