chord

package module
v0.0.0-...-370c9f9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 17, 2021 License: MIT Imports: 19 Imported by: 0

README

Chord

配置说明

Main主机

MainNode与Orderer跑在同一台主机上,启动(重启)需要输入 ./script/setup.sh up 在Main主机上启动Orderer、MainDhtNode、Deliver。

关闭MainNode与Orderer需要输入 ./script/setup.sh down

删除已有块输入 rm -rf /tmp/hyperledger/

注意:Main主机需要fabric的工程与chord在同一级目录下,启动前修改config.go中的 LocalAddress 为Main主机在子网的IP地址。

DhtNode主机

可在一台启动多个DhtNode加入到MainNode的环中,启动(重启)需要输入 ./script/join_node.sh join

关闭当前机器上的DhtNode需要输入 ./script/setup.sh close

注意:启动前需要修改join_node.sh中的 ADDR 为当前DhtNode主机在子网的IP地址, PORTSNODEIDS分别修改为要定义的DhtNode的监听端口和ID,其数量要一致,且所有DhtNode的ID不能一致,如果有Dht节点挂掉再重新加入,则需要重启MainNode与所有的DhtNode。

Broadcast主机

在Main主机与DhtNode主机启动完成后,需要在Broadcast主机上输入 ./script/setup.sh msg [参数消息数量] [并发数] [消息大小(单位KB)]

注意:启动前修改setup.sh中的 ORDERER_ADDRESS 为Main主机的地址 + Orderer接受消息的监听端口。

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ERR_NO_SUCCESSOR  = errors.New("cannot find successor")
	ERR_NODE_EXISTS   = errors.New("node with id already exists")
	ERR_KEY_NOT_FOUND = errors.New("key not found")
	ERR_HASHSIZE      = errors.New("hashsize should not be less than hash func size")
)

Functions

func Dial

func Dial(addr string, opts ...grpc.DialOption) (*grpc.ClientConn, error)

func GetHashID

func GetHashID(key []byte) []byte

For testing

func IDToString

func IDToString(id []byte) string

IDToString converts a []byte to a big.Int string, useful for debugging/logging.

func NewInode

func NewInode(id string, addr string) *cm.Node

新建单个节点

Types

type Config

type Config struct {
	Id   string
	Addr string

	ServerOpts []grpc.ServerOption
	DialOpts   []grpc.DialOption

	Hash     func() hash.Hash // Hash function to use
	HashSize int

	StabilizeMin time.Duration // Minimum stabilization time
	StabilizeMax time.Duration // Maximum stabilization time

	Timeout time.Duration
	MaxIdle time.Duration
}

一个Node一个Config

func DefaultConfig

func DefaultConfig() *Config

func (*Config) Validate

func (c *Config) Validate() error

hashsize是finger表的记录个数。。。

type GrpcTransport

type GrpcTransport struct {
	*cm.UnimplementedChordServer
	// contains filtered or unexported fields
}

func NewGrpcTransport

func NewGrpcTransport(cnf *Config) (*GrpcTransport, error)

func NewGrpcTransport(config *Config) (cm.ChordClient, error) {

func (*GrpcTransport) CheckPredecessor

func (g *GrpcTransport) CheckPredecessor(node *cm.Node) error

func (*GrpcTransport) DeleteKey

func (g *GrpcTransport) DeleteKey(node *cm.Node, key []byte) error

func (*GrpcTransport) DeleteKeys

func (g *GrpcTransport) DeleteKeys(node *cm.Node, keys [][]byte) error

func (*GrpcTransport) FindSuccessor

func (g *GrpcTransport) FindSuccessor(node *cm.Node, id []byte) (*cm.Node, error)

FindSuccessor the successor ID of a remote node.

func (*GrpcTransport) GetKey

func (g *GrpcTransport) GetKey(node *cm.Node, key []byte) (*cm.GetResponse, error)

func (*GrpcTransport) GetPredecessor

func (g *GrpcTransport) GetPredecessor(node *cm.Node) (*cm.Node, error)

GetPredecessor the successor ID of a remote node.

func (*GrpcTransport) GetServer

func (g *GrpcTransport) GetServer() *grpc.Server

func (*GrpcTransport) GetSuccessor

func (g *GrpcTransport) GetSuccessor(node *cm.Node) (*cm.Node, error)

GetSuccessor the successor ID of a remote node.

func (*GrpcTransport) Notify

func (g *GrpcTransport) Notify(node, pred *cm.Node) error

func (*GrpcTransport) RequestKeys

func (g *GrpcTransport) RequestKeys(node *cm.Node, from, to []byte) ([]*cm.KV, error)

func (*GrpcTransport) SetKey

func (g *GrpcTransport) SetKey(node *cm.Node, key, value []byte) error

func (*GrpcTransport) SetPredecessor

func (g *GrpcTransport) SetPredecessor(node *cm.Node, pred *cm.Node) error

func (*GrpcTransport) SetSuccessor

func (g *GrpcTransport) SetSuccessor(node *cm.Node, succ *cm.Node) error

func (*GrpcTransport) Start

func (g *GrpcTransport) Start() error

func (*GrpcTransport) Stop

func (g *GrpcTransport) Stop() error

Shutdown the TCP transport

type Message

type Message struct {
	ConfigSeq uint64
	NormalMsg *cb.Envelope
	ConfigMsg *cb.Envelope
}

type Node

type Node struct {
	*cm.Node
	*cm.UnimplementedChordServer
	// contains filtered or unexported fields
}

func NewNode

func NewNode(cnf *Config, joinNode *cm.Node) (*Node, error)

NewNode creates a new Chord node. Returns error if node alreadyexists in the chord ring

func (*Node) CheckPredecessor

func (n *Node) CheckPredecessor(ctx context.Context, id *cm.ID) (*cm.ER, error)

func (*Node) Delete

func (n *Node) Delete(key []byte) error

func (*Node) Find

func (n *Node) Find(key []byte) (*cm.Node, error)

func (*Node) FindSuccessor

func (n *Node) FindSuccessor(ctx context.Context, id *cm.ID) (*cm.Node, error)

func (*Node) FingerTableString

func (node *Node) FingerTableString() string

FingerTableString takes a node and converts it's finger table into a string.

func (*Node) Get

func (n *Node) Get(key []byte) ([]byte, error)

func (*Node) GetConfig

func (n *Node) GetConfig() *Config

func (*Node) GetPredecessor

func (n *Node) GetPredecessor(ctx context.Context, r *cm.ER) (*cm.Node, error)

func (*Node) GetShutdownCh

func (n *Node) GetShutdownCh() chan struct{}

func (*Node) GetStorage

func (n *Node) GetStorage() Storage

func (*Node) GetSuccessor

func (n *Node) GetSuccessor(ctx context.Context, r *cm.ER) (*cm.Node, error)

ctx context.Context上下文 GetSuccessor gets the successor on the node..

func (*Node) Notify

func (n *Node) Notify(ctx context.Context, node *cm.Node) (*cm.ER, error)

已验证逻辑,transfer_Keys部分存疑 Notify notifies Chord that Node(Client) thinks it is our predecessor Notify(n0): n0通知n它的存在,若此时n没有前序节点或,n0比n现有的前序节点更加靠近n,则n将其设置为前序节点。

func (*Node) Set

func (n *Node) Set(key []byte, value []byte) error

func (*Node) SetPredecessor

func (n *Node) SetPredecessor(ctx context.Context, pred *cm.Node) (*cm.ER, error)

SetPredecessor sets the predecessor on the node..

func (*Node) SetSuccessor

func (n *Node) SetSuccessor(ctx context.Context, succ *cm.Node) (*cm.ER, error)

SetSuccessor sets the successor on the node..

func (*Node) Stop

func (n *Node) Stop()

删除本节点,将当前节点的前置节点和后继节点挂钩

func (*Node) XDelete

func (n *Node) XDelete(ctx context.Context, req *cm.DeleteRequest) (*cm.DeleteResponse, error)

func (*Node) XGet

func (n *Node) XGet(ctx context.Context, req *cm.GetRequest) (*cm.GetResponse, error)

获取key对应的数据

func (*Node) XMultiDelete

func (n *Node) XMultiDelete(ctx context.Context, req *cm.MultiDeleteRequest) (*cm.DeleteResponse, error)

func (*Node) XRequestKeys

func (n *Node) XRequestKeys(ctx context.Context, req *cm.RequestKeysRequest) (*cm.RequestKeysResponse, error)

func (*Node) XSet

func (n *Node) XSet(ctx context.Context, req *cm.SetRequest) (*cm.SetResponse, error)

type Storage

type Storage interface {
	Get([]byte) ([]byte, error)
	Set([]byte, []byte) error
	Delete([]byte) error
	Between([]byte, []byte) ([]*cm.KV, error)
	MDelete(...[]byte) error
}

type Transport

type Transport interface {
	Start() error
	Stop() error

	//RPC
	GetSuccessor(*cm.Node) (*cm.Node, error)
	FindSuccessor(*cm.Node, []byte) (*cm.Node, error)
	GetPredecessor(*cm.Node) (*cm.Node, error)
	Notify(*cm.Node, *cm.Node) error
	CheckPredecessor(*cm.Node) error
	SetPredecessor(*cm.Node, *cm.Node) error
	SetSuccessor(*cm.Node, *cm.Node) error

	//Storage
	GetKey(*cm.Node, []byte) (*cm.GetResponse, error)
	SetKey(*cm.Node, []byte, []byte) error
	DeleteKey(*cm.Node, []byte) error
	RequestKeys(*cm.Node, []byte, []byte) ([]*cm.KV, error)
	DeleteKeys(*cm.Node, [][]byte) error
}

要实现一个 RPC 框架,只需要把以下三点实现了就基本完成了:

Call ID 映射:可以直接使用函数字符串,也可以使用整数 ID。映射表一般就是一个哈希表。 序列化反序列化:可以自己写,也可以使用 Protobuf 或者 FlatBuffers 之类的。 网络传输库:可以自己写 Socket,或者用 Asio,ZeroMQ,Netty 之类。

Transport enables a node to talk to the other nodes in the ring

type TxStorage

type TxStorage interface {
	Storage
	GetMsgChan() chan *Message
}

func NewtxStorage

func NewtxStorage(hashFunc func() hash.Hash) TxStorage

Directories

Path Synopsis
examples
models

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL