pipeline

package
v0.18.14 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 19, 2022 License: Apache-2.0 Imports: 25 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func RunCleanup

func RunCleanup(cfg *config.Config, db *store.Store, logger *logrus.Logger) error

RunCleanup performs the data cleanup

func RunSync

func RunSync(cfg *config.Config, db *store.Store, clients []near.Client) (int, error)

func RunSyncHistoricalDelegators added in v0.9.0

func RunSyncHistoricalDelegators(cfg *config.Config, db *store.Store, clients []near.Client, logger *logrus.Logger) error

func RunSyncTransactionsMissingInfoForHistorical added in v0.11.0

func RunSyncTransactionsMissingInfoForHistorical(cfg *config.Config, db *store.Store, clients []near.Client) error

Types

type AnalyzerTask added in v0.3.0

type AnalyzerTask struct {
	// contains filtered or unexported fields
}

AnalyzerTask performs data analysis on parsed heights data

func NewAnalyzerTask added in v0.3.0

func NewAnalyzerTask(db *store.Store, logger *logrus.Logger) AnalyzerTask

NewAnalyzerTask returns a new analyzer task

func (AnalyzerTask) Name added in v0.3.0

func (t AnalyzerTask) Name() string

Name returns the task name

func (AnalyzerTask) Run added in v0.3.0

func (t AnalyzerTask) Run(ctx context.Context, payload *Payload) error

Run executes the analyzer task

func (AnalyzerTask) ShouldRun added in v0.3.0

func (t AnalyzerTask) ShouldRun(payload *Payload) bool

ShouldRun returns true if there any heights to process

type DataStoreRetrieveTask added in v0.18.0

type DataStoreRetrieveTask struct {
	// contains filtered or unexported fields
}

func NewDataStoreRetrieveTask added in v0.18.0

func NewDataStoreRetrieveTask(
	datastoreConn *grpc.ClientConn,
	db *store.Store,
	rpc []near.Client,
	config *config.Config,
	logger *logrus.Logger,

) DataStoreRetrieveTask

func (DataStoreRetrieveTask) Name added in v0.18.0

func (t DataStoreRetrieveTask) Name() string

Name returns the task name

func (*DataStoreRetrieveTask) RPC added in v0.18.0

func (DataStoreRetrieveTask) Run added in v0.18.0

func (t DataStoreRetrieveTask) Run(ctx context.Context, payload *Payload) (err error)

func (DataStoreRetrieveTask) ShouldRun added in v0.18.0

func (t DataStoreRetrieveTask) ShouldRun(payload *Payload) bool

ShouldRun returns true if there any heights to process

type FetcherTask added in v0.3.0

type FetcherTask struct {
	// contains filtered or unexported fields
}

FetcherTask performs fetching data from the network node

func NewFetcherTask added in v0.3.0

func NewFetcherTask(
	db *store.Store,
	rpc []near.Client,
	config *config.Config,
	logger *logrus.Logger,
) FetcherTask

NewFetcherTask returns a new data fetcher task

func (FetcherTask) Name added in v0.3.0

func (t FetcherTask) Name() string

Name returns the task name

func (*FetcherTask) RPC added in v0.5.0

func (t *FetcherTask) RPC() near.Client

func (FetcherTask) Run added in v0.3.0

func (t FetcherTask) Run(ctx context.Context, payload *Payload) error

Run executes the data fetching

func (FetcherTask) ShouldRun added in v0.3.0

func (t FetcherTask) ShouldRun(payload *Payload) bool

ShouldRun returns true if there any heights to process

type HeightPayload added in v0.3.0

type HeightPayload struct {
	Height      uint64 `json:"height"`
	ErrorString string `json:"error_string"`
	Error       error
	Skip        bool `json:"skip"`

	Block                  *near.Block                   `json:"block"`
	Validators             []near.Validator              `json:"validators"`
	Chunks                 []near.ChunkDetails           `json:"chunks"`
	Transactions           []near.TransactionDetails     `json:"transactions"`
	Delegations            []near.AccountInfo            `json:"delegations"`
	Accounts               map[string]near.Account       `json:"accounts"`
	RewardFees             map[string]near.RewardFee     `json:"reward_fees"`
	DelegationsByValidator map[string][]near.AccountInfo `json:"delegations_by_validator"`
	CurrentEpoch           bool                          `json:"current_epoch"`
	PreviousValidators     []near.Validator              `json:"previous_validators"`
	PreviousEpochKickOut   []near.ValidatorKickout       `json:"previous_epoch_kick_out"`
	PreviousBlock          *near.Block                   `json:"previous_block"`

	Parsed *ParsedPayload
}

HeightPayload contains all raw data for a single height

func (*HeightPayload) DecodeError added in v0.18.0

func (p *HeightPayload) DecodeError()

func (*HeightPayload) SkipWithError added in v0.3.0

func (p *HeightPayload) SkipWithError(err error)

SkipWithError marks the payload as skipped

type ParsedPayload added in v0.3.0

type ParsedPayload struct {
	Block           *model.Block
	Epoch           *model.Epoch
	Transactions    []model.Transaction
	Validators      []model.Validator
	ValidatorAggs   []model.ValidatorAgg
	ValidatorEpochs []model.ValidatorEpoch
	DelegatorEpochs []model.DelegatorEpoch
	Accounts        []model.Account
	Events          []model.Event
}

ParsedPayload contains parsed data for a single height

type ParserTask added in v0.3.0

type ParserTask struct {
	// contains filtered or unexported fields
}

ParserTask performs raw block data parsing

func NewParserTask added in v0.3.0

func NewParserTask(db *store.Store, logger *logrus.Logger) ParserTask

NewParserTask returns a new parser task

func (ParserTask) Name added in v0.3.0

func (t ParserTask) Name() string

Name returns the task name

func (ParserTask) Run added in v0.3.0

func (t ParserTask) Run(ctx context.Context, payload *Payload) error

Run executes the parser task

func (ParserTask) ShouldRun added in v0.3.0

func (t ParserTask) ShouldRun(payload *Payload) bool

ShouldRun returns true if there any heights to process

type Payload added in v0.3.0

type Payload struct {
	Lag         int
	StartHeight uint64
	StartTime   time.Time
	EndHeight   uint64
	EndTime     time.Time
	Tip         *near.Block
	Heights     []*HeightPayload
}

Payload contains data for a single sync run

type PersistorTask added in v0.3.0

type PersistorTask struct {
	// contains filtered or unexported fields
}

PersistorTask saves the processed data in the database

func NewPersistorTask added in v0.3.0

func NewPersistorTask(db *store.Store, rewardsHTTPStoreAddress string, network string, chain_id string, logger *logrus.Logger) PersistorTask

NewPersistorTask returns a new persistor task

func (PersistorTask) Name added in v0.3.0

func (t PersistorTask) Name() string

Name returns the task name

func (PersistorTask) Run added in v0.3.0

func (t PersistorTask) Run(ctx context.Context, payload *Payload) error

Run executes the persistor task

func (PersistorTask) ShouldRun added in v0.3.0

func (t PersistorTask) ShouldRun(payload *Payload) bool

ShouldRun returns true if there any heights to process

type Task

type Task interface {
	Name() string
	ShouldRun(*Payload) bool
	Run(context.Context, *Payload) error
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL