pipeline

package
v1.5.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 17, 2024 License: Apache-2.0 Imports: 39 Imported by: 2

Documentation

Index

Constants

This section is empty.

Variables

View Source
var Done = errors.New("done")

Functions

func BuildRequestDetails added in v0.1.0

func BuildRequestDetails(
	ctx context.Context,
	request *pbsubstreamsrpc.Request,
	getRecentFinalBlock getBlockFunc,
	resolveCursor CursorResolver,
	getHeadBlock getBlockFunc) (req *reqctx.RequestDetails, undoSignal *pbsubstreamsrpc.BlockUndoSignal, err error)

func BuildRequestDetailsFromSubrequest added in v1.0.2

func BuildRequestDetailsFromSubrequest(request *pbssinternal.ProcessRangeRequest) (req *reqctx.RequestDetails)

func NewStoreBoundary added in v0.0.21

func NewStoreBoundary(
	interval uint64,
	requestStartBlockNum uint64,
	requestStopBlock uint64,
) *storeBoundary

Types

type CursorResolver added in v1.0.2

type CursorResolver func(context.Context, *bstream.Cursor) (reorgJunctionBlock, currentHead bstream.BlockRef, err error)

func NewCursorResolver added in v1.0.2

func NewCursorResolver(hub *hub.ForkableHub, mergedBlocksStore, forkedBlocksStore dstore.Store) CursorResolver

type ForkHandler added in v0.0.20

type ForkHandler struct {
	// contains filtered or unexported fields
}

TODO(abourget): The scope of this object and the Engine

are not pretty similar, to keep track of certain pieces
of info that are reversible, and handle the back and forth
between undos and redos.
Perhaps what we could have here, is have those undo handlers
live on the Pipeline (where it makes sense)
and have some nested structs handle

func NewForkHandler added in v0.0.21

func NewForkHandler() *ForkHandler

type Option

type Option func(p *Pipeline)

func WithFinalBlocksOnly added in v1.1.1

func WithFinalBlocksOnly() Option

func WithHighestStage added in v1.1.9

func WithHighestStage(stage uint32) Option

func WithPendingUndoMessage added in v1.1.1

func WithPendingUndoMessage(msg *pbsubstreamsrpc.Response) Option

WithPendingUndoMessage allows sending a message right before we send the first 'BlockScopedData'

func WithPostBlockHook

func WithPostBlockHook(f substreams.BlockHook) Option

func WithPostJobHook

func WithPostJobHook(f substreams.PostJobHook) Option

func WithPreBlockHook

func WithPreBlockHook(f substreams.BlockHook) Option

type Pipeline

type Pipeline struct {
	// contains filtered or unexported fields
}

func New

func New(
	ctx context.Context,
	outputGraph *outputmodules.Graph,
	stores *Stores,
	execoutStorage *execout.Configs,
	wasmRuntime *wasm.Registry,
	execOutputCache *cache.Engine,
	runtimeConfig config.RuntimeConfig,
	respFunc substreams.ResponseFunc,
	opts ...Option,
) *Pipeline

func (*Pipeline) GetStoreMap added in v0.1.0

func (p *Pipeline) GetStoreMap() store.Map

func (*Pipeline) Init added in v0.0.14

func (p *Pipeline) Init(ctx context.Context) (err error)

func (*Pipeline) InitTier1StoresAndBackprocess added in v1.1.9

func (p *Pipeline) InitTier1StoresAndBackprocess(ctx context.Context, reqPlan *plan.RequestPlan) (err error)

func (*Pipeline) InitTier2Stores added in v1.1.9

func (p *Pipeline) InitTier2Stores(ctx context.Context) (err error)

func (*Pipeline) OnStreamTerminated added in v0.1.0

func (p *Pipeline) OnStreamTerminated(ctx context.Context, err error) error

OnStreamTerminated performs flush of store and setting trailers when the stream terminated gracefully from our point of view. If the stream terminated gracefully, we return `nil` otherwise, the original is returned.

func (*Pipeline) ProcessBlock added in v0.0.14

func (p *Pipeline) ProcessBlock(block *pbbstream.Block, obj interface{}) (err error)

func (*Pipeline) ProcessFromExecOutput added in v1.4.0

func (p *Pipeline) ProcessFromExecOutput(
	ctx context.Context,
	clock *pbsubstreams.Clock,
	cursor *bstream.Cursor,
) (err error)

type Stores added in v0.1.0

type Stores struct {
	StoreMap store.Map
	// contains filtered or unexported fields
}

func NewStores added in v0.1.0

func NewStores(ctx context.Context, storeConfigs store.ConfigMap, storeSnapshotSaveInterval, requestStartBlockNum, stopBlockNum uint64, isTier2Request bool) *Stores

func (*Stores) SetStoreMap added in v0.1.0

func (s *Stores) SetStoreMap(storeMap store.Map)

type UndoHandler added in v0.0.21

type UndoHandler func(clock *pbsubstreams.Clock, moduleOutputs []*pbssinternal.ModuleOutput)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL