txpool

package
v0.0.0-...-1f8a15b Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 12, 2023 License: Apache-2.0 Imports: 54 Imported by: 0

Documentation

Index

Constants

View Source
const (
	NoNonceGaps       = 0b010000
	EnoughBalance     = 0b001000
	NotTooMuchGas     = 0b000100
	EnoughFeeCapBlock = 0b000010
	IsLocal           = 0b000001

	BaseFeePoolBits = NoNonceGaps + EnoughBalance + NotTooMuchGas
)

Variables

View Source
var ErrPoolDisabled = fmt.Errorf("TxPool Disabled")
View Source
var PoolChainConfigKey = []byte("chain_config")
View Source
var PoolLastSeenBlockKey = []byte("last_seen_block")
View Source
var PoolPendingBaseFeeKey = []byte("pending_base_fee")
View Source
var PoolPendingBlobFeeKey = []byte("pending_blob_fee")
View Source
var TxPoolAPIVersion = &types2.VersionReply{Major: 1, Minor: 0, Patch: 0}

TxPoolAPIVersion

Functions

func ChainConfig

func ChainConfig(tx kv.Getter) (*chain.Config, error)

func LastSeenBlock

func LastSeenBlock(tx kv.Getter) (uint64, error)

func MainLoop

func MainLoop(ctx context.Context, db kv.RwDB, coreDB kv.RoDB, p *TxPool, newTxs chan types.Announcements, send *Send, newSlotsStreams *NewSlotsStreams, notifyMiningAboutNewSlots func())

MainLoop - does: send pending byHash to p2p:

  • new byHash
  • all pooled byHash to recently connected peers
  • all local pooled byHash to random peers periodically

promote/demote transactions reorgs

func PutChainConfig

func PutChainConfig(tx kv.Putter, cc *chain.Config, buf []byte) error

func PutLastSeenBlock

func PutLastSeenBlock(tx kv.Putter, n uint64, buf []byte) error

func SortByNonceLess

func SortByNonceLess(a, b *metaTx) bool

func StartGrpc

func StartGrpc(txPoolServer txpool_proto.TxpoolServer, miningServer txpool_proto.MiningServer, addr string, creds *credentials.TransportCredentials, logger log.Logger) (*grpc.Server, error)

Types

type BestQueue

type BestQueue struct {
	// contains filtered or unexported fields
}

func (BestQueue) Len

func (p BestQueue) Len() int

func (BestQueue) Less

func (p BestQueue) Less(i, j int) bool

func (*BestQueue) Pop

func (p *BestQueue) Pop() interface{}

func (*BestQueue) Push

func (p *BestQueue) Push(x interface{})

func (BestQueue) Swap

func (p BestQueue) Swap(i, j int)

type BySenderAndNonce

type BySenderAndNonce struct {
	// contains filtered or unexported fields
}

BySenderAndNonce - designed to perform most expensive operation in TxPool: "recalculate all ephemeral fields of all transactions" by algo

  • for all senders - iterate over all transactions in nonce growing order

Performances decisions:

  • All senders stored inside 1 large BTree - because iterate over 1 BTree is faster than over map[senderId]BTree
  • sortByNonce used as non-pointer wrapper - because iterate over BTree of pointers is 2x slower

type Fetch

type Fetch struct {
	// contains filtered or unexported fields
}

Fetch connects to sentry and implements eth/66 protocol regarding the transaction messages. It tries to "prime" the sentry with StatusData message containing given genesis hash and list of forks, but with zero max block and total difficulty Sentry should have a logic not to overwrite statusData with messages from tx pool

func NewFetch

func NewFetch(ctx context.Context, sentryClients []direct.SentryClient, pool Pool, stateChangesClient StateChangesClient, coreDB kv.RoDB, db kv.RwDB,
	chainID uint256.Int, logger log.Logger) *Fetch

NewFetch creates a new fetch object that will work with given sentry clients. Since the SentryClient here is an interface, it is suitable for mocking in tests (mock will need to implement all the functions of the SentryClient interface).

func (*Fetch) ConnectCore

func (f *Fetch) ConnectCore()

func (*Fetch) ConnectSentries

func (f *Fetch) ConnectSentries()

ConnectSentries initialises connection to the sentry

func (*Fetch) SetWaitGroup

func (f *Fetch) SetWaitGroup(wg *sync.WaitGroup)

type GrpcDisabled

type GrpcDisabled struct {
	txpool_proto.UnimplementedTxpoolServer
}

func (*GrpcDisabled) Add

func (*GrpcDisabled) All

func (*GrpcDisabled) FindUnknown

func (*GrpcDisabled) Nonce

func (*GrpcDisabled) OnAdd

func (*GrpcDisabled) Pending

func (*GrpcDisabled) Status

func (*GrpcDisabled) Transactions

func (*GrpcDisabled) Version

func (*GrpcDisabled) Version(ctx context.Context, empty *emptypb.Empty) (*types2.VersionReply, error)

type GrpcServer

type GrpcServer struct {
	txpool_proto.UnimplementedTxpoolServer

	NewSlotsStreams *NewSlotsStreams
	// contains filtered or unexported fields
}

func NewGrpcServer

func NewGrpcServer(ctx context.Context, txPool txPool, db kv.RoDB, chainID uint256.Int, logger log.Logger) *GrpcServer

func (*GrpcServer) Add

func (*GrpcServer) All

func (*GrpcServer) FindUnknown

func (*GrpcServer) Nonce

returns nonce for address

func (*GrpcServer) OnAdd

func (*GrpcServer) Pending

func (*GrpcServer) Status

func (*GrpcServer) Version

type MockSentry

type MockSentry struct {
	*sentry.SentryServerMock

	StreamWg sync.WaitGroup
	// contains filtered or unexported fields
}

func NewMockSentry

func NewMockSentry(ctx context.Context) *MockSentry

func (*MockSentry) HandShake

func (*MockSentry) Messages

func (*MockSentry) PeerEvents

func (*MockSentry) Send

func (ms *MockSentry) Send(req *sentry.InboundMessage) (errs []error)

func (*MockSentry) SetStatus

type NewSlotsStreams

type NewSlotsStreams struct {
	// contains filtered or unexported fields
}

NewSlotsStreams - it's safe to use this class as non-pointer

func (*NewSlotsStreams) Add

func (s *NewSlotsStreams) Add(stream txpool_proto.Txpool_OnAddServer) (remove func())

func (*NewSlotsStreams) Broadcast

func (s *NewSlotsStreams) Broadcast(reply *txpool_proto.OnAddReply, logger log.Logger)

type PendingPool

type PendingPool struct {
	// contains filtered or unexported fields
}

PendingPool - is different from other pools - it's best is Slice instead of Heap It's more expensive to maintain "slice sort" invariant, but it allow do cheap copy of pending.best slice for mining (because we consider txs and metaTx are immutable)

func NewPendingSubPool

func NewPendingSubPool(t SubPoolType, limit int) *PendingPool

func (*PendingPool) Add

func (p *PendingPool) Add(i *metaTx, logger log.Logger)

func (*PendingPool) Best

func (p *PendingPool) Best() *metaTx

func (*PendingPool) DebugPrint

func (p *PendingPool) DebugPrint(prefix string)

func (*PendingPool) EnforceBestInvariants

func (p *PendingPool) EnforceBestInvariants()

func (*PendingPool) EnforceWorstInvariants

func (p *PendingPool) EnforceWorstInvariants()

func (*PendingPool) Len

func (p *PendingPool) Len() int

func (*PendingPool) PopWorst

func (p *PendingPool) PopWorst() *metaTx

func (*PendingPool) Remove

func (p *PendingPool) Remove(i *metaTx)

func (*PendingPool) Updated

func (p *PendingPool) Updated(mt *metaTx)

func (*PendingPool) Worst

func (p *PendingPool) Worst() *metaTx

type Pool

type Pool interface {
	ValidateSerializedTxn(serializedTxn []byte) error

	// Handle 3 main events - new remote txs from p2p, new local txs from RPC, new blocks from execution layer
	AddRemoteTxs(ctx context.Context, newTxs types.TxSlots)
	AddLocalTxs(ctx context.Context, newTxs types.TxSlots, tx kv.Tx) ([]txpoolcfg.DiscardReason, error)
	OnNewBlock(ctx context.Context, stateChanges *remote.StateChangeBatch, unwindTxs, minedTxs types.TxSlots, tx kv.Tx) error
	// IdHashKnown check whether transaction with given Id hash is known to the pool
	IdHashKnown(tx kv.Tx, hash []byte) (bool, error)
	FilterKnownIdHashes(tx kv.Tx, hashes types.Hashes) (unknownHashes types.Hashes, err error)
	Started() bool
	GetRlp(tx kv.Tx, hash []byte) ([]byte, error)
	GetKnownBlobTxn(tx kv.Tx, hash []byte) (*metaTx, error)

	AddNewGoodPeer(peerID types.PeerID)
}

Pool is interface for the transaction pool This interface exists for the convenience of testing, and not yet because there are multiple implementations

type Send

type Send struct {
	// contains filtered or unexported fields
}

Send - does send concrete P2P messages to Sentry. Same as Fetch but for outbound traffic does not initiate any messages by self

func NewSend

func NewSend(ctx context.Context, sentryClients []direct.SentryClient, pool Pool, logger log.Logger) *Send

func (*Send) AnnouncePooledTxs

func (f *Send) AnnouncePooledTxs(types []byte, sizes []uint32, hashes types2.Hashes, maxPeers uint64) (hashSentTo []int)

func (*Send) BroadcastPooledTxs

func (f *Send) BroadcastPooledTxs(rlps [][]byte, maxPeers uint64) (txSentTo []int)

Broadcast given RLPs to random peers

func (*Send) PropagatePooledTxsToPeersList

func (f *Send) PropagatePooledTxsToPeersList(peers []types2.PeerID, types []byte, sizes []uint32, hashes []byte)

func (*Send) SetWaitGroup

func (f *Send) SetWaitGroup(wg *sync.WaitGroup)

type SentryClient

type SentryClient interface {
	sentry.SentryClient
	Protocol() uint
}

type StateChangesClient

type StateChangesClient interface {
	StateChanges(ctx context.Context, in *remote.StateChangeRequest, opts ...grpc.CallOption) (remote.KV_StateChangesClient, error)
}

type SubPool

type SubPool struct {
	// contains filtered or unexported fields
}

func NewSubPool

func NewSubPool(t SubPoolType, limit int) *SubPool

func (*SubPool) Add

func (p *SubPool) Add(i *metaTx, logger log.Logger)

func (*SubPool) Best

func (p *SubPool) Best() *metaTx

func (*SubPool) DebugPrint

func (p *SubPool) DebugPrint(prefix string)

func (*SubPool) EnforceInvariants

func (p *SubPool) EnforceInvariants()

func (*SubPool) Len

func (p *SubPool) Len() int

func (*SubPool) PopBest

func (p *SubPool) PopBest() *metaTx

func (*SubPool) PopWorst

func (p *SubPool) PopWorst() *metaTx

func (*SubPool) Remove

func (p *SubPool) Remove(i *metaTx)

func (*SubPool) Updated

func (p *SubPool) Updated(i *metaTx)

func (*SubPool) Worst

func (p *SubPool) Worst() *metaTx

type SubPoolMarker

type SubPoolMarker uint8

SubPoolMarker is an ordered bitset of five bits that's used to sort transactions into sub-pools. Bits meaning: 1. Absence of nonce gaps. Set to 1 for transactions whose nonce is N, state nonce for the sender is M, and there are transactions for all nonces between M and N from the same sender. Set to 0 is the transaction's nonce is divided from the state nonce by one or more nonce gaps. 2. Sufficient balance for gas. Set to 1 if the balance of sender's account in the state is B, nonce of the sender in the state is M, nonce of the transaction is N, and the sum of feeCap x gasLimit + transferred_value of all transactions from this sender with nonces N+1 ... M is no more than B. Set to 0 otherwise. In other words, this bit is set if there is currently a guarantee that the transaction and all its required prior transactions will be able to pay for gas. 3. Not too much gas: Set to 1 if the transaction doesn't use too much gas 4. Dynamic fee requirement. Set to 1 if feeCap of the transaction is no less than baseFee of the currently pending block. Set to 0 otherwise. 5. Local transaction. Set to 1 if transaction is local.

type SubPoolType

type SubPoolType uint8
const BaseFeeSubPool SubPoolType = 2
const PendingSubPool SubPoolType = 1
const QueuedSubPool SubPoolType = 3

func (SubPoolType) String

func (sp SubPoolType) String() string

type TxPool

type TxPool struct {
	// contains filtered or unexported fields
}

TxPool - holds all pool-related data structures and lock-based tiny methods most of logic implemented by pure tests-friendly functions

txpool doesn't start any goroutines - "leave concurrency to user" design txpool has no DB-TX fields - "leave db transactions management to user" design txpool has _chainDB field - but it must maximize local state cache hit-rate - and perform minimum _chainDB transactions

It preserve TxSlot objects immutable

func New

func New(newTxs chan types.Announcements, coreDB kv.RoDB, cfg txpoolcfg.Config, cache kvcache.Cache,
	chainID uint256.Int, shanghaiTime, agraBlock, cancunTime *big.Int, maxBlobsPerBlock uint64, logger log.Logger,
) (*TxPool, error)

func (*TxPool) AddLocalTxs

func (p *TxPool) AddLocalTxs(ctx context.Context, newTransactions types.TxSlots, tx kv.Tx) ([]txpoolcfg.DiscardReason, error)

func (*TxPool) AddNewGoodPeer

func (p *TxPool) AddNewGoodPeer(peerID types.PeerID)

func (*TxPool) AddRemoteTxs

func (p *TxPool) AddRemoteTxs(_ context.Context, newTxs types.TxSlots)

func (*TxPool) AppendAllAnnouncements

func (p *TxPool) AppendAllAnnouncements(types []byte, sizes []uint32, hashes []byte) ([]byte, []uint32, []byte)

func (*TxPool) AppendLocalAnnouncements

func (p *TxPool) AppendLocalAnnouncements(types []byte, sizes []uint32, hashes []byte) ([]byte, []uint32, []byte)

func (*TxPool) AppendRemoteAnnouncements

func (p *TxPool) AppendRemoteAnnouncements(types []byte, sizes []uint32, hashes []byte) ([]byte, []uint32, []byte)

func (*TxPool) CountContent

func (p *TxPool) CountContent() (int, int, int)

func (*TxPool) FilterKnownIdHashes

func (p *TxPool) FilterKnownIdHashes(tx kv.Tx, hashes types.Hashes) (unknownHashes types.Hashes, err error)

func (*TxPool) GetKnownBlobTxn

func (p *TxPool) GetKnownBlobTxn(tx kv.Tx, hash []byte) (*metaTx, error)

func (*TxPool) GetRlp

func (p *TxPool) GetRlp(tx kv.Tx, hash []byte) ([]byte, error)

func (*TxPool) IdHashKnown

func (p *TxPool) IdHashKnown(tx kv.Tx, hash []byte) (bool, error)

func (*TxPool) IsLocal

func (p *TxPool) IsLocal(idHash []byte) bool

func (*TxPool) NonceFromAddress

func (p *TxPool) NonceFromAddress(addr [20]byte) (nonce uint64, inPool bool)

func (*TxPool) OnNewBlock

func (p *TxPool) OnNewBlock(ctx context.Context, stateChanges *remote.StateChangeBatch, unwindTxs, minedTxs types.TxSlots, tx kv.Tx) error

func (*TxPool) PeekBest

func (p *TxPool) PeekBest(n uint16, txs *types.TxsRlp, tx kv.Tx, onTopOf, availableGas, availableBlobGas uint64) (bool, error)

func (*TxPool) ResetYieldedStatus

func (p *TxPool) ResetYieldedStatus()

func (*TxPool) Started

func (p *TxPool) Started() bool

func (*TxPool) ValidateSerializedTxn

func (p *TxPool) ValidateSerializedTxn(serializedTxn []byte) error

Check that that the serialized txn should not exceed a certain max size

func (*TxPool) YieldBest

func (p *TxPool) YieldBest(n uint16, txs *types.TxsRlp, tx kv.Tx, onTopOf, availableGas, availableBlobGas uint64, toSkip mapset.Set[[32]byte]) (bool, int, error)

type WorstQueue

type WorstQueue struct {
	// contains filtered or unexported fields
}

func (WorstQueue) Len

func (p WorstQueue) Len() int

func (WorstQueue) Less

func (p WorstQueue) Less(i, j int) bool

func (*WorstQueue) Pop

func (p *WorstQueue) Pop() interface{}

func (*WorstQueue) Push

func (p *WorstQueue) Push(x interface{})

func (WorstQueue) Swap

func (p WorstQueue) Swap(i, j int)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL