exchange

package
v1.54.9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 4, 2024 License: MIT Imports: 46 Imported by: 0

Documentation

Overview

Example (PoolExchangeDagBetweenPoolNodes)

Example_poolExchangeDagBetweenPoolNodes starts up a pool with 2 nodes, stores a sample DAG in one node and fetches it via GraphSync from the other node.

server := startMockServer("127.0.0.1:4001")
defer func() {
	// Shutdown the server after test
	if err := server.Shutdown(context.Background()); err != nil {
		panic(err) // Handle the error as you see fit
	}
}()

const poolName = "1"
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()

// Elevate log level to show internal communications.
if err := logging.SetLogLevel("*", "info"); err != nil {
	panic(err)
}

// Use a deterministic random generator to generate deterministic
// output for the example.

// Instantiate the first node in the pool
h1, err := libp2p.New(libp2p.Identity(generateIdentity(1)))
if err != nil {
	panic(err)
}
n1, err := blox.New(blox.WithPoolName(poolName), blox.WithHost(h1))
if err != nil {
	panic(err)
}
if err := n1.Start(ctx); err != nil {
	panic(err)
}
defer n1.Shutdown(ctx)
fmt.Printf("Instantiated node in pool %s with ID: %s\n", poolName, h1.ID().String())

// Instantiate the second node in the pool
h2, err := libp2p.New(libp2p.Identity(generateIdentity(2)))
if err != nil {
	panic(err)
}
n2, err := blox.New(blox.WithPoolName(poolName), blox.WithHost(h2))
if err != nil {
	panic(err)
}
if err := n2.Start(ctx); err != nil {
	panic(err)
}
defer n2.Shutdown(ctx)
fmt.Printf("Instantiated node in pool %s with ID: %s\n", poolName, h2.ID().String())

// Connect n1 to n2.
h1.Peerstore().AddAddrs(h2.ID(), h2.Addrs(), peerstore.PermanentAddrTTL)
if err = h1.Connect(ctx, peer.AddrInfo{ID: h2.ID(), Addrs: h2.Addrs()}); err != nil {
	panic(err)
}
h2.Peerstore().AddAddrs(h1.ID(), h1.Addrs(), peerstore.PermanentAddrTTL)
if err = h2.Connect(ctx, peer.AddrInfo{ID: h1.ID(), Addrs: h1.Addrs()}); err != nil {
	panic(err)
}

// Authorize exchange between the two nodes
if err := n1.SetAuth(ctx, h1.ID(), h2.ID(), true); err != nil {
	panic(err)
}
if err := n2.SetAuth(ctx, h2.ID(), h1.ID(), true); err != nil {
	panic(err)
}

// Generate a sample DAG and store it on node 1 (n1) in the pool, which we will pull from n1
n1leaf := fluent.MustBuildMap(basicnode.Prototype.Map, 1, func(na fluent.MapAssembler) {
	na.AssembleEntry("this").AssignBool(true)
})
n1leafLink, err := n1.Store(ctx, n1leaf)
if err != nil {
	panic(err)
}
n1Root := fluent.MustBuildMap(basicnode.Prototype.Map, 2, func(na fluent.MapAssembler) {
	na.AssembleEntry("that").AssignInt(42)
	na.AssembleEntry("oneLeafLink").AssignLink(n1leafLink)
})
n1RootLink, err := n1.Store(ctx, n1Root)
if err != nil {
	panic(err)
}
fmt.Printf("%s stored IPLD data with links:\n    root: %s\n    leaf:%s\n", h1.ID(), n1RootLink, n1leafLink)

// Generate a sample DAG and store it on node 2 (n1) in the pool, which we will push to n1
n2leaf := fluent.MustBuildMap(basicnode.Prototype.Map, 1, func(na fluent.MapAssembler) {
	na.AssembleEntry("that").AssignBool(false)
})
n2leafLink, err := n2.Store(ctx, n2leaf)
if err != nil {
	panic(err)
}
n2Root := fluent.MustBuildMap(basicnode.Prototype.Map, 2, func(na fluent.MapAssembler) {
	na.AssembleEntry("this").AssignInt(24)
	na.AssembleEntry("anotherLeafLink").AssignLink(n2leafLink)
})
n2RootLink, err := n2.Store(ctx, n2Root)
if err != nil {
	panic(err)
}
fmt.Printf("%s stored IPLD data with links:\n    root: %s\n    leaf:%s\n", h1.ID(), n1RootLink, n1leafLink)

fmt.Println("exchanging by Pull...")
// Pull the sample DAG stored on node 1 from node 2 by only asking for the root link.
// Because fetch implementation is recursive, it should fetch the leaf link too.
if err := n2.Pull(ctx, h1.ID(), n1RootLink); err != nil {
	panic(err)
}

// Assert that n2 now has both root and leaf links
if exists, err := n2.Has(ctx, n1RootLink); err != nil {
	panic(err)
} else if !exists {
	panic("expected n2 to have fetched the entire sample DAG, link: " + n1RootLink.String())
} else {
	fmt.Printf("%s successfully fetched:\n    link: %s\n    from %s\n", h2.ID(), n1RootLink, h1.ID())
	n, err := n2.Load(ctx, n1RootLink, basicnode.Prototype.Any)
	if err != nil {
		panic(err)
	}
	var buf bytes.Buffer
	if err := dagjson.Encode(n, &buf); err != nil {
		panic(err)
	}
	fmt.Printf("    content: %s\n", buf.String())
}
if exists, err := n2.Has(ctx, n1leafLink); err != nil {
	panic(err)
} else if !exists {
	panic("expected n2 to have fetched the entire sample DAG, link: " + n1leafLink.String())
} else {
	fmt.Printf("%s successfully fetched:\n    link: %s\n    from %s\n", h2.ID(), n1leafLink, h1.ID())
	n, err := n2.Load(ctx, n1leafLink, basicnode.Prototype.Any)
	if err != nil {
		panic(err)
	}
	var buf bytes.Buffer
	if err := dagjson.Encode(n, &buf); err != nil {
		panic(err)
	}
	fmt.Printf("    content: %s\n", buf.String())
}

fmt.Println("exchanging by Push...")
// Push the sample DAG stored on node 2 to node 1 by only pushing the root link.
// Because Push implementation is recursive, it should push the leaf link too.
if err := n2.Push(ctx, h1.ID(), n2RootLink); err != nil {
	panic(err)
}

// Since push is an asynchronous operation, wait until background push is finished
// by periodically checking if link is present on node 1.
for {
	if exists, _ := n1.Has(ctx, n2RootLink); exists {
		break
	}
	select {
	case <-ctx.Done():
		panic(ctx.Err())
	default:
		time.Sleep(time.Second)
	}
}

// Assert that n1 now has both root and leaf links
if exists, err := n1.Has(ctx, n2RootLink); err != nil {
	panic(err)
} else if !exists {
	panic("expected n2 to have pushed the entire sample DAG")
} else {
	fmt.Printf("%s successfully pushed:\n    link: %s\n    from %s\n", h2.ID(), n1RootLink, h1.ID())
	n, err := n1.Load(ctx, n2RootLink, basicnode.Prototype.Any)
	if err != nil {
		panic(err)
	}
	var buf bytes.Buffer
	if err := dagjson.Encode(n, &buf); err != nil {
		panic(err)
	}
	fmt.Printf("    content: %s\n", buf.String())
}
if exists, err := n1.Has(ctx, n2leafLink); err != nil {
	panic(err)
} else if !exists {
	panic("expected n2 to have pushed the entire sample DAG")
} else {
	fmt.Printf("%s successfully pushed:\n    link: %s\n    from %s\n", h2.ID(), n1leafLink, h1.ID())
	n, err := n1.Load(ctx, n2leafLink, basicnode.Prototype.Any)
	if err != nil {
		panic(err)
	}
	var buf bytes.Buffer
	if err := dagjson.Encode(n, &buf); err != nil {
		panic(err)
	}
	fmt.Printf("    content: %s", buf.String())
}
Output:

Instantiated node in pool 1 with ID: 12D3KooWQfGkPUkoLDEeJE3H3ZTmu9BZvAdbJpmhha8WpjeSLKMM
Instantiated node in pool 1 with ID: 12D3KooWH9swjeCyuR6utzKU1UspiW5RDGzAFvNDwqkT5bUHwuxX
12D3KooWQfGkPUkoLDEeJE3H3ZTmu9BZvAdbJpmhha8WpjeSLKMM stored IPLD data with links:
    root: bafyr4ifwexg2ka3kueem7wp36diai4wzqswkdiqscw2su4llkhgwcmq2ji
    leaf:bafyr4iauqnsshryxfg2262z6mqev5fyef7gmgjk54skmtggnplehusyno4
12D3KooWQfGkPUkoLDEeJE3H3ZTmu9BZvAdbJpmhha8WpjeSLKMM stored IPLD data with links:
    root: bafyr4ifwexg2ka3kueem7wp36diai4wzqswkdiqscw2su4llkhgwcmq2ji
    leaf:bafyr4iauqnsshryxfg2262z6mqev5fyef7gmgjk54skmtggnplehusyno4
exchanging by Pull...
12D3KooWH9swjeCyuR6utzKU1UspiW5RDGzAFvNDwqkT5bUHwuxX successfully fetched:
    link: bafyr4ifwexg2ka3kueem7wp36diai4wzqswkdiqscw2su4llkhgwcmq2ji
    from 12D3KooWQfGkPUkoLDEeJE3H3ZTmu9BZvAdbJpmhha8WpjeSLKMM
    content: {"oneLeafLink":{"/":"bafyr4iauqnsshryxfg2262z6mqev5fyef7gmgjk54skmtggnplehusyno4"},"that":42}
12D3KooWH9swjeCyuR6utzKU1UspiW5RDGzAFvNDwqkT5bUHwuxX successfully fetched:
    link: bafyr4iauqnsshryxfg2262z6mqev5fyef7gmgjk54skmtggnplehusyno4
    from 12D3KooWQfGkPUkoLDEeJE3H3ZTmu9BZvAdbJpmhha8WpjeSLKMM
    content: {"this":true}
exchanging by Push...
12D3KooWH9swjeCyuR6utzKU1UspiW5RDGzAFvNDwqkT5bUHwuxX successfully pushed:
    link: bafyr4ifwexg2ka3kueem7wp36diai4wzqswkdiqscw2su4llkhgwcmq2ji
    from 12D3KooWQfGkPUkoLDEeJE3H3ZTmu9BZvAdbJpmhha8WpjeSLKMM
    content: {"anotherLeafLink":{"/":"bafyr4iaab3lel4ykjcyzqajx5np2uluetwvfyv3ujupxt5qs57owhpo6ty"},"this":24}
12D3KooWH9swjeCyuR6utzKU1UspiW5RDGzAFvNDwqkT5bUHwuxX successfully pushed:
    link: bafyr4iauqnsshryxfg2262z6mqev5fyef7gmgjk54skmtggnplehusyno4
    from 12D3KooWQfGkPUkoLDEeJE3H3ZTmu9BZvAdbJpmhha8WpjeSLKMM
    content: {"that":false}
Example (ProvideAfterPull)
server := startMockServer("127.0.0.1:4001")
defer func() {
	// Shutdown the server after test
	if err := server.Shutdown(context.Background()); err != nil {
		panic(err) // Handle the error as you see fit
	}
}()

const poolName = "1"
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()

// Elevate log level to show internal communications.
if err := logging.SetLogLevel("*", "info"); err != nil {
	panic(err)
}

// Use a deterministic random generator to generate deterministic
// output for the example.

// Instantiate the first node in the pool
h1, err := libp2p.New(libp2p.Identity(generateIdentity(1)))
if err != nil {
	panic(err)
}
n1, err := blox.New(
	blox.WithPoolName(poolName),
	blox.WithTopicName(poolName),
	blox.WithHost(h1),
	blox.WithUpdatePoolName(updatePoolName),
	blox.WithBlockchainEndPoint("127.0.0.1:4001"),
	blox.WithRelays([]string{"/dns/relay.dev.fx.land/tcp/4001/p2p/12D3KooWDRrBaAfPwsGJivBoUw5fE7ZpDiyfUjqgiURq2DEcL835"}),
	blox.WithExchangeOpts(
		exchange.WithDhtProviderOptions(
			dht.ProtocolExtension(protocol.ID("/"+poolName)),
			dht.ProtocolPrefix("/fula"),
			dht.Resiliency(1),
			dht.Mode(dht.ModeAutoServer),
		),
	),
)
if err != nil {
	panic(err)
}
if err := n1.Start(ctx); err != nil {
	panic(err)
}
defer n1.Shutdown(ctx)
fmt.Printf("Instantiated node in pool %s with ID: %s\n", poolName, h1.ID().String())

// Instantiate the second node in the pool
h2, err := libp2p.New(libp2p.Identity(generateIdentity(2)))
if err != nil {
	panic(err)
}
n2, err := blox.New(
	blox.WithPoolName(poolName),
	blox.WithTopicName(poolName),
	blox.WithHost(h2),
	blox.WithUpdatePoolName(updatePoolName),
	blox.WithBlockchainEndPoint("127.0.0.1:4001"),
	blox.WithRelays([]string{"/dns/relay.dev.fx.land/tcp/4001/p2p/12D3KooWDRrBaAfPwsGJivBoUw5fE7ZpDiyfUjqgiURq2DEcL835"}),
	blox.WithExchangeOpts(
		exchange.WithDhtProviderOptions(
			dht.ProtocolExtension(protocol.ID("/"+poolName)),
			dht.ProtocolPrefix("/fula"),
			dht.Resiliency(1),
			dht.Mode(dht.ModeAutoServer),
		),
	),
)
if err != nil {
	panic(err)
}
if err := n2.Start(ctx); err != nil {
	panic(err)
}
defer n2.Shutdown(ctx)
fmt.Printf("Instantiated node in pool %s with ID: %s\n", poolName, h2.ID().String())

// Instantiate the third node in the pool
h3, err := libp2p.New(libp2p.Identity(generateIdentity(3)))
if err != nil {
	panic(err)
}
n3, err := blox.New(
	blox.WithPoolName(poolName),
	blox.WithTopicName(poolName),
	blox.WithHost(h3),
	blox.WithUpdatePoolName(updatePoolName),
	blox.WithBlockchainEndPoint("127.0.0.1:4001"),
	blox.WithRelays([]string{"/dns/relay.dev.fx.land/tcp/4001/p2p/12D3KooWDRrBaAfPwsGJivBoUw5fE7ZpDiyfUjqgiURq2DEcL835"}),
	blox.WithExchangeOpts(
		exchange.WithDhtProviderOptions(
			dht.ProtocolExtension(protocol.ID("/"+poolName)),
			dht.ProtocolPrefix("/fula"),
			dht.Resiliency(1),
			dht.Mode(dht.ModeAutoServer),
		),
	),
)
if err != nil {
	panic(err)
}
if err := n3.Start(ctx); err != nil {
	panic(err)
}
defer n3.Shutdown(ctx)
fmt.Printf("Instantiated node in pool %s with ID: %s\n", poolName, h3.ID().String())

// Instantiate the fourth node not in the pool
h4, err := libp2p.New(libp2p.Identity(generateIdentity(4)))
if err != nil {
	panic(err)
}
n4, err := blox.New(
	blox.WithPoolName("0"),
	blox.WithTopicName("0"),
	blox.WithHost(h4),
	blox.WithUpdatePoolName(updatePoolName),
	blox.WithBlockchainEndPoint("127.0.0.1:4001"),
	blox.WithRelays([]string{"/dns/relay.dev.fx.land/tcp/4001/p2p/12D3KooWDRrBaAfPwsGJivBoUw5fE7ZpDiyfUjqgiURq2DEcL835"}),
	blox.WithExchangeOpts(
		exchange.WithDhtProviderOptions(
			dht.ProtocolExtension(protocol.ID("/"+poolName)),
			dht.ProtocolPrefix("/fula"),
			dht.Resiliency(1),
			dht.Mode(dht.ModeAutoServer),
		),
	),
)
if err != nil {
	panic(err)
}
if err := n4.Start(ctx); err != nil {
	panic(err)
}
defer n4.Shutdown(ctx)
fmt.Printf("Instantiated node in pool %s with ID: %s\n", "0", h4.ID().String())

// Connect n1 to n2 and n3 so that there is a path for gossip propagation.
// Note that we are not connecting n2 to n3 as they should discover
// each other via pool's iexist announcements.
h1.Peerstore().AddAddrs(h2.ID(), h2.Addrs(), peerstore.PermanentAddrTTL)
if err = h1.Connect(ctx, peer.AddrInfo{ID: h2.ID(), Addrs: h2.Addrs()}); err != nil {
	panic(err)
}
h1.Peerstore().AddAddrs(h3.ID(), h3.Addrs(), peerstore.PermanentAddrTTL)
if err = h1.Connect(ctx, peer.AddrInfo{ID: h3.ID(), Addrs: h3.Addrs()}); err != nil {
	panic(err)
}

// Wait until the nodes discover each other
for {
	if len(h1.Peerstore().Peers()) >= 3 &&
		len(h2.Peerstore().Peers()) >= 3 &&
		len(h3.Peerstore().Peers()) >= 3 {
		break
	}
	select {
	case <-ctx.Done():
		panic(ctx.Err())
	default:
		time.Sleep(time.Second)
	}
}

fmt.Printf("Finally %s peerstore contains >=3 nodes:\n", h1.ID())

fmt.Printf("Finally %s peerstore contains >=3 nodes:\n", h2.ID())

fmt.Printf("Finally %s peerstore contains >=3 nodes:\n", h3.ID())

//Manually adding h4 as it is not in the same pool
h1.Peerstore().AddAddrs(h4.ID(), h4.Addrs(), peerstore.PermanentAddrTTL)
if err = h1.Connect(ctx, peer.AddrInfo{ID: h4.ID(), Addrs: h4.Addrs()}); err != nil {
	panic(err)
}
//Manually adding h4 as it is not in the same pool
h2.Peerstore().AddAddrs(h4.ID(), h4.Addrs(), peerstore.PermanentAddrTTL)
if err = h2.Connect(ctx, peer.AddrInfo{ID: h4.ID(), Addrs: h4.Addrs()}); err != nil {
	panic(err)
}
//Manually adding h4 as it is not in the same pool
h3.Peerstore().AddAddrs(h4.ID(), h4.Addrs(), peerstore.PermanentAddrTTL)
if err = h3.Connect(ctx, peer.AddrInfo{ID: h4.ID(), Addrs: h4.Addrs()}); err != nil {
	panic(err)
}

// Wait until the fourth node discover others
for {
	if len(h4.Peerstore().Peers()) >= 4 {
		break
	}
	select {
	case <-ctx.Done():
		panic(ctx.Err())
	default:
		time.Sleep(time.Second)
	}
}

//Store a link in h1 and find providers from h2

// Generate a sample DAG and store it on node 1 (n1) in the pool, which we will pull from n1
n1leaf := fluent.MustBuildMap(basicnode.Prototype.Map, 1, func(na fluent.MapAssembler) {
	na.AssembleEntry("this").AssignBool(true)
})
n1leafLink, err := n1.Store(ctx, n1leaf)
if err != nil {
	panic(err)
}
n1Root := fluent.MustBuildMap(basicnode.Prototype.Map, 2, func(na fluent.MapAssembler) {
	na.AssembleEntry("that").AssignInt(42)
	na.AssembleEntry("oneLeafLink").AssignLink(n1leafLink)
})
n1RootLink, err := n1.Store(ctx, n1Root)
if err != nil {
	panic(err)
}
fmt.Printf("%s stored IPLD data with links:\n    root: %s\n    leaf:%s\n", h1.ID(), n1RootLink, n1leafLink)

// Generate a sample DAG and store it on node 2 (n1) in the pool, which we will push to n1
n2leaf := fluent.MustBuildMap(basicnode.Prototype.Map, 1, func(na fluent.MapAssembler) {
	na.AssembleEntry("that").AssignBool(false)
})
n2leafLink, err := n2.Store(ctx, n2leaf)
if err != nil {
	panic(err)
}
n2Root := fluent.MustBuildMap(basicnode.Prototype.Map, 2, func(na fluent.MapAssembler) {
	na.AssembleEntry("this").AssignInt(24)
	na.AssembleEntry("anotherLeafLink").AssignLink(n2leafLink)
})
n2RootLink, err := n2.Store(ctx, n2Root)
if err != nil {
	panic(err)
}
fmt.Printf("%s stored IPLD data with links:\n    root: %s\n    leaf:%s\n", h1.ID(), n2RootLink, n2leafLink)

//n1.UpdateDhtPeers(h2.Peerstore().Peers())
//n2.UpdateDhtPeers(h1.Peerstore().Peers())

err = n1.ProvideLinkByDht(n1RootLink)
if err != nil {
	fmt.Print("Error happened in ProvideLinkByDht")
	panic(err)
}
peerlist1, err := n2.FindLinkProvidersByDht(n1RootLink)
if err != nil {
	fmt.Print("Error happened in FindLinkProvidersByDht")
	panic(err)
}
// Iterate over the slice and print the peer ID of each AddrInfo
for _, addrInfo := range peerlist1 {
	fmt.Printf("Found %s on %s\n", n1RootLink, addrInfo.ID.String()) // ID.String() converts the peer ID to a string
}

err = n1.PingDht(h3.ID())
if err != nil {
	fmt.Print("Error happened in PingDht")
	panic(err)
}

fmt.Println("exchanging by Pull...")
// Pull the sample DAG stored on node 1 from node 2 by only asking for the root link.
// Because fetch implementation is recursive, it should fetch the leaf link too.
if err := n2.Pull(ctx, h1.ID(), n1RootLink); err != nil {
	panic(err)
}

// Assert that n2 now has both root and leaf links
if exists, err := n2.Has(ctx, n1RootLink); err != nil {
	panic(err)
} else if !exists {
	panic("expected n2 to have fetched the entire sample DAG")
} else {
	fmt.Printf("%s successfully fetched:\n    link: %s\n    from %s\n", h2.ID(), n1RootLink, h1.ID())
	n, err := n2.Load(ctx, n1RootLink, basicnode.Prototype.Any)
	if err != nil {
		panic(err)
	}
	var buf bytes.Buffer
	if err := dagjson.Encode(n, &buf); err != nil {
		panic(err)
	}
	fmt.Printf("    content: %s\n", buf.String())
}
if exists, err := n2.Has(ctx, n1leafLink); err != nil {
	panic(err)
} else if !exists {
	panic("expected n2 to have fetched the entire sample DAG")
} else {
	fmt.Printf("%s successfully fetched:\n    link: %s\n    from %s\n", h2.ID(), n1leafLink, h1.ID())
	n, err := n2.Load(ctx, n1leafLink, basicnode.Prototype.Any)
	if err != nil {
		panic(err)
	}
	var buf bytes.Buffer
	if err := dagjson.Encode(n, &buf); err != nil {
		panic(err)
	}
	fmt.Printf("    content: %s\n", buf.String())
}

fmt.Println("exchanging by Push...")
// Push the sample DAG stored on node 2 to node 1 by only pushing the root link.
// Because Push implementation is recursive, it should push the leaf link too.
if err := n2.Push(ctx, h1.ID(), n2leafLink); err != nil {
	panic(err)
}
err = n1.ProvideLinkByDht(n2leafLink)
if err != nil {
	fmt.Print("Error happened in ProvideLinkByDht n1")
	panic(err)
}
err = n2.ProvideLinkByDht(n2leafLink)
if err != nil {
	fmt.Print("Error happened in ProvideLinkByDht n2")
	panic(err)
}

peerlist3, err := n3.FindLinkProvidersByDht(n2leafLink)
if err != nil {
	fmt.Print("Error happened in FindLinkProvidersByDht3")
	panic(err)
}

// Iterate over the slice and print the peer ID of each AddrInfo
for _, addrInfo := range peerlist3 {
	fmt.Printf("Found %s on %s\n", n2leafLink, addrInfo.ID.String()) // ID.String() converts the peer ID to a string
}
Output:

Instantiated node in pool 1 with ID: 12D3KooWQfGkPUkoLDEeJE3H3ZTmu9BZvAdbJpmhha8WpjeSLKMM
Instantiated node in pool 1 with ID: 12D3KooWH9swjeCyuR6utzKU1UspiW5RDGzAFvNDwqkT5bUHwuxX
Instantiated node in pool 1 with ID: 12D3KooWRde3N9rHE8vEyzTiPMVBvs1RpjS4oaWjVkfAt17412vX
Instantiated node in pool 0 with ID: 12D3KooWRTzN7HfmjoUBHokyRZuKdyohVVSGqKBMF24ZC3tGK78Q
Finally 12D3KooWQfGkPUkoLDEeJE3H3ZTmu9BZvAdbJpmhha8WpjeSLKMM peerstore contains >=3 nodes:
Finally 12D3KooWH9swjeCyuR6utzKU1UspiW5RDGzAFvNDwqkT5bUHwuxX peerstore contains >=3 nodes:
Finally 12D3KooWRde3N9rHE8vEyzTiPMVBvs1RpjS4oaWjVkfAt17412vX peerstore contains >=3 nodes:
12D3KooWQfGkPUkoLDEeJE3H3ZTmu9BZvAdbJpmhha8WpjeSLKMM stored IPLD data with links:
    root: bafyr4ifwexg2ka3kueem7wp36diai4wzqswkdiqscw2su4llkhgwcmq2ji
    leaf:bafyr4iauqnsshryxfg2262z6mqev5fyef7gmgjk54skmtggnplehusyno4
12D3KooWQfGkPUkoLDEeJE3H3ZTmu9BZvAdbJpmhha8WpjeSLKMM stored IPLD data with links:
    root: bafyr4iapi76lbx5zfcjkolrvqbj4fl4jk2uvuz5obwa7xhbjwq3wupgwhm
    leaf:bafyr4iaab3lel4ykjcyzqajx5np2uluetwvfyv3ujupxt5qs57owhpo6ty
Found bafyr4ifwexg2ka3kueem7wp36diai4wzqswkdiqscw2su4llkhgwcmq2ji on 12D3KooWQfGkPUkoLDEeJE3H3ZTmu9BZvAdbJpmhha8WpjeSLKMM
exchanging by Pull...
12D3KooWH9swjeCyuR6utzKU1UspiW5RDGzAFvNDwqkT5bUHwuxX successfully fetched:
    link: bafyr4ifwexg2ka3kueem7wp36diai4wzqswkdiqscw2su4llkhgwcmq2ji
    from 12D3KooWQfGkPUkoLDEeJE3H3ZTmu9BZvAdbJpmhha8WpjeSLKMM
    content: {"oneLeafLink":{"/":"bafyr4iauqnsshryxfg2262z6mqev5fyef7gmgjk54skmtggnplehusyno4"},"that":42}
12D3KooWH9swjeCyuR6utzKU1UspiW5RDGzAFvNDwqkT5bUHwuxX successfully fetched:
    link: bafyr4iauqnsshryxfg2262z6mqev5fyef7gmgjk54skmtggnplehusyno4
    from 12D3KooWQfGkPUkoLDEeJE3H3ZTmu9BZvAdbJpmhha8WpjeSLKMM
    content: {"this":true}
exchanging by Push...
Found bafyr4iaab3lel4ykjcyzqajx5np2uluetwvfyv3ujupxt5qs57owhpo6ty on 12D3KooWQfGkPUkoLDEeJE3H3ZTmu9BZvAdbJpmhha8WpjeSLKMM
Found bafyr4iaab3lel4ykjcyzqajx5np2uluetwvfyv3ujupxt5qs57owhpo6ty on 12D3KooWH9swjeCyuR6utzKU1UspiW5RDGzAFvNDwqkT5bUHwuxX

Index

Examples

Constants

View Source
const (
	FxExchangeProtocolID = "/fx.land/exchange/0.0.3"
)

Variables

This section is empty.

Functions

This section is empty.

Types

type ConfigUpdater added in v1.14.0

type ConfigUpdater func([]peer.ID) error

type Exchange

type Exchange interface {
	Start(context.Context) error
	Push(context.Context, peer.ID, ipld.Link) error
	// Pull recursively traverses the given link and syncs all its associated blocks and itself from the given peer ID.
	Pull(context.Context, peer.ID, ipld.Link) error
	// PullBlock Pulls a single block associated to the given link from the given peer ID.
	PullBlock(context.Context, peer.ID, ipld.Link) error
	SetAuth(context.Context, peer.ID, peer.ID, bool) error
	Shutdown(context.Context) error
	IpniNotifyLink(link ipld.Link)
	FindProvidersIpni(l ipld.Link, relays []string) ([]peer.AddrInfo, error)
}

type FxExchange added in v0.5.4

type FxExchange struct {
	// contains filtered or unexported fields
}

func NewFxExchange added in v0.5.4

func NewFxExchange(h host.Host, ls ipld.LinkSystem, o ...Option) (*FxExchange, error)

func (*FxExchange) FindProvidersDht added in v1.16.0

func (e *FxExchange) FindProvidersDht(l ipld.Link) ([]peer.AddrInfo, error)

func (*FxExchange) FindProvidersIpni added in v1.29.0

func (e *FxExchange) FindProvidersIpni(l ipld.Link, relays []string) ([]peer.AddrInfo, error)

func (*FxExchange) GetAuth added in v1.0.0

func (e *FxExchange) GetAuth(ctx context.Context) (peer.ID, error)

func (*FxExchange) GetAuthorizedPeers added in v1.14.0

func (e *FxExchange) GetAuthorizedPeers(ctx context.Context) ([]peer.ID, error)
func (e *FxExchange) IpniNotifyLink(link ipld.Link)

func (*FxExchange) PingDht added in v1.16.0

func (e *FxExchange) PingDht(p peer.ID) error

func (*FxExchange) ProvideDht added in v1.16.0

func (e *FxExchange) ProvideDht(l ipld.Link) error

func (*FxExchange) Pull added in v0.5.4

func (e *FxExchange) Pull(ctx context.Context, from peer.ID, l ipld.Link) error

func (*FxExchange) PullBlock added in v1.30.0

func (e *FxExchange) PullBlock(ctx context.Context, from peer.ID, l ipld.Link) error

func (*FxExchange) Push added in v0.5.4

func (e *FxExchange) Push(ctx context.Context, to peer.ID, l ipld.Link) error

func (*FxExchange) PutValueDht added in v1.16.0

func (e *FxExchange) PutValueDht(ctx context.Context, key string, val string) error

func (*FxExchange) SearchValueDht added in v1.16.0

func (e *FxExchange) SearchValueDht(ctx context.Context, key string) (string, error)

func (*FxExchange) SetAuth added in v0.8.3

func (e *FxExchange) SetAuth(ctx context.Context, on peer.ID, subject peer.ID, allow bool) error

func (*FxExchange) Shutdown added in v0.5.4

func (e *FxExchange) Shutdown(ctx context.Context) error

func (*FxExchange) Start added in v0.5.4

func (e *FxExchange) Start(ctx context.Context) error

func (*FxExchange) UpdateDhtPeers added in v1.16.0

func (e *FxExchange) UpdateDhtPeers(peers []peer.ID) error

type MultihashResult added in v1.29.0

type MultihashResult struct {
	Multihash       string           `json:"Multihash"`
	ProviderResults []ProviderResult `json:"ProviderResults"`
}

type NoopExchange added in v0.5.4

type NoopExchange struct{}

func (NoopExchange) FindProvidersIpni added in v1.29.0

func (n NoopExchange) FindProvidersIpni(l ipld.Link, relays []string) ([]peer.AddrInfo, error)
func (n NoopExchange) IpniNotifyLink(l ipld.Link)

func (NoopExchange) Pull added in v0.5.4

func (n NoopExchange) Pull(_ context.Context, from peer.ID, l ipld.Link) error

func (NoopExchange) PullBlock added in v1.30.0

func (n NoopExchange) PullBlock(_ context.Context, from peer.ID, l ipld.Link) error

func (NoopExchange) Push added in v0.5.4

func (n NoopExchange) Push(_ context.Context, to peer.ID, l ipld.Link) error

func (NoopExchange) SetAuth added in v0.8.3

func (n NoopExchange) SetAuth(_ context.Context, on peer.ID, subject peer.ID, allow bool) error

func (NoopExchange) Shutdown added in v0.5.4

func (n NoopExchange) Shutdown(context.Context) error

func (NoopExchange) Start added in v0.5.4

func (n NoopExchange) Start(context.Context) error

type Option added in v0.8.3

type Option func(*options) error

func WithAllowTransientConnection added in v0.8.4

func WithAllowTransientConnection(t bool) Option

func WithAuthorizedPeers added in v1.14.0

func WithAuthorizedPeers(l []peer.ID) Option

func WithAuthorizer added in v0.8.3

func WithAuthorizer(a peer.ID) Option

WithAuthorizer sets the peer ID that has permission to configure DAG exchange authorization. Defaults to authorization disabled.

func WithDhtProviderOptions added in v1.16.0

func WithDhtProviderOptions(d ...dht.Option) Option

func WithIPFSApi added in v1.46.0

func WithIPFSApi(ipfsApi iface.CoreAPI) Option

func WithIPFSNode added in v1.46.0

func WithIPFSNode(ipfsNode *core.IpfsNode) Option

func WithIpniGetEndPoint added in v1.29.0

func WithIpniGetEndPoint(l string) Option

func WithIpniProviderEngineOptions added in v1.0.0

func WithIpniProviderEngineOptions(e ...engine.Option) Option

func WithIpniPublishChanBuffer added in v1.0.0

func WithIpniPublishChanBuffer(s int) Option

func WithIpniPublishDisabled added in v1.0.0

func WithIpniPublishDisabled(d bool) Option

func WithIpniPublishInterval added in v1.0.0

func WithIpniPublishInterval(t time.Duration) Option

func WithIpniPublishMaxBatchSize added in v1.0.0

func WithIpniPublishMaxBatchSize(s int) Option

func WithMaxPushRate added in v1.30.0

func WithMaxPushRate(r int) Option

WithMaxPushRate sets the maximum number of CIDs pushed per second. Defaults to 5 per second.

func WithPoolHostMode added in v1.53.0

func WithPoolHostMode(n bool) Option

func WithUpdateConfig added in v1.14.0

func WithUpdateConfig(updateConfig ConfigUpdater) Option

func WithWg added in v1.21.0

func WithWg(wg *sync.WaitGroup) Option

type ProviderResult added in v1.29.0

type ProviderResult struct {
	ContextID string `json:"ContextID"`
}

type PutContentRequest added in v1.29.0

type PutContentRequest struct {
	Mutlihashes []multihash.Multihash
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL