discovery

package
v0.0.0-...-e3e94bd Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 4, 2024 License: Apache-2.0 Imports: 16 Imported by: 0

README

Discovery服务

1. 默认配置

const (
	DefaultAliveTimeInterval            = 5 * time.Second
	DefaultAliveExpirationTimeout       = 5 * DefaultAliveTimeInterval
	DefaultAliveExpirationCheckInterval = DefaultAliveExpirationTimeout / 10
	DefaultReconnectInterval            = DefaultAliveExpirationTimeout
	DefaultMsgExpirationFactor          = 20
	DefaultMaxConnectionAttempts        = 120
)
  • DefaultAliveTimeInterval: 定义了 Discovery 服务每隔多长时间向外广播自己的 alive 消息,默认是 5 秒。

  • DefaultAliveExpirationTimeout: 定义了一段超时时间,如果某个节点在经过了 DefaultAliveExpirationTimeout 时间后还没有给 Discovery 发送 alive 消息,则会认为此节点陷入了沉寂,并且会主动断开与其建立的网络连接。

  • DefaultAliveExpirationCheckInterval: 定义了一段时间间隔,Discoveryalive 消息存储器会每隔 DefaultAliveExpirationCheckInterval 定义的时间间隔检查一遍所有存储的 alive 消息是否走到了生命的尽头,alive 消息的最大寿命等于 DefaultAliveExpirationTimeout * DefaultMsgExpirationFactor

  • DefaultReconnectInterval: 定义了一段时间间隔,如果 Discovery 第一次与某个节点建立连接时失败了,则会等待 DefaultReconnectInterval 定义的时间间隔再次尝试建立连接。

  • DefaultMsgExpirationFactor: 定义了一个乘法因子,它的用法可回看对 DefaultAliveExpirationCheckInterval 的定义。

  • DefaultMaxConnectionAttempts: 定义了最大连接尝试次数,如果 Discovery 第一次与某个节点建立连接时失败了,则会继续最多尝试与其建立连接 DefaultMaxConnectionAttempts - 1 次。

2. Discovery的服务逻辑

2.1 广播自己的alive消息

区块链系统在启动之初,会建立并启动 Discovery 服务,这个服务默认情况下,会每隔 5 秒(DefaultAliveTimeInterval 定义的时间间隔)对外广播自己的 alive 消息。alive 消息的结构和内容如下所示:

aliveMsg := GossipMessage{
    Tag: GossipMessage_EMPTY,
    Content: &GossipMessage_AliveMsg{
        Membership: &Member{
            Endpoint:   自己的 ExternalEndpoint,
            Metadata:   自己的 Metadata,
            PkiId:      自己的 PKIid,
        },
        Timestamp: &PeerTime{
            IncNum: Discovery 服务启动时的时间,
            SeqNum: 目前为自己构造 alive 消息的次数, // 实际上就是调用 aliveMsgAndInternalEndpoint()方法的次数
        }
    }
}

在得到自己的 alive 消息后,Discovery 会利用 CryptoService 服务接口,对 alive 消息进行签名,得到 envelope,紧接着构建 SignedGossipMessage

signedAliveMsg := &SignedGossipMessage{
    GossipMessage:  aliveMsg,
    Envelope:       envelope,
}

这个时候,Discovery 就会将带有 alive 消息的 SignedGossipMessage 消息结构广播给其他节点。

2.2 接收别人的alive消息

在收到别人的 alive 消息后,会先后进行下列操作,检查此消息是否值得被处理:

  1. 验证携有对方 alive 消息的 SignedGossipMessage 消息结构中所携带的签名(签名信息存储在 Envelope 字段中)是否合法。
  2. 验证对方发送来的 alive 消息是否太旧,主要是通过 GossipMessage_AliveMsg 消息结构中的 Timestamp 字段来判断。其实对于同一个 peer 节点来说,它的 Discovery 服务的启动时间正常来说是恒定不变的,因此判断一个 alive 消息是否是陈旧的,主要还是通过判断 SeqNum 来确定。
  3. 如果接收到的 alive 消息是来自于自己的,则直接忽略掉。

经过上述三个步骤的验证后,Discovery 服务会将此条消息存储到 aliveMsgStore 中,这里需要详细说明,aliveMsgStore 是一个会定期清理超时消息的内存数据库,默认情况下,aliveMsgStore 中每个 alive 消息的存活时间是 500 秒(DefaultAliveExpirationTimeout * DefaultMsgExpirationFactor 定义的超时时间),根据存储逻辑,在任何时间里,aliveMsgStore 只会为每个节点存储唯一一条 alive 消息,如果此条 alive 消息因为过期被清除掉的话,Discovery 处维护的对应节点也会被删除掉,但是锚点 peer 不会被删除,操作逻辑如下代码所示:

gossip-1.png

aliveMsgStore 里存储的每条 alive 消息的生命倒计时是从消息加入到 aliveMsgStore 中开始计算的。默认情况下,alive 消息的存活时间是 500 秒,因此,aliveMsgStore 会每隔 5 秒检查一下存储的所有 alive 消息是否有过期的。

之后,Discovery 会判断此条 alive 消息是否来自于一个已知的节点,如果不是,那么就表明有一个新节点向我们发送了 alive 消息,因此,我们就会根据发送来的 alive 消息,构建一个代表此新节点身份的 NetworkMember 结构,并将此结构存储到本地,即,将此新节点存储到 Discovery 中。不然的话,会判断发送此 alive 消息的节点,在 Discovery 处的状态,是被认为已经 dead 了,还是依然 alive 呢?对于前者,如果此条 alive 消息是在节点被置为 dead 状态后发送来的,那么就将此节点复活,否则就忽视掉此条 alive 消息。对于后者,如果此条 alive 消息足够新鲜,那么就更新此节点在 Discovery 处的信息。最后,我们再将此条 alive 消息转发给其他节点。

2.3 周期性检查有没有之前处于活跃状态的节点现在已经陷入沉寂

默认情况下,Discovery 会每隔 2.5 秒(DefaultAliveExpirationCheckInterval 定义的间隔时间)检查 aliveLastTS 字段里存储的每个节点上次发送来 alive 消息的时间到现在的时间间隔是否超过了 25 秒(DefaultAliveExpirationTimeout 定义的超时时间),如果超过了,则说明此节点很可能陷入了沉寂,那么我们就需要将这些陷入沉寂的节点移入到 deadLastTSdeadMembership 中,并断开与其建立的网络连接。

2.4 循环监听消息通道中新来的消息

Discovery 服务只会接收并处理以下三种消息:

  • AliveMessage
  • MembershipRequest
  • MembershipResponse
  1. 如果 Discovery 收到的是 AliveMessage 消息,则会按照 2.2 节所示的过程进行处理。

  2. 如果 Discovery 收到的是 MembershipRequest 消息,则会验证该消息是否新鲜,是否被正确签名,另外,这个节点能给我们发送 AliveMessage 消息,则表明此节点是处于 alive 状态,所以会将此节点通过 MembershipRequest 消息传过来的 alive 消息按照 2.2 节所示的过程进行处理。最后,Discovery 会构建 MembershipResponse 消息并回复,构建此消息的代码如下所示:

gossip-2.png

上述代码告诉了我们哪些 Membership 的消息能传给请求者,哪些不能。

  1. 如果 Discovery 收到的是 MembershipResponse 消息,则会处理其中的 alivedead 成员信息。

3. 构造MembershipRequest消息的过程

  1. func (impl *gossipDiscoveryImpl) createMembershipRequest(includeInternalEndpoint bool) (*pbgossip.GossipMessage, error)

    createMembershipRequest 方法调用 getMySignedAliveMessage 方法获取经过签名的 SignedGossipMessage,然后通过以下代码,去构造带有 GossipMessage_MemReq 消息的 GossipMessage

    request := &GossipMessage{
        Tag: GossipMessage_EMPTY,
        Nonce: RandomUint64(),
        Content: &GossipMessage_MemReq{
            MemReq: &MembershipRequest{
                SelfInformation: signedGossipMessage.Envelope,
            },
        }
    }
    
  2. func (impl *gossipDiscoveryImpl) getMySignedAliveMessage(includeInternalEndpoint bool) (*protoext.SignedGossipMessage, error)

    getMySignedAliveMessage 方法调用 aliveMsgAndInternalEndpoint 方法获取经过组装的 GossipMessage 和自身的 InternalEndpoint

    getMySignedAliveMessage 调用 CryptoService 密码服务接口,使用 SignMessage 方法对 GossipMessage 进行签名,**目前,如何对其进行签名,还不太清楚,可以猜一下:**首先利用 protobufGossipMessage 消息进行序列化,得到字节切片 payload,然后利用签名算法 signerpayload 进行签名,得到 signature,紧接着就是构造 Envelope

    envelope := &Envelope{
        Payload: payload,
        Signature: signature,
    }
    

    然后再构造 SignedGossipMessage 消息结构:

    signedGossipMessage := &SignedGossipMessage{
        GossipMessage: gossipMessage,
        Envelope: envelope,
    }
    

    上面的签名过程,我们没有讲解 Envelope 消息结构内的 SecretEnvelope 字段如何生成,但是盲猜一下,应该是在生成签名的过程中生成的。

    最后,如果 includeInternalEndpoint 的值等于 false,则需要将 signedGossipMessage.Envelope.SecretEnvelope 设置为 nil

    最后的最后,getMySignedAliveMessage 方法将生成的 signedGossipMessage 消息返回到上层方法 createMembershipRequest,所以此时,我们可以回到第 1 步去观察 createMembershipRequest 方法如何基于返回的 SignedGossipMessage 构造 GossipMessage_MemReq

  3. func (impl *gossipDiscoveryImpl) aliveMsgAndInternalEndpoint() (*pbgossip.GossipMessage, string)

    aliveMsgAndInternalEndpoint 方法通过以下过程组装 GossipMessageGossipMessage 内存放的实际内容为 GossipMessage_AliveMsg

    gossipMessage := &GossipMessage{
        Tag: GossipMessage_EMPTY,
        Content: &GossipMessage_AliveMsg{ //实际消息
            AliveMsg: &AliveMessage{
                Membership: &Member{
                    Endpoint: impl.self.ExternalEndpoint,
                    Metadata: impl.self.Metadata,
                    PkiId: impl.self.PKIid,
                },
                Timestamp: &PeerTime{
                    IncNum: impl.incTime,
                    SeqNum: impl.seqNum,
                },
            }
        }
    }
    

    方法返回的 InternalEndpoint 则是 impl.self.InternalEndpoint。对于方法所构造的 GossipMessage,不做任何处理,直接返回到上层方法 getMySignedAliveMessage,所以此时,我们可以回到第 2 步去观察 getMySignedAliveMessage 方法如何基于返回的 GossipMessage 构造 SignedGossipMessage

Documentation

Index

Constants

View Source
const (
	DefaultAliveTimeInterval            = 5 * time.Second
	DefaultAliveExpirationTimeout       = 5 * DefaultAliveTimeInterval
	DefaultAliveExpirationCheckInterval = DefaultAliveExpirationTimeout / 10
	DefaultReconnectInterval            = DefaultAliveExpirationTimeout
	DefaultMsgExpirationFactor          = 20
	DefaultMaxConnectionAttempts        = 120
)

Variables

This section is empty.

Functions

func HasExternalEndpoint

func HasExternalEndpoint(nm NetworkMember) bool

HasExternalEndpoint 可以告诉我们给定的 NetworkMember 是否拥有不为空的 external endpoint。

Types

type AnchorPeerTracker

type AnchorPeerTracker interface {
	IsAnchorPeer(endpoint string) bool
}

AnchorPeerTracker 是一个传递给 discovery 的接口,用于检查端点是否是锚点 peer。

在 gossip 协议中,锚点 peer 是指在网络中作为固定参考点的特定节点。它通常是由网络管理员或系统设计者选择并预先配置的节点。 锚点 peer 的主要作用是提供网络的可靠性和稳定性。通过设置锚点 peer,可以确保网络中至少存在一些可信赖的节点,它们可以为其他节点提供准确的信息,并确保消息的可靠传递。

type CommService

type CommService interface {
	Gossip(msg *protoext.SignedGossipMessage)

	SendToPeer(peer *NetworkMember, msg *protoext.SignedGossipMessage)

	// Ping 向远程节点发送 ping 消息,然后返回一个布尔值表示对方是否回应了 pong。
	Ping(peer *NetworkMember) bool

	// Accept 返回一个 read-only 通道,其中存储着从远程节点那里收到的消息。
	Accept() <-chan protoext.ReceivedMessage

	// 返回一个 read-only 通道,其中存储着那些假定 dead peer。
	PresumedDead() <-chan common.PKIid

	// CloseConn 关闭与给定的节点之间的连接。
	CloseConn(peer *NetworkMember)

	// Forwar 将报文转发至下一跳,但不会将报文转发给最初接收报文的那一跳。
	Forward(msg protoext.ReceivedMessage)

	// IdentitySwitch 返回一个 read-only 通道,其中存储着那些身份改变的 peer。
	IdentitySwitch() <-chan common.PKIid
}

type CryptoService

type CryptoService interface {
	// 验证 alive 消息是否被认证
	ValidateAliveMsg(message *protoext.SignedGossipMessage) bool

	// 签署消息
	SignMessage(m *pbgossip.GossipMessage, internalEndpoint string) *pbgossip.Envelope
}

type DisclosurePolicy

type DisclosurePolicy func(nm *NetworkMember) (Sieve, EnvelopeFilter)

DisclosurePolicy 定义了某个远程 peer 有资格了解哪些信息,以及有资格了解某个 SignedGossipMessage 中的哪些信息。

type Discovery

type Discovery interface {
	// 根据提供的 peer 节点的 PKI-ID,搜寻并返回其对应关联的 NetworkMember。
	Lookup(pkiID common.PKIid) *NetworkMember

	// Self 返回该 instance 自身的 NetworkMember。
	Self() NetworkMember

	// UpdateMetadata 更新该 instance 自身的元数据。
	UpdateMetadata([]byte)

	// UpdateExternalEndpoint 更新该 instance 自身的 endpoint。
	UpdateExternalEndpoint(string)

	// Stop 停止该 instance。
	Stop()

	// GetMembership 返回当前 alive 的成员。
	GetMembership() []NetworkMember

	// InitiateSync 向给定数量的 peer 节点发送 GossipMessage_MemReq 消息,询问它们所知道的网络成员信息。
	InitiateSync(int)

	// Connect 使该实例与远程实例连接。identifier 参数是一个函数,可用于
	// 识别对等程序,并断言其 PKI-ID、是否在对等程序的 org 中,以及操作是
	// 否成功。
	Connect(NetworkMember, identifier)
}

func NewDiscoveryService

func NewDiscoveryService(self NetworkMember, commService CommService, cryptoService CryptoService, disclosurePolicy DisclosurePolicy, config DiscoveryConfig, anchorPeerTracker AnchorPeerTracker, logger *hlogging.HyperchainLogger) Discovery

type DiscoveryConfig

type DiscoveryConfig struct {
	AliveTimeInterval            time.Duration // 用于发送 alive 消息的时间间隔
	AliveExpirationTimeout       time.Duration // alive 消息的过期时间
	AliveExpirationCheckInterval time.Duration // 检查 alive 消息是否过期的时间间隔
	ReconnectInterval            time.Duration // 重连的时间间隔
	MaxConnectionAttempts        int           // 最大连接尝试次数
	MsgExpirationFactor          int           // 消息过期时间的因子
	BootstrapPeers               []string      // 用于引导的节点 endpoint 列表
}

type EnvelopeFilter

type EnvelopeFilter func(message *protoext.SignedGossipMessage) *pbgossip.Envelope

type Members

type Members []NetworkMember

func (Members) ByID

func (members Members) ByID() map[string]NetworkMember

func (Members) Filter

func (members Members) Filter(filter func(nm NetworkMember) bool) Members

Filter 接收一个自定义的过滤器函数:func(nm NetworkMember) bool,经过此过滤器过滤的 NetworkMember 会被留下来, 其余的会被忽略掉。

func (Members) Intersect

func (members Members) Intersect(otherMembers Members) Members

Intersect 返回两个 Members 的交集。

func (Members) Map

func (members Members) Map(f func(NetworkMember) NetworkMember) Members

Map 会逐个对 Members 里的对象调用给定的函数:func(NetworkMember) NetworkMember。

type NetworkMember

type NetworkMember struct {
	Metadata         []byte
	PKIid            common.PKIid
	ExternalEndpoint string // endpoint 一般就是 IP 地址 port 的组合,例如 192.168.111.131:8000
	InternalEndpoint string
	Properties       *pbgossip.Properties
	Envelope         *pbgossip.Envelope
}

NetworkMember 结构体的作用是表示网络中的成员(peer)的信息。它包含了成员的地址、 元数据、PKIid(公钥基础设施标识符)、属性、以及一个Envelope(信封)对象,用于在 成员之间传递信息。该结构体用于描述网络中的各个成员,以便进行通信和交互。

func (NetworkMember) Clone

func (nm NetworkMember) Clone() NetworkMember

func (NetworkMember) PreferredEndpoint

func (nm NetworkMember) PreferredEndpoint() string

PreferredEndpoint 如果peer节点的InternalEndpoint不为空,那么它会优先选择连接到InternalEndpoint而不是标准的Endpoint。这通常与内部网络规则有关。

内部网络通常是指在同一个网络或子网内的节点之间的通信。这些节点可能位于同一个机房、数据中心或私有云中。在这种情况下,使用内部网络连接可以提供更快的速度、更低的延迟和更高的带宽。 相比之下,标准的Endpoint通常是指外部网络或公共互联网上的节点之间的通信。使用公共互联网连接可能会受到网络拥塞、延迟高等因素的影响。 因此,peer节点更倾向于使用InternalEndpoint连接,是为了获得更可靠、更高效的网络连接,以提高节点间通信的性能和稳性。

func (NetworkMember) String

func (nm NetworkMember) String() string

type PeerIdentification

type PeerIdentification struct {
	PKIid   common.PKIid
	SelfOrg bool // 对方是否与自己在同一组织内,在建立连接时,这个标志位如果等于 true,则表示对方节点与我在同一组织内,那么我就可以将我的 InternalEndpoint 发送给他。
}

PeerIdentification 结构体定义了对方 peer 节点的 PKI-ID,并且它其中的 SelfOrg 字段揭示了该 peer 节点是否与自己在同一组织内。

type Sieve

type Sieve func(message *protoext.SignedGossipMessage) bool

Sieve 是一个筛子,返回的布尔值说明了是否能将报文发送给远程对等点。

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL