collect

package module
v0.0.12 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 9, 2021 License: MIT Imports: 23 Imported by: 0

README

go-libp2p-collect

Discourse posts Build Status

A pub-sub-collect system.

This is the a pub-sub-collect implementation for libp2p.

We currently provide following implementations:

  • floodcollect, which is based on floodpub——the baseline flooding protocol.

Table of Contents

Install

go get github.com/bdware/go-libp2p-collect

Usage

To be used for messaging and data collection in p2p instrastructure (as part of libp2p) such as BDWare, IPFS, Ethereum, other blockchains, etc.

Documentation

See API documentation.

Contribute

Contributions welcome. Please check out the issues.

Small note: If editing the README, please conform to the standard-readme specification.

License

MIT

Copyright (c) 2020 The BDWare Authors. All rights reserved.

Documentation

Overview

Package collect provides facilities for the Publish/Subscribe/Collect pattern of message propagation and data collections, useful for distributed query.

Index

Constants

View Source
const (
	HostWiresProcotolID = "/hostwires/1.0.0"
)
View Source
const (
	IntBFSProtocolID = protocol.ID("/intbfs/1.0.0")
)

Variables

View Source
var (
	ErrTopicJoined    = errors.New("ErrTopicJoined")
	ErrTopicNotJoined = errors.New("ErrTopicNotJoined")
	ErrNilTopicWires  = errors.New("ErrNilTopicWires")
)

Functions

func DefaultMsgIDFn

func DefaultMsgIDFn(pmsg *pubsub.PbMessage) string

DefaultMsgIDFn should be used with DefaultMsgIDFn.

Types

type AsyncPubSub

type AsyncPubSub struct {
	// contains filtered or unexported fields
}

AsyncPubSub encapsulates pubsub, provides async methods to get subscribe messages. AsyncPubSub also manages the joined topics

func NewAsyncPubSub

func NewAsyncPubSub(h host.Host, opts ...TopicOpt) (apsub *AsyncPubSub, err error)

NewAsyncPubSub returns a new Topics instance. If WithCustomPubSubFactory is not set, a default randomsub will be used.

func (*AsyncPubSub) Close

func (ap *AsyncPubSub) Close() error

Close the topics

func (*AsyncPubSub) LoadTopicItem

func (ap *AsyncPubSub) LoadTopicItem(topic string, key interface{}) (value interface{}, err error)

LoadTopicItem .

func (*AsyncPubSub) Publish

func (ap *AsyncPubSub) Publish(ctx context.Context, topic string, data []byte) (err error)

Publish a message with given topic

func (*AsyncPubSub) SetTopicItem

func (ap *AsyncPubSub) SetTopicItem(topic string, key interface{}, value interface{}) error

SetTopicItem .

func (*AsyncPubSub) Subscribe

func (ap *AsyncPubSub) Subscribe(topic string, handle TopicHandle) (err error)

Subscribe a topic Subscribe a same topic is ok, but the previous handle will be replaced.

func (*AsyncPubSub) Topics

func (ap *AsyncPubSub) Topics() (out []string)

Topics returns the subscribed topics

func (*AsyncPubSub) Unsubscribe

func (ap *AsyncPubSub) Unsubscribe(topic string) (err error)

Unsubscribe the given topic

type BasicPubSubCollector

type BasicPubSubCollector struct {
	// contains filtered or unexported fields
}

BasicPubSubCollector implements of psc.BasicPubSubCollector Interface. It broadcasts request by libp2p.PubSub. When the remote nodes receive the request, they will try to dial the request node and return the response directly.

func NewBasicPubSubCollector

func NewBasicPubSubCollector(h host.Host, options ...InitOpt) (bpsc *BasicPubSubCollector, err error)

NewBasicPubSubCollector returns a new BasicPubSubCollector

func (*BasicPubSubCollector) Close

func (bpsc *BasicPubSubCollector) Close() (err error)

Close the BasicPubSubCollector.

func (*BasicPubSubCollector) Join

func (bpsc *BasicPubSubCollector) Join(topic string, opts ...JoinOpt) (err error)

Join the overlay network defined by topic Join the same topic is allowed here. Rejoin will refresh the requestHandler.

func (*BasicPubSubCollector) Leave

func (bpsc *BasicPubSubCollector) Leave(topic string) (err error)

Leave the overlay. The registered topichandles and responseHandlers will be closed.

func (*BasicPubSubCollector) Publish

func (bpsc *BasicPubSubCollector) Publish(topic string, payload []byte, opts ...PubOpt) (err error)

Publish a serilized request payload.

type Conf

type Conf struct {
	// Router is an option to select different router type
	// Router must be one of `basic`, `relay` and `intbfs`
	Router string `json:"router"`
	// ProtocolPrefix is the protocol name prefix
	ProtocolPrefix string `json:"protocol"`
	// RequestCacheSize .
	// RequestCache is used to store the request control message,
	// which is for response routing.
	RequestCacheSize int `json:"requestCacheSize"`
	// ResponseCacheSize .
	// ResponseCache is used to deduplicate the response.
	ResponseCacheSize int `json:"responseCacheSize"`
	// Fanout .
	Fanout int `json:"fanout"`
	// RandomFanout .
	RandomFanout int `json:"randomFanout"`
	// MaxHitsToSend
	MaxHitsToSend int `json:"maxHitsToSend"`
}

Conf is the static configuration of PubSubCollector

func MakeDefaultConf

func MakeDefaultConf() Conf

MakeDefaultConf returns a default Conf instance

type FinalRespHandler

type FinalRespHandler func(context.Context, *Response)

FinalRespHandler is the callback function when the root node receiving a response. It will be called only in the root node. It will be called more than one time when the number of responses is larger than one.

type HostWires added in v0.0.9

type HostWires struct {
	host.Host
	// contains filtered or unexported fields
}

func NewHostWires added in v0.0.9

func NewHostWires(h host.Host) *HostWires

func (*HostWires) ClosedStream added in v0.0.9

func (hw *HostWires) ClosedStream(network.Network, network.Stream)

func (*HostWires) Connected added in v0.0.9

func (hw *HostWires) Connected(n network.Network, c network.Conn)

func (*HostWires) Disconnected added in v0.0.9

func (hw *HostWires) Disconnected(n network.Network, c network.Conn)

func (*HostWires) ID added in v0.0.9

func (hw *HostWires) ID() peer.ID

func (*HostWires) Listen added in v0.0.9

func (hw *HostWires) Listen(network.Network, ma.Multiaddr)

func (*HostWires) ListenClose added in v0.0.9

func (hw *HostWires) ListenClose(network.Network, ma.Multiaddr)

func (*HostWires) Neighbors added in v0.0.9

func (hw *HostWires) Neighbors() []peer.ID

func (*HostWires) OpenedStream added in v0.0.9

func (hw *HostWires) OpenedStream(network.Network, network.Stream)

func (*HostWires) SendMsg added in v0.0.9

func (hw *HostWires) SendMsg(to peer.ID, data []byte) error

func (*HostWires) SetListener added in v0.0.9

func (hw *HostWires) SetListener(wn WireListener)

func (*HostWires) SetMsgHandler added in v0.0.9

func (hw *HostWires) SetMsgHandler(h MsgHandler)

type InitOpt

type InitOpt func(*InitOpts) error

InitOpt is options used in NewBasicPubSubCollector

func WithConf

func WithConf(conf Conf) InitOpt

WithConf specifies configuration of pubsubcollector

func WithLogger

func WithLogger(l Logger) InitOpt

WithLogger .

func WithRequestIDGenerator

func WithRequestIDGenerator(ridFn ReqIDFn) InitOpt

WithRequestIDGenerator .

type InitOpts

type InitOpts struct {
	Conf    Conf
	ReqIDFn ReqIDFn
	MsgIDFn pubsub.MsgIDFn
	Logger  Logger
	Wires   TopicWires
}

InitOpts is options used in NewBasicPubSubCollector

func NewInitOpts

func NewInitOpts(opts []InitOpt) (out *InitOpts, err error)

NewInitOpts returns initopts

type IntBFS added in v0.0.9

type IntBFS struct {
	// contains filtered or unexported fields
}

IntBFS don't care about topic. IntBFS contains 5 parts: 1. query machanism 2. peer profiles 3. peer ranking 4. distance function 5. random perturbation

func NewIntBFS added in v0.0.9

func NewIntBFS(wires Wires, opts *IntBFSOptions) (*IntBFS, error)

func (*IntBFS) Close added in v0.0.9

func (ib *IntBFS) Close() error

func (*IntBFS) HandlePeerDown added in v0.0.9

func (ib *IntBFS) HandlePeerDown(p peer.ID)

func (*IntBFS) HandlePeerUp added in v0.0.9

func (ib *IntBFS) HandlePeerUp(p peer.ID)

func (*IntBFS) Publish added in v0.0.9

func (ib *IntBFS) Publish(data []byte, opts ...PubOpt) error

type IntBFSCollector added in v0.0.9

type IntBFSCollector struct {
	// contains filtered or unexported fields
}

IntBFSCollector . We used a topic-defined overlay here.

func NewIntBFSCollector added in v0.0.9

func NewIntBFSCollector(h host.Host, opts ...InitOpt) (*IntBFSCollector, error)

func (*IntBFSCollector) Close added in v0.0.9

func (ic *IntBFSCollector) Close() error

func (*IntBFSCollector) Join added in v0.0.9

func (ic *IntBFSCollector) Join(topic string, opts ...JoinOpt) (err error)

func (*IntBFSCollector) Leave added in v0.0.9

func (ic *IntBFSCollector) Leave(topic string) (err error)

func (*IntBFSCollector) Publish added in v0.0.9

func (ic *IntBFSCollector) Publish(topic string, data []byte, opts ...PubOpt) error

type IntBFSOptions added in v0.0.9

type IntBFSOptions struct {
	Fanout        int
	RandomFanout  int
	MaxHitsToSend int
	ProfileFactory
	RequestHandler
	ReqIDFn
	Logger
	Topic string
}

IntBFSOptions .

func MakeDefaultIntBFSOptions added in v0.0.9

func MakeDefaultIntBFSOptions() *IntBFSOptions

MakeDefaultIntBFSOptions .

type Intermediate

type Intermediate = pb.Intermediate

Intermediate type alias

type JoinOpt

type JoinOpt func(*JoinOpts) error

JoinOpt is optional options in PubSubCollector.Join

func WithProfileFactory added in v0.0.9

func WithProfileFactory(pf ProfileFactory) JoinOpt

func WithRequestHandler

func WithRequestHandler(rqhandle RequestHandler) JoinOpt

WithRequestHandler registers request handler

type JoinOpts

type JoinOpts struct {
	RequestHandler
	ProfileFactory
}

JoinOpts is the aggregated options

func NewJoinOptions

func NewJoinOptions(opts []JoinOpt) (out *JoinOpts, err error)

NewJoinOptions returns an option collection

type Logger

type Logger interface {
	Logf(level, format string, args ...interface{})
}

Logger . level is {"debug", "info", "warn", "error", "fatal"}; format and args are compatable with fmt.Printf.

func MakeDefaultLogger

func MakeDefaultLogger() Logger

MakeDefaultLogger .

type Message

type Message = pubsub.Message

Message is type alias

type Msg added in v0.0.9

type Msg struct {
	*pb.Msg
	// contains filtered or unexported fields
}

type MsgHandler added in v0.0.9

type MsgHandler func(from peer.ID, data []byte)

MsgHandler .

type PeerSet

type PeerSet struct {
	// contains filtered or unexported fields
}

func NewPeerSet

func NewPeerSet() *PeerSet

func (*PeerSet) Add

func (ps *PeerSet) Add(p peer.ID)

func (*PeerSet) Del

func (ps *PeerSet) Del(p peer.ID)

func (*PeerSet) Equal

func (ps *PeerSet) Equal(another *PeerSet) bool

func (*PeerSet) Slice

func (ps *PeerSet) Slice() []peer.ID

func (*PeerSet) String

func (ps *PeerSet) String() string

type Profile added in v0.0.9

type Profile interface {
	//
	Insert(req *Request, resp *Response)
	//
	Less(that Profile, req *Request) bool
}

Profile stores query profiles

type ProfileFactory added in v0.0.9

type ProfileFactory func() Profile

ProfileFactory generates a Profile

type PubOpt

type PubOpt func(*PubOpts) error

PubOpt is optional options in PubSubCollector.Publish

func WithFinalRespHandler

func WithFinalRespHandler(handler FinalRespHandler) PubOpt

WithFinalRespHandler registers notifHandler

func WithRequestContext

func WithRequestContext(ctx context.Context) PubOpt

WithRequestContext adds cancellation or timeout for a request default is withCancel. (ctx will be cancelled when request is closed)

type PubOpts

type PubOpts struct {
	RequestContext  context.Context
	FinalRespHandle FinalRespHandler
}

PubOpts is the aggregated options

func NewPublishOptions

func NewPublishOptions(opts []PubOpt) (out *PubOpts, err error)

NewPublishOptions returns an option collection

type PubReq added in v0.0.9

type PubReq struct {
	// contains filtered or unexported fields
}

type PubSubCollector

type PubSubCollector interface {

	// Join the overlay network defined by topic.
	// Register RequestHandle and ResponseHandle in opts.
	Join(topic string, opts ...JoinOpt) error

	// Publish a serialized request. Request should be encasulated in data argument.
	Publish(topic string, data []byte, opts ...PubOpt) error

	// Leave the overlay
	Leave(topic string) error

	io.Closer
}

PubSubCollector is a group communication module on topic-based overlay network. It helps to dispatch request, and wait for corresponding responses. In relay mode, PubSubCollector can also help to reduce the response.

func NewCollector added in v0.0.9

func NewCollector(h host.Host, opts ...InitOpt) (PubSubCollector, error)

NewCollector creates PubSubCollector.

type PubSubFactory

type PubSubFactory func(h host.Host) (*pubsub.PubSub, error)

PubSubFactory initializes a pubsub in Topics

type RelayPubSubCollector

type RelayPubSubCollector struct {
	// contains filtered or unexported fields
}

RelayPubSubCollector .

func NewRelayPubSubCollector

func NewRelayPubSubCollector(h host.Host, options ...InitOpt) (r *RelayPubSubCollector, err error)

NewRelayPubSubCollector .

func (*RelayPubSubCollector) Close

func (r *RelayPubSubCollector) Close() (err error)

Close the BasicPubSubCollector.

func (*RelayPubSubCollector) Join

func (r *RelayPubSubCollector) Join(topic string, options ...JoinOpt) (err error)

Join the overlay network defined by topic. Register RequestHandle and ResponseHandle in opts.

func (*RelayPubSubCollector) Leave

func (r *RelayPubSubCollector) Leave(topic string) (err error)

Leave the overlay

func (*RelayPubSubCollector) Publish

func (r *RelayPubSubCollector) Publish(topic string, data []byte, opts ...PubOpt) (err error)

Publish a serialized request. Request should be encasulated in data argument.

type ReqIDFn

type ReqIDFn func(*Request) RequestID

ReqIDFn is used to generate id for each request

type Request

type Request = pb.Request

Request type alias

type RequestHandler

type RequestHandler func(ctx context.Context, req *Request) *Intermediate

RequestHandler is the callback function when receiving a request. It will be called in every node joined the network. The return value will be sent to the root (directly or relayedly).

type RequestID

type RequestID = pb.RequestID

RequestID type alias

func DefaultReqIDFn

func DefaultReqIDFn(rq *Request) RequestID

DefaultReqIDFn returns default ReqIDGenerator. fnv hash function is called in it.

type RequestResult added in v0.0.12

type RequestResult struct {
	// contains filtered or unexported fields
}

type Response

type Response = pb.Response

Response type alias

type TopicHandle

type TopicHandle func(topic string, msg *Message)

TopicHandle is the handle function of subscription. WARNING: DO NOT change msg, if a msg includes multiple topics, they share a message.

type TopicHub added in v0.0.9

type TopicHub struct {
	// contains filtered or unexported fields
}

func NewTopicHub added in v0.0.9

func NewTopicHub(tw TopicWires) *TopicHub

func (*TopicHub) Close added in v0.0.9

func (th *TopicHub) Close() error

func (*TopicHub) HandlePeerDown added in v0.0.9

func (th *TopicHub) HandlePeerDown(p peer.ID, topic string)

func (*TopicHub) HandlePeerUp added in v0.0.9

func (th *TopicHub) HandlePeerUp(p peer.ID, topic string)

func (*TopicHub) Join added in v0.0.9

func (th *TopicHub) Join(topic string) error

func (*TopicHub) JoinWithNotifiee added in v0.0.9

func (th *TopicHub) JoinWithNotifiee(topic string, wn WireListener) error

func (*TopicHub) Leave added in v0.0.9

func (th *TopicHub) Leave(topic string) error

func (*TopicHub) Wires added in v0.0.9

func (th *TopicHub) Wires(topic string) Wires

type TopicMsgHandler added in v0.0.9

type TopicMsgHandler = pubsub.TopicMsgHandler

TopicMsgHandler .

type TopicOpt

type TopicOpt func(*AsyncPubSub) error

TopicOpt is the option in NewTopics

func WithCustomPubSubFactory

func WithCustomPubSubFactory(psubFact PubSubFactory) TopicOpt

WithCustomPubSubFactory initials the pubsub based on given factory

func WithSelfNotif

func WithSelfNotif(enable bool) TopicOpt

WithSelfNotif decides whether a node receives self-published messages. If WithSelfNotif is set to true, the node will receive messages published by itself. Default is set to false.

type TopicWireListener added in v0.0.9

type TopicWireListener = pubsub.TopicWireListener

TopicWireListener .

type TopicWires added in v0.0.9

type TopicWires interface {
	ID() peer.ID
	Join(topic string) error
	Leave(topic string) error
	Topics() []string
	Neighbors(topic string) []peer.ID
	SetListener(twn TopicWireListener)
	SendMsg(topic string, to peer.ID, data []byte) error
	SetTopicMsgHandler(th TopicMsgHandler)
	io.Closer
}

type WireListener added in v0.0.9

type WireListener interface {
	HandlePeerUp(p peer.ID)
	HandlePeerDown(p peer.ID)
}

type Wires added in v0.0.9

type Wires interface {
	ID() peer.ID
	Neighbors() []peer.ID
	SetListener(wn WireListener)
	SendMsg(to peer.ID, data []byte) error
	SetMsgHandler(h MsgHandler)
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL