legs

package module
v0.0.0-...-6862cae Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 7, 2023 License: Apache-2.0, MIT Imports: 29 Imported by: 0

README ¶

legs 🦵

Legs is an interface for go-data-transfer, providing a 1:1 mechanism for maintaining a synchronized IPLD dag of data between a publisher and a subscriber's current state for that publisher.

Usage

Typically an application will be either a provider or a subscriber, but may be both.

Publisher

Create a legs publisher. Update its root to cause it to publish.

pub, err :=  NewPublisher(host, dsstore, lsys, "/legs/topic")
if err != nil {
	panic(err)
}
...
// Publish updated root.
err = publisher.UpdateRoot(ctx, lnk.(cidlink.Link).Cid)
if err != nil {
	panic(err)
}

Subscriber

The Subscriber handles subscribing to a topic, reading messages from the topic and tracking the state of each publisher.

Create a Subscriber:

sub, err := legs.NewSubscriber(dstHost, dstStore, dstLnkS, "/legs/topic", nil)
if err != nil {
	panic(err)
}

Optionally, request notification of updates:

watcher, cancelWatcher := sub.OnSyncFinished()
defer cancelWatcher()
go watch(watcher)

func watch(notifications <-chan legs.SyncFinished) {
    for {
        syncFinished := <-notifications
        // newHead is now available in the local dataStore
    }
}

To shutdown a Subscriber, call its Close() method.

A Subscriber can be created with a function that determines if the Subscriber accepts or rejects messages from a publisher. Use the AllowPeer option to specify the function.

sub, err := legs.NewSubscriber(dstHost, dstStore, dstLnkS, "/legs/topic", nil, legs.AllowPeer(allowPeer))

The Subscriber keeps track of the latest head for each publisher that it has synced. This avoids exchanging the whole DAG from scratch in every update and instead downloads only the part that has not been synced. This value is not persisted as part of the library. If you want to start a Subscriber which has already partially synced with a provider you can use the SetLatestSync method:

sub, err := legs.NewSubscriber(dstHost, dstStore, dstLnkS, "/legs/topic", nil)
if err != nil {
    panic(err)
}
// Set up partially synced publishers
if err = sub.SetLatestSync(peerID1, lastSync1) ; err != nil {
    panic(err)
}
if err = sub.SetLatestSync(peerID2, lastSync2) ; err != nil {
    panic(err)
}
if err = sub.SetLatestSync(peerID3, lastSync3) ; err != nil {
    panic(err)
}

License

Legs is dual-licensed under Apache 2.0 and MIT terms:

Apache License, Version 2.0, (LICENSE or http://www.apache.org/licenses/LICENSE-2.0)
MIT license (LICENSE-MIT or http://opensource.org/licenses/MIT)

Documentation ¶

Index ¶

Examples ¶

Constants ¶

This section is empty.

Variables ¶

This section is empty.

Functions ¶

func ExploreRecursiveWithStop ¶

func ExploreRecursiveWithStop(limit selector.RecursionLimit, sequence selectorbuilder.SelectorSpec, stopLnk ipld.Link) ipld.Node

ExploreRecursiveWithStop builds a selector that recursively syncs a DAG until the link stopLnk is seen. It prevents from having to sync DAGs from scratch with every update.

func ExploreRecursiveWithStopNode ¶

func ExploreRecursiveWithStopNode(limit selector.RecursionLimit, sequence ipld.Node, stopLnk ipld.Link) ipld.Node

ExploreRecursiveWithStopNode builds a selector that recursively syncs a DAG until the link stopLnk is seen. It prevents from having to sync DAGs from scratch with every update.

func LegSelector ¶

func LegSelector(limit selector.RecursionLimit, stopLnk ipld.Link) ipld.Node

LegSelector is a convenient function that returns the selector used by leg subscribers

LegSelector is a "recurse all" selector that provides conditions to stop the traversal at a specific link (stopAt).

Types ¶

type BlockHookFunc ¶

type BlockHookFunc func(peer.ID, cid.Cid, SegmentSyncActions)

BlockHookFunc is the signature of a function that is called when a received.

type DefaultLatestSyncHandler ¶

type DefaultLatestSyncHandler struct {
	// contains filtered or unexported fields
}

func (*DefaultLatestSyncHandler) GetLatestSync ¶

func (h *DefaultLatestSyncHandler) GetLatestSync(p peer.ID) (cid.Cid, bool)

func (*DefaultLatestSyncHandler) SetLatestSync ¶

func (h *DefaultLatestSyncHandler) SetLatestSync(p peer.ID, c cid.Cid)

type LatestSyncHandler ¶

type LatestSyncHandler interface {
	SetLatestSync(peer peer.ID, cid cid.Cid)
	GetLatestSync(peer peer.ID) (cid.Cid, bool)
}

LatestSyncHandler defines how to store the latest synced cid for a given peer and how to fetch it. Legs guarantees this will not be called concurrently for the same peer, but it may be called concurrently for different peers.

type Option ¶

type Option func(*config) error

func AddrTTL ¶

func AddrTTL(addrTTL time.Duration) Option

AddrTTL sets the peerstore address time-to-live for addresses discovered from pubsub messages.

func AllowPeer ¶

func AllowPeer(allowPeer announce.AllowPeerFunc) Option

AllowPeer sets the function that determines whether to allow or reject messages from a peer.

func BlockHook ¶

func BlockHook(blockHook BlockHookFunc) Option

BlockHook adds a hook that is run when a block is received via Subscriber.Sync along with a SegmentSyncActions to control the sync flow if segmented sync is enabled. Note that if segmented sync is disabled, calls on SegmentSyncActions will have no effect. See: SegmentSyncActions, SegmentDepthLimit, ScopedBlockHook.

func DtManager ¶

func DtManager(dtManager dt.Manager, gs graphsync.GraphExchange) Option

DtManager provides an existing datatransfer manager.

func FilterIPs ¶

func FilterIPs(enable bool) Option

FilterIPs removes any private, loopback, or unspecified IP multiaddrs from addresses supplied in announce messages.

func HttpClient ¶

func HttpClient(client *http.Client) Option

HttpClient provides Subscriber with an existing http client.

func IdleHandlerTTL ¶

func IdleHandlerTTL(ttl time.Duration) Option

IdleHandlerTTL configures the time after which idle handlers are removed.

func RateLimiter ¶

func RateLimiter(limiterFor RateLimiterFor) Option

RateLimiter configures a function that is called for each sync to get the rate limiter for a specific peer.

func ResendAnnounce ¶

func ResendAnnounce(enable bool) Option

ResendAnnounce determines whether to resend the direct announce mesages (those that are not received via pubsub) over pubsub.

func SegmentDepthLimit ¶

func SegmentDepthLimit(depth int64) Option

SegmentDepthLimit sets the maximum recursion depth limit for a segmented sync. Setting the depth to a value less than zero disables segmented sync completely. Disabled by default. Note that for segmented sync to function at least one of BlockHook or ScopedBlockHook must be set.

func SyncRecursionLimit ¶

func SyncRecursionLimit(limit selector.RecursionLimit) Option

SyncRecursionLimit sets the recursion limit of the background syncing process. Defaults to selector.RecursionLimitNone if not specified.

func Topic ¶

func Topic(topic *pubsub.Topic) Option

Topic provides an existing pubsub topic.

func UseLatestSyncHandler ¶

func UseLatestSyncHandler(h LatestSyncHandler) Option

UseLatestSyncHandler sets the latest sync handler to use.

type Publisher ¶

type Publisher interface {
	// SetRoot sets the root CID without publishing it.
	SetRoot(context.Context, cid.Cid) error
	// UpdateRoot sets the root CID and publishes its update in the pubsub channel.
	UpdateRoot(context.Context, cid.Cid) error
	// UpdateRootWithAddrs publishes an update for the DAG in the pubsub channel using custom multiaddrs.
	UpdateRootWithAddrs(context.Context, cid.Cid, []ma.Multiaddr) error
	// Close publisher
	Close() error
}

Publisher is an interface for updating the published dag.

Example ¶
// Init legs publisher and subscriber
srcStore := dssync.MutexWrap(datastore.NewMapDatastore())
srcHost, _ = libp2p.New()
srcLnkS := makeLinkSystem(srcStore)

pub, err := dtsync.NewPublisher(srcHost, srcStore, srcLnkS, testTopic)
if err != nil {
	panic(err)
}
defer pub.Close()

// Update root on publisher one with item
itm1 := basicnode.NewString("hello world")
lnk1, err := store(srcStore, itm1)
if err != nil {
	panic(err)
}
if err = pub.UpdateRoot(context.Background(), lnk1.(cidlink.Link).Cid); err != nil {
	panic(err)
}
log.Print("Publish 1:", lnk1.(cidlink.Link).Cid)

// Update root on publisher one with item
itm2 := basicnode.NewString("hello world 2")
lnk2, err := store(srcStore, itm2)
if err != nil {
	panic(err)
}
if err = pub.UpdateRoot(context.Background(), lnk2.(cidlink.Link).Cid); err != nil {
	panic(err)
}
log.Print("Publish 2:", lnk2.(cidlink.Link).Cid)
Output:

type RateLimiterFor ¶

type RateLimiterFor func(publisher peer.ID) *rate.Limiter

type SegmentBlockHookFunc ¶

type SegmentBlockHookFunc func(peer.ID, cid.Cid, SegmentSyncActions)

SegmentBlockHookFunc is called for each synced block, similarly to BlockHookFunc. Except that it provides SegmentSyncActions to the hook allowing the user to control the flow of segmented sync by determining which CID should be used in the next segmented sync cycle by decoding the synced block.

SegmentSyncActions also allows the user to signal any errors that may occur during the hook execution to terminate the sync and mark it as failed.

type SegmentSyncActions ¶

type SegmentSyncActions interface {
	// SetNextSyncCid sets the cid that will be synced in the next
	// segmented sync. Note that the last call to this function during a
	// segmented sync cycle dictates which CID will be synced in the next
	// cycle.
	//
	// At least one call to this function must be made for the segmented
	// sync cycles to continue. Because, otherwise the CID that should be
	// used in the next segmented sync cycle cannot be known.
	//
	// If no calls are made to this function or next CID is set to
	// cid.Undef, the sync will terminate and any CIDs that are synced so
	// far will be included in a SyncFinished event.
	SetNextSyncCid(cid.Cid)

	// FailSync fails the sync and returns the given error as soon as the
	// current segment sync finishes. The last call to this function during
	// a segmented sync cycle dictates the error value. Passing nil as
	// error will cancel sync failure.
	FailSync(error)
}

SegmentSyncActions allows the user to control the flow of segmented sync by either choosing which CID should be synced in the next sync cycle or setting the error that should mark the sync as failed.

type Subscriber ¶

type Subscriber struct {
	// contains filtered or unexported fields
}

Subscriber creates a single pubsub subscriber that receives messages from a gossip pubsub topic, and creates a stateful message handler for each message source peer. An optional externally-defined AllowPeerFunc determines whether to allow or deny messages from specific peers.

Messages from separate peers are handled concurrently, and multiple messages from the same peer are handled serially. If a handler is busy handling a message, and more messages arrive from the same peer, then the last message replaces the previous unhandled message to avoid having to maintain queues of messages. Handlers do not have persistent goroutines, but start a new goroutine to handle a single message.

Example ¶
dstHost, _ := libp2p.New()

dstStore := dssync.MutexWrap(datastore.NewMapDatastore())
dstLnkSys := makeLinkSystem(dstStore)

srcHost.Peerstore().AddAddrs(dstHost.ID(), dstHost.Addrs(), time.Hour)
dstHost.Peerstore().AddAddrs(srcHost.ID(), srcHost.Addrs(), time.Hour)

sub, err := legs.NewSubscriber(dstHost, dstStore, dstLnkSys, "/indexer/ingest/testnet", nil)
if err != nil {
	panic(err)
}
defer sub.Close()

// Connections must be made after Subscriber is created, because the
// gossip pubsub must be created before connections are made.  Otherwise,
// the connecting hosts will not see the destination host has pubsub and
// messages will not get published.
dstPeerInfo := dstHost.Peerstore().PeerInfo(dstHost.ID())
if err = srcHost.Connect(context.Background(), dstPeerInfo); err != nil {
	panic(err)
}

watcher, cancelWatcher := sub.OnSyncFinished()
defer cancelWatcher()

for syncFin := range watcher {
	fmt.Println("Finished sync to", syncFin.Cid, "with peer:", syncFin.PeerID)
}
Output:

func NewSubscriber ¶

func NewSubscriber(host host.Host, ds datastore.Batching, lsys ipld.LinkSystem, topic string, dss ipld.Node, options ...Option) (*Subscriber, error)

NewSubscriber creates a new Subscriber that process pubsub messages.

func (*Subscriber) Announce ¶

func (s *Subscriber) Announce(ctx context.Context, nextCid cid.Cid, peerID peer.ID, peerAddrs []multiaddr.Multiaddr) error

Announce handles a direct announce message, that was not arrived over pubsub. The message is resent over pubsub if the Receiver is configured to do so. with the original peerID encoded into the message extra data.

func (*Subscriber) Close ¶

func (s *Subscriber) Close() error

Close shuts down the Subscriber.

func (*Subscriber) GetLatestSync ¶

func (s *Subscriber) GetLatestSync(peerID peer.ID) ipld.Link

GetLatestSync returns the latest synced CID for the specified peer. If there is not handler for the peer, then nil is returned. This does not mean that no data is synced with that peer, it means that the Subscriber does not know about it. Calling Sync() first may be necessary.

func (*Subscriber) HttpPeerStore ¶

func (s *Subscriber) HttpPeerStore() peerstore.Peerstore

HttpPeerStore returns the subscriber's HTTP peer store.

func (*Subscriber) OnSyncFinished ¶

func (s *Subscriber) OnSyncFinished() (<-chan SyncFinished, context.CancelFunc)

OnSyncFinished creates a channel that receives change notifications, and adds that channel to the list of notification channels.

Calling the returned cancel function removes the notification channel from the list of channels to be notified on changes, and it closes the channel to allow any reading goroutines to stop waiting on the channel.

func (*Subscriber) RemoveHandler ¶

func (s *Subscriber) RemoveHandler(peerID peer.ID) bool

RemoveHandler removes a handler for a publisher.

func (*Subscriber) SetAllowPeer ¶

func (s *Subscriber) SetAllowPeer(allowPeer announce.AllowPeerFunc)

SetAllowPeer configures Subscriber with a function to evaluate whether to allow or reject messages from a peer. Setting nil removes any filtering and allows messages from all peers. Calling SetAllowPeer replaces any previously configured AllowPeerFunc.

func (*Subscriber) SetLatestSync ¶

func (s *Subscriber) SetLatestSync(peerID peer.ID, latestSync cid.Cid) error

SetLatestSync sets the latest synced CID for a specified peer. If there is no handler for the peer, then one is created without consulting any AllowPeerFunc.

func (*Subscriber) Sync ¶

func (s *Subscriber) Sync(ctx context.Context, peerID peer.ID, nextCid cid.Cid, sel ipld.Node, peerAddr multiaddr.Multiaddr, opts ...SyncOption) (cid.Cid, error)

Sync performs a one-off explicit sync with the given peer for a specific CID and updates the latest synced link to it. Completing sync may take a significant amount of time, so Sync should generally be run in its own goroutine.

If given cid.Undef, the latest root CID is queried from the peer directly and used instead. Note that in an event where there is no latest root, i.e. querying the latest CID returns cid.Undef, this function returns cid.Undef with nil error.

The latest synced CID is returned when this sync is complete. Any OnSyncFinished readers will also get a SyncFinished when the sync succeeds, but only if syncing to the latest, using `cid.Undef`, and using the default selector. This is because when specifying a CID, it is usually for an entries sync, not an advertisements sync.

It is the responsibility of the caller to make sure the given CID appears after the latest sync in order to avid re-syncing of content that may have previously been synced.

The selector sequence, sel, can optionally be specified to customize the selection sequence during traversal. If unspecified, the default selector sequence is used.

Note that the selector sequence is wrapped with a selector logic that will stop traversal when the latest synced link is reached. Therefore, it must only specify the selection sequence itself.

See: ExploreRecursiveWithStopNode.

type SyncFinished ¶

type SyncFinished struct {
	// Cid is the CID identifying the link that finished and is now the latest
	// sync for a specific peer.
	Cid cid.Cid
	// PeerID identifies the peer this SyncFinished event pertains to.
	PeerID peer.ID
	// A list of cids that this sync acquired. In order from latest to oldest.
	// The latest cid will always be at the beginning.
	SyncedCids []cid.Cid
}

SyncFinished notifies an OnSyncFinished reader that a specified peer completed a sync. The channel receives events from providers that are manually synced to the latest, as well as those auto-discovered.

type SyncOption ¶

type SyncOption func(*syncCfg)

func AlwaysUpdateLatest ¶

func AlwaysUpdateLatest() SyncOption

func ScopedBlockHook ¶

func ScopedBlockHook(hook BlockHookFunc) SyncOption

ScopedBlockHook is the equivalent of BlockHook option but only applied to a single sync. If not specified, the Subscriber BlockHook option is used instead. Specifying the ScopedBlockHook will override the Subscriber level BlockHook for the current sync.

Note that calls to SegmentSyncActions from bloc hook will have no impact if segmented sync is disabled. See: BlockHook, SegmentDepthLimit, ScopedSegmentDepthLimit.

func ScopedRateLimiter ¶

func ScopedRateLimiter(l *rate.Limiter) SyncOption

ScopedRateLimiter set a rate limiter to use for a singel sync. If not specified, the Subscriber rateLimiterFor function is used to get a rate limiter for the sync.

func ScopedSegmentDepthLimit ¶

func ScopedSegmentDepthLimit(depth int64) SyncOption

ScopedSegmentDepthLimit is the equivalent of SegmentDepthLimit option but only applied to a single sync. If not specified, the Subscriber SegmentDepthLimit option is used instead.

Note that for segmented sync to function at least one of BlockHook or ScopedBlockHook must be set. See: SegmentDepthLimit.

type Syncer ¶

type Syncer interface {
	GetHead(context.Context) (cid.Cid, error)
	Sync(ctx context.Context, nextCid cid.Cid, sel ipld.Node) error
}

Syncer is the interface used to sync with a data source.

Directories ¶

Path Synopsis
Package mautil provides multiaddr utility functions.
Package mautil provides multiaddr utility functions.
p2p

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL