bstream

package module
v0.0.2-0...-d05d5d5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 30, 2024 License: Apache-2.0 Imports: 36 Imported by: 0

README

StreamingFast Blocks Streaming Library

reference License

The bstream package manages flows of blocks and forks in a blockchain through a Handler-based interface similar to net/http.

Usage

Flows are composed by assembling Handlers:

type HandlerFunc func(blk *pbbstream.Block, obj interface{}) error

and are kicked off by passing them to a Source

Overview

All streaming features of streamingfast use this package.

Sources include:

  • FileSource feeds from 100-blocks files in some dstore-based location (some object storage, or local filesystem files)
  • LiveSource streams from a gRPC-based block streamer (fed from instrumented blockchain nodes directly).
  • JoiningSource which bridges a FileSource and a LiveSource transparently, so you can stream from files and then handoff to a real-time stream.

Handlers include:

  • Forkable (in forkable/) which manages chain reorganizations, undos, according to the chain's consensus (longest chain, etc..)
  • SubscriptionHub (in hub/): In-process hub to dispatch blocks from a remote source to all consumers inside a Go process
  • A few gates, that allow the flowing of blocks only upon certain conditions (BlockNumGate, BlockIDGate, RealtimeGate, RealtimeTripper, which can be inclusive or exclusive). See gates.go.

Contributing

Issues and PR in this repo related strictly to the low-level functionalities of bstream

Report any protocol-specific issues in their respective repositories

Please first refer to the general StreamingFast contribution guide, if you wish to contribute to this code base.

License

Apache 2.0

Documentation

Index

Constants

View Source
const (
	GateInclusive = GateType(iota)
	GateExclusive
)
View Source
const (
	StepNew  = StepType(1) //  First time we're seeing this block
	StepUndo = StepType(2) // We are undoing this block (it was came as New previously)

	StepIrreversible = StepType(16) // This block is now final and cannot be 'Undone' anymore (irreversible)

	StepStalled         = StepType(32)                                                  // This block passed the LIB and is definitely forked out
	StepNewIrreversible = StepType(StepNew | StepIrreversible)                          //5 First time we're seeing this block, but we already know that it is irreversible
	StepsAll            = StepType(StepNew | StepUndo | StepIrreversible | StepStalled) //7 useful for filters
)
View Source
const (
	FileSourceHeadTarget  = Target("filesource-head")
	LiveSourceHeadTarget  = Target("livesource-head")
	LiveSourceTailTarget  = Target("livesource-tail")
	NetworkHeadTarget     = Target("network-head")
	NetworkLIBTarget      = Target("network-lib")
	BlockStreamHeadTarget = Target("bstream-head")
	BlockStreamLIBTarget  = Target("bstream-lib")
	HubHeadTarget         = Target("hub-head")
	HubLIBTarget          = Target("hub-lib")
)

Variables

View Source
var EmptyCursor = &Cursor{
	Block:     BlockRefEmpty,
	HeadBlock: BlockRefEmpty,
	LIB:       BlockRefEmpty,
}
View Source
var ErrGetterUndefined = errors.New("no getter defined")
View Source
var ErrOpenEndedRange = errors.New("open ended range")
View Source
var ErrResolveCursor = errors.New("cannot resolve cursor")
View Source
var ErrStopBlockReached = errors.New("stop block reached")
View Source
var ErrTrackerBlockNotFound = errors.New("tracker block not found")
View Source
var GetMaxNormalLIBDistance = uint64(1000)
View Source
var GetProtocolFirstStreamableBlock = uint64(0)

bstreams.NewDBinBlockReader var GetBlockReaderFactory BlockReaderFactory bstream.NewDBinBlockWriter var GetBlockWriterFactory BlockWriterFactory var GetBlockWriterHeaderLen int

View Source
var Metrics = dmetrics.NewSet(dmetrics.PrefixNameWith("bstream"))
View Source
var NormalizeBlockID = func(in string) string {
	return in
}

Functions

func AssertBlockRefEqual

func AssertBlockRefEqual(t *testing.T, expected, actual BlockRef)

func AssertCursorEqual

func AssertCursorEqual(t *testing.T, expected, actual *Cursor)

func AssertProtoEqual

func AssertProtoEqual(t *testing.T, expected, actual proto.Message)

func BlockFileName

func BlockFileName(block *pbbstream.Block) string

func BlockFileNameWithSuffix

func BlockFileNameWithSuffix(block *pbbstream.Block, suffix string) string

func DoForProtocol

func DoForProtocol(kind pbbstream.Protocol, mappings map[pbbstream.Protocol]func() error) error

DoForProtocol extra the worker (a lambda) that will be invoked based on the received `kind` parameter. If the mapping exists, the worker is invoked and the error returned with the call. If the mapping does not exist, an error is returned. In all other cases, this function returns `nil`.

func EqualsBlockRefs

func EqualsBlockRefs(left, right BlockRef) bool

func FetchBlockFromMergedBlocksStore

func FetchBlockFromMergedBlocksStore(
	ctx context.Context,
	num uint64,
	store dstore.Store,
) (*pbbstream.Block, error)

func FetchBlockFromOneBlockStore

func FetchBlockFromOneBlockStore(
	ctx context.Context,
	num uint64,
	id string,
	store dstore.Store,
) (*pbbstream.Block, error)

func FetchBlockMetaByHashFromOneBlockStore

func FetchBlockMetaByHashFromOneBlockStore(
	ctx context.Context,
	id string,
	store dstore.Store,
) (*pbbstream.BlockMeta, error)

FetchBlockMetaByHashFromOneBlockStore fetches a block meta by its hash from a single block store. It will list all the blocks in the store and find the one that matches the hash. If the block is not found, it returns `nil, nil`.

func FetchBlockMetaFromOneBlockStore

func FetchBlockMetaFromOneBlockStore(
	ctx context.Context,
	num uint64,
	id string,
	store dstore.Store,
) (*pbbstream.BlockMeta, error)

func GetStreamHeadInfo

func GetStreamHeadInfo(ctx context.Context, addr string) (head BlockRef, lib BlockRef, err error)

func IsEmpty

func IsEmpty(ref BlockRef) bool

func MustDoForProtocol

func MustDoForProtocol(kind pbbstream.Protocol, mappings map[pbbstream.Protocol]func())

MustDoForProtocol perform the same work, but accept only non-error lambdas as the worker and an inexistant mapping will panic.

func NewOneBlocksSource

func NewOneBlocksSource(
	lowestBlockNum uint64,
	store dstore.Store,
	handler Handler,
	options ...OneBlocksSourceOption,
) (*oneBlocksSource, error)

func ParseFilename

func ParseFilename(filename string) (blockNum uint64, blockIDSuffix string, previousBlockIDSuffix string, libNum uint64, canonicalName string, err error)

func TestBlock

func TestBlock(id, prev string) *pbbstream.Block

func TestBlockFromJSON

func TestBlockFromJSON(jsonContent string) *pbbstream.Block

func TestBlockWithLIBNum

func TestBlockWithLIBNum(id, previousID string, newLIB uint64) *pbbstream.Block

func TestBlockWithNumbers

func TestBlockWithNumbers(id, prev string, num, prevNum uint64) *pbbstream.Block

func TestBlockWithTimestamp

func TestBlockWithTimestamp(id, prev string, timestamp time.Time) *pbbstream.Block

func TestJSONBlockWithLIBNum

func TestJSONBlockWithLIBNum(id, previousID string, newLIB uint64) string

func ToProtocol

func ToProtocol[B proto.Message](blk *pbbstream.Block) B

func TruncateBlockID

func TruncateBlockID(in string) string

func ValidateRegistry

func ValidateRegistry() error

Types

type BasicBlockRef

type BasicBlockRef struct {
	// contains filtered or unexported fields
}

BasicBlockRef assumes the id and num are completely separated and represents two independent piece of information. The `ID()` in this case is the `id` field and the `Num()` is the `num` field.

func NewBlockRef

func NewBlockRef(id string, num uint64) BasicBlockRef

func NewBlockRefFromID

func NewBlockRefFromID(id string) BasicBlockRef

NewBlockRefFromID is a convenience method when the string is assumed to have the block number in the first 8 characters of the id as a big endian encoded hexadecimal number and the full string represents the ID.

func (BasicBlockRef) ID

func (b BasicBlockRef) ID() string

func (BasicBlockRef) Num

func (b BasicBlockRef) Num() uint64

func (BasicBlockRef) String

func (b BasicBlockRef) String() string

type BlockDecoder

type BlockDecoder interface {
	Decode(blk *pbbstream.Block) (interface{}, error)
}

type BlockDecoderFunc

type BlockDecoderFunc func(blk *pbbstream.Block) (interface{}, error)

func (BlockDecoderFunc) Decode

func (f BlockDecoderFunc) Decode(blk *pbbstream.Block) (interface{}, error)

type BlockIDGate

type BlockIDGate struct {
	MaxHoldOff int
	// contains filtered or unexported fields
}

func NewBlockIDGate

func NewBlockIDGate(blockID string, gateType GateType, h Handler, opts ...GateOption) *BlockIDGate

func (*BlockIDGate) ProcessBlock

func (g *BlockIDGate) ProcessBlock(blk *pbbstream.Block, obj interface{}) error

func (*BlockIDGate) SetLogger

func (g *BlockIDGate) SetLogger(logger *zap.Logger)

type BlockIndexProvider

type BlockIndexProvider interface {
	BlocksInRange(baseBlockNum, bundleSize uint64) (out []uint64, err error)
}

type BlockIndexProviderGetter

type BlockIndexProviderGetter interface {
	GetIndexProvider() BlockIndexProvider
}

type BlockNumGate

type BlockNumGate struct {
	MaxHoldOff int
	// contains filtered or unexported fields
}

func NewBlockNumGate

func NewBlockNumGate(blockNum uint64, gateType GateType, h Handler, opts ...GateOption) *BlockNumGate

func (*BlockNumGate) ProcessBlock

func (g *BlockNumGate) ProcessBlock(blk *pbbstream.Block, obj interface{}) error

func (*BlockNumGate) SetLogger

func (g *BlockNumGate) SetLogger(logger *zap.Logger)

type BlockNumberGator

type BlockNumberGator struct {
	// contains filtered or unexported fields
}

func NewBlockNumberGator

func NewBlockNumberGator(blockNum uint64, opts ...GateOption) *BlockNumberGator

func NewExclusiveBlockNumberGator

func NewExclusiveBlockNumberGator(blockNum uint64, opts ...GateOption) *BlockNumberGator

func (*BlockNumberGator) Pass

func (g *BlockNumberGator) Pass(block *pbbstream.Block) bool

func (*BlockNumberGator) SetLogger

func (g *BlockNumberGator) SetLogger(logger *zap.Logger)

type BlockRef

type BlockRef interface {
	ID() string
	Num() uint64
	String() string
}

BlockRef represents a reference to a block and is mainly define as the pair `<BlockID, BlockNum>`. A `Block` interface should always implement the `BlockRef` interface.

The interface enforce also the creation of a `Stringer` object. We expected all format to be rendered in the form `#<BlockNum> (<Id>)`. This is to easy formatted output when using `zap.Stringer(...)`.

var BlockRefEmpty BlockRef = &emptyBlockRef{}

type BlockRefGetter

type BlockRefGetter func(context.Context) (BlockRef, error)

BlockRefGetter is a function to retrieve a block ref from any system.

func HighestBlockRefGetter

func HighestBlockRefGetter(getters ...BlockRefGetter) BlockRefGetter

func NetworkHeadBlockRefGetter

func NetworkHeadBlockRefGetter(headinfoServiceAddr string) BlockRefGetter

func NetworkLIBBlockRefGetter

func NetworkLIBBlockRefGetter(headinfoServiceAddr string) BlockRefGetter

func RetryableBlockRefGetter

func RetryableBlockRefGetter(attempts int, wait time.Duration, next BlockRefGetter) BlockRefGetter

func StreamHeadBlockRefGetter

func StreamHeadBlockRefGetter(headinfoServiceAddr string) BlockRefGetter

func StreamLIBBlockRefGetter

func StreamLIBBlockRefGetter(headinfoServiceAddr string) BlockRefGetter

type BlockWithObj

type BlockWithObj struct {
	Block *pbbstream.Block
	Obj   interface{}
}

type Buffer

type Buffer struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

func NewBuffer

func NewBuffer(name string, logger *zap.Logger) *Buffer

func (*Buffer) AllBlocks

func (b *Buffer) AllBlocks() (out []*pbbstream.Block)

func (*Buffer) AppendHead

func (b *Buffer) AppendHead(blk *pbbstream.Block)

func (*Buffer) Contains

func (b *Buffer) Contains(blockNum uint64) bool

func (*Buffer) Delete

func (b *Buffer) Delete(blk *pbbstream.Block)

func (*Buffer) Exists

func (b *Buffer) Exists(id string) bool

func (*Buffer) GetByID

func (b *Buffer) GetByID(id string) (blk *pbbstream.Block)

func (*Buffer) Head

func (b *Buffer) Head() (blk *pbbstream.Block)

func (*Buffer) HeadBlocks

func (b *Buffer) HeadBlocks(count int) []*pbbstream.Block

func (*Buffer) Len

func (b *Buffer) Len() int

Len() locks the buffer and returns its length. Watch out for deadlocks between buffer.lock and promises.lock if using this internally.

func (*Buffer) PopTail

func (b *Buffer) PopTail() (blockRef *pbbstream.Block)

func (*Buffer) Tail

func (b *Buffer) Tail() (blk *pbbstream.Block)

func (*Buffer) TruncateTail

func (b *Buffer) TruncateTail(lowBlockNumInclusive uint64) (truncated []*pbbstream.Block)

type Cursor

type Cursor struct {
	Step  StepType
	Block BlockRef
	LIB   BlockRef // last block sent as irreversible if it exists, else known forkdb LIB

	// HeadBlock will be the same as Block when you receive a 'new' Step, except during a reorg.
	// During a reorg, (steps in ['new','redo','undo']) the HeadBlock will always point to the block that causes the reorg.
	// When the LIB is advancing (ex: DPOSLibNum changes, etc.), step='irreversible' and the HeadBlock will be the block
	// that causes previous blocks to become irreversible.
	HeadBlock BlockRef
}

func CursorFromOpaque

func CursorFromOpaque(in string) (*Cursor, error)

func FromString

func FromString(cur string) (*Cursor, error)

func (*Cursor) Equals

func (c *Cursor) Equals(cc *Cursor) bool

func (*Cursor) IsEmpty

func (c *Cursor) IsEmpty() bool

func (*Cursor) IsOnFinalBlock

func (c *Cursor) IsOnFinalBlock() bool

func (*Cursor) String

func (c *Cursor) String() string

func (*Cursor) ToOpaque

func (c *Cursor) ToOpaque() string

type Cursorable

type Cursorable interface {
	Cursor() *Cursor
}

type DBinBlockReader

type DBinBlockReader struct {
	Header *dbin.Header
	// contains filtered or unexported fields
}

DBinBlockReader reads the dbin format where each element is assumed to be a `Block`.

func NewDBinBlockReader

func NewDBinBlockReader(reader io.Reader) (out *DBinBlockReader, err error)

func NewDBinBlockReaderWithValidation

func NewDBinBlockReaderWithValidation(reader io.Reader, validateHeaderFunc func(contentType string) error) (out *DBinBlockReader, err error)

func (*DBinBlockReader) Read

func (l *DBinBlockReader) Read() (*pbbstream.Block, error)

func (*DBinBlockReader) ReadAsBlockMeta

func (l *DBinBlockReader) ReadAsBlockMeta() (*pbbstream.BlockMeta, error)

ReadAsBlockMeta reads the next message as a BlockMeta instead of as a Block leading to reduce memory constaint since the payload are "skipped". There is a memory pressure since we need to load the full block.

But at least it's not persisent memory.

type DBinBlockWriter

type DBinBlockWriter struct {
	// contains filtered or unexported fields
}

DBinBlockWriter reads the dbin format where each element is assumed to be a `Block`.

func NewDBinBlockWriter

func NewDBinBlockWriter(writer io.Writer) (*DBinBlockWriter, error)

NewDBinBlockWriter creates a new DBinBlockWriter that writes to 'dbin' format, the 'contentType' must be 3 characters long perfectly, version should represent a version of the content.

func (*DBinBlockWriter) Write

func (w *DBinBlockWriter) Write(block *pbbstream.Block) error

type EternalSource

type EternalSource struct {
	*shutter.Shutter
	// contains filtered or unexported fields
}

func (*EternalSource) Run

func (s *EternalSource) Run()

func (*EternalSource) SetLogger

func (s *EternalSource) SetLogger(logger *zap.Logger)

type EternalSourceOption

type EternalSourceOption = func(s *EternalSource)

func EternalSourceWithLogger

func EternalSourceWithLogger(logger *zap.Logger) EternalSourceOption

type EternalSourceStartBackAtBlock

type EternalSourceStartBackAtBlock func() (BlockRef, error)

type FileSource

type FileSource struct {
	*shutter.Shutter
	// contains filtered or unexported fields
}

func NewFileSource

func NewFileSource(
	blocksStore dstore.Store,
	startBlockNum uint64,
	h Handler,
	logger *zap.Logger,
	options ...FileSourceOption,

) *FileSource

func NewFileSourceFromCursor

func NewFileSourceFromCursor(
	mergedBlocksStore dstore.Store,
	forkedBlocksStore dstore.Store,
	cursor *Cursor,
	h Handler,
	logger *zap.Logger,
	options ...FileSourceOption,
) *FileSource

func NewFileSourceThroughCursor

func NewFileSourceThroughCursor(
	mergedBlocksStore dstore.Store,
	forkedBlocksStore dstore.Store,
	startBlockNum uint64,
	cursor *Cursor,
	h Handler,
	logger *zap.Logger,
	options ...FileSourceOption,
) *FileSource

func (*FileSource) Run

func (s *FileSource) Run()

func (*FileSource) SetLogger

func (s *FileSource) SetLogger(logger *zap.Logger)

type FileSourceFactory

type FileSourceFactory struct {
	// contains filtered or unexported fields
}

func NewFileSourceFactory

func NewFileSourceFactory(
	mergedBlocksStore dstore.Store,
	forkedBlocksStore dstore.Store,
	logger *zap.Logger,
	options ...FileSourceOption,
) *FileSourceFactory

func (*FileSourceFactory) SourceFromBlockNum

func (g *FileSourceFactory) SourceFromBlockNum(start uint64, h Handler) Source

func (*FileSourceFactory) SourceFromCursor

func (g *FileSourceFactory) SourceFromCursor(cursor *Cursor, h Handler) Source

func (*FileSourceFactory) SourceThroughCursor

func (g *FileSourceFactory) SourceThroughCursor(start uint64, cursor *Cursor, h Handler) Source

type FileSourceOption

type FileSourceOption = func(s *FileSource)

func FileSourceWithBlockIndexProvider

func FileSourceWithBlockIndexProvider(prov BlockIndexProvider) FileSourceOption

func FileSourceWithBundleSize

func FileSourceWithBundleSize(bundleSize uint64) FileSourceOption

func FileSourceWithConcurrentPreprocess

func FileSourceWithConcurrentPreprocess(preprocFunc PreprocessFunc, threadCount int) FileSourceOption

func FileSourceWithRetryDelay

func FileSourceWithRetryDelay(delay time.Duration) FileSourceOption

func FileSourceWithStopBlock

func FileSourceWithStopBlock(stopBlock uint64) FileSourceOption

func FileSourceWithWhitelistedBlocks

func FileSourceWithWhitelistedBlocks(nums ...uint64) FileSourceOption

type ForkableObject

type ForkableObject interface {
	Cursorable
	Stepable
	ObjectWrapper
}

type ForkableSourceFactory

type ForkableSourceFactory interface {
	SourceFromBlockNum(uint64, Handler) Source // irreversible
	SourceFromCursor(*Cursor, Handler) Source
	SourceThroughCursor(uint64, *Cursor, Handler) Source
}

ForkableSourceFactory allows you to get a stream of fork-aware blocks from either a cursor or a final block

type Gate

type Gate interface {
	SetLogger(logger *zap.Logger)
}

type GateOption

type GateOption func(g Gate)

func GateOptionWithLogger

func GateOptionWithLogger(logger *zap.Logger) GateOption

type GateType

type GateType int

func (GateType) String

func (g GateType) String() string

type Gator

type Gator interface {
	Pass(block *pbbstream.Block) bool
}

type Handler

type Handler interface {
	ProcessBlock(blk *pbbstream.Block, obj interface{}) error
}

func WithHeadMetrics

func WithHeadMetrics(h Handler, blkNum *dmetrics.HeadBlockNum, blkDrift *dmetrics.HeadTimeDrift) Handler

type HandlerFunc

type HandlerFunc func(blk *pbbstream.Block, obj interface{}) error

func (HandlerFunc) ProcessBlock

func (h HandlerFunc) ProcessBlock(blk *pbbstream.Block, obj interface{}) error

type JoiningSource

type JoiningSource struct {
	*shutter.Shutter
	// contains filtered or unexported fields
}

JoiningSource joins an irreversible-only source (file) to a fork-aware source close to HEAD (live) 1) it tries to get the source from LiveSourceFactory (using startblock or cursor) 2) if it can't, it will ask the FileSourceFactory for a source of those blocks. 3) when it receives blocks from Filesource, it looks at LiveSource the JoiningSource will instantiate and run an 'initialSource' until it can bridge the gap

func NewJoiningSource

func NewJoiningSource(
	fileSourceFactory,
	liveSourceFactory ForkableSourceFactory,
	h Handler,
	startBlockNum uint64,
	cursor *Cursor,
	cursorIsTarget bool,
	logger *zap.Logger) *JoiningSource

func (*JoiningSource) Run

func (s *JoiningSource) Run()

type LowSourceLimitGetter

type LowSourceLimitGetter interface {
	LowestBlockNum() uint64
}

type MinimalBlockNumFilter

type MinimalBlockNumFilter struct {
	// contains filtered or unexported fields
}

MinimalBlockNumFilter does not let anything through that is under MinimalBlockNum

func NewMinimalBlockNumFilter

func NewMinimalBlockNumFilter(blockNum uint64, h Handler) *MinimalBlockNumFilter

func (*MinimalBlockNumFilter) ProcessBlock

func (f *MinimalBlockNumFilter) ProcessBlock(blk *pbbstream.Block, obj interface{}) error

type MockSource

type MockSource struct {
	*shutter.Shutter
	// contains filtered or unexported fields
}

func NewMockSource

func NewMockSource(blocks []*pbbstream.Block, handler Handler) *MockSource

func (*MockSource) Run

func (s *MockSource) Run()

func (*MockSource) SetLogger

func (s *MockSource) SetLogger(logger *zap.Logger)

type MultiplexedSource

type MultiplexedSource struct {
	*shutter.Shutter
	// contains filtered or unexported fields
}

MultiplexedSource contains a gator based on realtime

func NewMultiplexedSource

func NewMultiplexedSource(sourceFactories []SourceFactory, h Handler, opts ...MultiplexedSourceOption) *MultiplexedSource

func (*MultiplexedSource) Run

func (s *MultiplexedSource) Run()

func (*MultiplexedSource) SetLogger

func (s *MultiplexedSource) SetLogger(logger *zap.Logger)

type MultiplexedSourceOption

type MultiplexedSourceOption = func(s *MultiplexedSource)

func MultiplexedSourceWithLogger

func MultiplexedSourceWithLogger(logger *zap.Logger) MultiplexedSourceOption

type ObjectWrapper

type ObjectWrapper interface {
	WrappedObject() interface{}
}

type OneBlockDownloaderFunc

type OneBlockDownloaderFunc = func(ctx context.Context, oneBlockFile *OneBlockFile) (data []byte, err error)

func OneBlockDownloaderFromStore

func OneBlockDownloaderFromStore(blocksStore dstore.Store) OneBlockDownloaderFunc

type OneBlockFile

type OneBlockFile struct {
	sync.Mutex
	CanonicalName string
	Filenames     map[string]bool
	ID            string
	Num           uint64
	LibNum        uint64
	PreviousID    string
	MemoizeData   []byte
	Deleted       bool
}

OneBlockFile is the representation of a single block inside one or more duplicate files, before they are merged It has a truncated ID

func MustNewOneBlockFile

func MustNewOneBlockFile(fileName string) *OneBlockFile

func NewOneBlockFile

func NewOneBlockFile(fileName string) (*OneBlockFile, error)

func (*OneBlockFile) Data

func (f *OneBlockFile) Data(ctx context.Context, oneBlockDownloader OneBlockDownloaderFunc) ([]byte, error)

func (*OneBlockFile) String

func (f *OneBlockFile) String() string

func (*OneBlockFile) ToBstreamBlock

func (f *OneBlockFile) ToBstreamBlock() *pbbstream.Block

type OneBlocksSourceOption

type OneBlocksSourceOption func(*oneBlocksSource)

func OneBlocksSourceWithSkipperFunc

func OneBlocksSourceWithSkipperFunc(f func(string) bool) OneBlocksSourceOption

OneBlocksSourceWithSkipperFunc allows a lookup function to prevent downloading the same file over and over

type ParsableTestBlock

type ParsableTestBlock struct {
	ID        string `json:"id,omitempty"`
	ParentID  string `json:"prev,omitempty"`
	ParentNum uint64 `json:"prevnum,omitempty"`
	Number    uint64 `json:"num,omitempty"`
	LIBNum    uint64 `json:"libnum,omitempty"`
	Timestamp string `json:"time,omitempty"`
	Kind      int32  `json:"kind,omitempty"`
	Version   int32  `json:"version,omitempty"`
}

type PreprocessFunc

type PreprocessFunc func(blk *pbbstream.Block) (interface{}, error)

type PreprocessedBlock

type PreprocessedBlock struct {
	Block *pbbstream.Block
	Obj   interface{}
}

func (*PreprocessedBlock) ID

func (p *PreprocessedBlock) ID() string

func (*PreprocessedBlock) Num

func (p *PreprocessedBlock) Num() uint64

func (*PreprocessedBlock) String

func (p *PreprocessedBlock) String() string

type Preprocessor

type Preprocessor struct {
	// contains filtered or unexported fields
}

Preprocessor will run a preprocess func only if `obj` is empty or if it matches a ForkableObject where the WrappedObject() is nil

func NewPreprocessor

func NewPreprocessor(preprocFunc PreprocessFunc, next Handler) *Preprocessor

func (*Preprocessor) ProcessBlock

func (p *Preprocessor) ProcessBlock(blk *pbbstream.Block, obj interface{}) (err error)

type Pretty

type Pretty interface {
	Pretty() string
}

type Range

type Range struct {
	// contains filtered or unexported fields
}

func MustParseRange

func MustParseRange(in string, opts ...RangeOptions) *Range

func NewInclusiveRange

func NewInclusiveRange(startBlock, endBlock uint64) *Range

func NewOpenRange

func NewOpenRange(startBlock uint64) *Range

func NewRangeContaining

func NewRangeContaining(blockNum uint64, size uint64) (*Range, error)

func NewRangeExcludingEnd

func NewRangeExcludingEnd(startBlock, endBlock uint64) *Range

func ParseRange

func ParseRange(in string, opts ...RangeOptions) (*Range, error)

ParseRange will parse a range of format 5-10, by default it will make an inclusive start & end use options to set exclusive boundaries

func (*Range) Contains

func (r *Range) Contains(blockNum uint64) bool

func (*Range) EndBlock

func (r *Range) EndBlock() *uint64

func (*Range) Equals

func (r *Range) Equals(other *Range) bool

func (*Range) IsNext

func (r *Range) IsNext(next *Range, size uint64) bool

func (*Range) MarshalLogObject

func (r *Range) MarshalLogObject(enc zapcore.ObjectEncoder) error

func (*Range) Next

func (r *Range) Next(size uint64) *Range

func (*Range) Previous

func (r *Range) Previous(size uint64) *Range

func (*Range) ReachedEndBlock

func (r *Range) ReachedEndBlock(blockNum uint64) bool

func (*Range) Size

func (r *Range) Size() (uint64, error)

func (*Range) Split

func (r *Range) Split(chunkSize uint64) ([]*Range, error)

func (*Range) StartBlock

func (r *Range) StartBlock() uint64

func (*Range) String

func (r *Range) String() string

type RangeOptions

type RangeOptions func(p *Range) *Range

func WithExclusiveEnd

func WithExclusiveEnd() RangeOptions

func WithExclusiveStart

func WithExclusiveStart() RangeOptions

type RealtimeGate

type RealtimeGate struct {
	// contains filtered or unexported fields
}

func NewRealtimeGate

func NewRealtimeGate(timeToRealtime time.Duration, h Handler, opts ...GateOption) *RealtimeGate

func (*RealtimeGate) ProcessBlock

func (g *RealtimeGate) ProcessBlock(blk *pbbstream.Block, obj interface{}) error

func (*RealtimeGate) SetLogger

func (g *RealtimeGate) SetLogger(logger *zap.Logger)

type RealtimeTripper

type RealtimeTripper struct {
	// contains filtered or unexported fields
}

RealtimeTripper is a pass-through handler that executes a function before the first block goes through.

func NewRealtimeTripper

func NewRealtimeTripper(timeToRealtime time.Duration, tripFunc func(), h Handler, opts ...GateOption) *RealtimeTripper

func (*RealtimeTripper) ProcessBlock

func (t *RealtimeTripper) ProcessBlock(blk *pbbstream.Block, obj interface{}) error

func (*RealtimeTripper) SetLogger

func (t *RealtimeTripper) SetLogger(logger *zap.Logger)

type RecentBlockGetter

type RecentBlockGetter struct {
	// contains filtered or unexported fields
}

RecentBlockGetter requires a source that shuts down when ProcessBlock fails

func NewRecentBlockGetter

func NewRecentBlockGetter(sampleSize int) *RecentBlockGetter

func (*RecentBlockGetter) LatestBlock

func (g *RecentBlockGetter) LatestBlock() *pbbstream.Block

func (*RecentBlockGetter) ProcessBlock

func (g *RecentBlockGetter) ProcessBlock(blk *pbbstream.Block, obj interface{}) error

type Shutterer

type Shutterer interface {
	Shutdown(error)
	Terminating() <-chan struct{}
	IsTerminating() bool
	Terminated() <-chan struct{}
	IsTerminated() bool
	OnTerminating(f func(error))
	OnTerminated(f func(error))
	Err() error
}

type Source

type Source interface {
	Shutterer
	Run()
}

type SourceFactory

type SourceFactory func(h Handler) Source

type SourceFromNumFactory

type SourceFromNumFactory func(startBlockNum uint64, h Handler) Source

type SourceFromNumFactoryWithSkipFunc

type SourceFromNumFactoryWithSkipFunc func(startBlockNum uint64, h Handler, skipFunc func(idSuffix string) bool) Source

type SourceFromRefFactory

type SourceFromRefFactory func(startBlockRef BlockRef, h Handler) Source

type StepType

type StepType int

func (StepType) Matches

func (t StepType) Matches(t2 StepType) bool

func (StepType) String

func (t StepType) String() string

type Stepable

type Stepable interface {
	Step() StepType
	FinalBlockHeight() uint64
	ReorgJunctionBlock() BlockRef
}

type TailLock

type TailLock struct {
	sync.Mutex
	// contains filtered or unexported fields
}

TailLock manages inflight block queries, to feed into truncation mechanism, so it happens only when buffer is full, and no one is querying the blocks.

func NewTailLock

func NewTailLock(opts ...TailLockOption) *TailLock

func (*TailLock) LowerBound

func (g *TailLock) LowerBound() uint64

func (*TailLock) TailLock

func (g *TailLock) TailLock(blockNum uint64) (releaseFunc func())

type TailLockOption

type TailLockOption = func(s *TailLock)

func TailLockWithLogger

func TailLockWithLogger(logger *zap.Logger) TailLockOption

type Target

type Target string

type TestBlockIndexProvider

type TestBlockIndexProvider struct {
	Blocks           []uint64
	LastIndexedBlock uint64
	ThrowError       error
}

func (*TestBlockIndexProvider) BlocksInRange

func (t *TestBlockIndexProvider) BlocksInRange(lowBlockNum uint64, bundleSize uint64) (out []uint64, err error)

type TestBlockReader

type TestBlockReader struct {
	// contains filtered or unexported fields
}

func (*TestBlockReader) Read

func (r *TestBlockReader) Read() (*pbbstream.Block, error)

type TestBlockReaderBin

type TestBlockReaderBin struct {
	DBinReader *dbin.Reader
}

func (*TestBlockReaderBin) Read

func (l *TestBlockReaderBin) Read() (*pbbstream.Block, error)

type TestBlockWriterBin

type TestBlockWriterBin struct {
	DBinWriter *dbin.Writer
}

func (*TestBlockWriterBin) Write

func (w *TestBlockWriterBin) Write(blk *pbbstream.Block) error

type TestSource

type TestSource struct {
	*shutter.Shutter

	StartBlockID      string
	StartBlockNum     uint64
	Cursor            *Cursor
	PassThroughCursor bool
	// contains filtered or unexported fields
}

func NewTestSource

func NewTestSource(h Handler) *TestSource

func (*TestSource) Push

func (t *TestSource) Push(b *pbbstream.Block, obj interface{}) error

func (*TestSource) Run

func (t *TestSource) Run()

func (*TestSource) SetLogger

func (t *TestSource) SetLogger(logger *zap.Logger)

type TestSourceFactory

type TestSourceFactory struct {
	Created           chan *TestSource
	FromBlockNumFunc  func(uint64, Handler) Source
	FromCursorFunc    func(*Cursor, Handler) Source
	ThroughCursorFunc func(uint64, *Cursor, Handler) Source
	LowestBlkNum      uint64
}

func NewTestSourceFactory

func NewTestSourceFactory() *TestSourceFactory

func (*TestSourceFactory) LowestBlockNum

func (t *TestSourceFactory) LowestBlockNum() uint64

func (*TestSourceFactory) NewSource

func (t *TestSourceFactory) NewSource(h Handler) Source

func (*TestSourceFactory) NewSourceFromRef

func (t *TestSourceFactory) NewSourceFromRef(ref BlockRef, h Handler) Source

func (*TestSourceFactory) SourceFromBlockNum

func (t *TestSourceFactory) SourceFromBlockNum(blockNum uint64, h Handler) Source

func (*TestSourceFactory) SourceFromCursor

func (t *TestSourceFactory) SourceFromCursor(cursor *Cursor, h Handler) Source

func (*TestSourceFactory) SourceThroughCursor

func (t *TestSourceFactory) SourceThroughCursor(start uint64, cursor *Cursor, h Handler) Source

type TimeThresholdGator

type TimeThresholdGator struct {
	// contains filtered or unexported fields
}

func NewTimeThresholdGator

func NewTimeThresholdGator(threshold time.Duration, opts ...GateOption) *TimeThresholdGator

func (*TimeThresholdGator) Pass

func (g *TimeThresholdGator) Pass(block *pbbstream.Block) bool

func (*TimeThresholdGator) SetLogger

func (g *TimeThresholdGator) SetLogger(logger *zap.Logger)

type Tracker

type Tracker struct {
	// contains filtered or unexported fields
}

Tracker tracks the chain progress and block history. Allows many processes to take decisions on the state of different pieces being sync'd (live) or in catch-up mode.

func NewTracker

func NewTracker(nearBlocksCount uint64) *Tracker

func (*Tracker) AddGetter

func (t *Tracker) AddGetter(target Target, f BlockRefGetter)

func (*Tracker) Clone

func (t *Tracker) Clone() *Tracker

func (*Tracker) Get

func (t *Tracker) Get(ctx context.Context, target Target) (BlockRef, error)

func (*Tracker) GetRelativeBlock

func (t *Tracker) GetRelativeBlock(ctx context.Context, potentiallyNegativeBlockNum int64, target Target) (uint64, error)

func (*Tracker) IsNear

func (t *Tracker) IsNear(ctx context.Context, from Target, to Target) (bool, error)

func (*Tracker) IsNearManualCheck

func (t *Tracker) IsNearManualCheck(from, to uint64) bool

IsNearManualCheck allows you to manually check two "already resolved values" for nearness.

func (*Tracker) IsNearWithResults

func (t *Tracker) IsNearWithResults(ctx context.Context, from Target, to Target) (fromBlockRef BlockRef, toBlockRef BlockRef, isNear bool, err error)

IsNearWithResults returns BlockRefs for the two targets. It can short-circuit the lookup for `from` if `to` is near the beginning of the chain, within the `nearBlocksCount`, in which case `fromBlockRef` will be nil.

func (*Tracker) SetNearBlocksCount

func (t *Tracker) SetNearBlocksCount(count int64)

Directories

Path Synopsis
pb

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL