rpc

package module
v0.0.9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 17, 2022 License: MIT Imports: 12 Imported by: 3

README

go-libp2p-pubsub-rpc

Made by Textile Chat on Slack standard-readme compliant

RPC over libp2p pubsub with error handling

Table of Contents

Background

go-libp2p-pubsub-rpc is an extension to go-libp2p-pubsub that provides RPC-like functionality:

  • Request/Response pattern with a peer
  • Request/Multi-Response pattern with multiple peers

Install

go get github.com/textileio/go-libp2p-pubsub-rpc

Usage

package main

import (
	"context"
	"errors"
	"fmt"
	"time"

	core "github.com/libp2p/go-libp2p-core/peer"
	rpc "github.com/textileio/go-libp2p-pubsub-rpc"
	"github.com/textileio/go-libp2p-pubsub-rpc/peer"
)

func main() {

	//
	// PING PONG WITH TWO PEERS
	//

	// create a peer
	p1, _ := peer.New(peer.Config{
		RepoPath:   "/tmp/repo1",
		EnableMDNS: true, // can be used when peers are on the same network
	})

	// create another peer
	p2, _ := peer.New(peer.Config{
		RepoPath:   "/tmp/repo2",
		EnableMDNS: true,
	})

	eventHandler := func(from core.ID, topic string, msg []byte) {
		fmt.Printf("%s event: %s %s\n", topic, from, msg)
	}
	messageHandler := func(from core.ID, topic string, msg []byte) ([]byte, error) {
		fmt.Printf("%s message: %s %s\n", topic, from, msg)
		if string(msg) == "ping" {
			return []byte("pong"), nil
		} else {
			return nil, errors.New("invalid request")
		}
	}

	t1, _ := p1.NewTopic(context.Background(), "mytopic", true) // no need to subscribe if only publishing
	t1.SetEventHandler(eventHandler)                            // event handler reports topic membership events
	t1.SetMessageHandler(messageHandler)                        // message handle is any func that returns a response and error

	t2, _ := p2.NewTopic(context.Background(), "mytopic", true)
	t2.SetEventHandler(eventHandler)
	t2.SetMessageHandler(messageHandler) // using same message handler as peer1, but this could be anything

	time.Sleep(time.Second) // wait for mdns discovery

	// peer1 requests "pong" from peer2
	rc1, _ := t1.Publish(context.Background(), []byte("ping"))
	r1 := <-rc1
	// check r1.Err
	fmt.Printf("peer1 received \"%s\" from %s\n", r1.Data, r1.From)

	// peer2 requests "pong" from peer1
	rc2, _ := t2.Publish(context.Background(), []byte("ping"))
	r2 := <-rc2
	// check r2.Err
	fmt.Printf("peer2 received \"%s\" from %s\n", r2.Data, r2.From)

	// peers can respond with an error
	rc3, _ := t2.Publish(context.Background(), []byte("not a ping"))
	r3 := <-rc3
	fmt.Printf("peer2 received error \"%s\" from %s\n", r3.Err, r3.From)

	//
	// PING PONG WITH MULTIPLE PEERS
	//

	// create another peer
	p3, _ := peer.New(peer.Config{
		RepoPath:   "/tmp/repo3",
		EnableMDNS: true,
	})
	t3, _ := p3.NewTopic(context.Background(), "mytopic", true)
	t3.SetEventHandler(eventHandler)
	t3.SetMessageHandler(messageHandler)

	time.Sleep(time.Second) // wait for mdns discovery

	// peer1 requests "pong" from peer2 and peer3
	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
	defer cancel()
	rc, _ := t1.Publish(ctx, []byte("ping"), rpc.WithMultiResponse(true))
	for r := range rc {
		// check r.Err
		fmt.Printf("peer1 received \"%s\" from %s\n", r.Data, r.From)
	}
}

Contributing

Pull requests and bug reports are very welcome ❤️

This repository falls under the Textile Code of Conduct.

Feel free to get in touch by:

Changelog

A changelog is published along with each release.

License

MIT

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (

	// ErrResponseNotReceived indicates a response was not received after publishing a message.
	ErrResponseNotReceived = errors.New("response not received")
)

Functions

This section is empty.

Types

type EventHandler

type EventHandler func(from peer.ID, topic string, msg []byte)

EventHandler is used to receive topic peer events.

type MessageHandler

type MessageHandler func(from peer.ID, topic string, msg []byte) ([]byte, error)

MessageHandler is used to receive topic messages.

type PublishOption

type PublishOption func(*PublishOptions) error

PublishOption defines a Publish option.

func WithIgnoreResponse

func WithIgnoreResponse(enable bool) PublishOption

WithIgnoreResponse indicates whether or not Publish will wait for a response(s) from the receiver(s). Default: disabled.

func WithMultiResponse

func WithMultiResponse(enable bool) PublishOption

WithMultiResponse indicates whether or not Publish will wait for multiple responses before returning. Default: disabled.

func WithPubOpts

func WithPubOpts(opts ...pubsub.PubOpt) PublishOption

WithPubOpts sets native pubsub.PubOpt options.

func WithRepublishing added in v0.0.5

func WithRepublishing(enable bool) PublishOption

WithRepublishing indicates whether or not Publish will continue republishing to newly joined peers as long as the context hasn't expired or is not canceled. Default: disabled.

type PublishOptions

type PublishOptions struct {
	// contains filtered or unexported fields
}

PublishOptions defines options for Publish.

type Response

type Response struct {
	// ID is the cid.Cid of the received message.
	ID string
	// From is the peer.ID of the sender.
	From peer.ID
	// Data is the message data.
	Data []byte
	// Err is an error from the sender.
	Err error
}

Response wraps a message response.

type Topic

type Topic struct {
	// contains filtered or unexported fields
}

Topic provides a nice interface to a libp2p pubsub topic.

func NewTopic

func NewTopic(ctx context.Context, ps *pubsub.PubSub, host peer.ID, topic string, subscribe bool) (*Topic, error)

NewTopic returns a new topic for the host.

func (*Topic) Close

func (t *Topic) Close() error

Close the topic.

func (*Topic) Publish

func (t *Topic) Publish(
	ctx context.Context,
	data []byte,
	opts ...PublishOption,
) (<-chan Response, error)

Publish data. Note that the data may arrive peers duplicated. And as a result, if WithMultiResponse is supplied, the response may be duplicated as well. See PublishOptions for option details.

func (*Topic) SetEventHandler

func (t *Topic) SetEventHandler(handler EventHandler)

SetEventHandler sets a handler func that will be called with peer events.

func (*Topic) SetMessageHandler

func (t *Topic) SetMessageHandler(handler MessageHandler)

SetMessageHandler sets a handler func that will be called with topic messages. A subscription is required for the handler to be called.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL