Documentation ¶
Index ¶
- Variables
- func Dial(addr string, opts ...grpc.DialOption) (*grpc.ClientConn, error)
- func GetHashID(key []byte) []byte
- func IDToString(id []byte) string
- func NewInode(id string, addr string) *cm.Node
- type Config
- type GrpcTransport
- func (g *GrpcTransport) CheckPredecessor(node *cm.Node) error
- func (g *GrpcTransport) DeleteKey(node *cm.Node, key []byte) error
- func (g *GrpcTransport) DeleteKeys(node *cm.Node, keys [][]byte) error
- func (g *GrpcTransport) FindSuccessor(node *cm.Node, id []byte) (*cm.Node, error)
- func (g *GrpcTransport) GetKey(node *cm.Node, key []byte) (*cm.GetResponse, error)
- func (g *GrpcTransport) GetPredecessor(node *cm.Node) (*cm.Node, error)
- func (g *GrpcTransport) GetServer() *grpc.Server
- func (g *GrpcTransport) GetSuccessor(node *cm.Node) (*cm.Node, error)
- func (g *GrpcTransport) Notify(node, pred *cm.Node) error
- func (g *GrpcTransport) RequestKeys(node *cm.Node, from, to []byte) ([]*cm.KV, error)
- func (g *GrpcTransport) SetKey(node *cm.Node, key, value []byte) error
- func (g *GrpcTransport) SetPredecessor(node *cm.Node, pred *cm.Node) error
- func (g *GrpcTransport) SetSuccessor(node *cm.Node, succ *cm.Node) error
- func (g *GrpcTransport) Start() error
- func (g *GrpcTransport) Stop() error
- type Message
- type Node
- func (n *Node) CheckPredecessor(ctx context.Context, id *cm.ID) (*cm.ER, error)
- func (n *Node) Delete(key []byte) error
- func (n *Node) Find(key []byte) (*cm.Node, error)
- func (n *Node) FindSuccessor(ctx context.Context, id *cm.ID) (*cm.Node, error)
- func (node *Node) FingerTableString() string
- func (n *Node) Get(key []byte) ([]byte, error)
- func (n *Node) GetConfig() *Config
- func (n *Node) GetPredecessor(ctx context.Context, r *cm.ER) (*cm.Node, error)
- func (n *Node) GetShutdownCh() chan struct{}
- func (n *Node) GetStorage() Storage
- func (n *Node) GetSuccessor(ctx context.Context, r *cm.ER) (*cm.Node, error)
- func (n *Node) Notify(ctx context.Context, node *cm.Node) (*cm.ER, error)
- func (n *Node) Set(key []byte, value []byte) error
- func (n *Node) SetPredecessor(ctx context.Context, pred *cm.Node) (*cm.ER, error)
- func (n *Node) SetSuccessor(ctx context.Context, succ *cm.Node) (*cm.ER, error)
- func (n *Node) Stop()
- func (n *Node) XDelete(ctx context.Context, req *cm.DeleteRequest) (*cm.DeleteResponse, error)
- func (n *Node) XGet(ctx context.Context, req *cm.GetRequest) (*cm.GetResponse, error)
- func (n *Node) XMultiDelete(ctx context.Context, req *cm.MultiDeleteRequest) (*cm.DeleteResponse, error)
- func (n *Node) XRequestKeys(ctx context.Context, req *cm.RequestKeysRequest) (*cm.RequestKeysResponse, error)
- func (n *Node) XSet(ctx context.Context, req *cm.SetRequest) (*cm.SetResponse, error)
- type Storage
- type Transport
- type TxStorage
Constants ¶
This section is empty.
Variables ¶
Functions ¶
func Dial ¶
func Dial(addr string, opts ...grpc.DialOption) (*grpc.ClientConn, error)
func IDToString ¶
IDToString converts a []byte to a big.Int string, useful for debugging/logging.
Types ¶
type Config ¶
type Config struct { Id string Addr string ServerOpts []grpc.ServerOption DialOpts []grpc.DialOption Hash func() hash.Hash // Hash function to use HashSize int StabilizeMin time.Duration // Minimum stabilization time StabilizeMax time.Duration // Maximum stabilization time Timeout time.Duration MaxIdle time.Duration }
一个Node一个Config
func DefaultConfig ¶
func DefaultConfig() *Config
type GrpcTransport ¶
type GrpcTransport struct { *cm.UnimplementedChordServer // contains filtered or unexported fields }
func NewGrpcTransport ¶
func NewGrpcTransport(cnf *Config) (*GrpcTransport, error)
func NewGrpcTransport(config *Config) (cm.ChordClient, error) {
func (*GrpcTransport) CheckPredecessor ¶
func (g *GrpcTransport) CheckPredecessor(node *cm.Node) error
func (*GrpcTransport) DeleteKey ¶
func (g *GrpcTransport) DeleteKey(node *cm.Node, key []byte) error
func (*GrpcTransport) DeleteKeys ¶
func (g *GrpcTransport) DeleteKeys(node *cm.Node, keys [][]byte) error
func (*GrpcTransport) FindSuccessor ¶
FindSuccessor the successor ID of a remote node.
func (*GrpcTransport) GetKey ¶
func (g *GrpcTransport) GetKey(node *cm.Node, key []byte) (*cm.GetResponse, error)
func (*GrpcTransport) GetPredecessor ¶
GetPredecessor the successor ID of a remote node.
func (*GrpcTransport) GetServer ¶
func (g *GrpcTransport) GetServer() *grpc.Server
func (*GrpcTransport) GetSuccessor ¶
GetSuccessor the successor ID of a remote node.
func (*GrpcTransport) RequestKeys ¶
func (*GrpcTransport) SetKey ¶
func (g *GrpcTransport) SetKey(node *cm.Node, key, value []byte) error
func (*GrpcTransport) SetPredecessor ¶
func (*GrpcTransport) SetSuccessor ¶
func (*GrpcTransport) Start ¶
func (g *GrpcTransport) Start() error
type Node ¶
type Node struct { *cm.Node *cm.UnimplementedChordServer // contains filtered or unexported fields }
func NewNode ¶
NewNode creates a new Chord node. Returns error if node alreadyexists in the chord ring
func (*Node) CheckPredecessor ¶
func (*Node) FindSuccessor ¶
func (*Node) FingerTableString ¶
FingerTableString takes a node and converts it's finger table into a string.
func (*Node) GetPredecessor ¶
func (*Node) GetShutdownCh ¶
func (n *Node) GetShutdownCh() chan struct{}
func (*Node) GetStorage ¶
func (*Node) GetSuccessor ¶
ctx context.Context上下文 GetSuccessor gets the successor on the node..
func (*Node) Notify ¶
已验证逻辑,transfer_Keys部分存疑 Notify notifies Chord that Node(Client) thinks it is our predecessor Notify(n0): n0通知n它的存在,若此时n没有前序节点或,n0比n现有的前序节点更加靠近n,则n将其设置为前序节点。
func (*Node) SetPredecessor ¶
SetPredecessor sets the predecessor on the node..
func (*Node) SetSuccessor ¶
SetSuccessor sets the successor on the node..
func (*Node) XDelete ¶
func (n *Node) XDelete(ctx context.Context, req *cm.DeleteRequest) (*cm.DeleteResponse, error)
func (*Node) XGet ¶
func (n *Node) XGet(ctx context.Context, req *cm.GetRequest) (*cm.GetResponse, error)
获取key对应的数据
func (*Node) XMultiDelete ¶
func (n *Node) XMultiDelete(ctx context.Context, req *cm.MultiDeleteRequest) (*cm.DeleteResponse, error)
func (*Node) XRequestKeys ¶
func (n *Node) XRequestKeys(ctx context.Context, req *cm.RequestKeysRequest) (*cm.RequestKeysResponse, error)
func (*Node) XSet ¶
func (n *Node) XSet(ctx context.Context, req *cm.SetRequest) (*cm.SetResponse, error)
type Transport ¶
type Transport interface { Start() error Stop() error //RPC GetSuccessor(*cm.Node) (*cm.Node, error) FindSuccessor(*cm.Node, []byte) (*cm.Node, error) GetPredecessor(*cm.Node) (*cm.Node, error) Notify(*cm.Node, *cm.Node) error CheckPredecessor(*cm.Node) error SetPredecessor(*cm.Node, *cm.Node) error SetSuccessor(*cm.Node, *cm.Node) error //Storage GetKey(*cm.Node, []byte) (*cm.GetResponse, error) SetKey(*cm.Node, []byte, []byte) error DeleteKey(*cm.Node, []byte) error RequestKeys(*cm.Node, []byte, []byte) ([]*cm.KV, error) DeleteKeys(*cm.Node, [][]byte) error }
要实现一个 RPC 框架,只需要把以下三点实现了就基本完成了:
Call ID 映射:可以直接使用函数字符串,也可以使用整数 ID。映射表一般就是一个哈希表。 序列化反序列化:可以自己写,也可以使用 Protobuf 或者 FlatBuffers 之类的。 网络传输库:可以自己写 Socket,或者用 Asio,ZeroMQ,Netty 之类。
Transport enables a node to talk to the other nodes in the ring