live

package
v0.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 22, 2020 License: Apache-2.0 Imports: 29 Imported by: 0

README

CLI call

echo '{"lowBlockNum": 40000000, "highBlockNum": 41000000, "descending": true, "limit": 100, "query": "status:executed"}' | grpcurl -plaintext -d @ localhost:50002 search.Archive.StreamMatches
echo '{"lowBlockNum": 40000000, "highBlockNum": 41000000, "descending": true, "limit": 100, "query": "status:executed"}' | grpcurl -plaintext -d @ localhost:50001 search.Router.StreamMatches

gRPC UI

grpcui -port 60002 -plaintext localhost:50002

Planned renames

  • shardIndex:

    • startBlock -> lowBlock
    • endBlock -> highBlock
  • queries also:

    • lowBlockNum
    • highBlockNum

or should it be hi and lo ? loBlockNum, hiBlockNum

Case where we should fail

  • Going backwards, if you're giving me a forked head block, I fail.

    The backwards cursor should pass the head block in there, even as we navigate towards the beginning of the chain.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func GetMeshLIB

func GetMeshLIB(getAllPeers GetSearchPeersFunc, backendThreshold int) bstream.BlockRef

Types

type GetSearchPeersFunc

type GetSearchPeersFunc func() []*dmesh.SearchPeer

type IndexedBlock

type IndexedBlock struct {
	Idx *search.SingleIndex
	Blk *bstream.Block
}

type LiveBackend

type LiveBackend struct {
	*shutter.Shutter
	// contains filtered or unexported fields
}

func New

func New(dmeshClient dmeshClient.SearchClient, searchPeer *dmesh.SearchPeer, headDelayTolerance uint64, shutdownDelay time.Duration) *LiveBackend

func (*LiveBackend) Check

func (*LiveBackend) GetHeadInfo

func (*LiveBackend) Launch

func (b *LiveBackend) Launch(grpcListenAddr string)

func (*LiveBackend) SetupSubscriptionHub

func (b *LiveBackend) SetupSubscriptionHub(
	startBlock bstream.BlockRef,
	blockMapper search.BlockMapper,
	blocksStore dstore.Store,
	blockstreamAddr string,
	liveIndexesPath string,
	realtimeTolerance time.Duration,
	truncationThreshold int,
) error

func (*LiveBackend) StreamHeadInfo

func (*LiveBackend) StreamMatches

func (b *LiveBackend) StreamMatches(req *pb.BackendRequest, stream pb.Backend_StreamMatchesServer) error

Backend.StreamMatches gRPC implementation

func (*LiveBackend) WaitHubReady

func (b *LiveBackend) WaitHubReady()

type LiveQuery

type LiveQuery struct {
	Ctx context.Context

	MatchCollector search.MatchCollector

	Request    *pb.BackendRequest
	BleveQuery *search.BleveQuery

	IncomingMatches chan *pb.SearchMatch

	LastBlockRead uint64

	// fwd only
	LiveMarkerReached          bool
	LiveMarkerLastSentBlockNum uint64
	// contains filtered or unexported fields
}

func (*LiveQuery) ForwardProcessBlock

func (q *LiveQuery) ForwardProcessBlock(blk *bstream.Block, obj interface{}) error

func (*LiveQuery) ProcessMatches

func (q *LiveQuery) ProcessMatches(matches []search.SearchMatch, blk *bstream.Block, irrBlockNum uint64, step forkable.StepType) error

func (*LiveQuery) ProcessSingleBlocks

func (q *LiveQuery) ProcessSingleBlocks(ctx context.Context, indexedBlock *IndexedBlock, matchCollector search.MatchCollector, incomingMatches chan *pb.SearchMatch) (err error)

type TailManager

type TailManager struct {
	sync.Mutex
	// contains filtered or unexported fields
}

TailManager truncates the buffer under the guard of the LowBlockGuard (populated by the SubscriptionHub), as well as managing which lower block is published on dmesh, to slowly drain our requests that we're about to truncate.. then truncates the buffer, and cleans up the bleve indexes below.

func NewTailManager

func NewTailManager(getSearchPeers GetSearchPeersFunc, dmeshClient dmeshClient.Client, searchPeer *dmesh.SearchPeer, buffer *bstream.Buffer, minTargetBufferSize int, backendThreshold int, targetLIB bstream.BlockRef) *TailManager

func (*TailManager) CurrentLIB

func (t *TailManager) CurrentLIB() bstream.BlockRef

func (*TailManager) Launch

func (t *TailManager) Launch()

func (*TailManager) Ready

func (t *TailManager) Ready() bool

func (*TailManager) TailLock

func (t *TailManager) TailLock(startBlockNum uint64) (releaseFunc func(), err error)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL