comm

package
v0.0.0-...-e3e94bd Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 4, 2024 License: Apache-2.0 Imports: 23 Imported by: 0

README

Communicate 模块

1. 建立连接的过程

gossip-6.png

2. 接收消息的流程

gossip-7.jpg

Documentation

Index

Constants

View Source
const (
	DefaultDialTimeout  = 3 * time.Second
	DefaultConnTimeout  = 2 * time.Second
	DefaultRecvBuffSize = 20
	DefaultSendBuffSize = 20
)

Variables

This section is empty.

Functions

This section is empty.

Types

type ChannelDeMultiplexer

type ChannelDeMultiplexer struct {
	// contains filtered or unexported fields
}

ChannelDeMultiplexer 是一个通道多路复用器,可以注册通道(AddChannel),同时可以根据注册时 定义的谓词(common.MessageAcceptor)将消息发送到对应通道(DeMultiplex)。

func NewChannelDeMultiplexer

func NewChannelDeMultiplexer() *ChannelDeMultiplexer

func (*ChannelDeMultiplexer) AddChannel

func (cdm *ChannelDeMultiplexer) AddChannel(pred common.MessageAcceptor) <-chan interface{}

AddChannel 方法用于向多路复用器注册一个通道。如果多路复用器已经停止(isStopped() 返回 true), 则返回一个被关闭的通道,以防止外部接收者一直等待。否则,创建一个带有缓冲区的通道(bidirectionalCh), 将其注册到 channels 切片中,并返回该通道。

func (*ChannelDeMultiplexer) DeMultiplex

func (cdm *ChannelDeMultiplexer) DeMultiplex(msg interface{})

DeMultiplex 方法用于接收一个消息,并根据注册的谓词将消息发送到相应的通道。如果多路复用器已 经停止(isStopped() 返回 true),则直接返回。否则,遍历 channels 切片,对每个通道,如果谓词判 断该消息应该发送到该通道,则将消息发送到通道的 ch 通道中。

func (*ChannelDeMultiplexer) Stop

func (cdm *ChannelDeMultiplexer) Stop()

Stop 方法用于停止多路复用器的操作。首先,检查 stopCh 通道是否已经关闭,如果已经关闭,则直接返回。否则, 关闭 stopCh 通道,等待所有的 DeMultiplex 调用完成,然后锁定互斥锁,关闭所有已注册通道的 ch 通道,并清 空 channels 切片。

type Comm

type Comm interface {
	GetPKIid() common.PKIid

	Send(msg *protoext.SignedGossipMessage, peers ...*discovery.NetworkMember)

	SendWithAck(msg *protoext.SignedGossipMessage, timeout time.Duration, minAck int, peers ...*discovery.NetworkMember) SendResults

	// Probe 探测远程节点,如果有响应则返回 nil,如果没有响应则返回错误信息。
	Probe(peer *discovery.NetworkMember) error

	Handshake(peer *discovery.NetworkMember) (api.PeerIdentity, error)

	Accept(common.MessageAcceptor) <-chan protoext.ReceivedMessage

	PresumedDead() <-chan common.PKIid

	IdentitySwitch() <-chan common.PKIid

	CloseConn(peer *discovery.NetworkMember)

	SetLogger(logger *hlogging.HyperchainLogger)

	Stop()
}

func NewCommInstance

func NewCommInstance(s *grpc.Server, certs *common.TLSCertificates, idStore identity.Mapper, peerIdentity api.PeerIdentity, secureDialOpts api.PeerSecureDialOpts, sa api.SecurityAdvisor, commMetrics *metrics.CommMetrics, config CommConfig, dialOpts ...grpc.DialOption) (Comm, error)

type CommConfig

type CommConfig struct {
	DialTimeout  time.Duration // 建立连接的超时时间,默认 3 秒
	ConnTimeout  time.Duration // 发送/接收消息的超时时间,默认 2 秒
	RecvBuffSize int
	SendBuffSize int
}

type ConnConfig

type ConnConfig struct {
	RecvBuffSize int
	SendBuffSize int
}

type ReceivedMessageImpl

type ReceivedMessageImpl struct {
	// contains filtered or unexported fields
}

func (*ReceivedMessageImpl) Ack

func (rmi *ReceivedMessageImpl) Ack(err error)

func (*ReceivedMessageImpl) GetConnectionInfo

func (rmi *ReceivedMessageImpl) GetConnectionInfo() *protoext.ConnectionInfo

func (*ReceivedMessageImpl) GetEnvelope

func (rmi *ReceivedMessageImpl) GetEnvelope() *pbgossip.Envelope

func (*ReceivedMessageImpl) GetSignedGossipMessage

func (rmi *ReceivedMessageImpl) GetSignedGossipMessage() *protoext.SignedGossipMessage

func (*ReceivedMessageImpl) Respond

func (rmi *ReceivedMessageImpl) Respond(msg *pbgossip.GossipMessage)

type SendResult

type SendResult struct {
	discovery.NetworkMember
	// contains filtered or unexported fields
}

type SendResults

type SendResults []SendResult

func (SendResults) AckCount

func (srs SendResults) AckCount() int

func (SendResults) NackCount

func (srs SendResults) NackCount() int

func (SendResults) String

func (srs SendResults) String() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL