diff --git a/dht.go b/dht.go index b599cb6bd..26429728a 100644 --- a/dht.go +++ b/dht.go @@ -138,7 +138,8 @@ type IpfsDHT struct { // networks). enableProviders, enableValues bool - fixLowPeersChan chan struct{} + disableFixLowPeers bool + fixLowPeersChan chan struct{} addPeerToRTChan chan addPeerRTReq refreshFinishedCh chan struct{} @@ -186,6 +187,7 @@ func New(ctx context.Context, h host.Host, options ...Option) (*IpfsDHT, error) dht.maxRecordAge = cfg.maxRecordAge dht.enableProviders = cfg.enableProviders dht.enableValues = cfg.enableValues + dht.disableFixLowPeers = cfg.disableFixLowPeers dht.Validator = cfg.validator @@ -216,19 +218,12 @@ func New(ctx context.Context, h host.Host, options ...Option) (*IpfsDHT, error) // handle providers dht.proc.AddChild(dht.ProviderManager.Process()) - if err := dht.rtRefreshManager.Start(); err != nil { - return nil, err - } + dht.proc.Go(dht.populatePeers) // go-routine to make sure we ALWAYS have RT peer addresses in the peerstore // since RT membership is decoupled from connectivity go dht.persistRTPeersInPeerStore() - // listens to the fix low peers chan and tries to fix the Routing Table - if !cfg.disableFixLowPeers { - dht.proc.Go(dht.fixLowPeersRoutine) - } - dht.proc.Go(dht.rtPeerLoop) return dht, nil @@ -417,7 +412,23 @@ func (dht *IpfsDHT) Mode() ModeOpt { return dht.auto } -// fixLowPeersRoutine tries to get more peers into the routing table if we're below the threshold +func (dht *IpfsDHT) populatePeers(_ goprocess.Process) { + if !dht.disableFixLowPeers { + dht.fixLowPeers(dht.ctx) + } + + if err := dht.rtRefreshManager.Start(); err != nil { + logger.Error(err) + } + + // listens to the fix low peers chan and tries to fix the Routing Table + if !dht.disableFixLowPeers { + dht.proc.Go(dht.fixLowPeersRoutine) + } + +} + +// fixLowPeersRouting manages simultaneous requests to fixLowPeers func (dht *IpfsDHT) fixLowPeersRoutine(proc goprocess.Process) { ticker := time.NewTicker(periodicBootstrapInterval) defer ticker.Stop() @@ -430,62 +441,67 @@ func (dht *IpfsDHT) fixLowPeersRoutine(proc goprocess.Process) { return } - if dht.routingTable.Size() > minRTRefreshThreshold { - continue - } + dht.fixLowPeers(dht.Context()) + } - // we try to add all peers we are connected to to the Routing Table - // in case they aren't already there. - for _, p := range dht.host.Network().Peers() { - dht.peerFound(dht.Context(), p, false) - } +} - // TODO Active Bootstrapping - // We should first use non-bootstrap peers we knew of from previous - // snapshots of the Routing Table before we connect to the bootstrappers. - // See https://github.com/libp2p/go-libp2p-kad-dht/issues/387. - if dht.routingTable.Size() == 0 { - if len(dht.bootstrapPeers) == 0 { - // No point in continuing, we have no peers! - continue - } +// fixLowPeers tries to get more peers into the routing table if we're below the threshold +func (dht *IpfsDHT) fixLowPeers(ctx context.Context) { + if dht.routingTable.Size() > minRTRefreshThreshold { + return + } - found := 0 - for _, i := range rand.Perm(len(dht.bootstrapPeers)) { - ai := dht.bootstrapPeers[i] - err := dht.Host().Connect(dht.Context(), ai) - if err == nil { - found++ - } else { - logger.Warnw("failed to bootstrap", "peer", ai.ID, "error", err) - } + // we try to add all peers we are connected to to the Routing Table + // in case they aren't already there. + for _, p := range dht.host.Network().Peers() { + dht.peerFound(ctx, p, false) + } - // Wait for two bootstrap peers, or try them all. - // - // Why two? In theory, one should be enough - // normally. However, if the network were to - // restart and everyone connected to just one - // bootstrapper, we'll end up with a mostly - // partitioned network. - // - // So we always bootstrap with two random peers. - if found == maxNBoostrappers { - break - } - } + // TODO Active Bootstrapping + // We should first use non-bootstrap peers we knew of from previous + // snapshots of the Routing Table before we connect to the bootstrappers. + // See https://github.com/libp2p/go-libp2p-kad-dht/issues/387. + if dht.routingTable.Size() == 0 { + if len(dht.bootstrapPeers) == 0 { + // No point in continuing, we have no peers! + return } - // if we still don't have peers in our routing table(probably because Identify hasn't completed), - // there is no point in triggering a Refresh. - if dht.routingTable.Size() == 0 { - continue - } + found := 0 + for _, i := range rand.Perm(len(dht.bootstrapPeers)) { + ai := dht.bootstrapPeers[i] + err := dht.Host().Connect(ctx, ai) + if err == nil { + found++ + } else { + logger.Warnw("failed to bootstrap", "peer", ai.ID, "error", err) + } - if dht.autoRefresh { - dht.rtRefreshManager.RefreshNoWait() + // Wait for two bootstrap peers, or try them all. + // + // Why two? In theory, one should be enough + // normally. However, if the network were to + // restart and everyone connected to just one + // bootstrapper, we'll end up with a mostly + // partitioned network. + // + // So we always bootstrap with two random peers. + if found == maxNBoostrappers { + break + } } } + // if we still don't have peers in our routing table(probably because Identify hasn't completed), + // there is no point in triggering a Refresh. + if dht.routingTable.Size() == 0 { + return + } + + if dht.autoRefresh { + dht.rtRefreshManager.RefreshNoWait() + } } // TODO This is hacky, horrible and the programmer needs to have his mother called a hamster. diff --git a/dht_bootstrap.go b/dht_bootstrap.go index e7b5edb96..72d133af5 100644 --- a/dht_bootstrap.go +++ b/dht_bootstrap.go @@ -58,6 +58,7 @@ func GetDefaultBootstrapPeerAddrInfos() []peer.AddrInfo { // Bootstrap tells the DHT to get into a bootstrapped state satisfying the // IpfsRouter interface. func (dht *IpfsDHT) Bootstrap(ctx context.Context) error { + dht.fixRTIfNeeded() dht.rtRefreshManager.RefreshNoWait() return nil }