blocksprovider

package
v0.0.0-...-579b097 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 15, 2024 License: Apache-2.0 Imports: 21 Imported by: 0

Documentation

Index

Constants

View Source
const (
	BftMinRetryInterval       = 50 * time.Millisecond
	BftMaxRetryInterval       = 10 * time.Second
	BftBlockCensorshipTimeout = 20 * time.Second
)

Variables

This section is empty.

Functions

This section is empty.

Types

type BFTCensorshipMonitor

type BFTCensorshipMonitor struct {
	// contains filtered or unexported fields
}

BFTCensorshipMonitor monitors the progress of headers receivers versus the progress of the block receiver. We ask for a stream of headers from all sources except the one supplying blocks. We track the progress of header receivers against the block reception progress. If there is a header that is ahead of the last block, and a timeout had passed since that header was received, we declare that censorship was detected. When censorship is detected, ErrCensorship is sent to the errorCh which can be read by ErrorsChannel() method.

func NewBFTCensorshipMonitor

func NewBFTCensorshipMonitor(
	chainID string,
	updatableVerifier UpdatableBlockVerifier,
	requester DeliverClientRequester,
	progressReporter BlockProgressReporter,
	fetchSources []*orderers.Endpoint,
	blockSourceIndex int,
	timeoutConf TimeoutConfig,
) *BFTCensorshipMonitor

func (*BFTCensorshipMonitor) ErrorsChannel

func (m *BFTCensorshipMonitor) ErrorsChannel() <-chan error

func (*BFTCensorshipMonitor) GetSuspicion

func (m *BFTCensorshipMonitor) GetSuspicion() (bool, uint64)

GetSuspicion returns the suspicion flag, and the header block number that is ahead. If suspicion==false, then suspicionBlockNumber==0.

Used mainly for testing.

func (*BFTCensorshipMonitor) Monitor

func (m *BFTCensorshipMonitor) Monitor()

Monitor the progress of headers and compare to the progress of block fetching, trying to detect block censorship. Continuously try and relaunch the goroutines that monitor individual orderers. If an orderer is faulty we increase the interval between retries but never quit.

This method should be run using a dedicated goroutine.

func (*BFTCensorshipMonitor) Stop

func (m *BFTCensorshipMonitor) Stop()

type BFTCensorshipMonitorFactory

type BFTCensorshipMonitorFactory struct{}

BFTCensorshipMonitorFactory creates an instance of a BFTCensorshipMonitor. It is an implementation of the CensorshipDetectorFactory interface which abstracts the creation of a BFTCensorshipMonitor.

func (*BFTCensorshipMonitorFactory) Create

func (f *BFTCensorshipMonitorFactory) Create(chainID string, updatableVerifier UpdatableBlockVerifier, requester DeliverClientRequester, progressReporter BlockProgressReporter, fetchSources []*orderers.Endpoint, blockSourceIndex int, timeoutConf TimeoutConfig) CensorshipDetector

type BFTDeliverer

type BFTDeliverer struct {
	ChannelID    string
	BlockHandler BlockHandler
	Ledger       LedgerInfo

	UpdatableBlockVerifier    UpdatableBlockVerifier
	Dialer                    Dialer
	OrderersSourceFactory     OrdererConnectionSourceFactory
	CryptoProvider            bccsp.BCCSP
	DoneC                     chan struct{}
	Signer                    identity.SignerSerializer
	DeliverStreamer           DeliverStreamer
	CensorshipDetectorFactory CensorshipDetectorFactory
	Logger                    *flogging.FabricLogger

	// The initial value of the actual retry interval, which is increased on every failed retry
	InitialRetryInterval time.Duration
	// The maximal value of the actual retry interval, which cannot increase beyond this value
	MaxRetryInterval time.Duration
	// If a certain header from a header receiver is in front of the block receiver for more that this time, a
	// censorship event is declared and the block source is changed.
	BlockCensorshipTimeout time.Duration
	// After this duration, the MaxRetryDurationExceededHandler is called to decide whether to keep trying
	MaxRetryDuration time.Duration
	// This function is called after MaxRetryDuration of failed retries to decide whether to keep trying
	MaxRetryDurationExceededHandler MaxRetryDurationExceededHandler

	// TLSCertHash should be nil when TLS is not enabled
	TLSCertHash []byte // util.ComputeSHA256(b.credSupport.GetClientCertificate().Certificate[0])
	// contains filtered or unexported fields
}

BFTDeliverer fetches blocks using a block receiver and maintains a BFTCensorshipMonitor. It maintains a shuffled orderer source slice, and will cycle through it trying to find a "good" orderer to fetch blocks from. After it selects an orderer to fetch blocks from, it assigns all the rest of the orderers to the censorship monitor. The censorship monitor will request block attestations (header+sigs) from said orderers, and will monitor their progress relative to the block fetcher. If a censorship suspicion is detected, the BFTDeliverer will try to find another orderer to fetch from.

func (*BFTDeliverer) BlockProgress

func (d *BFTDeliverer) BlockProgress() (uint64, time.Time)

func (*BFTDeliverer) DeliverBlocks

func (d *BFTDeliverer) DeliverBlocks()

func (*BFTDeliverer) FetchBlocks

func (d *BFTDeliverer) FetchBlocks(source *orderers.Endpoint)

func (*BFTDeliverer) Initialize

func (d *BFTDeliverer) Initialize(channelConfig *common.Config)

func (*BFTDeliverer) Stop

func (d *BFTDeliverer) Stop()

Stop

type BFTHeaderReceiver

type BFTHeaderReceiver struct {
	// contains filtered or unexported fields
}

BFTHeaderReceiver receives a stream of blocks from an orderer, where each block contains a header and metadata. It keeps track of the last header it received, and the time it was received. The header receivers verify each block as it arrives.

TODO The header receiver will receive (or ask for) full config blocks - in a later commit. TODO The header receiver will maintain its own private block verifier (bundle) - in a later commit.

func NewBFTHeaderReceiver

func NewBFTHeaderReceiver(
	chainID string,
	endpoint string,
	client orderer.AtomicBroadcast_DeliverClient,
	updatableBlockVerifier UpdatableBlockVerifier,
	previousReceiver *BFTHeaderReceiver,
	logger *flogging.FabricLogger,
) *BFTHeaderReceiver

NewBFTHeaderReceiver create a new BFTHeaderReceiver.

If the previousReceiver is not nil, the lastHeader and lastHeaderTime are copied to the new instance. This allows a new receiver to start from the last know good header that has been received.

func (*BFTHeaderReceiver) DeliverHeaders

func (hr *BFTHeaderReceiver) DeliverHeaders()

DeliverHeaders starts to deliver headers from the stream client

func (*BFTHeaderReceiver) GetErrorStopTime

func (hr *BFTHeaderReceiver) GetErrorStopTime() time.Time

func (*BFTHeaderReceiver) IsStarted

func (hr *BFTHeaderReceiver) IsStarted() bool

func (*BFTHeaderReceiver) IsStopped

func (hr *BFTHeaderReceiver) IsStopped() bool

func (*BFTHeaderReceiver) LastBlock

func (hr *BFTHeaderReceiver) LastBlock() (*common.Block, time.Time, error)

LastBlock returns the last block which was verified

func (*BFTHeaderReceiver) LastBlockNum

func (hr *BFTHeaderReceiver) LastBlockNum() (uint64, time.Time, error)

LastBlockNum returns the last block number which was verified

func (*BFTHeaderReceiver) Stop

func (hr *BFTHeaderReceiver) Stop() error

Stop the reception of headers and close the client connection

type BlockHandler

type BlockHandler interface {
	// HandleBlock gives the block to the next stage of processing after fetching it from a remote orderer.
	HandleBlock(channelID string, block *common.Block) error
}

BlockHandler abstracts the next stage of processing after the block is fetched from the orderer. In the peer the block is given to the gossip service. In the orderer the block is placed in a buffer from which the chain or the follower pull blocks.

type BlockProgressReporter

type BlockProgressReporter interface {
	// BlockProgress returns the last block fetched from an orderer, and the time it was fetched.
	// If the fetch time IsZero == true, no block had been fetched yet (block number will always be zero in that case).
	BlockProgress() (uint64, time.Time)
}

BlockProgressReporter provides information on the last block fetched from an orderer.

type BlockReceiver

type BlockReceiver struct {
	// contains filtered or unexported fields
}

func (*BlockReceiver) ProcessIncoming

func (br *BlockReceiver) ProcessIncoming(onSuccess func(blockNum uint64, channelConfig *common.Config)) error

ProcessIncoming processes incoming messages until stopped or encounters an error.

func (*BlockReceiver) Start

func (br *BlockReceiver) Start()

Start starts a goroutine that continuously receives blocks.

func (*BlockReceiver) Stop

func (br *BlockReceiver) Stop()

type CensorshipDetector

type CensorshipDetector interface {
	Monitor()
	Stop()
	ErrorsChannel() <-chan error
}

type CensorshipDetectorFactory

type CensorshipDetectorFactory interface {
	Create(
		chainID string,
		updatableVerifier UpdatableBlockVerifier,
		requester DeliverClientRequester,
		progressReporter BlockProgressReporter,
		fetchSources []*orderers.Endpoint,
		blockSourceIndex int,
		timeoutConf TimeoutConfig,
	) CensorshipDetector
}

type DeliverAdapter

type DeliverAdapter struct{}

func (DeliverAdapter) Deliver

type DeliverClientRequester

type DeliverClientRequester interface {
	SeekInfoHeadersFrom(ledgerHeight uint64) (*common.Envelope, error)
	Connect(seekInfoEnv *common.Envelope, endpoint *orderers.Endpoint) (orderer.AtomicBroadcast_DeliverClient, func(), error)
}

DeliverClientRequester connects to an orderer, requests a stream of blocks or headers, and provides a deliver client.

type DeliverStreamer

type DeliverStreamer interface {
	Deliver(context.Context, *grpc.ClientConn) (orderer.AtomicBroadcast_DeliverClient, error)
}

type Deliverer

type Deliverer struct {
	ChannelID              string
	BlockHandler           BlockHandler
	Ledger                 LedgerInfo
	UpdatableBlockVerifier UpdatableBlockVerifier
	Dialer                 Dialer
	OrderersSourceFactory  OrdererConnectionSourceFactory
	CryptoProvider         bccsp.BCCSP
	DoneC                  chan struct{}
	Signer                 identity.SignerSerializer
	DeliverStreamer        DeliverStreamer
	Logger                 *flogging.FabricLogger

	// The maximal value of the actual retry interval, which cannot increase beyond this value
	MaxRetryInterval time.Duration
	// The initial value of the actual retry interval, which is increased on every failed retry
	InitialRetryInterval time.Duration
	// After this duration, the MaxRetryDurationExceededHandler is called to decide whether to keep trying
	MaxRetryDuration time.Duration
	// This function is called after MaxRetryDuration of failed retries to decide whether to keep trying
	MaxRetryDurationExceededHandler MaxRetryDurationExceededHandler

	// TLSCertHash should be nil when TLS is not enabled
	TLSCertHash []byte // util.ComputeSHA256(b.credSupport.GetClientCertificate().Certificate[0])
	// contains filtered or unexported fields
}

Deliverer the CFT implementation of the deliverservice.BlockDeliverer interface.

func (*Deliverer) DeliverBlocks

func (d *Deliverer) DeliverBlocks()

DeliverBlocks used to pull out blocks from the ordering service to distribute them across peers

func (*Deliverer) Initialize

func (d *Deliverer) Initialize(channelConfig *cb.Config)

func (*Deliverer) Stop

func (d *Deliverer) Stop()

Stop stops blocks delivery provider

type DeliveryRequester

type DeliveryRequester struct {
	// contains filtered or unexported fields
}

DeliveryRequester is used to connect to an orderer and request the delivery of various types of block delivery streams. The type of stream requested depends upon the orderer.SeekInfo created.

func NewDeliveryRequester

func NewDeliveryRequester(
	channelID string,
	signer identity.SignerSerializer,
	tlsCertHash []byte,
	dialer Dialer,
	deliverStreamer DeliverStreamer,
) *DeliveryRequester

func (*DeliveryRequester) Connect

func (dr *DeliveryRequester) Connect(seekInfoEnv *common.Envelope, endpoint *orderers.Endpoint) (orderer.AtomicBroadcast_DeliverClient, func(), error)

func (*DeliveryRequester) SeekInfoBlocksFrom

func (dr *DeliveryRequester) SeekInfoBlocksFrom(ledgerHeight uint64) (*common.Envelope, error)

SeekInfoBlocksFrom produces a signed SeekInfo envelope requesting a stream of blocks from a certain block number.

func (*DeliveryRequester) SeekInfoHeadersFrom

func (dr *DeliveryRequester) SeekInfoHeadersFrom(ledgerHeight uint64) (*common.Envelope, error)

SeekInfoHeadersFrom produces a signed SeekInfo envelope requesting a stream of headers (block attestations) from a certain block number.

func (*DeliveryRequester) SeekInfoNewestHeader

func (dr *DeliveryRequester) SeekInfoNewestHeader() (*common.Envelope, error)

SeekInfoNewestHeader produces a signed SeekInfo envelope requesting the newest header (block attestation) available to the orderer. Only a single header is expected in response, not a stream.

type Dialer

type Dialer interface {
	Dial(address string, rootCerts [][]byte) (*grpc.ClientConn, error)
}

type DialerAdapter

type DialerAdapter struct {
	ClientConfig comm.ClientConfig
}

func (DialerAdapter) Dial

func (da DialerAdapter) Dial(address string, rootCerts [][]byte) (*grpc.ClientConn, error)

type DurationExceededHandler

type DurationExceededHandler interface {
	DurationExceededHandler() (stopRetries bool)
}

type ErrCensorship

type ErrCensorship struct {
	Message string
}

func (*ErrCensorship) Error

func (e *ErrCensorship) Error() string

type ErrFatal

type ErrFatal struct {
	Message string
}

func (*ErrFatal) Error

func (e *ErrFatal) Error() string

type ErrStopping

type ErrStopping struct {
	Message string
}

func (*ErrStopping) Error

func (e *ErrStopping) Error() string

type GossipServiceAdapter

type GossipServiceAdapter interface {
	// AddPayload adds payload to the local state sync buffer
	AddPayload(chainID string, payload *gossip.Payload) error

	// Gossip the message across the peers
	Gossip(msg *gossip.GossipMessage)
}

GossipServiceAdapter serves to provide basic functionality required from gossip service by delivery service

type LedgerInfo

type LedgerInfo interface {
	// LedgerHeight returns current local ledger height
	LedgerHeight() (uint64, error)

	// GetCurrentBlockHash returns the block header hash of the last block in the ledger.
	GetCurrentBlockHash() ([]byte, error)
}

LedgerInfo an adapter to provide the interface to query the ledger committer for current ledger height

type MaxRetryDurationExceededHandler

type MaxRetryDurationExceededHandler func() (stopRetries bool)

MaxRetryDurationExceededHandler is a function that decides what to do in case the total time the component spends in reconnection attempts is exceeded. If it returns true, it means that the component should stop retrying.

In the peer, with gossip and a dynamic leader, stopping causes the gossip leader to yield. In the peer, with gossip and a static leader, we never stop.

type OrdererConnectionSource

type OrdererConnectionSource orderers.ConnectionSourcer

type OrdererConnectionSourceFactory

type OrdererConnectionSourceFactory orderers.ConnectionSourceCreator

type TimeoutConfig

type TimeoutConfig struct {
	// The initial value of the actual retry interval, which is increased on every failed retry
	MinRetryInterval time.Duration
	// The maximal value of the actual retry interval, which cannot increase beyond this value
	MaxRetryInterval time.Duration
	// The value of the bft censorship detection timeout
	BlockCensorshipTimeout time.Duration
}

func (*TimeoutConfig) ApplyDefaults

func (t *TimeoutConfig) ApplyDefaults()

Directories

Path Synopsis
Code generated by counterfeiter.
Code generated by counterfeiter.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL