mempool

package
v0.0.0-...-f47aca1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 10, 2020 License: Apache-2.0 Imports: 25 Imported by: 0

Documentation

Overview

TODO: Better handle abci client errors. (make it automatically handle connection errors)

Index

Constants

View Source
const (
	MempoolChannel = byte(0x30)

	// UnknownPeerID 是在没有peer(例如RPC)的情况下运行CheckTx时要使用的peerID。
	UnknownPeerID uint16 = 0
)

Reactor组件之一:Mempool

View Source
const (
	// MetricsSubsystem is a subsystem shared by all metrics exposed by this
	// package.
	MetricsSubsystem = "mempool"
)

Variables

View Source
var (
	// ErrTxInCache is returned to the client if we saw tx earlier
	ErrTxInCache = errors.New("tx already exists in cache")
)

Functions

func IsPreCheckError

func IsPreCheckError(err error) bool

IsPreCheckError returns true if err is due to pre check failure.

func RegisterMessages

func RegisterMessages(cdc *amino.Codec)

Types

type CListMempool

type CListMempool struct {
	// contains filtered or unexported fields
}

CListMempool是一个有序的内存池,用于在达成共识回合之前提出交易。 在将交易添加到池之前,使用CheckTx abci消息检查交易有效性。 内存池使用并发列表结构来存储可以由多个并发读取器有效访问的事务。

func NewCListMempool

func NewCListMempool(
	config *cfg.MempoolConfig,
	proxyAppConn proxy.AppConnMempool,
	height int64,
	options ...CListMempoolOption,
) *CListMempool

NewCListMempool 返回具有给定配置和与应用程序的连接的新内存池。

func (*CListMempool) CheckTx

func (mem *CListMempool) CheckTx(tx types.Tx, cb func(*abci.Response), txInfo TxInfo) error

当我们在等待Update()或Reap()时,会阻塞。 cb:来自CheckTx命令的回调。它从另一个goroutine被调用。 常识:要么cb将被调用,要么err返回。

多个goroutine并发使用安全

func (*CListMempool) CloseWAL

func (mem *CListMempool) CloseWAL()

func (*CListMempool) EnableTxsAvailable

func (mem *CListMempool) EnableTxsAvailable()

NOTE: not thread safe - should only be called once, on startup

func (*CListMempool) Flush

func (mem *CListMempool) Flush()

XXX:不安全! 调用Flush可能会使内存池处于不一致状态.

func (*CListMempool) FlushAppConn

func (mem *CListMempool) FlushAppConn() error

Lock() must be help by the caller during execution.

func (*CListMempool) InitWAL

func (mem *CListMempool) InitWAL() error

func (*CListMempool) Lock

func (mem *CListMempool) Lock()

多个goroutine并发使用安全

func (*CListMempool) ReapMaxBytesMaxGas

func (mem *CListMempool) ReapMaxBytesMaxGas(maxBytes, maxGas int64) types.Txs

多goroutine并发安全

func (*CListMempool) ReapMaxTxs

func (mem *CListMempool) ReapMaxTxs(max int) types.Txs

多goroutine并发安全

func (*CListMempool) SetLogger

func (mem *CListMempool) SetLogger(l log.Logger)

SetLogger 设置日志

func (*CListMempool) Size

func (mem *CListMempool) Size() int

多个goroutine并发使用安全

func (*CListMempool) TxsAvailable

func (mem *CListMempool) TxsAvailable() <-chan struct{}

多goroutine并发安全

func (*CListMempool) TxsBytes

func (mem *CListMempool) TxsBytes() int64

多个goroutine并发使用安全

func (*CListMempool) TxsFront

func (mem *CListMempool) TxsFront() *clist.CElement

TxsFront返回排序列表中的第一个交易,以便peer goroutine调用.NextWait()。 FIXME: leaking implementation details!

多个goroutine并发使用安全

func (*CListMempool) TxsWaitChan

func (mem *CListMempool) TxsWaitChan() <-chan struct{}

TxsWaitChan 返回等待交易的通道。一旦内存池不为空,它将关闭(即内部`mem.txs`具有至少一个元素)

多个goroutine并发使用安全

func (*CListMempool) Unlock

func (mem *CListMempool) Unlock()

多个goroutine并发使用安全

func (*CListMempool) Update

func (mem *CListMempool) Update(
	height int64,
	txs types.Txs,
	deliverTxResponses []*abci.ResponseDeliverTx,
	preCheck PreCheckFunc,
	postCheck PostCheckFunc,
) error

Lock() must be help by the caller during execution.

type CListMempoolOption

type CListMempoolOption func(*CListMempool)

CListMempoolOption 在内存池上设置一个可选参数。

func WithMetrics

func WithMetrics(metrics *Metrics) CListMempoolOption

WithMetrics 设置监控器。

func WithPostCheck

func WithPostCheck(f PostCheckFunc) CListMempoolOption

WithPostCheck设置一个过滤器,让内存池在f(tx)返回false时拒绝tx。 这是在CheckTx之后运行的。

func WithPreCheck

func WithPreCheck(f PreCheckFunc) CListMempoolOption

WithPreCheck 设置一个过滤器,让内存池在f(tx)返回false时拒绝tx。 这是在CheckTx之前运行的。

type ErrMempoolIsFull

type ErrMempoolIsFull struct {
	// contains filtered or unexported fields
}

ErrMempoolIsFull means Tendermint & an application can't handle that much load

func (ErrMempoolIsFull) Error

func (e ErrMempoolIsFull) Error() string

type ErrPreCheck

type ErrPreCheck struct {
	Reason error
}

ErrPreCheck is returned when tx is too big

func (ErrPreCheck) Error

func (e ErrPreCheck) Error() string

type ErrTxTooLarge

type ErrTxTooLarge struct {
	// contains filtered or unexported fields
}

ErrTxTooLarge means the tx is too big to be sent in a message to other peers

func (ErrTxTooLarge) Error

func (e ErrTxTooLarge) Error() string

type Mempool

type Mempool interface {
	// CheckTx对应用程序执行交易,以确定其有效性以及是否应将其添加到内存池。
	CheckTx(tx types.Tx, callback func(*abci.Response), txInfo TxInfo) error

	// ReapMaxBytesMaxGas可从内存池中提取交易,直至总计maxBytes个字节,条件是gasWanted的总数必须小于maxGas。
	// 如果两个最大值都为负,则所有返回的交易(〜所有可用交易)的大小没有上限。
	ReapMaxBytesMaxGas(maxBytes, maxGas int64) types.Txs

	// ReapMaxTxs 从内存池中提取出max个交易。
	// 如果max是个负数,则所有返回的交易(〜所有可用交易)的大小没有上限。
	ReapMaxTxs(max int) types.Txs

	// Lock 锁定内存池,共识模块必须使用锁来保证安全更新
	Lock()

	// Unlock 解锁内存池
	Unlock()

	// Update 通知内存池,给定的这些交易已经被提交,可以被丢弃。
	// 注意:此方法必须在区块被提交之后再调用
	// 注意:Lock/Unlock必须被调用者管理
	Update(
		blockHeight int64,
		blockTxs types.Txs,
		deliverTxResponses []*abci.ResponseDeliverTx,
		newPreFn PreCheckFunc,
		newPostFn PostCheckFunc,
	) error

	// FlushAppConn刷新内存池连接,以确保完成异步reqResCb调用。例如,来自CheckTx。
	FlushAppConn() error

	// Flush 删除缓存和内存池的所有交易
	Flush()

	// TxsAvailable返回一个通道,该通道针对每个高度触发一次,并且仅在内存池中有可用交易时才触发。
	// 注意:如果EnableTxsAvailable没有被调用,那么返回的channel可能为空
	TxsAvailable() <-chan struct{}

	// EnableTxsAvailable 初始化TxsAvailable通道,确保在有可用交易时,它在每一个高度都触发一次
	EnableTxsAvailable()

	// Size 返回内存池中交易的总数
	Size() int

	// TxsBytes 返回内存池中所有交易的总大小
	TxsBytes() int64

	// InitWal 创建一个存放WAL文件的文件夹并创建/打开一个文件。如果有错误,它将是*PathError类型的错误。
	InitWAL() error

	// CloseWAL 关闭并丢弃基础的WAL文件。任何进一步的写入将不会写到磁盘。
	CloseWAL()
}

Mempool 定义了内存池的接口

更新内存池需要同步提交的块,app以此在提交时重置他们的瞬时状态

type Message

type Message interface{}

Message is a message sent or received by the Reactor.

type Metrics

type Metrics struct {
	// Size of the mempool.
	Size metrics.Gauge
	// Histogram of transaction sizes, in bytes.
	TxSizeBytes metrics.Histogram
	// Number of failed transactions.
	FailedTxs metrics.Counter
	// Number of times transactions are rechecked in the mempool.
	RecheckTimes metrics.Counter
}

Metrics contains metrics exposed by this package. see MetricsProvider for descriptions.

func NopMetrics

func NopMetrics() *Metrics

NopMetrics returns no-op Metrics.

func PrometheusMetrics

func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics

PrometheusMetrics returns Metrics build using Prometheus client library. Optionally, labels can be provided along with their values ("foo", "fooValue").

type PeerState

type PeerState interface {
	GetHeight() int64
}

PeerState describes the state of a peer.

type PostCheckFunc

type PostCheckFunc func(types.Tx, *abci.ResponseCheckTx) error

PostCheckFunc是在CheckTx之后执行的可选过滤器,如果返回false,则拒绝交易。 一个例子是确保交易所需的gas不超过该区块可用的gas。

func PostCheckMaxGas

func PostCheckMaxGas(maxGas int64) PostCheckFunc

PostCheckMaxGas 检查所需气体是否小于或等于通过的maxGas。如果maxGas为-1,则返回nil。

type PreCheckFunc

type PreCheckFunc func(types.Tx) error

PreCheckFunc是在CheckTx之前执行的可选过滤器,如果返回false, 则拒绝交易。一个示例是确保事务不超过块大小。

func PreCheckAminoMaxBytes

func PreCheckAminoMaxBytes(maxBytes int64) PreCheckFunc

PreCheckAminoMaxBytes 检查交易的大小加上amino开销是否小于或等于预期的maxBytes。

type Reactor

type Reactor struct {
	p2p.BaseReactor
	// contains filtered or unexported fields
}

Reactor 处理内存池交易并在peer之间广播他们 它包含一个peer的ID为键,计数器为值的map,来防止你把交易发送到给你这个交易的peer。

func NewReactor

func NewReactor(config *cfg.MempoolConfig, mempool *CListMempool) *Reactor

NewReactor returns a new Reactor with the given config and mempool.

func (*Reactor) AddPeer

func (memR *Reactor) AddPeer(peer p2p.Peer)

AddPeer implements Reactor. It starts a broadcast routine ensuring all txs are forwarded to the given peer.

func (*Reactor) GetChannels

func (memR *Reactor) GetChannels() []*p2p.ChannelDescriptor

GetChannels implements Reactor. It returns the list of channels for this reactor.

func (*Reactor) InitPeer

func (memR *Reactor) InitPeer(peer p2p.Peer) p2p.Peer

InitPeer implements Reactor by creating a state for the peer.

func (*Reactor) OnStart

func (memR *Reactor) OnStart() error

OnStart implements p2p.BaseReactor.

func (*Reactor) Receive

func (memR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte)

Receive implements Reactor. It adds any received transactions to the mempool.

func (*Reactor) RemovePeer

func (memR *Reactor) RemovePeer(peer p2p.Peer, reason interface{})

RemovePeer implements Reactor.

func (*Reactor) SetLogger

func (memR *Reactor) SetLogger(l log.Logger)

SetLogger sets the Logger on the reactor and the underlying mempool.

type TxInfo

type TxInfo struct {
	// SenderID是在内存池中用于标识发件人的内部peer ID,每个交易存储2个字节,而不是p2p.ID的20个字节。
	SenderID uint16
	// SenderP2PID 是sender真正的p2p.ID,用于日志
	SenderP2PID p2p.ID
}

TxInfo 是在尝试将交易添加到内存池时传递的参数。

type TxMessage

type TxMessage struct {
	Tx types.Tx
}

TxMessage 一条包含一个交易的消息

func (*TxMessage) String

func (m *TxMessage) String() string

String 字符串

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL