algo

package
v0.0.0-...-e3e94bd Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 4, 2024 License: Apache-2.0 Imports: 5 Imported by: 0

Documentation

Index

Constants

View Source
const (
	/*
		一开始,启动者向一组远程 peer 节点发送 hello 消息(针对不同的 peer 节点,启动者会在 hello 消息中放置不同的 nonce),接着启动者会设置一个等待 digest 消息的超时时间,这个
		超时时间一到,启动者就会处理在超时时间内收到的 digests,然后构造 request 消息,向其他 peer 节点发送 request 消息,默认情况下,等待 digest 消息的超时时间是 1 秒。

		收到 hello 消息的 peer 节点,会将 nonce 值暂存在本地,然后设置等待 request 消息的超时时间,这个超时时间一过,peer 节点就会将暂存在本地的 nonce 给删除掉,一旦删除掉,后续,
		启动者如果再基于该 nonce 向 peer 节点发送 request 消息,则会被 peer 节点忽视。默认情况下,peer 节点设置的等待 request 消息的超时时间是 1.5 秒。接着 peer 节点会将存储在
		本地状态机中的值逐一取出,在这里我们把这些值看成是 digest,hello 消息中不仅包含 nonce,其实还包含一个 context,一般情况下,context 代表发送 hello 消息的发送者地址(或姓名),
		即启动者的地址(或姓名),peer 节点利用摘要过滤器 DigestFilter,逐一过滤存储在本地状态机中的 digest。注意观察摘要过滤器的定义,我们可以发现,摘要过滤器它是一个返回值是函数的
		函数,我们将 DigestFilter 视为母函数,DigestFilter 的返回值视为子函数,母函数的入参是 interface{},参数名就叫 context,因此,我们不难猜出母函数返回的子函数应当与 hello 消
		息中包含的 context 相关,也就是与启动者相关,而子函数的入参是摘要值,所以摘要过滤器会将哪些摘要值过滤出来可能取决于启动者的身份。

		收到 digest 消息的启动者,会将 digest 逐一存储到本地,并建立 digest 与发送者之间的联系,即启动者需要知道每个 digest 是由哪些 peer 节点发送的,之所以是 “哪些”,是因为不同的
		peer 节点可能会发送相同的 digest。当等待 digest 的超时时间一过,则启动者会根据在超时时间内收到的 digest 构建一个 request 消息,并且此时启动者将不会再接收新的 digest 消息。、
		request 构造规则可以通过举一个例子来说明:例如有两个 peer 节点:p1 和 p2,p1 给启动者发送的 digests 是 [1 2 3],p2 给启动者发送的 digests 是 [2 4 3],那么启动者收到的
		digests 经过去重处理后是 [1 2 3 4],digest 与发送者之间的联系如下:
			1 => {p1}
			2 => {p1, p2}
			3 => {p1, p2}
			4 => {p2}
		启动者会随机构造 2 个 request 消息,它首先遍历 [1 2 3 4],首先是 1 这个 digest,它仅由 p1 发送,所以启动者构造 req1{[1], p1};接着遍历到 2,它由 p1 和 p2 发送,启动者从 p1
		和p2 中随机选一个,例如选到 p1,那么启动者更新 req1{[1, 2], p1};接着遍历到 3,它由 p1 和 p2 发送,启动者从 p1 和 p2 中随机选一个,例如选到 p2,那么启动者构造 req2{[3], p1};
		最后遍历到 4,它仅由 p2 发送,因此,启动者更新 req2{[3, 4], p2}。启动者分别将 req1 和 req2 发送给 p1 和 p2,并进入等待 response 消息的超时时间内,一旦超时时间一过,则启动者会
		结束本次 pull 进程。

		收到 request 消息的 peer 节点,会解析 request 消息,得到其中的 digests,然后逐一提取其中的 digest,并判断本地状态机中是否有存储该 digest,其次还会根据摘要过滤器 DigestFilter
		判断此 digest 能不能发送给启动者,如果本地状态机存有该 digest 且过滤器判断结果是可以发送,那么 peer 节点就会构造 response 消息,将能发送的 digest(item)发送给启动者。

		收到 response 消息的启动者,会将 response 消息中的 items 存储到本地状态机中。

		从上面的过程可以看出,DigestWaitTime 必须小于 RequestWaitTime,如果 DigestWaitTime 大于或等于 RequestWaitTime,那么 peer 节点会率先因为超时(RequestWaitTime)将暂存
		在本地的 nonce 删除掉。之后启动者才因为超时(DigestWaitTime)向 peer 节点发送 request 消息,这样的话就已经迟了,peer 节点会因为在本地找不到 request 消息中的 nonce 而忽
		视掉启动者发送来的 request。而 ResponseWaitTime 的大小则与 DigestWaitTime 和 RequestWaitTime 无关,它主要取决于网络质量的好坏。
	*/
	DigestWaitTime   = 1000 * time.Millisecond
	RequestWaitTime  = 1500 * time.Millisecond
	ResponseWaitTime = 2000 * time.Millisecond
)

Variables

This section is empty.

Functions

This section is empty.

Types

type DigestFilter

type DigestFilter func(context interface{}) func(digestItem string) bool

DigestFilter 会根据消息的上下文过滤出来要发送给启动者的摘要。

type PullAdapter

type PullAdapter interface {
	SelectPeers() []string

	Hello(dest string, nonce uint64)

	// context 可能代表的是 digest 消息的接收者地址
	SendDigest(digest []string, nonce uint64, context interface{})

	SendReq(dest string, items []string, nonce uint64)

	// context 可能代表的是 response 消息的接收者地址
	SendRes(items []string, context interface{}, nonce uint64)
}

type PullEngince

type PullEngince struct {
	PullAdapter
	PullEngineConfig
	// contains filtered or unexported fields
}

func NewPullEngine

func NewPullEngine(participant PullAdapter, sleepTime time.Duration, config PullEngineConfig) *PullEngince

func NewPullEngineWithFilter

func NewPullEngineWithFilter(participant PullAdapter, sleepTime time.Duration, df DigestFilter, config PullEngineConfig) *PullEngince

func (*PullEngince) Add

func (pe *PullEngince) Add(seqs ...string)

func (*PullEngince) OnDigest

func (pe *PullEngince) OnDigest(digests []string, nonce uint64, context interface{})

OnDigest 告诉 PullEngine 有一个 digest 到了,处理远程对等节点返回的摘要信息,并将相关的项目添加到PullEngine的状态中。

func (*PullEngince) OnHello

func (pe *PullEngince) OnHello(nonce uint64, context interface{})

OnHello 告诉 PullEngine 有一条 hello 消息到了,将自己本地存储的 digest 发送给 hello 消息的发送者。

func (*PullEngince) OnReq

func (pe *PullEngince) OnReq(items []string, nonce uint64, context interface{})

OnReq 告诉 PullEngine 有一个 request 到了,将 OnReq 方法的第一个入参发送给发送 request 的 peer 节点。

func (*PullEngince) OnRes

func (pe *PullEngince) OnRes(items []string, nonce uint64)

OnRes 提醒 PullEngine 有一个 response 消息到了,将 OnRes 的第一个入参存储到本地 state 中。

func (*PullEngince) Remove

func (pe *PullEngince) Remove(seqs ...string)

func (*PullEngince) Stop

func (pe *PullEngince) Stop()

type PullEngineConfig

type PullEngineConfig struct {
	DigestWaitTime   time.Duration
	RequestWaitTime  time.Duration
	ResponseWaitTime time.Duration
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL