p2p

package module
v0.0.0-...-9e68871 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 16, 2018 License: Apache-2.0 Imports: 22 Imported by: 1

README

P2P

GoDoc Build Status

A simple P2P (peer-to-peer) network implementation.

Introduction

Features
  1. Peer discovery and auto management.
  2. Scalability of services.
  3. Broadcast message (flood protocol).
Network

In the P2P network, each node will choose some nodes as their neighbors.

             +------+
    +------->+ node +<------+
    |        +------+       |
    |                       |
    v                       v
+---+--+                 +--+---+
| node |                 | node |
+---+--+                 +--+---+
    ^                       ^
    |                       |
    |        +------+       |
    +------->+ node +<------+
             +------+
Node & Peer

The neighbor "nodes" are looked as "peers" for a node. A node will management the connections for neighbor peers and provide some services for other nodes. We use gRPC for communication between the node and neighbor peers.

                                gRPC
                                  |
                                  v       +----+
                       +-----connection-->+peer|
                       |                  +----+
                       |
                       |
                       |
                   +---+--+               +----+
+----Service---+-->+ node +--connection-->+peer|
|PeerManager   |   +---+--+               +----+
|MessageManager|       |
+--------------+       |
                       |
                       |                  +----+
                       +-----connection-->+peer|
                                          +----+

Install

Fist, use go get to install the latest version of the library:

go get -u github.com/lynn9388/p2p

Next, include this package in your application:

import "github.com/lynn9388/p2p"

Example

The code below shows how to create and launch a new node.

func main() {
	port := flag.Int("port", 9388, "port for server")
	flag.Parse()

	node := p2p.NewNode("localhost:" + strconv.Itoa(*port))
	node.StartServer()
	defer node.StopServer()
	node.PeerManager.StartDiscoverPeers("localhost:9388")
	defer node.PeerManager.StopDiscoverPeers()
	node.Wait()
}

Try to run several examples with different port.

For more information you can check the GoDoc

Documentation

Overview

Package p2p implements a node in P2P network.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func RegisterMessageServiceServer

func RegisterMessageServiceServer(s *grpc.Server, srv MessageServiceServer)

func RegisterNodeServiceServer

func RegisterNodeServiceServer(s *grpc.Server, srv NodeServiceServer)

func RegisterPeerServiceServer

func RegisterPeerServiceServer(s *grpc.Server, srv PeerServiceServer)

Types

type MessageManager

type MessageManager struct {
	ProcessSet  map[string]Process // process for every message type
	MessageLogs []messageLog       // logs for sent/received messages
}

MessageManager is the service to receive and process messages.

func NewMessageManager

func NewMessageManager() *MessageManager

NewMessageManager returns a initialized message manager.

func (*MessageManager) ReceiveMessage

func (mm *MessageManager) ReceiveMessage(ctx context.Context, msg *any.Any) (*any.Any, error)

ReceiveMessage receives message from a peer and process it.

func (*MessageManager) RegisterProcess

func (mm *MessageManager) RegisterProcess(x proto.Message, p Process)

RegisterProcess registers a process for a type of message.

func (*MessageManager) SendMessage

func (mm *MessageManager) SendMessage(ctx context.Context, sender string, conn *grpc.ClientConn, msg proto.Message, timeout time.Duration) (*any.Any, error)

SendMessage sends message to a peer through a connection.

type MessageServiceClient

type MessageServiceClient interface {
	// ReceiveMessage receives message from a peer.
	ReceiveMessage(ctx context.Context, in *any.Any, opts ...grpc.CallOption) (*any.Any, error)
}

MessageServiceClient is the client API for MessageService service.

For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream.

func NewMessageServiceClient

func NewMessageServiceClient(cc *grpc.ClientConn) MessageServiceClient

type MessageServiceServer

type MessageServiceServer interface {
	// ReceiveMessage receives message from a peer.
	ReceiveMessage(context.Context, *any.Any) (*any.Any, error)
}

MessageServiceServer is the server API for MessageService service.

type Node

type Node struct {
	Addr            string         // network address
	Server          *grpc.Server   // gRPC server
	Waiter          sync.WaitGroup // wait server running in background
	PeerManager     *PeerManager   // peer manager
	*MessageManager                // message manager
}

Node is a independent entity in the P2P network.

func NewNode

func NewNode(addr string) *Node

NewNode initials a new node with specific network address.

func (*Node) Broadcast

func (n *Node) Broadcast(msg proto.Message, timeout time.Duration) error

Broadcast sends a broadcast message to neighbor peers.

func (*Node) ReceiveBroadcast

func (n *Node) ReceiveBroadcast(ctx context.Context, msg *any.Any) (*any.Any, error)

ReceiveBroadcast receives message and relay message to neighbor peers. The node will not broadcast messages with same content within 1 minutes.

func (*Node) SendMessage

func (n *Node) SendMessage(addr string, msg proto.Message, timeout time.Duration) (*any.Any, error)

SendMessage sends a message to a peer.

func (*Node) StartServer

func (n *Node) StartServer()

StartServer starts server to provide services. This must be called after registering any other external service.

func (*Node) StopServer

func (n *Node) StopServer()

StopServer stops the server.

func (*Node) Wait

func (n *Node) Wait()

Wait keeps the server of the node running in background.

type NodeServiceClient

type NodeServiceClient interface {
	// ReceiveBroadcast receives message and broadcasts it to neighbor peers.
	ReceiveBroadcast(ctx context.Context, in *any.Any, opts ...grpc.CallOption) (*any.Any, error)
}

NodeServiceClient is the client API for NodeService service.

For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream.

func NewNodeServiceClient

func NewNodeServiceClient(cc *grpc.ClientConn) NodeServiceClient

type NodeServiceServer

type NodeServiceServer interface {
	// ReceiveBroadcast receives message and broadcasts it to neighbor peers.
	ReceiveBroadcast(context.Context, *any.Any) (*any.Any, error)
}

NodeServiceServer is the server API for NodeService service.

type PeerManager

type PeerManager struct {
	Peers map[string]*peer // known remote peers
	Mux   sync.RWMutex     // mutual exclusion lock for peers
	// contains filtered or unexported fields
}

PeerManager manages the peers that a local node known.

func NewPeerManager

func NewPeerManager(self string) *PeerManager

NewPeerManager returns a new peer manager with its own network address.

func (*PeerManager) AddPeers

func (pm *PeerManager) AddPeers(addresses ...string)

AddPeers adds peers to the peer manager if a peer's network address is unknown before.

func (*PeerManager) Disconnect

func (pm *PeerManager) Disconnect(addr string) error

Disconnect closes the connection to the peer.

func (*PeerManager) DisconnectAll

func (pm *PeerManager) DisconnectAll()

DisconnectAll closes all the connections to known peers.

func (*PeerManager) GetConnection

func (pm *PeerManager) GetConnection(addr string) (*grpc.ClientConn, error)

GetConnection returns a connection to a peer.

func (*PeerManager) GetNeighbors

func (pm *PeerManager) GetNeighbors(ctx context.Context, addr *wrappers.StringValue) (*Peers, error)

GetNeighbors returns the already known neighbor peers, and add the requester into the known peers list if it's not known before.

func (*PeerManager) GetPeerState

func (pm *PeerManager) GetPeerState(addr string) connectivity.State

GetPeerState returns the state of connection to a peer

func (*PeerManager) GetPeers

func (pm *PeerManager) GetPeers() []string

GetPeers returns all the peers' addresses in the peer manager.

func (*PeerManager) GetPeersNum

func (pm *PeerManager) GetPeersNum() int

GetPeersNum returns the number of peers in the peer manager.

func (*PeerManager) RemovePeer

func (pm *PeerManager) RemovePeer(addr string) error

RemovePeer removes a peer from the peer manager. It disconnects the connection relative to the peer before removing.

func (*PeerManager) StartDiscoverPeers

func (pm *PeerManager) StartDiscoverPeers(bootstraps ...string)

StartDiscoverPeers starts discovering new peers via bootstraps.

func (*PeerManager) StopDiscoverPeers

func (pm *PeerManager) StopDiscoverPeers()

StopDiscoverPeers stops discovering new peers and disconnect all connections.

func (*PeerManager) Wait

func (pm *PeerManager) Wait()

Wait keeps the peer manager running in background.

type PeerServiceClient

type PeerServiceClient interface {
	// GetNeighbors returns the already known neighbor peers.
	GetNeighbors(ctx context.Context, in *wrappers.StringValue, opts ...grpc.CallOption) (*Peers, error)
}

PeerServiceClient is the client API for PeerService service.

For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream.

func NewPeerServiceClient

func NewPeerServiceClient(cc *grpc.ClientConn) PeerServiceClient

type PeerServiceServer

type PeerServiceServer interface {
	// GetNeighbors returns the already known neighbor peers.
	GetNeighbors(context.Context, *wrappers.StringValue) (*Peers, error)
}

PeerServiceServer is the server API for PeerService service.

type Peers

type Peers struct {
	Peers                []string `protobuf:"bytes,1,rep,name=Peers,proto3" json:"Peers,omitempty"`
	XXX_NoUnkeyedLiteral struct{} `json:"-"`
	XXX_unrecognized     []byte   `json:"-"`
	XXX_sizecache        int32    `json:"-"`
}

Peers is a list of peer's network address.

func (*Peers) Descriptor

func (*Peers) Descriptor() ([]byte, []int)

func (*Peers) GetPeers

func (m *Peers) GetPeers() []string

func (*Peers) ProtoMessage

func (*Peers) ProtoMessage()

func (*Peers) Reset

func (m *Peers) Reset()

func (*Peers) String

func (m *Peers) String() string

func (*Peers) XXX_DiscardUnknown

func (m *Peers) XXX_DiscardUnknown()

func (*Peers) XXX_Marshal

func (m *Peers) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*Peers) XXX_Merge

func (dst *Peers) XXX_Merge(src proto.Message)

func (*Peers) XXX_Size

func (m *Peers) XXX_Size() int

func (*Peers) XXX_Unmarshal

func (m *Peers) XXX_Unmarshal(b []byte) error

type Process

type Process func(*any.Any) (*any.Any, error)

Process is the function to process one type of message.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL