diff --git a/dht.go b/dht.go index 26429728a..df5a43148 100644 --- a/dht.go +++ b/dht.go @@ -218,14 +218,21 @@ func New(ctx context.Context, h host.Host, options ...Option) (*IpfsDHT, error) // handle providers dht.proc.AddChild(dht.ProviderManager.Process()) - dht.proc.Go(dht.populatePeers) - // go-routine to make sure we ALWAYS have RT peer addresses in the peerstore // since RT membership is decoupled from connectivity go dht.persistRTPeersInPeerStore() dht.proc.Go(dht.rtPeerLoop) + // Fill routing table with currently connected peers that are DHT servers + dht.plk.Lock() + for _, p := range dht.host.Network().Peers() { + dht.peerFound(dht.ctx, p, false) + } + dht.plk.Unlock() + + dht.proc.Go(dht.populatePeers) + return dht, nil } diff --git a/dht_options.go b/dht_options.go index c4b08c03a..820145451 100644 --- a/dht_options.go +++ b/dht_options.go @@ -64,7 +64,7 @@ type config struct { diversityFilter peerdiversity.PeerIPGroupFilter } - bootstrapPeers []peer.AddrInfo + bootstrapPeers []peer.AddrInfo // test specific config options disableFixLowPeers bool diff --git a/dht_test.go b/dht_test.go index a20f5c90c..85dcc22f6 100644 --- a/dht_test.go +++ b/dht_test.go @@ -1871,12 +1871,12 @@ func TestV1ProtocolOverride(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - d1 := setupDHT(ctx, t, false, V1ProtocolOverride("/myproto") ) - d2 := setupDHT(ctx, t, false, V1ProtocolOverride("/myproto") ) + d1 := setupDHT(ctx, t, false, V1ProtocolOverride("/myproto")) + d2 := setupDHT(ctx, t, false, V1ProtocolOverride("/myproto")) d3 := setupDHT(ctx, t, false, V1ProtocolOverride("/myproto2")) d4 := setupDHT(ctx, t, false) - dhts := []*IpfsDHT{d1,d2,d3,d4} + dhts := []*IpfsDHT{d1, d2, d3, d4} for i, dout := range dhts { for _, din := range dhts[i+1:] { @@ -1893,7 +1893,7 @@ func TestV1ProtocolOverride(t *testing.T) { t.Fatal("should have one peer in the routing table") } - if d3.RoutingTable().Size() > 0 || d4.RoutingTable().Size() > 0{ + if d3.RoutingTable().Size() > 0 || d4.RoutingTable().Size() > 0 { t.Fatal("should have an empty routing table") } } @@ -2023,3 +2023,73 @@ func TestBootStrapWhenRTIsEmpty(t *testing.T) { rt.Find(bootstrappers[2].self) != "" && rt.Find(bootstrapcons[1].self) != "" && rt.Find(bootstrapcons[2].self) != "" }, 5*time.Second, 500*time.Millisecond) } + +func TestPreconnectedNodes(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + // If this test fails it may hang so set a timeout + ctx, cancel = context.WithTimeout(ctx, time.Second*10) + defer cancel() + + opts := []Option{ + testPrefix, + DisableAutoRefresh(), + Mode(ModeServer), + } + + // Create hosts + h1 := bhost.New(swarmt.GenSwarm(t, ctx, swarmt.OptDisableReuseport)) + h2 := bhost.New(swarmt.GenSwarm(t, ctx, swarmt.OptDisableReuseport)) + + // Setup first DHT + d1, err := New(ctx, h1, opts...) + if err != nil { + t.Fatal(err) + } + + // Connect the first host to the second + if err := h1.Connect(ctx, peer.AddrInfo{ID: h2.ID(), Addrs: h2.Addrs()}); err != nil { + t.Fatal(err) + } + + // Wait until we know identify has completed by checking for supported protocols + // TODO: Is this needed? Could we do h2.Connect(h1) and that would wait for identify to complete. + for { + h1Protos, err := h2.Peerstore().SupportsProtocols(h1.ID(), d1.protocolsStrs...) + if err != nil { + t.Fatal(err) + } + + if len(h1Protos) > 0 { + break + } + + select { + case <-time.After(time.Millisecond * 100): + case <-ctx.Done(): + t.Fatal("test hung") + } + } + + // Setup the second DHT + d2, err := New(ctx, h2, opts...) + if err != nil { + t.Fatal(err) + } + + // See if it works + peerCh, err := d2.GetClosestPeers(ctx, "testkey") + if err != nil { + t.Fatal(err) + } + + select { + case p := <-peerCh: + if p == h1.ID() { + break + } + t.Fatal("could not find peer") + case <-ctx.Done(): + t.Fatal("test hung") + } +} diff --git a/subscriber_notifee.go b/subscriber_notifee.go index 75e9cbe83..8211d25de 100644 --- a/subscriber_notifee.go +++ b/subscriber_notifee.go @@ -56,13 +56,6 @@ func newSubscriberNotifiee(dht *IpfsDHT) (*subscriberNotifee, error) { // register for network notifications dht.host.Network().Notify(nn) - // Fill routing table with currently connected peers that are DHT servers - dht.plk.Lock() - defer dht.plk.Unlock() - for _, p := range dht.host.Network().Peers() { - dht.peerFound(dht.ctx, p, false) - } - return nn, nil }