minogrpc

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 10, 2024 License: BSD-3-Clause Imports: 39 Imported by: 3

Documentation

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewAddressFactory

func NewAddressFactory() mino.AddressFactory

NewAddressFactory returns a new address factory.

func ParseAddress

func ParseAddress(ip string, port uint16) net.Addr

ParseAddress is a helper to create a TCP network address.

Types

type Endpoint

type Endpoint struct {
	// We need this mutex to prevent two processes from concurrently checking
	// that the stream session must be created. Using a sync.Map would require
	// to use the "LoadOrStore" function, which would make us create the session
	// each time, but only saving it the first time.
	sync.RWMutex

	Handler mino.Handler
	Factory serde.Factory
	// contains filtered or unexported fields
}

Endpoint defines the requirement of an endpoint. Since the endpoint can be called multiple times concurrently we need a mutex and we need to use the same sender/receiver.

type Joinable

type Joinable interface {
	mino.Mino

	// ServeTLS returns true if this node is running with TLS for gRPC.
	ServeTLS() bool

	// GetCertificateChain returns the certificate chain of the instance.
	GetCertificateChain() certs.CertChain

	// GetCertificateStore returns the certificate storage which contains every
	// known peer certificate.
	GetCertificateStore() certs.Storage

	// GenerateToken returns a token that can be provided by a distant peer to
	// mutually share certificates with this instance.
	GenerateToken(expiration time.Duration) string

	// Join tries to mutually share certificates of the distant address in
	// parameter using the token as a credential. The certificate of the distant
	// address digest is compared against the one in parameter.
	//
	// The token and the certificate digest are provided by the distant peer
	// over a secure channel.
	//
	// Only the "host" and "path" parts are used in the URL, which must be of
	// form //<host>:<port>/<path>
	Join(addr *url.URL, token string, certHash []byte) error
}

Joinable is an extension of the mino.Mino interface to allow distant servers to join a network of participants.

type Minogrpc

type Minogrpc struct {
	// contains filtered or unexported fields
}

Minogrpc is an implementation of a minimalist network overlay using gRPC internally to communicate with distant peers.

- implements mino.Mino - implements fmt.Stringer

func NewMinogrpc

func NewMinogrpc(listen net.Addr, public *url.URL, router router.Router, opts ...Option) (
	*Minogrpc,
	error,
)

NewMinogrpc creates and starts a new instance. it will try to listen for the address and returns an error if it fails. "listen" is the local address, while "public" is the public node address. If public is empty it uses the local address. Public does not support any scheme, it should be of form //<hostname>:<port>/<path>.

func (*Minogrpc) CreateRPC

func (m *Minogrpc) CreateRPC(name string, h mino.Handler, f serde.Factory) (mino.RPC, error)

CreateRPC implements Mino. It returns a newly created rpc with the provided name and reserved for the current namespace. When contacting distant peers, it will only talk to mirrored RPCs with the same name and namespace.

func (*Minogrpc) GenerateToken

func (m *Minogrpc) GenerateToken(expiration time.Duration) string

GenerateToken implements minogrpc.Joinable. It generates and returns a new token that will be valid for the given amount of time.

func (*Minogrpc) GetAddress

func (m *Minogrpc) GetAddress() mino.Address

GetAddress implements mino.Mino. It returns the address of the server.

func (*Minogrpc) GetAddressFactory

func (m *Minogrpc) GetAddressFactory() mino.AddressFactory

GetAddressFactory implements mino.Mino. It returns the address factory.

func (Minogrpc) GetCertificateChain

func (o Minogrpc) GetCertificateChain() certs.CertChain

GetCertificateChain returns the certificate of the overlay with its private key set.

func (Minogrpc) GetCertificateStore

func (o Minogrpc) GetCertificateStore() certs.Storage

GetCertificateStore returns the certificate store.

func (*Minogrpc) GetTrafficWatcher

func (m *Minogrpc) GetTrafficWatcher() traffic.Watcher

GetTrafficWatcher returns the traffic watcher.

func (*Minogrpc) GracefulStop

func (m *Minogrpc) GracefulStop() error

GracefulStop first stops the grpc server then waits for the remaining handlers to close.

func (Minogrpc) Join

func (o Minogrpc) Join(addr *url.URL, token string, certHash []byte) error

Join sends a join request to a distant node with a token generated by the remote node. The certHash is used to make sure that no man-in-the-middle intercepts the communication. If the certHash is empty, it supposes that a transparent proxy is handling the TLS connection and that we can trust the CAs in place.

func (Minogrpc) ServeTLS

func (o Minogrpc) ServeTLS() bool

ServeTLS returns true if the gRPC server uses TLS

func (*Minogrpc) Stop

func (m *Minogrpc) Stop() error

Stop stops the server immediately.

func (*Minogrpc) String

func (m *Minogrpc) String() string

String implements fmt.Stringer. It prints a short description of the instance.

func (*Minogrpc) WithSegment

func (m *Minogrpc) WithSegment(segment string) mino.Mino

WithSegment returns a new mino instance that will have its URI path extended with the provided segment. The segment can not be empty and should match [a-zA-Z0-9]+

type Option

type Option func(*minoTemplate)

Option is the type to set some fields when instantiating an overlay.

func NoTLS

func NoTLS() Option

NoTLS sets up the gRPC server to serve plain connections only.

func WithCert

func WithCert(cert *tls.Certificate) Option

WithCert is an option to set the node's certificate in case it is not already present in the certificate store.

func WithCertificateKey

func WithCertificateKey(secret, public interface{}) Option

WithCertificateKey is an option to set the key of the server certificate.

func WithRandom

func WithRandom(r io.Reader) Option

WithRandom is an option to set the randomness if the certificate private key needs to be generated.

func WithStorage

func WithStorage(certs certs.Storage) Option

WithStorage is an option to set a different certificate storage.

type RPC

type RPC struct {
	// contains filtered or unexported fields
}

RPC represents an RPC that has been registered by a client, which allows clients to call an RPC that will execute the provided handler.

- implements mino.RPC

func (*RPC) Call

func (rpc *RPC) Call(ctx context.Context,
	req serde.Message, players mino.Players) (<-chan mino.Response, error)

Call implements mino.RPC. It calls the RPC on each provided address.

Example
mA, err := NewMinogrpc(ParseAddress("127.0.0.1", 0), nil, tree.NewRouter(NewAddressFactory()))
if err != nil {
	panic("overlay A failed: " + err.Error())
}

rpcA := mino.MustCreateRPC(mA, "test", exampleHandler{}, exampleFactory{})

mB, err := NewMinogrpc(ParseAddress("127.0.0.1", 0), nil, tree.NewRouter(NewAddressFactory()))
if err != nil {
	panic("overlay B failed: " + err.Error())
}

mino.MustCreateRPC(mB, "test", exampleHandler{}, exampleFactory{})

mA.GetCertificateStore().Store(mB.GetAddress(), mB.GetCertificateChain())
mB.GetCertificateStore().Store(mA.GetAddress(), mA.GetCertificateChain())

addrs := mino.NewAddresses(mA.GetAddress(), mB.GetAddress())

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

resps, err := rpcA.Call(ctx, exampleMessage{value: "Hello World!"}, addrs)
if err != nil {
	panic("call failed: " + err.Error())
}

for resp := range resps {
	reply, err := resp.GetMessageOrError()
	if err != nil {
		panic("error in reply: " + err.Error())
	}

	if resp.GetFrom().Equal(mA.GetAddress()) {
		fmt.Println("A", reply.(exampleMessage).value)
	}
	if resp.GetFrom().Equal(mB.GetAddress()) {
		fmt.Println("B", reply.(exampleMessage).value)
	}
}
Output:

A Hello World!
B Hello World!

func (RPC) Stream

func (rpc RPC) Stream(ctx context.Context, players mino.Players) (mino.Sender, mino.Receiver, error)

Stream implements mino.RPC. It will open a stream to one of the addresses with a bidirectional channel that will send and receive packets. The chosen address will open one or several streams to the rest of the players. The choice of the gateway is first the local node if it belongs to the list, otherwise the first node of the list.

The way routes are created depends on the router implementation chosen for the endpoint. It can for instance use a tree structure, which means the network for 8 nodes could look like this:

  Orchestrator
        |
     __ A __
    /       \
   B         C
 / | \     /   \
D  E  F   G     H

If C has to send a message to B, it will send it through node A. Similarly, if D has to send a message to G, it will move up the tree through B, A and finally C.

Example
mA, err := NewMinogrpc(ParseAddress("127.0.0.1", 0), nil, tree.NewRouter(NewAddressFactory()))
if err != nil {
	panic("overlay A failed: " + err.Error())
}

rpcA := mino.MustCreateRPC(mA, "test", exampleHandler{}, exampleFactory{})

mB, err := NewMinogrpc(ParseAddress("127.0.0.1", 0), nil, tree.NewRouter(NewAddressFactory()))
if err != nil {
	panic("overlay B failed: " + err.Error())
}

mino.MustCreateRPC(mB, "test", exampleHandler{}, exampleFactory{})

mA.GetCertificateStore().Store(mB.GetAddress(), mB.GetCertificateChain())
mB.GetCertificateStore().Store(mA.GetAddress(), mA.GetCertificateChain())

addrs := mino.NewAddresses(mA.GetAddress(), mB.GetAddress())

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

sender, recv, err := rpcA.Stream(ctx, addrs)
if err != nil {
	panic("stream failed: " + err.Error())
}

err = <-sender.Send(exampleMessage{value: "Hello World!"}, mB.GetAddress())
if err != nil {
	panic("failed to send: " + err.Error())
}

from, msg, err := recv.Recv(ctx)
if err != nil {
	panic("failed to receive: " + err.Error())
}

if from.Equal(mB.GetAddress()) {
	fmt.Println("B", msg.(exampleMessage).value)
}
Output:

B Hello World!

Directories

Path Synopsis
Package certs defines a certificate store that will provide primitives to store and get certificates for a given address.
Package certs defines a certificate store that will provide primitives to store and get certificates for a given address.
Package controller implements a controller for minogrpc.
Package controller implements a controller for minogrpc.
Package ptypes contains the protobuf definitions for the implementation of minogrpc.
Package ptypes contains the protobuf definitions for the implementation of minogrpc.
Package session defines an abstraction of a session during a distributed RPC.
Package session defines an abstraction of a session during a distributed RPC.
Package tokens defines a token holder to generate and validate access tokens.
Package tokens defines a token holder to generate and validate access tokens.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL