ingest

package
v0.0.0-...-798156f Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 3, 2018 License: Apache-2.0 Imports: 29 Imported by: 0

Documentation

Overview

Package ingest contains the ingestion system for frontier. This system takes data produced by the connected digitalbits-core database, transforms it and inserts it into the frontier database.

Index

Constants

View Source
const (
	// CurrentVersion reflects the latest version of the ingestion
	// algorithm. As rows are ingested into the frontier database, this version is
	// used to tag them.  In the future, any breaking changes introduced by a
	// developer should be accompanied by an increase in this value.
	//
	// Scripts, that have yet to be ported to this codebase can then be leveraged
	// to re-ingest old data with the new algorithm, providing a seamless
	// transition when the ingested data's structure changes.
	CurrentVersion = 13
)

Variables

This section is empty.

Functions

This section is empty.

Types

type Address

type Address string

Address is a type of a param provided to BatchInsertBuilder that gets exchanged to record ID in a DB.

type AssetsModified

type AssetsModified map[string]xdr.Asset

AssetsModified tracks all the assets modified during a cycle of ingestion

func (AssetsModified) IngestOperation

func (assetsModified AssetsModified) IngestOperation(err error, op *xdr.Operation, source *xdr.AccountId, coreQ *core.Q) error

IngestOperation updates the assetsModified using the passed in operation

func (AssetsModified) UpdateAssetStats

func (assetsModified AssetsModified) UpdateAssetStats(is *Session)

UpdateAssetStats updates the db with the latest asset stats for the assets that were modified

type BatchInsertBuilder

type BatchInsertBuilder struct {
	TableName TableName
	Columns   []string
	// contains filtered or unexported fields
}

BatchInsertBuilder works like sq.InsertBuilder but has a better support for batching large number of rows.

func (*BatchInsertBuilder) Exec

func (b *BatchInsertBuilder) Exec(DB *db.Session) error

func (*BatchInsertBuilder) GetAddresses

func (b *BatchInsertBuilder) GetAddresses() (adds []Address)

func (*BatchInsertBuilder) ReplaceAddressesWithIDs

func (b *BatchInsertBuilder) ReplaceAddressesWithIDs(mapping map[Address]int64)

func (*BatchInsertBuilder) Values

func (b *BatchInsertBuilder) Values(params ...interface{})

type Cursor

type Cursor struct {
	// FirstLedger is the beginning of the range of ledgers (inclusive) that will
	// attempt to be ingested in this session.
	FirstLedger int32
	// LastLedger is the end of the range of ledgers (inclusive) that will
	// attempt to be ingested in this session.
	LastLedger int32

	// CoreDB is the digitalbits-core db that data is ingested from.
	CoreDB *db.Session

	Metrics        *IngesterMetrics
	AssetsModified AssetsModified

	// Err is the error that caused this iteration to fail, if any.
	Err error
	// contains filtered or unexported fields
}

Cursor iterates through a digitalbits core database's ledgers

func NewCursor

func NewCursor(first, last int32, i *System) *Cursor

NewCursor initializes a new ingestion cursor

func (*Cursor) BeforeAndAfter

func (c *Cursor) BeforeAndAfter(target xdr.LedgerKey) (
	before *xdr.LedgerEntry,
	after *xdr.LedgerEntry,
	err error,
)

BeforeAndAfter loads the ledger entry for `target` before the current operation was applied and after the operation was applied.

func (*Cursor) InLedger

func (c *Cursor) InLedger() bool

InLedger returns true if the cursor is on a ledger.

func (*Cursor) InOperation

func (c *Cursor) InOperation() bool

InOperation returns true if the cursor is on a operation. Will return false after advancing to a new transaction but before advancing on to the transaciton's first operation.

func (*Cursor) InTransaction

func (c *Cursor) InTransaction() bool

InTransaction returns true if the cursor is pointing to a transaction. This will return false after advancing to a new ledger but prior to advancing into the ledger's first transaction.

func (*Cursor) Ledger

func (c *Cursor) Ledger() *core.LedgerHeader

Ledger returns the current ledger

func (*Cursor) LedgerID

func (c *Cursor) LedgerID() int64

LedgerID returns the current ledger's id, as used by the history system.

func (*Cursor) LedgerRange

func (c *Cursor) LedgerRange() (start int64, end int64)

LedgerRange returns the beginning and end of id values that map to the current ledger. Useful for clearing a ledgers worth of data.

func (*Cursor) LedgerSequence

func (c *Cursor) LedgerSequence() int32

LedgerSequence returns the current ledger's sequence

func (*Cursor) NextLedger

func (c *Cursor) NextLedger() bool

NextLedger advances `c` to the next ledger in the iteration, loading a new LedgerBundle from the core database. Returns false if an error occurs or the iteration is complete.

func (*Cursor) NextOp

func (c *Cursor) NextOp() bool

NextOp advances `c` to the next operation in the current transaction. Returns false if the current transaction has nothing left to visit.

func (*Cursor) NextTx

func (c *Cursor) NextTx() bool

NextTx advances `c` to the next transaction in the current ledger. Returns false if the current ledger has no transactions left to visit.

func (*Cursor) Operation

func (c *Cursor) Operation() *xdr.Operation

Operation returns the current operation

func (*Cursor) OperationChanges

func (c *Cursor) OperationChanges() xdr.LedgerEntryChanges

OperationChanges returns all of LedgerEntryChanges that occurred in the course of applying the current operation.

func (*Cursor) OperationCount

func (c *Cursor) OperationCount() int

OperationCount returns the count of operations in the current transaction

func (*Cursor) OperationID

func (c *Cursor) OperationID() int64

OperationID returns the current operations id, as used by the history system.

func (*Cursor) OperationOrder

func (c *Cursor) OperationOrder() int32

OperationOrder returns the order of the current operation amongst the current transaction's operations.

func (*Cursor) OperationResult

func (c *Cursor) OperationResult() *xdr.OperationResultTr

OperationResult returns the current operation's result record

func (*Cursor) OperationSourceAccount

func (c *Cursor) OperationSourceAccount() xdr.AccountId

OperationSourceAccount returns the current operation's effective source account (i.e. default's to the transaction's source account).

func (*Cursor) OperationType

func (c *Cursor) OperationType() xdr.OperationType

OperationType returns the current operation type

func (*Cursor) Operations

func (c *Cursor) Operations() []xdr.Operation

Operations returns the current transactions operations.

func (*Cursor) SuccessfulLedgerOperationCount

func (c *Cursor) SuccessfulLedgerOperationCount() (ret int)

SuccessfulLedgerOperationCount returns the count of operations in the current ledger

func (*Cursor) SuccessfulTransactionCount

func (c *Cursor) SuccessfulTransactionCount() (ret int)

SuccessfulTransactionCount returns the count of transactions in the current ledger that succeeded.

func (*Cursor) Transaction

func (c *Cursor) Transaction() *core.Transaction

Transaction returns the current transaction

func (*Cursor) TransactionFee

func (c *Cursor) TransactionFee() *core.TransactionFee

TransactionFee returns the txfeehistory row for the current transaction.

func (*Cursor) TransactionID

func (c *Cursor) TransactionID() int64

TransactionID returns the current tranaction's id, as used by the history system.

func (*Cursor) TransactionMetaBundle

func (c *Cursor) TransactionMetaBundle() *meta.Bundle

TransactionMetaBundle provides easier access to the meta data regarding the application of the current transaction.

func (*Cursor) TransactionSourceAccount

func (c *Cursor) TransactionSourceAccount() xdr.AccountId

TransactionSourceAccount returns the current transaction's source account id

type EffectIngestion

type EffectIngestion struct {
	Dest        *Ingestion
	OperationID int64
	// contains filtered or unexported fields
}

EffectIngestion is a helper struct to smooth the ingestion of effects. this struct will track what the correct operation to use and order to use when adding effects into an ingestion.

func (*EffectIngestion) Add

func (ei *EffectIngestion) Add(aid xdr.AccountId, typ history.EffectType, details interface{}) bool

Add writes an effect to the database while automatically tracking the index to use.

func (*EffectIngestion) Finish

func (ei *EffectIngestion) Finish() error

Finish marks this ingestion as complete, returning any error that was recorded.

type IngesterMetrics

type IngesterMetrics struct {
	ClearLedgerTimer  metrics.Timer
	IngestLedgerTimer metrics.Timer
	LoadLedgerTimer   metrics.Timer
}

IngesterMetrics tracks all the metrics for the ingestion subsystem

type Ingestion

type Ingestion struct {
	// DB is the sql connection to be used for writing any rows into the frontier
	// database.
	DB *db.Session
	// contains filtered or unexported fields
}

Ingestion receives write requests from a Session

func (*Ingestion) Clear

func (ingest *Ingestion) Clear(start int64, end int64) error

Clear removes a range of data from the history database, exclusive of the end id provided.

func (*Ingestion) ClearAll

func (ingest *Ingestion) ClearAll() error

ClearAll clears the entire history database

func (*Ingestion) Close

func (ingest *Ingestion) Close() error

Close finishes the current transaction and finishes this ingestion.

func (*Ingestion) Effect

func (ingest *Ingestion) Effect(address Address, opid int64, order int, typ history.EffectType, details interface{}) error

Effect adds a new row into the `history_effects` table.

func (*Ingestion) Flush

func (ingest *Ingestion) Flush() error

Flush writes the currently buffered rows to the db, and if successful starts a new transaction.

func (*Ingestion) Ledger

func (ingest *Ingestion) Ledger(
	id int64,
	header *core.LedgerHeader,
	txs int,
	ops int,
)

Ledger adds a ledger to the current ingestion

func (*Ingestion) Operation

func (ingest *Ingestion) Operation(
	id int64,
	txid int64,
	order int32,
	source xdr.AccountId,
	typ xdr.OperationType,
	details map[string]interface{},

) error

Operation ingests the provided operation data into a new row in the `history_operations` table

func (*Ingestion) OperationParticipants

func (ingest *Ingestion) OperationParticipants(op int64, aids []xdr.AccountId)

OperationParticipants ingests the provided accounts `aids` as participants of operation with id `op`, creating a new row in the `history_operation_participants` table.

func (*Ingestion) Rollback

func (ingest *Ingestion) Rollback() (err error)

Rollback aborts this ingestions transaction

func (*Ingestion) Start

func (ingest *Ingestion) Start() (err error)

Start makes the ingestion reeady, initializing the insert builders and tx

func (*Ingestion) Trade

func (ingest *Ingestion) Trade(
	opid int64,
	order int32,
	buyer xdr.AccountId,
	trade xdr.ClaimOfferAtom,
	ledgerClosedAt int64,
) error

Trade records a trade into the history_trades table

func (*Ingestion) Transaction

func (ingest *Ingestion) Transaction(
	id int64,
	tx *core.Transaction,
	fee *core.TransactionFee,
)

Transaction ingests the provided transaction data into a new row in the `history_transactions` table

func (*Ingestion) TransactionParticipants

func (ingest *Ingestion) TransactionParticipants(tx int64, aids []xdr.AccountId)

TransactionParticipants ingests the provided account ids as participants of transaction with id `tx`, creating a new row in the `history_transaction_participants` table.

func (*Ingestion) UpdateAccountIDs

func (ingest *Ingestion) UpdateAccountIDs(tables []TableName) error

UpdateAccountIDs updates IDs of the accounts before inserting objects into DB.

type LedgerBundle

type LedgerBundle struct {
	Sequence        int32
	Header          core.LedgerHeader
	TransactionFees []core.TransactionFee
	Transactions    []core.Transaction
}

LedgerBundle represents a single ledger's worth of novelty created by one ledger close

func (*LedgerBundle) Load

func (lb *LedgerBundle) Load(db *db.Session) error

Load runs queries against `core` to fill in the records of the bundle.

type Session

type Session struct {
	Cursor    *Cursor
	Ingestion *Ingestion
	// Network is the passphrase for the network being imported
	Network string

	// StellarCoreURL is the http endpoint of the digitalbits-core that data is being
	// ingested from.
	StellarCoreURL string

	// ClearExisting causes the session to clear existing data from the frontier db
	// when the session is run.
	ClearExisting bool

	// SkipCursorUpdate causes the session to skip
	// reporting the "last imported ledger" cursor to
	// digitalbits-core
	SkipCursorUpdate bool

	// Metrics is a reference to where the session should record its metric information
	Metrics *IngesterMetrics

	// Err is the error that caused this session to fail, if any.
	Err error

	// Ingested is the number of ledgers that were successfully ingested during
	// this session.
	Ingested int
}

Session represents a single attempt at ingesting data into the history database.

func NewSession

func NewSession(i *System) *Session

NewSession initialize a new ingestion session

func (*Session) Run

func (is *Session) Run()

Run starts an attempt to ingest the range of ledgers specified in this session.

type System

type System struct {
	// HorizonDB is the connection to the frontier database that ingested data will
	// be written to.
	HorizonDB *db.Session

	// CoreDB is the digitalbitsv-core db that data is ingested from.
	CoreDB *db.Session

	Metrics IngesterMetrics

	// Network is the passphrase for the network being imported
	Network string

	// StellarCoreURL is the http endpoint of the digitalbits-core that data is being
	// ingested from.
	StellarCoreURL string

	// SkipCursorUpdate causes the ingestor to skip
	// reporting the "last imported ledger" cursor to
	// digitalbits-core
	SkipCursorUpdate bool

	// HistoryRetentionCount is the desired minimum number of ledgers to
	// keep in the history database, working backwards from the latest core
	// ledger.  0 represents "all ledgers".
	HistoryRetentionCount uint
	// contains filtered or unexported fields
}

System represents the data ingestion subsystem of frontier.

func New

func New(network string, coreURL string, core, frontier *db.Session) *System

New initializes the ingester, causing it to begin polling the digitalbits-core database for now ledgers and ingesting data into the frontier database.

func (*System) Backfill

func (i *System) Backfill(n uint) error

Backfill ingests history in reverse chronological order, from the current frontier elder query for `n` ledgers

func (*System) ClearAll

func (i *System) ClearAll() error

ClearAll removes all previously ingested historical data from the frontier database.

func (*System) RebaseHistory

func (i *System) RebaseHistory(sequence int32) error

RebaseHistory re-establishes frontier's history database using the provided sequence as a starting point.

func (*System) ReingestAll

func (i *System) ReingestAll() (int, error)

ReingestAll re-ingests all ledgers

func (*System) ReingestOutdated

func (i *System) ReingestOutdated() (n int, err error)

ReingestOutdated finds old ledgers and reimports them.

func (*System) ReingestRange

func (i *System) ReingestRange(start, end int32) (int, error)

ReingestRange reingests a range of ledgers, from `start` to `end`, inclusive.

func (*System) ReingestSingle

func (i *System) ReingestSingle(sequence int32) error

ReingestSingle re-ingests a single ledger

func (*System) Tick

func (i *System) Tick() *Session

Tick triggers the ingestion system to ingest any new ledger data, provided that there currently is not an import session in progress.

type TableName

type TableName string
const (
	AssetStatsTableName              TableName = "asset_stats"
	EffectsTableName                 TableName = "history_effects"
	LedgersTableName                 TableName = "history_ledgers"
	OperationParticipantsTableName   TableName = "history_operation_participants"
	OperationsTableName              TableName = "history_operations"
	TradesTableName                  TableName = "history_trades"
	TransactionParticipantsTableName TableName = "history_transaction_participants"
	TransactionsTableName            TableName = "history_transactions"
)

Directories

Path Synopsis
Package participants contains functions to derive a set of "participant" addresses for various data structures in the DigitalBits network's ledger.
Package participants contains functions to derive a set of "participant" addresses for various data structures in the DigitalBits network's ledger.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL