delivery

package
v1.4.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 31, 2023 License: MIT Imports: 13 Imported by: 6

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrPrefixMissing  = fmt.Errorf("prefix missing")
	ErrPrefixOverlap  = fmt.Errorf("producers must not have overlapping prefixes")
	ErrUnknownBatch   = fmt.Errorf("unknown batch")
	ErrInvalidBatch   = fmt.Errorf("invalid batch")
	ErrPrefixConflict = fmt.Errorf("deletes, updates, and batches cannot have prefix conflicts")
)

Functions

func AvroInt

func AvroInt(b int) []byte

func Ready added in v0.0.23

func Ready()

Ready should be called when the transport is synced up. This sets up metrics to start recording deltas once caught up, so the catchup process doesn't taint the metrics

Types

type Batch

type Batch struct {
	Number     int64                  `avro:"num"`
	Weight     []byte                 `avro:"weight"`
	ParentHash types.Hash             `avro:"parent"`
	Updates    map[string]BatchRecord `avro:"updates"`
	Hash       types.Hash
}

func UnmarshalBatch

func UnmarshalBatch(data []byte) (*Batch, error)

func (*Batch) EncodeAvro

func (b *Batch) EncodeAvro() []byte

type BatchRecord

type BatchRecord struct {
	Value    []byte     `avro:"value"`
	Count    int        `avro:"count"`
	Subbatch types.Hash `avro:"subbatch"`
	Delete   bool       `avro:"delete"`
}

type ChainUpdate

type ChainUpdate struct {
	// contains filtered or unexported fields
}

func (*ChainUpdate) Added

func (c *ChainUpdate) Added() []*PendingBatch

func (*ChainUpdate) Removed

func (c *ChainUpdate) Removed() []*PendingBatch

type Message

type Message interface {
	Key() []byte
	Value() []byte
}

type MessageProcessor

type MessageProcessor struct {
	// contains filtered or unexported fields
}

MessageProcessor aggregates messages and emits completed blocks. Messages will be discarded if their blocks are older than lastEmittedNum - reorgThreshold, but otherwise the will be emitted in the order they are completed, even if they are older than the initial block or there are gaps between the last emitted block and this one. Subsequent layers will be used to ensure blocks are emitted in something resembling a sensible order and that reorgs are handled.

func NewMessageProcessor

func NewMessageProcessor(lastEmittedNum int64, reorgThreshold int64, trackedPrefixes []*regexp.Regexp) *MessageProcessor

NewMessageProcessor instantiates a MessageProcessor. lastEmittedNum should indicate the last block number processed by this system for resumption. Blocks older than lastEmittedNum - reorgThreshold will be discarded. trackedPrefixes is a list of regular expressions indicating which messages are of interest to this consumer - use '.*' to get all messages.

func (*MessageProcessor) ProcessCompleteBatch added in v0.0.38

func (mp *MessageProcessor) ProcessCompleteBatch(pb *PendingBatch)

func (*MessageProcessor) ProcessMessage

func (mp *MessageProcessor) ProcessMessage(m ResumptionMessage) error

ProcessMessage takes in messages and assembles completed blocks of messages.

func (*MessageProcessor) Subscribe

func (mp *MessageProcessor) Subscribe(ch chan<- *PendingBatch) types.Subscription

Subscribe to pending batches.

func (*MessageProcessor) SubscribeReorgs

func (mp *MessageProcessor) SubscribeReorgs(ch chan<- map[int64]types.Hash) types.Subscription

Subscribe to reorg notifications

func (*MessageProcessor) WhyNotReady

func (mp *MessageProcessor) WhyNotReady(hash types.Hash) string

type MessageType

type MessageType byte
const (
	BatchType          MessageType = iota // BatchType.$Hash
	SubBatchHeaderType                    // SubBatchHeaderType.$Hash.$BatchId
	SubBatchMsgType                       // SubBatchMsgType.$hash.$BatchId.$Index
	BatchMsgType                          // BatchMsgType.$hash./path/
	BatchDeleteMsgType                    // BatchDeleteMsgType.$hash./path/
	ReorgType                             // ReorgType.$Hash
	ReorgCompleteType                     // ReorgCompleteType.$Hash
	PingType           MessageType = 0xff
)

func (MessageType) GetKey

func (mt MessageType) GetKey(components ...[]byte) []byte

type OrderedMessageProcessor

type OrderedMessageProcessor struct {
	// contains filtered or unexported fields
}

func NewOrderedMessageProcessor

func NewOrderedMessageProcessor(lastNumber int64, lastHash types.Hash, lastWeight *big.Int, reorgThreshold int64, trackedPrefixes []*regexp.Regexp, whitelist map[uint64]types.Hash) (*OrderedMessageProcessor, error)

func (*OrderedMessageProcessor) Close

func (omp *OrderedMessageProcessor) Close()

func (*OrderedMessageProcessor) HandlePendingBatch

func (omp *OrderedMessageProcessor) HandlePendingBatch(pb *PendingBatch, reorg bool)

func (*OrderedMessageProcessor) HandleReorg

func (omp *OrderedMessageProcessor) HandleReorg(num int64, hash types.Hash)

func (*OrderedMessageProcessor) ProcessCompleteBatch added in v0.0.38

func (omp *OrderedMessageProcessor) ProcessCompleteBatch(pb *PendingBatch)

func (*OrderedMessageProcessor) ProcessMessage

func (omp *OrderedMessageProcessor) ProcessMessage(m ResumptionMessage) error

func (*OrderedMessageProcessor) Subscribe

func (omp *OrderedMessageProcessor) Subscribe(ch interface{}) types.Subscription

Subscribe enables subscribing to either oredred chain updates or unordered pending batches. Calling Subscribe on a chan *ChainUpdate will return a subscription for ordered chain updates. Calling subscribe on a *PendingBatch will return a subscription for unordered pending batches.

func (*OrderedMessageProcessor) SubscribeReorg

func (omp *OrderedMessageProcessor) SubscribeReorg(ch chan<- map[int64]types.Hash) types.Subscription

func (*OrderedMessageProcessor) WhyNotReady

func (omp *OrderedMessageProcessor) WhyNotReady(hash types.Hash) string

type PendingBatch

type PendingBatch struct {
	Number     int64
	Weight     *big.Int
	ParentHash types.Hash
	Hash       types.Hash
	Values     map[string][]byte
	Deletes    map[string]struct{}
	// contains filtered or unexported fields
}

func (*PendingBatch) ApplyMessage

func (pb *PendingBatch) ApplyMessage(m ResumptionMessage) bool

func (*PendingBatch) DecPrefixCounter

func (pb *PendingBatch) DecPrefixCounter(p string) bool

func (*PendingBatch) Done

func (pb *PendingBatch) Done()

Done should be called on a pending batch after it has been applied.

func (*PendingBatch) Ready

func (pb *PendingBatch) Ready() bool

func (*PendingBatch) Resumption

func (pb *PendingBatch) Resumption() string

type Producer

type Producer struct {
	// contains filtered or unexported fields
}

func NewProducer

func NewProducer(defaultTopic string, schema map[string]string) (*Producer, error)

func (*Producer) AddBlock

func (p *Producer) AddBlock(number int64, hash, parentHash types.Hash, weight *big.Int, updates map[string][]byte, deletes map[string]struct{}, batches map[string]types.Hash) (map[string][]Message, error)

func (*Producer) Reorg

func (p *Producer) Reorg(number int64, hash types.Hash) Message

Reorg should be called for large reorgs (> reorgThreshold). The number and hash should correspond to the common ancestor between the two reorged chains.

func (*Producer) ReorgDone

func (p *Producer) ReorgDone(number int64, hash types.Hash) Message

ReorgDone should be called at the end of a large reorg, to indicate to the consumer that all messages for the reorg are available

func (*Producer) SendBatch

func (p *Producer) SendBatch(batchid types.Hash, deletes []string, update map[string][]byte) (map[string][]Message, error)

type ResumptionMessage

type ResumptionMessage interface {
	Message
	Offset() int64
	Source() string
	Time() time.Time
}

type SubBatchRecord

type SubBatchRecord struct {
	Delete  []string          `avro:"delete"`
	Updates map[string][]byte `avro:"updates"`
}

func (*SubBatchRecord) EncodeAvro

func (b *SubBatchRecord) EncodeAvro() []byte

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL