expingest

package
v0.0.0-...-13e2633 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 3, 2020 License: Apache-2.0 Imports: 28 Imported by: 0

Documentation

Overview

Package expingest contains the new ingestion system for horizon. It currently runs completely independent of the old one, that means that the new system can be ledgers behind/ahead the old system.

Index

Constants

View Source
const (
	// CurrentVersion reflects the latest version of the ingestion
	// algorithm. This value is stored in KV store and is used to decide
	// if there's a need to reprocess the ledger state or reingest data.
	//
	// Version history:
	// - 1: Initial version
	// - 2: Added the orderbook, offers processors and distributed ingestion.
	// - 3: Fixed a bug that could potentialy result in invalid state
	//      (#1722). Update the version to clear the state.
	// - 4: Fixed a bug in AccountSignersChanged method.
	// - 5: Added trust lines.
	// - 6: Added accounts and accounts data.
	// - 7: Fixes a bug in AccountSignersChanged method.
	// - 8: Fixes AccountSigners processor to remove preauth tx signer
	//      when preauth tx is failed.
	// - 9: Fixes a bug in asset stats processor that counted unauthorized
	//      trustlines.
	// - 10: Fixes a bug in meta processing (fees are now processed before
	//      everything else).
	CurrentVersion = 10

	// MaxDBConnections is the size of the postgres connection pool dedicated to Horizon ingestion
	MaxDBConnections = 2
)

Variables

View Source
var (

	// ErrReingestRangeConflict indicates that the reingest range overlaps with
	// horizon's most recently ingested ledger
	ErrReingestRangeConflict = errors.New("reingest range overlaps with horizon ingestion")
)

Functions

This section is empty.

Types

type Config

type Config struct {
	CoreSession       *db.Session
	StellarCoreURL    string
	StellarCoreCursor string
	NetworkPassphrase string

	HistorySession           *db.Session
	HistoryArchiveURL        string
	DisableStateVerification bool

	// MaxStreamRetries determines how many times the reader will retry when encountering
	// errors while streaming xdr bucket entries from the history archive.
	// Set MaxStreamRetries to 0 if there should be no retry attempts
	MaxStreamRetries int

	IngestFailedTransactions bool
}

type OrderBookStream

type OrderBookStream struct {
	OrderBookGraph orderbook.OBGraph
	HistoryQ       history.IngestionQ
	// contains filtered or unexported fields
}

OrderBookStream updates an in memory graph to be consistent with offers in the Horizon DB. Any offers which are created, modified, or removed from the Horizon DB during ingestion will be applied to the in memory order book graph. OrderBookStream assumes that no other component will update the in memory graph. However, it is safe for other go routines to use the in memory graph for read operations.

func (*OrderBookStream) Run

func (o *OrderBookStream) Run(ctx context.Context)

Run will call Update() every 30 seconds until the given context is terminated.

func (*OrderBookStream) Update

func (o *OrderBookStream) Update() error

Update will query the Horizon DB for offers which have been created, removed, or updated since the last time Update() was called. Those changes will then be applied to the in memory order book graph. After calling this function, the the in memory order book graph should be consistent with the Horizon DB (assuming no error is returned).

type ProcessorRunner

type ProcessorRunner struct {
	// contains filtered or unexported fields
}

func (*ProcessorRunner) DisableMemoryStatsLogging

func (s *ProcessorRunner) DisableMemoryStatsLogging()

func (*ProcessorRunner) EnableMemoryStatsLogging

func (s *ProcessorRunner) EnableMemoryStatsLogging()

func (*ProcessorRunner) RunAllProcessorsOnLedger

func (*ProcessorRunner) RunHistoryArchiveIngestion

func (s *ProcessorRunner) RunHistoryArchiveIngestion(checkpointLedger uint32) (io.StatsChangeProcessorResults, error)

func (*ProcessorRunner) RunTransactionProcessorsOnLedger

func (s *ProcessorRunner) RunTransactionProcessorsOnLedger(ledger uint32) (io.StatsLedgerTransactionProcessorResults, error)

func (*ProcessorRunner) SetHistoryAdapter

func (s *ProcessorRunner) SetHistoryAdapter(historyAdapter adapters.HistoryArchiveAdapterInterface)

func (*ProcessorRunner) SetLedgerBackend

func (s *ProcessorRunner) SetLedgerBackend(ledgerBackend ledgerbackend.LedgerBackend)

type ProcessorRunnerInterface

type ProcessorRunnerInterface interface {
	SetLedgerBackend(ledgerBackend ledgerbackend.LedgerBackend)
	SetHistoryAdapter(historyAdapter adapters.HistoryArchiveAdapterInterface)
	EnableMemoryStatsLogging()
	DisableMemoryStatsLogging()
	RunHistoryArchiveIngestion(checkpointLedger uint32) (io.StatsChangeProcessorResults, error)
	RunTransactionProcessorsOnLedger(sequence uint32) (io.StatsLedgerTransactionProcessorResults, error)
	RunAllProcessorsOnLedger(sequence uint32) (
		io.StatsChangeProcessorResults,
		io.StatsLedgerTransactionProcessorResults,
		error,
	)
}

type System

type System struct {
	Metrics struct {
		// LedgerIngestionTimer exposes timing metrics about the rate and
		// duration of ledger ingestion (including updating DB and graph).
		LedgerIngestionTimer metrics.Timer

		// LedgerInMemoryIngestionTimer exposes timing metrics about the rate and
		// duration of ingestion into in-memory graph only.
		LedgerInMemoryIngestionTimer metrics.Timer

		// StateVerifyTimer exposes timing metrics about the rate and
		// duration of state verification.
		StateVerifyTimer metrics.Timer
	}
	// contains filtered or unexported fields
}

func NewSystem

func NewSystem(config Config) (*System, error)

func (*System) ReingestRange

func (s *System) ReingestRange(fromLedger, toLedger uint32, force bool) error

ReingestRange runs the ingestion pipeline on the range of ledgers ingesting history data only.

func (*System) Run

func (s *System) Run()

Run starts ingestion system. Ingestion system supports distributed ingestion that means that Horizon ingestion can be running on multiple machines and only one, random node will lead the ingestion.

It needs to support cartesian product of the following run scenarios cases: - Init from empty state (1a) and resuming from existing state (1b). - Ingestion system version has been upgraded (2a) or not (2b). - Current node is leading ingestion (3a) or not (3b).

We always clear state when ingestion system is upgraded so 2a and 2b are included in 1a.

We ensure that only one instance is a leader because in each round instances try to acquire a lock on `LastLedgerExpIngest value in key value store and only one instance will be able to acquire it. This happens in both initial processing and ledger processing. So this solves 3a and 3b in both 1a and 1b.

Finally, 1a and 1b are tricky because we need to keep the latest version of order book graph in memory of each Horizon instance. To solve this: * For state init:

  • If instance is a leader, we update the order book graph by running state pipeline normally.
  • If instance is NOT a leader, we build a graph from offers present in a database. We completely omit state pipeline in this case.

* For resuming:

  • If instances is a leader, it runs full ledger pipeline, including updating a database.
  • If instances is a NOT leader, it runs ledger pipeline without updating a a database so order book graph is updated but database is not overwritten.

func (*System) Shutdown

func (s *System) Shutdown()

func (*System) StressTest

func (s *System) StressTest(numTransactions, changesPerTransaction int) error

func (*System) VerifyRange

func (s *System) VerifyRange(fromLedger, toLedger uint32, verifyState bool) error

VerifyRange runs the ingestion pipeline on the range of ledgers. When verifyState is true it verifies the state when ingestion is complete.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL