pubsub

package
v0.0.0-...-ce94876 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 1, 2019 License: MIT, MIT Imports: 20 Imported by: 0

README

go-libp2p-pubsub

A pubsub system with flooding and gossiping variants.

PubSub is a work in progress, with floodsub as an initial protocol, followed by gossipsub (spec, gossipsub.go).

Table of Contents

Install

go get github.com/libp2p/go-libp2p-pubsub

Usage

To be used for messaging in p2p instrastructure (as part of libp2p) such as IPFS, Ethereum, other blockchains, etc.

Implementations

See libp2p/specs/pubsub#Implementations.

Contribute

Contributions welcome. Please check out the issues.

Check out our contributing document for more information on how we work, and about contributing in general. Please be aware that all interactions related to multiformats are subject to the IPFS Code of Conduct.

Small note: If editing the README, please conform to the standard-readme specification.

License

MIT © Jeromy Johnson

Documentation

Index

Constants

View Source
const (
	FloodSubID = protocol.ID("/floodsub/1.0.0")
)
View Source
const (
	GossipSubID = protocol.ID("/meshsub/1.0.0")
)
View Source
const (
	RandomSubID = protocol.ID("/randomsub/1.0.0")
)
View Source
const SignPrefix = "libp2p-pubsub:"

Variables

View Source
var (
	// overlay parameters
	GossipSubD   = 6
	GossipSubDlo = 4
	GossipSubDhi = 12

	// gossip parameters
	GossipSubHistoryLength = 5
	GossipSubHistoryGossip = 3

	// heartbeat interval
	GossipSubHeartbeatInitialDelay = 100 * time.Millisecond
	GossipSubHeartbeatInterval     = 1 * time.Second

	// fanout ttl
	GossipSubFanoutTTL = 60 * time.Second
)
View Source
var (
	RandomSubD = 6
)

Functions

This section is empty.

Types

type CacheEntry

type CacheEntry struct {
	// contains filtered or unexported fields
}

type FloodSubRouter

type FloodSubRouter struct {
	// contains filtered or unexported fields
}

func (*FloodSubRouter) AddPeer

func (fs *FloodSubRouter) AddPeer(peer.ID, protocol.ID)

func (*FloodSubRouter) Attach

func (fs *FloodSubRouter) Attach(p *PubSub)

func (*FloodSubRouter) HandleRPC

func (fs *FloodSubRouter) HandleRPC(rpc *RPC)

func (*FloodSubRouter) Join

func (fs *FloodSubRouter) Join(topic string)

func (*FloodSubRouter) Leave

func (fs *FloodSubRouter) Leave(topic string)

func (*FloodSubRouter) Protocols

func (fs *FloodSubRouter) Protocols() []protocol.ID

func (*FloodSubRouter) Publish

func (fs *FloodSubRouter) Publish(from peer.ID, msg *pb.Message)

func (*FloodSubRouter) RemovePeer

func (fs *FloodSubRouter) RemovePeer(peer.ID)

type GossipSubRouter

type GossipSubRouter struct {
	// contains filtered or unexported fields
}

GossipSubRouter is a router that implements the gossipsub protocol. For each topic we have joined, we maintain an overlay through which messages flow; this is the mesh map. For each topic we publish to without joining, we maintain a list of peers to use for injecting our messages in the overlay with stable routes; this is the fanout map. Fanout peer lists are expired if we don't publish any messages to their topic for GossipSubFanoutTTL.

func (*GossipSubRouter) AddPeer

func (gs *GossipSubRouter) AddPeer(p peer.ID, proto protocol.ID)

func (*GossipSubRouter) Attach

func (gs *GossipSubRouter) Attach(p *PubSub)

func (*GossipSubRouter) HandleRPC

func (gs *GossipSubRouter) HandleRPC(rpc *RPC)

func (*GossipSubRouter) Join

func (gs *GossipSubRouter) Join(topic string)

func (*GossipSubRouter) Leave

func (gs *GossipSubRouter) Leave(topic string)

func (*GossipSubRouter) Protocols

func (gs *GossipSubRouter) Protocols() []protocol.ID

func (*GossipSubRouter) Publish

func (gs *GossipSubRouter) Publish(from peer.ID, msg *pb.Message)

func (*GossipSubRouter) RemovePeer

func (gs *GossipSubRouter) RemovePeer(p peer.ID)

type Message

type Message struct {
	*pb.Message
}

func (*Message) GetFrom

func (m *Message) GetFrom() peer.ID

type MessageCache

type MessageCache struct {
	// contains filtered or unexported fields
}

func NewMessageCache

func NewMessageCache(gossip, history int) *MessageCache

func (*MessageCache) Get

func (mc *MessageCache) Get(mid string) (*pb.Message, bool)

func (*MessageCache) GetGossipIDs

func (mc *MessageCache) GetGossipIDs(topic string) []string

func (*MessageCache) Put

func (mc *MessageCache) Put(msg *pb.Message)

func (*MessageCache) Shift

func (mc *MessageCache) Shift()

type Option

type Option func(*PubSub) error

func WithMessageAuthor

func WithMessageAuthor(author peer.ID) Option

WithMessageAuthor sets the author for outbound messages to the given peer ID (defaults to the host's ID). If message signing is enabled, the private key must be available in the host's peerstore.

func WithMessageSigning

func WithMessageSigning(enabled bool) Option

WithMessageSigning enables or disables message signing (enabled by default).

func WithStrictSignatureVerification

func WithStrictSignatureVerification(required bool) Option

WithStrictSignatureVerification enforces message signing. If set, unsigned messages will be discarded.

This currently defaults to false but, as we transition to signing by default, will eventually default to true.

func WithValidateThrottle

func WithValidateThrottle(n int) Option

WithValidateThrottle sets the upper bound on the number of active validation goroutines.

type PubSub

type PubSub struct {
	// contains filtered or unexported fields
}

func NewFloodSub

func NewFloodSub(ctx context.Context, h host.Host, opts ...Option) (*PubSub, error)

NewFloodSub returns a new PubSub object using the FloodSubRouter

func NewFloodsubWithProtocols

func NewFloodsubWithProtocols(ctx context.Context, h host.Host, ps []protocol.ID, opts ...Option) (*PubSub, error)

NewFloodsubWithProtocols returns a new floodsub-enabled PubSub objecting using the protocols specified in ps

func NewGossipSub

func NewGossipSub(ctx context.Context, h host.Host, opts ...Option) (*PubSub, error)

NewGossipSub returns a new PubSub object using GossipSubRouter as the router

func NewPubSub

func NewPubSub(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option) (*PubSub, error)

NewPubSub returns a new PubSub management object

func NewRandomSub

func NewRandomSub(ctx context.Context, h host.Host, opts ...Option) (*PubSub, error)

NewRandomSub returns a new PubSub object using RandomSubRouter as the router

func (*PubSub) GetTopics

func (p *PubSub) GetTopics() []string

GetTopics returns the topics this node is subscribed to

func (*PubSub) ListPeers

func (p *PubSub) ListPeers(topic string) []peer.ID

ListPeers returns a list of peers we are connected to.

func (*PubSub) Publish

func (p *PubSub) Publish(topic string, data []byte) error

Publish publishes data under the given topic

func (*PubSub) RegisterTopicValidator

func (p *PubSub) RegisterTopicValidator(topic string, val Validator, opts ...ValidatorOpt) error

RegisterTopicValidator registers a validator for topic

func (*PubSub) Subscribe

func (p *PubSub) Subscribe(topic string, opts ...SubOpt) (*Subscription, error)

Subscribe returns a new Subscription for the given topic

func (*PubSub) SubscribeByTopicDescriptor

func (p *PubSub) SubscribeByTopicDescriptor(td *pb.TopicDescriptor, opts ...SubOpt) (*Subscription, error)

SubscribeByTopicDescriptor lets you subscribe a topic using a pb.TopicDescriptor

func (*PubSub) UnregisterTopicValidator

func (p *PubSub) UnregisterTopicValidator(topic string) error

UnregisterTopicValidator removes a validator from a topic returns an error if there was no validator registered with the topic

type PubSubNotif

type PubSubNotif PubSub

func (*PubSubNotif) ClosedStream

func (p *PubSubNotif) ClosedStream(n inet.Network, s inet.Stream)

func (*PubSubNotif) Connected

func (p *PubSubNotif) Connected(n inet.Network, c inet.Conn)

func (*PubSubNotif) Disconnected

func (p *PubSubNotif) Disconnected(n inet.Network, c inet.Conn)

func (*PubSubNotif) Listen

func (p *PubSubNotif) Listen(n inet.Network, _ ma.Multiaddr)

func (*PubSubNotif) ListenClose

func (p *PubSubNotif) ListenClose(n inet.Network, _ ma.Multiaddr)

func (*PubSubNotif) OpenedStream

func (p *PubSubNotif) OpenedStream(n inet.Network, s inet.Stream)

type PubSubRouter

type PubSubRouter interface {
	// Protocols returns the list of protocols supported by the router.
	Protocols() []protocol.ID
	// Attach is invoked by the PubSub constructor to attach the router to a
	// freshly initialized PubSub instance.
	Attach(*PubSub)
	// AddPeer notifies the router that a new peer has been connected.
	AddPeer(peer.ID, protocol.ID)
	// RemovePeer notifies the router that a peer has been disconnected.
	RemovePeer(peer.ID)
	// HandleRPC is invoked to process control messages in the RPC envelope.
	// It is invoked after subscriptions and payload messages have been processed.
	HandleRPC(*RPC)
	// Publish is invoked to forward a new message that has been validated.
	Publish(peer.ID, *pb.Message)
	// Join notifies the router that we want to receive and forward messages in a topic.
	// It is invoked after the subscription announcement.
	Join(topic string)
	// Leave notifies the router that we are no longer interested in a topic.
	// It is invoked after the unsubscription announcement.
	Leave(topic string)
}

PubSubRouter is the message router component of PubSub

type RPC

type RPC struct {
	pb.RPC
	// contains filtered or unexported fields
}

type RandomSubRouter

type RandomSubRouter struct {
	// contains filtered or unexported fields
}

RandomSubRouter is a router that implements a random propagation strategy. For each message, it selects RandomSubD peers and forwards the message to them.

func (*RandomSubRouter) AddPeer

func (rs *RandomSubRouter) AddPeer(p peer.ID, proto protocol.ID)

func (*RandomSubRouter) Attach

func (rs *RandomSubRouter) Attach(p *PubSub)

func (*RandomSubRouter) HandleRPC

func (rs *RandomSubRouter) HandleRPC(rpc *RPC)

func (*RandomSubRouter) Join

func (rs *RandomSubRouter) Join(topic string)

func (*RandomSubRouter) Leave

func (rs *RandomSubRouter) Leave(topic string)

func (*RandomSubRouter) Protocols

func (rs *RandomSubRouter) Protocols() []protocol.ID

func (*RandomSubRouter) Publish

func (rs *RandomSubRouter) Publish(from peer.ID, msg *pb.Message)

func (*RandomSubRouter) RemovePeer

func (rs *RandomSubRouter) RemovePeer(p peer.ID)

type SubOpt

type SubOpt func(sub *Subscription) error

type Subscription

type Subscription struct {
	// contains filtered or unexported fields
}

func (*Subscription) Cancel

func (sub *Subscription) Cancel()

func (*Subscription) Next

func (sub *Subscription) Next(ctx context.Context) (*Message, error)

func (*Subscription) Topic

func (sub *Subscription) Topic() string

type Validator

type Validator func(context.Context, *Message) bool

Validator is a function that validates a message

type ValidatorOpt

type ValidatorOpt func(addVal *addValReq) error

ValidatorOpt is an option for RegisterTopicValidator

func WithValidatorConcurrency

func WithValidatorConcurrency(n int) ValidatorOpt

WithValidatorConcurrency is an option that sets topic validator throttle

func WithValidatorTimeout

func WithValidatorTimeout(timeout time.Duration) ValidatorOpt

WithValidatorTimeout is an option that sets the topic validator timeout

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL