search

package module
v0.0.2-0...-f4c2c6f Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 7, 2022 License: Apache-2.0 Imports: 34 Imported by: 0

README

reference License

The StreamingFast Search engine is an innovative, both historical and real-time, fork-aware, blockchain search engine. It is part of StreamingFast.

Features

It can act as a distributed system, composed of real-time and archive backends, plus a router addressing the right backends, discovered through an etcd cluster.

It supports massively parallelized indexing of the chain (put in the power, and process 20TB of data in 30 minutes). It is designed for high availability, and scales horizontally.

It feeds from a StreamingFast source_, like EOSIO on StreamingFast

Installation & Usage

See the different protocol-specific StreamingFast binaries at https://github.com/streamingfast/streamingfast#protocols

Current search implementations:

Contributing

Issues and PR in this repo related strictly to the core search engine.

Report any protocol-specific issues in their respective repositories

Please first refer to the general StreamingFast contribution guide, if you wish to contribute to this code base.

This codebase uses unit tests extensively, please write and run tests.

License

Apache 2.0

Documentation

Index

Constants

View Source
const MaxInt = int(^uint(0) >> 1)
View Source
const TimeFormatBleveID = "2006-01-02T15-04-05.000"

Variables

View Source
var BoolFieldMapping *mapping.FieldMapping
View Source
var DisabledMapping *mapping.DocumentMapping

General purpose mappers

View Source
var DmeshArchiveLIBTarget = bstream.Target("dmesh-head-target")
View Source
var DynamicNestedDocMapping *mapping.DocumentMapping
View Source
var ErrEndOfRange = errors.New("end of block range")
View Source
var GetSearchMatchFactory func() SearchMatch
View Source
var SortableNumericFieldMapping *mapping.FieldMapping
View Source
var TestMatchCollector = func(ctx context.Context, lowBlockNum, highBlockNum uint64, results bsearch.DocumentMatchCollection) (out []SearchMatch, err error) {
	trxs := make(map[string][]uint16)
	var trxList []*testTrxResult

	for _, el := range results {
		if err := ctx.Err(); err != nil {
			return nil, err
		}

		blockNum, trxID, actionIdx, skip := testExplodeDocumentID(el.ID)
		if skip {
			continue
		}

		if blockNum < lowBlockNum || blockNum > highBlockNum {
			continue
		}

		if _, found := trxs[trxID]; !found {
			trxList = append(trxList, &testTrxResult{
				id:       trxID,
				blockNum: blockNum,
			})
		}

		trxs[trxID] = append(trxs[trxID], actionIdx)
	}

	for _, trx := range trxList {
		actions := trxs[trx.id]
		sort.Slice(actions, func(i, j int) bool { return actions[i] < actions[j] })

		out = append(out, &testSearchMatch{
			blockNumber:   trx.blockNum,
			trxIDPrefix:   trx.id,
			actionIndexes: actions,
		})
	}

	return out, nil
}
View Source
var TxtFieldMapping *mapping.FieldMapping

Functions

func AsPreprocessBlock

func AsPreprocessBlock(b BlockMapper) bstream.PreprocessFunc

func CheckIndexIntegrity

func CheckIndexIntegrity(path string, shardSize uint64) (*indexMetaInfo, error)

func DmeshHighestArchiveBlockRefGetter

func DmeshHighestArchiveBlockRefGetter(getAllPeers GetSearchPeersFunc, backendThreshold int) bstream.BlockRefGetter

func DoForFirstNChunks

func DoForFirstNChunks(input []byte, nchunks int, chunkSize int, function func(idxRef int, chunkRef []byte))

func GetIrreversibleBlock

func GetIrreversibleBlock(blockmetaCli pbblockmeta.BlockIDClient, blockNum uint64, ctx context.Context, retries int) (bstream.BlockRef, error)

GetIrreversibleBlock will do whatever it takes to fetch the irreversible block at height `blockNum`, up to a number of retries. -1 retries mean forever

func GetLibBlock

func GetLibBlock(blockmetaCli pbblockmeta.BlockIDClient) (bstream.BlockRef, error)

func GetLibInfo

func GetLibInfo(headinfoCli pbheadinfo.HeadInfoClient) (bstream.BlockRef, error)

func MapFirstNChunks

func MapFirstNChunks(input []byte, nChunks int, chunkSize int) map[string]string

func NewCursor

func NewCursor(blockNum uint64, headBlockID string, trxPrefix string) string

v0: 0:blocknum:(irr|headBlockID):trxPrefix

func ValidateRegistry

func ValidateRegistry() error

Types

type BleveQuery

type BleveQuery struct {
	Raw string

	FieldTransformer sqe.FieldTransformer

	FieldNames []string
	Validator  BleveQueryValidator
	// contains filtered or unexported fields
}

func NewParsedQuery

func NewParsedQuery(ctx context.Context, rawQuery string) (*BleveQuery, error)

func (*BleveQuery) BleveQuery

func (q *BleveQuery) BleveQuery() query.Query

func (*BleveQuery) Hash

func (q *BleveQuery) Hash() (string, error)

func (*BleveQuery) Parse

func (q *BleveQuery) Parse(ctx context.Context) error

func (*BleveQuery) Validate

func (q *BleveQuery) Validate() error

type BleveQueryFactory

type BleveQueryFactory func(rawQuery string) *BleveQuery
var GetBleveQueryFactory BleveQueryFactory

type BleveQueryValidator

type BleveQueryValidator interface {
	Validate(q *BleveQuery) error
}

type BlockMapper

type BlockMapper interface {
	Map(block *bstream.Block) ([]*document.Document, error)
	Validate() error
}

type BoundaryBlockInfo

type BoundaryBlockInfo struct {
	Num  uint64
	ID   string
	Time time.Time
}

type GetSearchPeersFunc

type GetSearchPeersFunc func() []*dmesh.SearchPeer

type IndexedField

type IndexedField struct {
	Name      string    `json:"name"`
	ValueType ValueType `json:"type"`
}

type IndexedFieldsMapFunc

type IndexedFieldsMapFunc func() map[string]*IndexedField

type MatchCollector

type MatchCollector func(ctx context.Context, lowBlockNum, highBlockNum uint64, results bsearch.DocumentMatchCollection) ([]SearchMatch, error)
var GetMatchCollector MatchCollector

type MultiError

type MultiError []error

func (MultiError) Error

func (m MultiError) Error() string

type PreIndexer

type PreIndexer struct {
	// contains filtered or unexported fields
}

PreIndexer is a bstream Preprocessor that returns the bleve object instead from a bstream.block

func NewPreIndexer

func NewPreIndexer(blockMapper BlockMapper, liveIndexesPath string) *PreIndexer

func (*PreIndexer) Preprocess

func (i *PreIndexer) Preprocess(blk *bstream.Block) (interface{}, error)

type QueryMetrics

type QueryMetrics struct {

	// The number of transactions we have seen from our shard results
	TransactionSeenCount int64

	// The number of actual walked indexes, regardless if it was actually used to return results or not (cancel index)
	SearchedIndexesCount *atomic.Uint32

	// The number of actual usueful searched indexes, those that were used to return results
	UtilizedIndexesCount *atomic.Uint32

	// The total transaction count utilized through utilized indexes
	UtilizedTrxCount *atomic.Uint32

	// The total time passed cumulatively in each utilized indexes
	UtilizedTotalDuration *atomic.Duration
	// contains filtered or unexported fields
}

func NewQueryMetrics

func NewQueryMetrics(zlog *zap.Logger, descending bool, query string, shardSize uint64, low, high uint64) *QueryMetrics

func (*QueryMetrics) Finalize

func (m *QueryMetrics) Finalize()

func (*QueryMetrics) MarkFirstResult

func (m *QueryMetrics) MarkFirstResult()

type SearchMatch

type SearchMatch interface {
	BlockNum() uint64
	TransactionIDPrefix() string

	GetIndex() uint64
	SetIndex(index uint64)

	FillProtoSpecific(match *pbsearch.SearchMatch, blk *bstream.Block) error
}

func RunSingleIndexQuery

func RunSingleIndexQuery(
	ctx context.Context,
	sortDesc bool,
	lowBlockNum, highBlockNum uint64,
	matchCollector MatchCollector,
	bquery *BleveQuery,
	index index.Index,
	releaseIndex func(),
	metrics *QueryMetrics,
) (
	out []SearchMatch,
	err error,
)

type ShardIndex

type ShardIndex struct {
	index.Index

	IndexBuilder    index.IndexBuilder
	IndexTargetPath string

	// These two values represent the "potential" start and end
	// block. It doesn't mean there is actual data within those two
	// blocks: ex: if block endBlock had 0 transactions, we wouldn't
	// shrink `endBlock`.
	//
	// The chain of [startBlock, endBlock] -> [startBlock, endBlock]
	// *must* be absolutely continuous from index to index within the
	// process, and between the different segments of indexes
	// (readOnly, merging, writable, and live)
	StartBlock     uint64 // inclusive
	StartBlockID   string
	StartBlockTime time.Time
	EndBlock       uint64 // inclusive
	EndBlockID     string
	EndBlockTime   time.Time

	Lock sync.RWMutex
	// contains filtered or unexported fields
}

func NewShardIndexWithAnalysisQueue

func NewShardIndexWithAnalysisQueue(baseBlockNum uint64, shardSize uint64, idx index.Index, pathFunc filePathFunc, analysisQueue *index.AnalysisQueue) (*ShardIndex, error)

func (*ShardIndex) Close

func (s *ShardIndex) Close() error

func (*ShardIndex) GetBoundaryBlocks

func (s *ShardIndex) GetBoundaryBlocks(idx index.Index) (start *BoundaryBlockInfo, end *BoundaryBlockInfo, err error)

func (*ShardIndex) RequestCoversFullRange

func (s *ShardIndex) RequestCoversFullRange(low, high uint64) bool

func (*ShardIndex) WritablePath

func (s *ShardIndex) WritablePath(suffix string) string

type SingleIndex

type SingleIndex struct {
	index.Index
	// contains filtered or unexported fields
}

func (*SingleIndex) Delete

func (i *SingleIndex) Delete()

func (*SingleIndex) GetIndex

func (i *SingleIndex) GetIndex() index.Index

type ValueType

type ValueType int32
const (
	AccountType ValueType = iota
	AddressType
	ActionType
	ActionIndexType
	AssetType
	BooleanType
	BlockNumType
	HexType
	FreeFormType
	NameType
	PermissionType
	TransactionIDType
	NumberType
)

Directories

Path Synopsis
app
cmd

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL