collect

package module
v0.10.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 6, 2024 License: MIT Imports: 22 Imported by: 0

README

go-libp2p-collect


A pub-sub-collect system built on libp2p.

Repo Lead Maintainer

@huiscool

Table of Contents

Install

go get github.com/daotl/go-libp2p-collect

And add the following line to your go.mod:

replace github.com/libp2p/go-libp2p-pubsub => github.com/daotl/go-libp2p-pubsub {{VERSION}}

Usage

To be used for messaging and data collection in p2p infrastructure (as part of libp2p) such as IPFS, Ethereum, other blockchains, etc.

Documentation

See the API documentation.

Contribute

Contributions welcome. Please check out the issues.

Small note: If editing the README, please conform to the standard-readme specification.

License

MIT

Copyright (c) 2020 DAOT Labs. All rights reserved.

Documentation

Overview

Package collect provides facilities for the Publish/Subscribe/Collect pattern of message propagation and data collections, useful for distributed query.

Index

Constants

View Source
const (
	HostWiresProcotolID = "/hostwires/1.0.0"
)
View Source
const (
	IntBFSProtocolID = protocol.ID("/intbfs/1.0.0")
)

Variables

View Source
var (
	ErrTopicJoined    = errors.New("ErrTopicJoined")
	ErrTopicNotJoined = errors.New("ErrTopicNotJoined")
	ErrNilTopicWires  = errors.New("ErrNilTopicWires")
)

Functions

func DefaultMsgIDFn

func DefaultMsgIDFn(pmsg *pubsub.PbMessage) string

DefaultMsgIDFn should be used with DefaultMsgIDFn.

Types

type AsyncPubSub

type AsyncPubSub struct {
	// contains filtered or unexported fields
}

AsyncPubSub encapsulates pubsub, provides async methods to get subscribe messages. AsyncPubSub also manages the joined topics

func NewAsyncPubSub

func NewAsyncPubSub(h host.Host, opts ...TopicOpt) (apsub *AsyncPubSub, err error)

NewAsyncPubSub returns a new Topics instance. If WithCustomPubSubFactory is not set, a default randomsub will be used.

func (*AsyncPubSub) Close

func (ap *AsyncPubSub) Close() error

Close the topics

func (*AsyncPubSub) LoadTopicItem

func (ap *AsyncPubSub) LoadTopicItem(topic string, key interface{}) (value interface{}, err error)

LoadTopicItem .

func (*AsyncPubSub) Publish

func (ap *AsyncPubSub) Publish(ctx context.Context, topic string, data []byte) (err error)

Publish a message with given topic

func (*AsyncPubSub) SetTopicItem

func (ap *AsyncPubSub) SetTopicItem(topic string, key interface{}, value interface{}) error

SetTopicItem .

func (*AsyncPubSub) Subscribe

func (ap *AsyncPubSub) Subscribe(topic string, handle TopicHandle) (err error)

Subscribe a topic Subscribe a same topic is ok, but the previous handle will be replaced.

func (*AsyncPubSub) Topics

func (ap *AsyncPubSub) Topics() (out []string)

Topics returns the subscribed topics

func (*AsyncPubSub) Unsubscribe

func (ap *AsyncPubSub) Unsubscribe(topic string) (err error)

Unsubscribe the given topic

type BasicPubSubCollector

type BasicPubSubCollector struct {
	// contains filtered or unexported fields
}

BasicPubSubCollector implements of psc.BasicPubSubCollector Interface. It broadcasts request by libp2p.PubSub. When the remote nodes receive the request, they will try to dial the request node and return the response directly.

func NewBasicPubSubCollector

func NewBasicPubSubCollector(h host.Host, options ...InitOpt) (bpsc *BasicPubSubCollector, err error)

NewBasicPubSubCollector returns a new BasicPubSubCollector

func (*BasicPubSubCollector) Close

func (bpsc *BasicPubSubCollector) Close() (err error)

Close the BasicPubSubCollector.

func (*BasicPubSubCollector) Join

func (bpsc *BasicPubSubCollector) Join(topic string, opts ...JoinOpt) (err error)

Join the overlay network defined by topic Join the same topic is allowed here. Rejoin will refresh the requestHandler.

func (*BasicPubSubCollector) Leave

func (bpsc *BasicPubSubCollector) Leave(topic string) (err error)

Leave the overlay. The registered topichandles and responseHandlers will be closed.

func (*BasicPubSubCollector) Publish

func (bpsc *BasicPubSubCollector) Publish(topic string, payload []byte, opts ...PubOpt) (err error)

Publish a serilized request payload.

type Conf

type Conf struct {
	// Router is an option to select different router type
	// Router must be one of `basic`, `relay` and `intbfs`
	Router string `json:"router"`
	// ProtocolPrefix is the protocol name prefix
	ProtocolPrefix string `json:"protocol"`
	// RequestCacheSize .
	// RequestCache is used to store the request control message,
	// which is for response routing.
	RequestCacheSize int `json:"requestCacheSize"`
	// ResponseCacheSize .
	// ResponseCache is used to deduplicate the response.
	ResponseCacheSize int `json:"responseCacheSize"`
}

Conf is the static configuration of PubSubCollector

func MakeDefaultConf

func MakeDefaultConf() Conf

MakeDefaultConf returns a default Conf instance

type EvalReqReq

type EvalReqReq struct {
	// contains filtered or unexported fields
}

type FinalRespHandler

type FinalRespHandler func(context.Context, *Response)

FinalRespHandler is the callback function when the root node receiving a response. It will be called only in the root node. It will be called more than one time when the number of responses is larger than one.

type HostWires

type HostWires struct {
	host.Host
	// contains filtered or unexported fields
}

func NewHostWires

func NewHostWires(h host.Host) *HostWires

func (*HostWires) ClosedStream

func (hw *HostWires) ClosedStream(network.Network, network.Stream)

func (*HostWires) Connected

func (hw *HostWires) Connected(n network.Network, c network.Conn)

func (*HostWires) Disconnected

func (hw *HostWires) Disconnected(n network.Network, c network.Conn)

func (*HostWires) ID

func (hw *HostWires) ID() peer.ID

func (*HostWires) Listen

func (hw *HostWires) Listen(network.Network, ma.Multiaddr)

func (*HostWires) ListenClose

func (hw *HostWires) ListenClose(network.Network, ma.Multiaddr)

func (*HostWires) Neighbors

func (hw *HostWires) Neighbors() []peer.ID

func (*HostWires) OpenedStream

func (hw *HostWires) OpenedStream(network.Network, network.Stream)

func (*HostWires) SendMsg

func (hw *HostWires) SendMsg(to peer.ID, data []byte) error

func (*HostWires) SetListener

func (hw *HostWires) SetListener(wn WireListener)

func (*HostWires) SetMsgHandler

func (hw *HostWires) SetMsgHandler(h MsgHandler)

type InitOpt

type InitOpt func(*InitOpts) error

InitOpt is options used in NewBasicPubSubCollector

func WithConf

func WithConf(conf Conf) InitOpt

WithConf specifies configuration of pubsubcollector

func WithLogger

func WithLogger(l Logger) InitOpt

WithLogger .

func WithRequestIDGenerator

func WithRequestIDGenerator(ridFn ReqIDFn) InitOpt

WithRequestIDGenerator .

type InitOpts

type InitOpts struct {
	Conf    Conf
	ReqIDFn ReqIDFn
	MsgIDFn pubsub.MsgIDFn
	Logger  Logger
	Wires   TopicWires
}

InitOpts is options used in NewBasicPubSubCollector

func NewInitOpts

func NewInitOpts(opts []InitOpt) (out *InitOpts, err error)

NewInitOpts returns initopts

type IntBFS

type IntBFS struct {
	// contains filtered or unexported fields
}

IntBFS don't care about topic. IntBFS contains 5 parts: 1. query machanism 2. peer profiles 3. peer ranking 4. distance function 5. random perturbation

func NewIntBFS

func NewIntBFS(wires Wires, opts *IntBFSOptions) (*IntBFS, error)

func (*IntBFS) Close

func (ib *IntBFS) Close() error

func (*IntBFS) HandlePeerDown

func (ib *IntBFS) HandlePeerDown(p peer.ID)

func (*IntBFS) HandlePeerUp

func (ib *IntBFS) HandlePeerUp(p peer.ID)

func (*IntBFS) Publish

func (ib *IntBFS) Publish(data []byte, opts ...PubOpt) error

type IntBFSCollector

type IntBFSCollector struct {
	// contains filtered or unexported fields
}

IntBFSCollector . We used a topic-defined overlay here.

func NewIntBFSCollector

func NewIntBFSCollector(h host.Host, opts ...InitOpt) (*IntBFSCollector, error)

func (*IntBFSCollector) Close

func (ic *IntBFSCollector) Close() error

func (*IntBFSCollector) Join

func (ic *IntBFSCollector) Join(topic string, opts ...JoinOpt) (err error)

func (*IntBFSCollector) Leave

func (ic *IntBFSCollector) Leave(topic string) (err error)

func (*IntBFSCollector) Publish

func (ic *IntBFSCollector) Publish(topic string, data []byte, opts ...PubOpt) error

type IntBFSOptions

type IntBFSOptions struct {
	ProfileFactory
	RequestHandler
	ReqIDFn
	Logger
	Topic string
}

IntBFSOptions .

func MakeDefaultIntBFSOptions

func MakeDefaultIntBFSOptions() *IntBFSOptions

MakeDefaultIntBFSOptions .

type Intermediate

type Intermediate = pb.Intermediate

Intermediate type alias

type JoinOpt

type JoinOpt func(*JoinOpts) error

JoinOpt is optional options in PubSubCollector.Join

func WithProfileFactory

func WithProfileFactory(pf ProfileFactory) JoinOpt

func WithRequestHandler

func WithRequestHandler(rqhandle RequestHandler) JoinOpt

WithRequestHandler registers request handler

type JoinOpts

type JoinOpts struct {
	RequestHandler
	ProfileFactory
}

JoinOpts is the aggregated options

func NewJoinOptions

func NewJoinOptions(opts []JoinOpt) (out *JoinOpts, err error)

NewJoinOptions returns an option collection

type Logger

type Logger interface {
	Logf(level, format string, args ...interface{})
}

Logger . level is {"debug", "info", "warn", "error", "fatal"}; format and args are compatable with fmt.Printf.

func MakeDefaultLogger

func MakeDefaultLogger() Logger

MakeDefaultLogger .

type Message

type Message = pubsub.Message

Message is type alias

type Msg

type Msg struct {
	*pb.Msg
	// contains filtered or unexported fields
}

type MsgHandler

type MsgHandler func(from peer.ID, data []byte)

MsgHandler .

type PeerSet

type PeerSet struct {
	// contains filtered or unexported fields
}

func NewPeerSet

func NewPeerSet() *PeerSet

func (*PeerSet) Add

func (ps *PeerSet) Add(p peer.ID)

func (*PeerSet) Del

func (ps *PeerSet) Del(p peer.ID)

func (*PeerSet) Equal

func (ps *PeerSet) Equal(another *PeerSet) bool

func (*PeerSet) Slice

func (ps *PeerSet) Slice() []peer.ID

func (*PeerSet) String

func (ps *PeerSet) String() string

type Profile

type Profile interface {
	//
	Insert(req *Request, resp *Response)
	//
	Less(that Profile, req *Request) bool
}

Profile stores query profiles

type ProfileFactory

type ProfileFactory func() Profile

ProfileFactory generates a Profile

type PubOpt

type PubOpt func(*PubOpts) error

PubOpt is optional options in PubSubCollector.Publish

func WithFinalRespHandler

func WithFinalRespHandler(handler FinalRespHandler) PubOpt

WithFinalRespHandler registers notifHandler

func WithRequestContext

func WithRequestContext(ctx context.Context) PubOpt

WithRequestContext adds cancellation or timeout for a request default is withCancel. (ctx will be cancelled when request is closed)

type PubOpts

type PubOpts struct {
	RequestContext  context.Context
	FinalRespHandle FinalRespHandler
}

PubOpts is the aggregated options

func NewPublishOptions

func NewPublishOptions(opts []PubOpt) (out *PubOpts, err error)

NewPublishOptions returns an option collection

type PubReq

type PubReq struct {
	// contains filtered or unexported fields
}

type PubSubCollector

type PubSubCollector interface {

	// Join the overlay network defined by topic.
	// Register RequestHandle and ResponseHandle in opts.
	Join(topic string, opts ...JoinOpt) error

	// Publish a serialized request. Request should be encasulated in data argument.
	Publish(topic string, data []byte, opts ...PubOpt) error

	// Leave the overlay
	Leave(topic string) error

	io.Closer
}

PubSubCollector is a group communication module on topic-based overlay network. It helps to dispatch request, and wait for corresponding responses. In relay mode, PubSubCollector can also help to reduce the response.

func NewCollector

func NewCollector(h host.Host, opts ...InitOpt) (PubSubCollector, error)

NewCollector creates PubSubCollector.

type PubSubFactory

type PubSubFactory func(h host.Host) (*pubsub.PubSub, error)

PubSubFactory initializes a pubsub in Topics

type RelayPubSubCollector

type RelayPubSubCollector struct {
	// contains filtered or unexported fields
}

RelayPubSubCollector .

func NewRelayPubSubCollector

func NewRelayPubSubCollector(h host.Host, options ...InitOpt) (r *RelayPubSubCollector, err error)

NewRelayPubSubCollector .

func (*RelayPubSubCollector) Close

func (r *RelayPubSubCollector) Close() (err error)

Close the BasicPubSubCollector.

func (*RelayPubSubCollector) Join

func (r *RelayPubSubCollector) Join(topic string, options ...JoinOpt) (err error)

Join the overlay network defined by topic. Register RequestHandle and ResponseHandle in opts.

func (*RelayPubSubCollector) Leave

func (r *RelayPubSubCollector) Leave(topic string) (err error)

Leave the overlay

func (*RelayPubSubCollector) Publish

func (r *RelayPubSubCollector) Publish(topic string, data []byte, opts ...PubOpt) (err error)

Publish a serialized request. Request should be encasulated in data argument.

type ReqIDFn

type ReqIDFn func(*Request) RequestID

ReqIDFn is used to generate id for each request

type Request

type Request = pb.Request

Request type alias

type RequestHandler

type RequestHandler func(ctx context.Context, req *Request) *Intermediate

RequestHandler is the callback function when receiving a request. It will be called in every node joined the network. The return value will be sent to the root (directly or relayedly).

type RequestID

type RequestID = pb.RequestID

RequestID type alias

func DefaultReqIDFn

func DefaultReqIDFn(rq *Request) RequestID

DefaultReqIDFn returns default ReqIDGenerator. SHA-512 hash function is called in it.

type Response

type Response = pb.Response

Response type alias

type TopicHandle

type TopicHandle func(topic string, msg *Message)

TopicHandle is the handle function of subscription. WARNING: DO NOT change msg, if a msg includes multiple topics, they share a message.

type TopicHub

type TopicHub struct {
	// contains filtered or unexported fields
}

func NewTopicHub

func NewTopicHub(tw TopicWires) *TopicHub

func (*TopicHub) Close

func (th *TopicHub) Close() error

func (*TopicHub) HandlePeerDown

func (th *TopicHub) HandlePeerDown(p peer.ID, topic string)

func (*TopicHub) HandlePeerUp

func (th *TopicHub) HandlePeerUp(p peer.ID, topic string)

func (*TopicHub) Join

func (th *TopicHub) Join(topic string) error

func (*TopicHub) JoinWithNotifiee

func (th *TopicHub) JoinWithNotifiee(topic string, wn WireListener) error

func (*TopicHub) Leave

func (th *TopicHub) Leave(topic string) error

func (*TopicHub) Wires

func (th *TopicHub) Wires(topic string) Wires

type TopicMsgHandler

type TopicMsgHandler = pubsub.TopicMsgHandler

TopicMsgHandler .

type TopicOpt

type TopicOpt func(*AsyncPubSub) error

TopicOpt is the option in NewTopics

func WithCustomPubSubFactory

func WithCustomPubSubFactory(psubFact PubSubFactory) TopicOpt

WithCustomPubSubFactory initials the pubsub based on given factory

func WithSelfNotif

func WithSelfNotif(enable bool) TopicOpt

WithSelfNotif decides whether a node receives self-published messages. If WithSelfNotif is set to true, the node will receive messages published by itself. Default is set to false.

type TopicWireListener

type TopicWireListener = pubsub.TopicWireListener

TopicWireListener .

type TopicWires

type TopicWires interface {
	ID() peer.ID
	Join(topic string) error
	Leave(topic string) error
	Topics() []string
	Neighbors(topic string) []peer.ID
	SetListener(twn TopicWireListener)
	SendMsg(topic string, to peer.ID, data []byte) error
	SetTopicMsgHandler(th TopicMsgHandler)
	io.Closer
}

type WireListener

type WireListener interface {
	HandlePeerUp(p peer.ID)
	HandlePeerDown(p peer.ID)
}

type Wires

type Wires interface {
	ID() peer.ID
	Neighbors() []peer.ID
	SetListener(wn WireListener)
	SendMsg(to peer.ID, data []byte) error
	SetMsgHandler(h MsgHandler)
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL