diff --git a/dht.go b/dht.go index c7a9dd71d..36db75512 100644 --- a/dht.go +++ b/dht.go @@ -22,6 +22,7 @@ import ( "github.com/libp2p/go-libp2p-kad-dht/providers" "github.com/libp2p/go-libp2p-kad-dht/rtrefresh" kb "github.com/libp2p/go-libp2p-kbucket" + "github.com/libp2p/go-libp2p-kbucket/peerdiversity" record "github.com/libp2p/go-libp2p-record" recpb "github.com/libp2p/go-libp2p-record/pb" @@ -40,6 +41,8 @@ import ( var ( logger = logging.Logger("dht") baseLogger = logger.Desugar() + + rtFreezeTimeout = 1 * time.Minute ) const ( @@ -66,6 +69,11 @@ const ( protectedBuckets = 2 ) +type addPeerRTReq struct { + p peer.ID + queryPeer bool +} + // IpfsDHT is an implementation of Kademlia with S/Kademlia modifications. // It is used to implement the base Routing module. type IpfsDHT struct { @@ -115,6 +123,7 @@ type IpfsDHT struct { queryPeerFilter QueryFilterFunc routingTablePeerFilter RouteTableFilterFunc + rtPeerDiversityFilter peerdiversity.PeerIPGroupFilter autoRefresh bool @@ -131,6 +140,14 @@ type IpfsDHT struct { enableProviders, enableValues bool fixLowPeersChan chan struct{} + + addPeerToRTChan chan addPeerRTReq + refreshFinishedCh chan struct{} + + rtFreezeTimeout time.Duration + + // configuration variables for tests + testAddressUpdateProcessing bool } // Assert that IPFS assumptions about interfaces aren't broken. These aren't a @@ -173,6 +190,8 @@ func New(ctx context.Context, h host.Host, options ...Option) (*IpfsDHT, error) dht.Validator = cfg.validator + dht.testAddressUpdateProcessing = cfg.testAddressUpdateProcessing + dht.auto = cfg.mode switch cfg.mode { case ModeAuto, ModeClient: @@ -207,7 +226,11 @@ func New(ctx context.Context, h host.Host, options ...Option) (*IpfsDHT, error) go dht.persistRTPeersInPeerStore() // listens to the fix low peers chan and tries to fix the Routing Table - dht.proc.Go(dht.fixLowPeersRoutine) + if !cfg.disableFixLowPeers { + dht.proc.Go(dht.fixLowPeersRoutine) + } + + dht.proc.Go(dht.rtPeerLoop) return dht, nil } @@ -271,7 +294,12 @@ func makeDHT(ctx context.Context, h host.Host, cfg config) (*IpfsDHT, error) { beta: cfg.resiliency, queryPeerFilter: cfg.queryPeerFilter, routingTablePeerFilter: cfg.routingTable.peerFilter, - fixLowPeersChan: make(chan struct{}, 1), + rtPeerDiversityFilter: cfg.routingTable.diversityFilter, + + fixLowPeersChan: make(chan struct{}, 1), + + addPeerToRTChan: make(chan addPeerRTReq), + refreshFinishedCh: make(chan struct{}), } var maxLastSuccessfulOutboundThreshold time.Duration @@ -320,6 +348,8 @@ func makeDHT(ctx context.Context, h host.Host, cfg config) (*IpfsDHT, error) { } dht.ProviderManager = pm + dht.rtFreezeTimeout = rtFreezeTimeout + return dht, nil } @@ -340,13 +370,32 @@ func makeRtRefreshManager(dht *IpfsDHT, cfg config, maxLastSuccessfulOutboundThr queryFnc, cfg.routingTable.refreshQueryTimeout, cfg.routingTable.refreshInterval, - maxLastSuccessfulOutboundThreshold) + maxLastSuccessfulOutboundThreshold, + dht.refreshFinishedCh) return r, err } func makeRoutingTable(dht *IpfsDHT, cfg config, maxLastSuccessfulOutboundThreshold time.Duration) (*kb.RoutingTable, error) { - rt, err := kb.NewRoutingTable(cfg.bucketSize, dht.selfKey, time.Minute, dht.host.Peerstore(), maxLastSuccessfulOutboundThreshold) + // make a Routing Table Diversity Filter + var filter *peerdiversity.Filter + if dht.rtPeerDiversityFilter != nil { + df, err := peerdiversity.NewFilter(dht.rtPeerDiversityFilter, "rt/diversity", func(p peer.ID) int { + return kb.CommonPrefixLen(dht.selfKey, kb.ConvertPeerID(p)) + }) + + if err != nil { + return nil, fmt.Errorf("failed to construct peer diversity filter: %w", err) + } + + filter = df + } + + rt, err := kb.NewRoutingTable(cfg.bucketSize, dht.selfKey, time.Minute, dht.host.Peerstore(), maxLastSuccessfulOutboundThreshold, filter) + if err != nil { + return nil, err + } + cmgr := dht.host.ConnManager() rt.PeerAdded = func(p peer.ID) { @@ -368,6 +417,11 @@ func makeRoutingTable(dht *IpfsDHT, cfg config, maxLastSuccessfulOutboundThresho return rt, err } +// GetRoutingTableDiversityStats returns the diversity stats for the Routing Table. +func (dht *IpfsDHT) GetRoutingTableDiversityStats() []peerdiversity.CplDiversityStats { + return dht.routingTable.GetDiversityStats() +} + // Mode allows introspection of the operation mode of the DHT func (dht *IpfsDHT) Mode() ModeOpt { return dht.auto @@ -496,19 +550,19 @@ func (dht *IpfsDHT) getValueOrPeers(ctx context.Context, p peer.ID, key string) // Perhaps we were given closer peers peers := pb.PBPeersToPeerInfos(pmes.GetCloserPeers()) - if record := pmes.GetRecord(); record != nil { + if rec := pmes.GetRecord(); rec != nil { // Success! We were given the value logger.Debug("got value") // make sure record is valid. - err = dht.Validator.Validate(string(record.GetKey()), record.GetValue()) + err = dht.Validator.Validate(string(rec.GetKey()), rec.GetValue()) if err != nil { logger.Debug("received invalid record (discarded)") // return a sentinal to signify an invalid record was received err = errInvalidRecord - record = new(recpb.Record) + rec = new(recpb.Record) } - return record, peers, err + return rec, peers, err } if len(peers) > 0 { @@ -554,6 +608,50 @@ func (dht *IpfsDHT) putLocal(key string, rec *recpb.Record) error { return dht.datastore.Put(mkDsKey(key), data) } +func (dht *IpfsDHT) rtPeerLoop(proc goprocess.Process) { + bootstrapCount := 0 + isBootsrapping := false + var timerCh <-chan time.Time + + for { + select { + case <-timerCh: + dht.routingTable.MarkAllPeersIrreplaceable() + case addReq := <-dht.addPeerToRTChan: + prevSize := dht.routingTable.Size() + if prevSize == 0 { + isBootsrapping = true + bootstrapCount = 0 + timerCh = nil + } + newlyAdded, err := dht.routingTable.TryAddPeer(addReq.p, addReq.queryPeer, isBootsrapping) + if err != nil { + // peer not added. + continue + } + if !newlyAdded && addReq.queryPeer { + // the peer is already in our RT, but we just successfully queried it and so let's give it a + // bump on the query time so we don't ping it too soon for a liveliness check. + dht.routingTable.UpdateLastSuccessfulOutboundQueryAt(addReq.p, time.Now()) + } + case <-dht.refreshFinishedCh: + bootstrapCount = bootstrapCount + 1 + if bootstrapCount == 2 { + timerCh = time.NewTimer(dht.rtFreezeTimeout).C + } + + old := isBootsrapping + isBootsrapping = false + if old { + dht.rtRefreshManager.RefreshNoWait() + } + + case <-proc.Closing(): + return + } + } +} + // peerFound signals the routingTable that we've found a peer that // might support the DHT protocol. // If we have a connection a peer but no exchange of a query RPC -> @@ -575,16 +673,11 @@ func (dht *IpfsDHT) peerFound(ctx context.Context, p peer.ID, queryPeer bool) { if err != nil { logger.Errorw("failed to validate if peer is a DHT peer", "peer", p, "error", err) } else if b { - newlyAdded, err := dht.routingTable.TryAddPeer(p, queryPeer) - if err != nil { - // peer not added. + select { + case dht.addPeerToRTChan <- addPeerRTReq{p, queryPeer}: + case <-dht.ctx.Done(): return } - if !newlyAdded && queryPeer { - // the peer is already in our RT, but we just successfully queried it and so let's give it a - // bump on the query time so we don't ping it too soon for a liveliness check. - dht.routingTable.UpdateLastSuccessfulOutboundQueryAt(p, time.Now()) - } } } diff --git a/dht_bootstrap_test.go b/dht_bootstrap_test.go index f6bf447b0..8ea9c3609 100644 --- a/dht_bootstrap_test.go +++ b/dht_bootstrap_test.go @@ -15,7 +15,7 @@ import ( func TestSelfWalkOnAddressChange(t *testing.T) { ctx := context.Background() // create three DHT instances with auto refresh disabled - d1 := setupDHT(ctx, t, false, DisableAutoRefresh()) + d1 := setupDHT(ctx, t, false, DisableAutoRefresh(), forceAddressUpdateProcessing(t)) d2 := setupDHT(ctx, t, false, DisableAutoRefresh()) d3 := setupDHT(ctx, t, false, DisableAutoRefresh()) @@ -68,3 +68,134 @@ func TestDefaultBootstrappers(t *testing.T) { } require.Empty(t, dfmap) } + +func TestBootstrappersReplacable(t *testing.T) { + old := rtFreezeTimeout + rtFreezeTimeout = 100 * time.Millisecond + defer func() { + rtFreezeTimeout = old + }() + ctx := context.Background() + d := setupDHT(ctx, t, false, disableFixLowPeersRoutine(t), BucketSize(2)) + defer d.host.Close() + defer d.Close() + + var d1 *IpfsDHT + var d2 *IpfsDHT + + // d1 & d2 have a cpl of 0 + for { + d1 = setupDHT(ctx, t, false, disableFixLowPeersRoutine(t)) + if kb.CommonPrefixLen(d.selfKey, d1.selfKey) == 0 { + break + } + } + + for { + d2 = setupDHT(ctx, t, false, disableFixLowPeersRoutine(t)) + if kb.CommonPrefixLen(d.selfKey, d2.selfKey) == 0 { + break + } + } + defer d1.host.Close() + defer d1.Close() + + defer d2.host.Close() + defer d2.Close() + + connect(t, ctx, d, d1) + connect(t, ctx, d, d2) + require.Len(t, d.routingTable.ListPeers(), 2) + + // d3 & d4 with cpl=0 will go in as d1 & d2 are replacable. + var d3 *IpfsDHT + var d4 *IpfsDHT + + for { + d3 = setupDHT(ctx, t, false, disableFixLowPeersRoutine(t)) + if kb.CommonPrefixLen(d.selfKey, d3.selfKey) == 0 { + break + } + } + + for { + d4 = setupDHT(ctx, t, false, disableFixLowPeersRoutine(t)) + if kb.CommonPrefixLen(d.selfKey, d4.selfKey) == 0 { + break + } + } + + defer d3.host.Close() + defer d3.Close() + defer d4.host.Close() + defer d4.Close() + + connect(t, ctx, d, d3) + connect(t, ctx, d, d4) + require.Len(t, d.routingTable.ListPeers(), 2) + require.Contains(t, d.routingTable.ListPeers(), d3.self) + require.Contains(t, d.routingTable.ListPeers(), d4.self) + + // do couple of refreshes and wait for the Routing Table to be "frozen". + <-d.RefreshRoutingTable() + <-d.RefreshRoutingTable() + time.Sleep(1 * time.Second) + + // adding d5 fails because RT is frozen + var d5 *IpfsDHT + for { + d5 = setupDHT(ctx, t, false, disableFixLowPeersRoutine(t)) + if kb.CommonPrefixLen(d.selfKey, d5.selfKey) == 0 { + break + } + } + defer d5.host.Close() + defer d5.Close() + + connectNoSync(t, ctx, d, d5) + time.Sleep(500 * time.Millisecond) + require.Len(t, d.routingTable.ListPeers(), 2) + require.Contains(t, d.routingTable.ListPeers(), d3.self) + require.Contains(t, d.routingTable.ListPeers(), d4.self) + + // Let's empty the routing table + for _, p := range d.routingTable.ListPeers() { + d.routingTable.RemovePeer(p) + } + require.Len(t, d.routingTable.ListPeers(), 0) + + // adding d1 & d2 works now because there is space in the Routing Table + require.NoError(t, d.host.Network().ClosePeer(d1.self)) + require.NoError(t, d.host.Network().ClosePeer(d2.self)) + connect(t, ctx, d, d1) + connect(t, ctx, d, d2) + require.Len(t, d.routingTable.ListPeers(), 2) + require.Contains(t, d.routingTable.ListPeers(), d1.self) + require.Contains(t, d.routingTable.ListPeers(), d2.self) + + // adding d3 & d4 also works because the RT is not frozen. + require.NoError(t, d.host.Network().ClosePeer(d3.self)) + require.NoError(t, d.host.Network().ClosePeer(d4.self)) + connect(t, ctx, d, d3) + connect(t, ctx, d, d4) + require.Len(t, d.routingTable.ListPeers(), 2) + require.Contains(t, d.routingTable.ListPeers(), d3.self) + require.Contains(t, d.routingTable.ListPeers(), d4.self) + + // run refreshes and freeze the RT + <-d.RefreshRoutingTable() + <-d.RefreshRoutingTable() + time.Sleep(1 * time.Second) + // cant add d1 & d5 because RT is frozen. + require.NoError(t, d.host.Network().ClosePeer(d1.self)) + require.NoError(t, d.host.Network().ClosePeer(d5.self)) + connectNoSync(t, ctx, d, d1) + connectNoSync(t, ctx, d, d5) + d.peerFound(ctx, d5.self, true) + d.peerFound(ctx, d1.self, true) + time.Sleep(1 * time.Second) + + require.Len(t, d.routingTable.ListPeers(), 2) + require.Contains(t, d.routingTable.ListPeers(), d3.self) + require.Contains(t, d.routingTable.ListPeers(), d4.self) +} diff --git a/dht_options.go b/dht_options.go index d08d119db..6edce9dd6 100644 --- a/dht_options.go +++ b/dht_options.go @@ -2,17 +2,21 @@ package dht import ( "fmt" + "testing" "time" - ds "github.com/ipfs/go-datastore" - dssync "github.com/ipfs/go-datastore/sync" - "github.com/ipfs/go-ipns" "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/protocol" "github.com/libp2p/go-libp2p-kad-dht/providers" + + "github.com/libp2p/go-libp2p-kbucket/peerdiversity" record "github.com/libp2p/go-libp2p-record" + + ds "github.com/ipfs/go-datastore" + dssync "github.com/ipfs/go-datastore/sync" + "github.com/ipfs/go-ipns" ) // ModeOpt describes what mode the dht should operate in @@ -56,11 +60,16 @@ type config struct { latencyTolerance time.Duration checkInterval time.Duration peerFilter RouteTableFilterFunc + diversityFilter peerdiversity.PeerIPGroupFilter } // set to true if we're operating in v1 dht compatible mode v1CompatibleMode bool bootstrapPeers []peer.AddrInfo + + // test specific config options + disableFixLowPeers bool + testAddressUpdateProcessing bool } func emptyQueryFilter(_ *IpfsDHT, ai peer.AddrInfo) bool { return true } @@ -403,3 +412,32 @@ func BootstrapPeers(bootstrappers ...peer.AddrInfo) Option { return nil } } + +// RoutingTablePeerDiversityFilter configures the implementation of the `PeerIPGroupFilter` that will be used +// to construct the diversity filter for the Routing Table. +// Please see the docs for `peerdiversity.PeerIPGroupFilter` AND `peerdiversity.Filter` for more details. +func RoutingTablePeerDiversityFilter(pg peerdiversity.PeerIPGroupFilter) Option { + return func(c *config) error { + c.routingTable.diversityFilter = pg + return nil + } +} + +// disableFixLowPeersRoutine disables the "fixLowPeers" routine in the DHT. +// This is ONLY for tests. +func disableFixLowPeersRoutine(t *testing.T) Option { + return func(c *config) error { + c.disableFixLowPeers = true + return nil + } +} + +// forceAddressUpdateProcessing forces the DHT to handle changes to the hosts addresses. +// This occurs even when AutoRefresh has been disabled. +// This is ONLY for tests. +func forceAddressUpdateProcessing(t *testing.T) Option { + return func(c *config) error { + c.testAddressUpdateProcessing = true + return nil + } +} diff --git a/dht_test.go b/dht_test.go index 3d7f5838b..cb487087d 100644 --- a/dht_test.go +++ b/dht_test.go @@ -2056,7 +2056,6 @@ func TestBootStrapWhenRTIsEmpty(t *testing.T) { require.NoError(t, dht1.host.Network().ClosePeer(dht2.self)) dht1.routingTable.RemovePeer(dht2.self) require.NotContains(t, dht2.self, dht1.routingTable.ListPeers()) - require.Eventually(t, func() bool { return dht1.routingTable.Size() == 2 && dht1.routingTable.Find(bootstrappers[0].self) != "" && dht1.routingTable.Find(bootstrapcons[0].self) != "" diff --git a/dual/dual.go b/dual/dual.go index c8e487cae..a291abb06 100644 --- a/dual/dual.go +++ b/dual/dual.go @@ -4,6 +4,7 @@ package dual import ( "context" + "fmt" "sync" dht "github.com/libp2p/go-libp2p-kad-dht" @@ -15,6 +16,7 @@ import ( "github.com/libp2p/go-libp2p-core/protocol" "github.com/libp2p/go-libp2p-core/routing" kb "github.com/libp2p/go-libp2p-kbucket" + "github.com/libp2p/go-libp2p-kbucket/peerdiversity" helper "github.com/libp2p/go-libp2p-routing-helpers" ma "github.com/multiformats/go-multiaddr" @@ -41,15 +43,67 @@ var ( _ routing.ValueStore = (*DHT)(nil) ) +var ( + maxPrefixCountPerCpl = 2 + maxPrefixCount = 3 +) + +type config struct { + wan, lan []dht.Option +} + +func (cfg *config) apply(opts ...Option) error { + for i, o := range opts { + if err := o(cfg); err != nil { + return fmt.Errorf("dual dht option %d failed: %w", i, err) + } + } + return nil +} + +// Option is an option used to configure the Dual DHT. +type Option func(*config) error + +// WanDHTOption applies the given DHT options to the WAN DHT. +func WanDHTOption(opts ...dht.Option) Option { + return func(c *config) error { + c.wan = append(c.wan, opts...) + return nil + } +} + +// LanDHTOption applies the given DHT options to the LAN DHT. +func LanDHTOption(opts ...dht.Option) Option { + return func(c *config) error { + c.lan = append(c.lan, opts...) + return nil + } +} + +// DHTOption applies the given DHT options to both the WAN and the LAN DHTs. +func DHTOption(opts ...dht.Option) Option { + return func(c *config) error { + c.lan = append(c.lan, opts...) + c.wan = append(c.wan, opts...) + return nil + } +} + // New creates a new DualDHT instance. Options provided are forwarded on to the two concrete // IpfsDHT internal constructions, modulo additional options used by the Dual DHT to enforce // the LAN-vs-WAN distinction. // Note: query or routing table functional options provided as arguments to this function // will be overriden by this constructor. -func New(ctx context.Context, h host.Host, options ...dht.Option) (*DHT, error) { - wanOpts := append(options, +func New(ctx context.Context, h host.Host, options ...Option) (*DHT, error) { + var cfg config + err := cfg.apply(options...) + if err != nil { + return nil, err + } + wanOpts := append(cfg.wan, dht.QueryFilter(dht.PublicQueryFilter), dht.RoutingTableFilter(dht.PublicRoutingTableFilter), + dht.RoutingTablePeerDiversityFilter(dht.NewRTPeerDiversityFilter(h, maxPrefixCountPerCpl, maxPrefixCount)), ) wan, err := dht.New(ctx, h, wanOpts...) if err != nil { @@ -58,7 +112,7 @@ func New(ctx context.Context, h host.Host, options ...dht.Option) (*DHT, error) // Unless overridden by user supplied options, the LAN DHT should default // to 'AutoServer' mode. - lanOpts := append(options, + lanOpts := append(cfg.lan, dht.ProtocolExtension(LanExtension), dht.QueryFilter(dht.PrivateQueryFilter), dht.RoutingTableFilter(dht.PrivateRoutingTableFilter), @@ -93,6 +147,14 @@ func (dht *DHT) Provide(ctx context.Context, key cid.Cid, announce bool) error { return dht.LAN.Provide(ctx, key, announce) } +// GetRoutingTableDiversityStats fetches the Routing Table Diversity Stats. +func (dht *DHT) GetRoutingTableDiversityStats() []peerdiversity.CplDiversityStats { + if dht.WANActive() { + return dht.WAN.GetRoutingTableDiversityStats() + } + return nil +} + // FindProvidersAsync searches for peers who are able to provide a given key func (dht *DHT) FindProvidersAsync(ctx context.Context, key cid.Cid, count int) <-chan peer.AddrInfo { reqCtx, cancel := context.WithCancel(ctx) diff --git a/dual/dual_test.go b/dual/dual_test.go index dbd4aab12..323ff1169 100644 --- a/dual/dual_test.go +++ b/dual/dual_test.go @@ -90,7 +90,7 @@ func setupDHT(ctx context.Context, t *testing.T, options ...dht.Option) *DHT { d, err := New( ctx, bhost.New(swarmt.GenSwarm(t, ctx, swarmt.OptDisableReuseport)), - append(baseOpts, options...)..., + append([]Option{DHTOption(baseOpts...)}, DHTOption(options...))..., ) if err != nil { t.Fatal(err) diff --git a/go.mod b/go.mod index ce95183c6..ae2d43460 100644 --- a/go.mod +++ b/go.mod @@ -18,7 +18,7 @@ require ( github.com/libp2p/go-eventbus v0.2.1 github.com/libp2p/go-libp2p v0.10.0 github.com/libp2p/go-libp2p-core v0.6.1 - github.com/libp2p/go-libp2p-kbucket v0.4.2 + github.com/libp2p/go-libp2p-kbucket v0.4.6 github.com/libp2p/go-libp2p-peerstore v0.2.6 github.com/libp2p/go-libp2p-record v0.1.3 github.com/libp2p/go-libp2p-routing-helpers v0.2.3 diff --git a/go.sum b/go.sum index 64032e786..6272365e5 100644 --- a/go.sum +++ b/go.sum @@ -80,7 +80,6 @@ github.com/golang/mock v1.2.0/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfb github.com/golang/mock v1.4.0/go.mod h1:UOMv5ysSaYNkG+OFQykRIcU/QvvxJf3p21QfJ2Bt3cw= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.0/go.mod h1:Qd/q+1AKNOZr9uGQzbzCmRO6sUih6GTPZv6a1/R87v0= -github.com/golang/protobuf v1.3.1 h1:YF8+flBXS5eO826T4nzqPrxfhQThhXl0YzfuUPu4SBg= github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8= github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA= @@ -91,7 +90,6 @@ github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvq github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= -github.com/google/go-cmp v0.3.0 h1:crn/baboCvb5fXaQ0IJ1SGTsTVrWpDsCWC8EGETZijY= github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4= @@ -126,7 +124,6 @@ github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ github.com/hashicorp/golang-lru v0.5.4 h1:YDjusn29QI/Das2iO9M0BHnIbxPeyuCHsjMW+lJfyTc= github.com/hashicorp/golang-lru v0.5.4/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4= github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= -github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/huin/goupnp v1.0.0 h1:wg75sLpL6DZqwHQN6E1Cfk6mtfzS45z8OV+ic+DtHRo= github.com/huin/goupnp v1.0.0/go.mod h1:n9v9KO1tAxYH82qOn+UTIFQDmx5n1Zxd/ClZDMX7Bnc= @@ -218,6 +215,8 @@ github.com/libp2p/go-addr-util v0.0.2/go.mod h1:Ecd6Fb3yIuLzq4bD7VcywcVSBtefcAwn github.com/libp2p/go-buffer-pool v0.0.1/go.mod h1:xtyIz9PMobb13WaxR6Zo1Pd1zXJKYg0a8KiIvDp3TzQ= github.com/libp2p/go-buffer-pool v0.0.2 h1:QNK2iAFa8gjAe1SPz6mHSMuCcjs+X1wlHzeOSqcmlfs= github.com/libp2p/go-buffer-pool v0.0.2/go.mod h1:MvaB6xw5vOrDl8rYZGLFdKAuk/hRoRZd1Vi32+RXyFM= +github.com/libp2p/go-cidranger v1.0.0 h1:EHrUPBAmseAjEKl+aqaonm0u6eZ04+Cvw2UejIhg3lE= +github.com/libp2p/go-cidranger v1.0.0/go.mod h1:66eNgiCUVEUSDYy3J2db4qc4hA7QyTyTqiF1n3CgSvA= github.com/libp2p/go-conn-security-multistream v0.1.0 h1:aqGmto+ttL/uJgX0JtQI0tD21CIEy5eYd1Hlp0juHY0= github.com/libp2p/go-conn-security-multistream v0.1.0/go.mod h1:aw6eD7LOsHEX7+2hJkDxw1MteijaVcI+/eP2/x3J1xc= github.com/libp2p/go-conn-security-multistream v0.2.0 h1:uNiDjS58vrvJTg9jO6bySd1rMKejieG7v45ekqHbZ1M= @@ -236,6 +235,8 @@ github.com/libp2p/go-libp2p v0.7.4/go.mod h1:oXsBlTLF1q7pxr+9w6lqzS1ILpyHsaBPniV github.com/libp2p/go-libp2p v0.8.3/go.mod h1:EsH1A+8yoWK+L4iKcbPYu6MPluZ+CHWI9El8cTaefiM= github.com/libp2p/go-libp2p v0.10.0 h1:7ooOvK1wi8eLpyTppy8TeH43UHy5uI75GAHGJxenUi0= github.com/libp2p/go-libp2p v0.10.0/go.mod h1:yBJNpb+mGJdgrwbKAKrhPU0u3ogyNFTfjJ6bdM+Q/G8= +github.com/libp2p/go-libp2p-asn-util v0.0.0-20200606034824-1b967eb41be7 h1:nf8vB9BDuXCSIPq2dykC6+zRhToq/M7Mib02w23ifwQ= +github.com/libp2p/go-libp2p-asn-util v0.0.0-20200606034824-1b967eb41be7/go.mod h1:bLVf7QeUbm5OTzlsVDVzIBpWCWuYWv4TTvgcAgSyyck= github.com/libp2p/go-libp2p-autonat v0.1.1/go.mod h1:OXqkeGOY2xJVWKAGV2inNF5aKN/djNA3fdpCWloIudE= github.com/libp2p/go-libp2p-autonat v0.2.0/go.mod h1:DX+9teU4pEEoZUqR1PiMlqliONQdNbfzE1C718tcViI= github.com/libp2p/go-libp2p-autonat v0.2.1/go.mod h1:MWtAhV5Ko1l6QBsHQNSuM6b1sRkXrpk0/LqCr+vCVxI= @@ -272,7 +273,6 @@ github.com/libp2p/go-libp2p-core v0.5.4/go.mod h1:uN7L2D4EvPCvzSH5SrhR72UWbnSGpt github.com/libp2p/go-libp2p-core v0.5.5/go.mod h1:vj3awlOr9+GMZJFH9s4mpt9RHHgGqeHCopzbYKZdRjM= github.com/libp2p/go-libp2p-core v0.5.6/go.mod h1:txwbVEhHEXikXn9gfC7/UDDw7rkxuX0bJvM49Ykaswo= github.com/libp2p/go-libp2p-core v0.5.7/go.mod h1:txwbVEhHEXikXn9gfC7/UDDw7rkxuX0bJvM49Ykaswo= -github.com/libp2p/go-libp2p-core v0.6.0 h1:u03qofNYTBN+yVg08PuAKylZogVf0xcTEeM8skGf+ak= github.com/libp2p/go-libp2p-core v0.6.0/go.mod h1:txwbVEhHEXikXn9gfC7/UDDw7rkxuX0bJvM49Ykaswo= github.com/libp2p/go-libp2p-core v0.6.1 h1:XS+Goh+QegCDojUZp00CaPMfiEADCrLjNZskWE7pvqs= github.com/libp2p/go-libp2p-core v0.6.1/go.mod h1:FfewUH/YpvWbEB+ZY9AQRQ4TAD8sJBt/G1rVvhz5XT8= @@ -282,8 +282,8 @@ github.com/libp2p/go-libp2p-discovery v0.3.0 h1:+JnYBRLzZQtRq0mK3xhyjBwHytLmJXMT github.com/libp2p/go-libp2p-discovery v0.3.0/go.mod h1:o03drFnz9BVAZdzC/QUQ+NeQOu38Fu7LJGEOK2gQltw= github.com/libp2p/go-libp2p-discovery v0.4.0 h1:dK78UhopBk48mlHtRCzbdLm3q/81g77FahEBTjcqQT8= github.com/libp2p/go-libp2p-discovery v0.4.0/go.mod h1:bZ0aJSrFc/eX2llP0ryhb1kpgkPyTo23SJ5b7UQCMh4= -github.com/libp2p/go-libp2p-kbucket v0.4.2 h1:wg+VPpCtY61bCasGRexCuXOmEmdKjN+k1w+JtTwu9gA= -github.com/libp2p/go-libp2p-kbucket v0.4.2/go.mod h1:7sCeZx2GkNK1S6lQnGUW5JYZCFPnXzAZCCBBS70lytY= +github.com/libp2p/go-libp2p-kbucket v0.4.6 h1:09hF4woC+3ZNl3k5flPErUV5pXKq34LBhvupjn4qjW4= +github.com/libp2p/go-libp2p-kbucket v0.4.6/go.mod h1:12pr3Yg2qtBdwRjffaGJTa995rrMQxBk5a+KncHqTn4= github.com/libp2p/go-libp2p-loggables v0.1.0 h1:h3w8QFfCt2UJl/0/NW4K829HX/0S4KD31PQ7m8UXXO8= github.com/libp2p/go-libp2p-loggables v0.1.0/go.mod h1:EyumB2Y6PrYjr55Q3/tiJ/o3xoDasoRYM7nOzEpoa90= github.com/libp2p/go-libp2p-mplex v0.2.0/go.mod h1:Ejl9IyjvXJ0T9iqUTE1jpYATQ9NM3g+OtR+EMMODbKo= @@ -304,6 +304,7 @@ github.com/libp2p/go-libp2p-peerstore v0.2.0/go.mod h1:N2l3eVIeAitSg3Pi2ipSrJYnq github.com/libp2p/go-libp2p-peerstore v0.2.1/go.mod h1:NQxhNjWxf1d4w6PihR8btWIRjwRLBr4TYKfNgrUkOPA= github.com/libp2p/go-libp2p-peerstore v0.2.2/go.mod h1:NQxhNjWxf1d4w6PihR8btWIRjwRLBr4TYKfNgrUkOPA= github.com/libp2p/go-libp2p-peerstore v0.2.3/go.mod h1:K8ljLdFn590GMttg/luh4caB/3g0vKuY01psze0upRw= +github.com/libp2p/go-libp2p-peerstore v0.2.4 h1:jU9S4jYN30kdzTpDAR7SlHUD+meDUjTODh4waLWF1ws= github.com/libp2p/go-libp2p-peerstore v0.2.4/go.mod h1:ss/TWTgHZTMpsU/oKVVPQCGuDHItOpf2W8RxAi50P2s= github.com/libp2p/go-libp2p-peerstore v0.2.6 h1:2ACefBX23iMdJU9Ke+dcXt3w86MIryes9v7In4+Qq3U= github.com/libp2p/go-libp2p-peerstore v0.2.6/go.mod h1:ss/TWTgHZTMpsU/oKVVPQCGuDHItOpf2W8RxAi50P2s= @@ -382,7 +383,6 @@ github.com/libp2p/go-reuseport v0.0.1/go.mod h1:jn6RmB1ufnQwl0Q1f+YxAj8isJgDCQza github.com/libp2p/go-reuseport-transport v0.0.2/go.mod h1:YkbSDrvjUVDL6b8XqriyA20obEtsW9BLkuOUyQAOCbs= github.com/libp2p/go-reuseport-transport v0.0.3 h1:zzOeXnTooCkRvoH+bSXEfXhn76+LAiwoneM0gnXjF2M= github.com/libp2p/go-reuseport-transport v0.0.3/go.mod h1:Spv+MPft1exxARzP2Sruj2Wb5JSyHNncjf1Oi2dEbzM= -github.com/libp2p/go-sockaddr v0.0.2 h1:tCuXfpA9rq7llM/v834RKc/Xvovy/AqM9kHvTV/jY/Q= github.com/libp2p/go-sockaddr v0.0.2/go.mod h1:syPvOmNs24S3dFVGJA1/mrqdeijPxLV2Le3BRLKd68k= github.com/libp2p/go-sockaddr v0.1.0 h1:Y4s3/jNoryVRKEBrkJ576F17CPOaMIzUeCsg7dlTDj0= github.com/libp2p/go-sockaddr v0.1.0/go.mod h1:syPvOmNs24S3dFVGJA1/mrqdeijPxLV2Le3BRLKd68k= @@ -503,7 +503,6 @@ github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+W github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.8.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.11.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= -github.com/onsi/ginkgo v1.12.0 h1:Iw5WCbBcaAAd0fpRb1c9r5YCylv4XDoCSigm1zLevwU= github.com/onsi/ginkgo v1.12.0/go.mod h1:oUhWkIvk5aDxtKvDDuw8gItl8pKl42LzjC9KZE0HfGg= github.com/onsi/ginkgo v1.12.1 h1:mFwc4LvZ0xpSvDZ3E+k8Yte0hLOMxXUlP+yXtJqkYfQ= github.com/onsi/ginkgo v1.12.1/go.mod h1:zj2OWP4+oCPe1qIXoGWkgMRwljMUYCdkwsT2108oapk= @@ -584,7 +583,6 @@ github.com/tarm/serial v0.0.0-20180830185346-98f6abe2eb07/go.mod h1:kDXzergiv9cb github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0= github.com/viant/assertly v0.4.8/go.mod h1:aGifi++jvCrUaklKEKT0BU95igDNaqkvz+49uaYMPRU= github.com/viant/toolbox v0.24.0/go.mod h1:OxMCG57V0PXuIP2HNQrtJf2CjqdmbrOx5EkMILuUhzM= -github.com/wangjia184/sortedset v0.0.0-20160527075905-f5d03557ba30/go.mod h1:YkocrP2K2tcw938x9gCOmT5G5eCD6jsTz0SZuyAqwIE= github.com/whyrusleeping/go-keyspace v0.0.0-20160322163242-5b898ac5add1 h1:EKhdznlJHPMoKr0XTrX+IlJs1LH3lyx2nfr1dOlZ79k= github.com/whyrusleeping/go-keyspace v0.0.0-20160322163242-5b898ac5add1/go.mod h1:8UvriyWtv5Q5EOgjHaSseUEdkQfvwFv1I/In/O2M9gc= github.com/whyrusleeping/go-logging v0.0.0-20170515211332-0457bb6b88fc/go.mod h1:bopw91TMyo8J3tvftk8xmU2kPmlrt4nScJQZU2hE5EM= @@ -720,7 +718,6 @@ golang.org/x/tools v0.0.0-20191108193012-7d206e10da11/go.mod h1:b+2E5dAYhXwXZwtn golang.org/x/tools v0.0.0-20191216052735-49a3e744a425 h1:VvQyQJN0tSuecqgcIxMWnnfG5kSmgy9KZR9sW3W5QeA= golang.org/x/tools v0.0.0-20191216052735-49a3e744a425/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898 h1:/atklqdjdhuosWIl6AIbOeHJjicWYPqR9bpxqxYG2pA= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= @@ -756,7 +753,6 @@ gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8 gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= -gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw= gopkg.in/src-d/go-cli.v0 v0.0.0-20181105080154-d492247bbc0d/go.mod h1:z+K8VcOYVYcSwSjGebuDL6176A1XskgbtNl64NSg+n8= diff --git a/qpeerset/qpeerset.go b/qpeerset/qpeerset.go index 971b5489d..b940a0430 100644 --- a/qpeerset/qpeerset.go +++ b/qpeerset/qpeerset.go @@ -121,53 +121,39 @@ func (qp *QueryPeerset) GetReferrer(p peer.ID) peer.ID { return qp.all[qp.find(p)].referredBy } -// NumWaiting returns the number of peers in state PeerWaiting. -func (qp *QueryPeerset) NumWaiting() int { - return len(qp.GetWaitingPeers()) -} - -// GetWaitingPeers returns a slice of all peers in state PeerWaiting, in an undefined order. -func (qp *QueryPeerset) GetWaitingPeers() (result []peer.ID) { - for _, p := range qp.all { - if p.state == PeerWaiting { - result = append(result, p.id) - } +// GetClosestNInStates returns the closest to the key peers, which are in one of the given states. +// It returns n peers or less, if fewer peers meet the condition. +// The returned peers are sorted in ascending order by their distance to the key. +func (qp *QueryPeerset) GetClosestNInStates(n int, states ...PeerState) (result []peer.ID) { + qp.sort() + m := make(map[PeerState]struct{}, len(states)) + for i := range states { + m[states[i]] = struct{}{} } - return -} -// GetClosestNotUnreachable returns the closest to the key peers, which are not in state PeerUnreachable. -// It returns count peers or less, if fewer peers meet the condition. -func (qp *QueryPeerset) GetClosestNotUnreachable(count int) (result []peer.ID) { - qp.sort() for _, p := range qp.all { - if p.state != PeerUnreachable { + if _, ok := m[p.state]; ok { result = append(result, p.id) } } - if len(result) >= count { - return result[:count] + if len(result) >= n { + return result[:n] } return result } -// NumHeard returns the number of peers in state PeerHeard. -func (qp *QueryPeerset) NumHeard() int { - return len(qp.GetHeardPeers()) +// GetClosestInStates returns the peers, which are in one of the given states. +// The returned peers are sorted in ascending order by their distance to the key. +func (qp *QueryPeerset) GetClosestInStates(states ...PeerState) (result []peer.ID) { + return qp.GetClosestNInStates(len(qp.all), states...) } -// GetHeardPeers returns a slice of all peers in state PeerHeard, in an undefined order. -func (qp *QueryPeerset) GetHeardPeers() (result []peer.ID) { - for _, p := range qp.all { - if p.state == PeerHeard { - result = append(result, p.id) - } - } - return +// NumHeard returns the number of peers in state PeerHeard. +func (qp *QueryPeerset) NumHeard() int { + return len(qp.GetClosestInStates(PeerHeard)) } -// GetSortedHeard returns a slice of all peers in state PeerHeard, ordered by ascending distance to the target key. -func (qp *QueryPeerset) GetSortedHeard() (result []peer.ID) { - qp.sort() - return qp.GetHeardPeers() +// NumWaiting returns the number of peers in state PeerWaiting. +func (qp *QueryPeerset) NumWaiting() int { + return len(qp.GetClosestInStates(PeerWaiting)) } diff --git a/qpeerset/qpeerset_test.go b/qpeerset/qpeerset_test.go index 4d0f7db0a..bd40413af 100644 --- a/qpeerset/qpeerset_test.go +++ b/qpeerset/qpeerset_test.go @@ -56,31 +56,31 @@ func TestQPeerSet(t *testing.T) { // add peer4 require.True(t, qp.TryAdd(peer4, oracle)) - cl := qp.GetClosestNotUnreachable(2) + cl := qp.GetClosestNInStates(2, PeerHeard, PeerWaiting, PeerQueried) require.Equal(t, []peer.ID{peer4, peer2}, cl) - cl = qp.GetClosestNotUnreachable(3) + cl = qp.GetClosestNInStates(3, PeerHeard, PeerWaiting, PeerQueried) require.Equal(t, []peer.ID{peer4, peer2}, cl) - cl = qp.GetClosestNotUnreachable(1) + cl = qp.GetClosestNInStates(1, PeerHeard, PeerWaiting, PeerQueried) require.Equal(t, []peer.ID{peer4}, cl) // mark as unreachable & try to get it qp.SetState(peer4, PeerUnreachable) - cl = qp.GetClosestNotUnreachable(1) + cl = qp.GetClosestNInStates(1, PeerHeard, PeerWaiting, PeerQueried) require.Equal(t, []peer.ID{peer2}, cl) // add peer1 require.True(t, qp.TryAdd(peer1, oracle)) - cl = qp.GetClosestNotUnreachable(1) + cl = qp.GetClosestNInStates(1, PeerHeard, PeerWaiting, PeerQueried) require.Equal(t, []peer.ID{peer1}, cl) - cl = qp.GetClosestNotUnreachable(2) + cl = qp.GetClosestNInStates(2, PeerHeard, PeerWaiting, PeerQueried) require.Equal(t, []peer.ID{peer1, peer2}, cl) // mark as waiting and assert qp.SetState(peer2, PeerWaiting) - require.Equal(t, []peer.ID{peer2}, qp.GetWaitingPeers()) + require.Equal(t, []peer.ID{peer2}, qp.GetClosestInStates(PeerWaiting)) - require.Equal(t, []peer.ID{peer1}, qp.GetHeardPeers()) + require.Equal(t, []peer.ID{peer1}, qp.GetClosestInStates(PeerHeard)) require.True(t, qp.TryAdd(peer3, oracle)) - require.Equal(t, []peer.ID{peer3, peer1}, qp.GetSortedHeard()) + require.Equal(t, []peer.ID{peer3, peer1}, qp.GetClosestInStates(PeerHeard)) require.Equal(t, 2, qp.NumHeard()) } diff --git a/query.go b/query.go index 88a1b63e6..47d87df05 100644 --- a/query.go +++ b/query.go @@ -8,13 +8,12 @@ import ( "sync" "time" - "github.com/google/uuid" - "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" pstore "github.com/libp2p/go-libp2p-core/peerstore" "github.com/libp2p/go-libp2p-core/routing" + "github.com/google/uuid" "github.com/libp2p/go-libp2p-kad-dht/qpeerset" kb "github.com/libp2p/go-libp2p-kbucket" ) @@ -223,7 +222,7 @@ func (q *query) constructLookupResult(target kb.ID) *lookupWithFollowupResult { // extract the top K not unreachable peers var peers []peer.ID peerState := make(map[peer.ID]qpeerset.PeerState) - qp := q.queryPeers.GetClosestNotUnreachable(q.dht.bucketSize) + qp := q.queryPeers.GetClosestNInStates(q.dht.bucketSize, qpeerset.PeerHeard, qpeerset.PeerWaiting, qpeerset.PeerQueried) for _, p := range qp { state := q.queryPeers.GetState(p) peerState[p] = state @@ -251,10 +250,11 @@ func (q *query) constructLookupResult(target kb.ID) *lookupWithFollowupResult { } type queryUpdate struct { - cause peer.ID - heard []peer.ID - queried []peer.ID - unreachable []peer.ID + cause peer.ID + queried []peer.ID + heard []peer.ID + unreachable []peer.ID + queryDuration time.Duration } @@ -279,8 +279,14 @@ func (q *query) run() { q.terminate(pathCtx, cancelPath, LookupCancelled) } + // calculate the maximum number of queries we could be spawning. + // Note: NumWaiting will be updated in spawnQuery + maxNumQueriesToSpawn := alpha - q.queryPeers.NumWaiting() + // termination is triggered on end-of-lookup conditions or starvation of unused peers - if ready, reason := q.isReadyToTerminate(); ready { + // it also returns the peers we should query next for a maximum of `maxNumQueriesToSpawn` peers. + ready, reason, qPeers := q.isReadyToTerminate(pathCtx, maxNumQueriesToSpawn) + if ready { q.terminate(pathCtx, cancelPath, reason) } @@ -288,29 +294,15 @@ func (q *query) run() { return } - // if all "threads" are busy, wait until someone finishes - if q.queryPeers.NumWaiting() >= alpha { - continue - } - - // spawn new queries, up to the parallelism allowance - // calculate the maximum number of queries we could be spawning. - // Note: NumWaiting will be updated in spawnQuery - maxNumQueriesToSpawn := alpha - q.queryPeers.NumWaiting() // try spawning the queries, if there are no available peers to query then we won't spawn them - for j := 0; j < maxNumQueriesToSpawn; j++ { - q.spawnQuery(pathCtx, cause, ch) + for _, p := range qPeers { + q.spawnQuery(pathCtx, cause, p, ch) } } } // spawnQuery starts one query, if an available heard peer is found -func (q *query) spawnQuery(ctx context.Context, cause peer.ID, ch chan<- *queryUpdate) { - peers := q.queryPeers.GetSortedHeard() - if len(peers) == 0 { - return - } - +func (q *query) spawnQuery(ctx context.Context, cause peer.ID, queryPeer peer.ID, ch chan<- *queryUpdate) { PublishLookupEvent(ctx, NewLookupEvent( q.dht.self, @@ -318,39 +310,52 @@ func (q *query) spawnQuery(ctx context.Context, cause peer.ID, ch chan<- *queryU q.key, NewLookupUpdateEvent( cause, - q.queryPeers.GetReferrer(peers[0]), - nil, // heard - []peer.ID{peers[0]}, // waiting - nil, // queried - nil, // unreachable + q.queryPeers.GetReferrer(queryPeer), + nil, // heard + []peer.ID{queryPeer}, // waiting + nil, // queried + nil, // unreachable ), nil, nil, ), ) - q.queryPeers.SetState(peers[0], qpeerset.PeerWaiting) + q.queryPeers.SetState(queryPeer, qpeerset.PeerWaiting) q.waitGroup.Add(1) - go q.queryPeer(ctx, ch, peers[0]) + go q.queryPeer(ctx, ch, queryPeer) } -func (q *query) isReadyToTerminate() (bool, LookupTerminationReason) { +func (q *query) isReadyToTerminate(ctx context.Context, nPeersToQuery int) (bool, LookupTerminationReason, []peer.ID) { // give the application logic a chance to terminate if q.stopFn() { - return true, LookupStopped + return true, LookupStopped, nil } if q.isStarvationTermination() { - return true, LookupStarvation + return true, LookupStarvation, nil } if q.isLookupTermination() { - return true, LookupCompleted + return true, LookupCompleted, nil } - return false, -1 + + // The peers we query next should be ones that we have only Heard about. + var peersToQuery []peer.ID + peers := q.queryPeers.GetClosestInStates(qpeerset.PeerHeard) + count := 0 + for _, p := range peers { + peersToQuery = append(peersToQuery, p) + count++ + if count == nPeersToQuery { + break + } + } + + return false, -1, peersToQuery } // From the set of all nodes that are not unreachable, // if the closest beta nodes are all queried, the lookup can terminate. func (q *query) isLookupTermination() bool { - peers := q.queryPeers.GetClosestNotUnreachable(q.dht.beta) + peers := q.queryPeers.GetClosestNInStates(q.dht.beta, qpeerset.PeerHeard, qpeerset.PeerWaiting, qpeerset.PeerQueried) for _, p := range peers { if q.queryPeers.GetState(p) != qpeerset.PeerQueried { return false @@ -478,6 +483,7 @@ func (q *query) updateState(ctx context.Context, up *queryUpdate) { if p == q.dht.self { // don't add self. continue } + if st := q.queryPeers.GetState(p); st == qpeerset.PeerWaiting { q.queryPeers.SetState(p, qpeerset.PeerUnreachable) } else { diff --git a/routing.go b/routing.go index 472094882..4d6ae990c 100644 --- a/routing.go +++ b/routing.go @@ -665,7 +665,7 @@ func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (_ peer.AddrInfo, // Note: we consider PeerUnreachable to be a valid state because the peer may not support the DHT protocol // and therefore the peer would fail the query. The fact that a peer that is returned can be a non-DHT // server peer and is not identified as such is a bug. - dialedPeerDuringQuery = lookupRes.state[i] != qpeerset.PeerHeard + dialedPeerDuringQuery = (lookupRes.state[i] == qpeerset.PeerQueried || lookupRes.state[i] == qpeerset.PeerUnreachable || lookupRes.state[i] == qpeerset.PeerWaiting) break } } diff --git a/rt_diversity_filter.go b/rt_diversity_filter.go new file mode 100644 index 000000000..06c3116e6 --- /dev/null +++ b/rt_diversity_filter.go @@ -0,0 +1,103 @@ +package dht + +import ( + "sync" + + "github.com/libp2p/go-libp2p-core/host" + "github.com/libp2p/go-libp2p-core/peer" + + "github.com/libp2p/go-libp2p-kbucket/peerdiversity" + + ma "github.com/multiformats/go-multiaddr" +) + +var _ peerdiversity.PeerIPGroupFilter = (*rtPeerIPGroupFilter)(nil) + +type rtPeerIPGroupFilter struct { + mu sync.RWMutex + h host.Host + + maxPerCpl int + maxForTable int + + cplIpGroupCount map[int]map[peerdiversity.PeerIPGroupKey]int + tableIpGroupCount map[peerdiversity.PeerIPGroupKey]int +} + +// NewRTPeerDiversityFilter constructs the `PeerIPGroupFilter` that will be used to configure +// the diversity filter for the Routing Table. +// Please see the docs for `peerdiversity.PeerIPGroupFilter` AND `peerdiversity.Filter` for more details. +func NewRTPeerDiversityFilter(h host.Host, maxPerCpl, maxForTable int) *rtPeerIPGroupFilter { + return &rtPeerIPGroupFilter{ + h: h, + + maxPerCpl: maxPerCpl, + maxForTable: maxForTable, + + cplIpGroupCount: make(map[int]map[peerdiversity.PeerIPGroupKey]int), + tableIpGroupCount: make(map[peerdiversity.PeerIPGroupKey]int), + } + +} + +func (r *rtPeerIPGroupFilter) Allow(g peerdiversity.PeerGroupInfo) bool { + r.mu.RLock() + defer r.mu.RUnlock() + + key := g.IPGroupKey + cpl := g.Cpl + + if r.tableIpGroupCount[key] >= r.maxForTable { + + return false + } + + c, ok := r.cplIpGroupCount[cpl] + allow := !ok || c[key] < r.maxPerCpl + return allow +} + +func (r *rtPeerIPGroupFilter) Increment(g peerdiversity.PeerGroupInfo) { + r.mu.Lock() + defer r.mu.Unlock() + + key := g.IPGroupKey + cpl := g.Cpl + + r.tableIpGroupCount[key] = r.tableIpGroupCount[key] + 1 + if _, ok := r.cplIpGroupCount[cpl]; !ok { + r.cplIpGroupCount[cpl] = make(map[peerdiversity.PeerIPGroupKey]int) + } + + r.cplIpGroupCount[cpl][key] = r.cplIpGroupCount[cpl][key] + 1 +} + +func (r *rtPeerIPGroupFilter) Decrement(g peerdiversity.PeerGroupInfo) { + r.mu.Lock() + defer r.mu.Unlock() + + key := g.IPGroupKey + cpl := g.Cpl + + r.tableIpGroupCount[key] = r.tableIpGroupCount[key] - 1 + if r.tableIpGroupCount[key] == 0 { + delete(r.tableIpGroupCount, key) + } + + r.cplIpGroupCount[cpl][key] = r.cplIpGroupCount[cpl][key] - 1 + if r.cplIpGroupCount[cpl][key] == 0 { + delete(r.cplIpGroupCount[cpl], key) + } + if len(r.cplIpGroupCount[cpl]) == 0 { + delete(r.cplIpGroupCount, cpl) + } +} + +func (r *rtPeerIPGroupFilter) PeerAddresses(p peer.ID) []ma.Multiaddr { + cs := r.h.Network().ConnsToPeer(p) + addr := make([]ma.Multiaddr, 0, len(cs)) + for _, c := range cs { + addr = append(addr, c.RemoteMultiaddr()) + } + return addr +} diff --git a/rt_diversity_filter_test.go b/rt_diversity_filter_test.go new file mode 100644 index 000000000..d439e863a --- /dev/null +++ b/rt_diversity_filter_test.go @@ -0,0 +1,143 @@ +package dht + +import ( + "context" + "testing" + "time" + + kb "github.com/libp2p/go-libp2p-kbucket" + "github.com/libp2p/go-libp2p-kbucket/peerdiversity" + swarmt "github.com/libp2p/go-libp2p-swarm/testing" + bhost "github.com/libp2p/go-libp2p/p2p/host/basic" + + "github.com/stretchr/testify/require" +) + +func TestRTPeerDiversityFilter(t *testing.T) { + ctx := context.Background() + h := bhost.New(swarmt.GenSwarm(t, ctx, swarmt.OptDisableReuseport)) + r := NewRTPeerDiversityFilter(h, 2, 3) + + // table should only have 2 for each prefix per cpl + key := "key" + g := peerdiversity.PeerGroupInfo{Cpl: 1, IPGroupKey: peerdiversity.PeerIPGroupKey(key)} + require.True(t, r.Allow(g)) + r.Increment(g) + require.True(t, r.Allow(g)) + r.Increment(g) + require.False(t, r.Allow(g)) + + // table should ONLY have 3 for a Prefix + key = "random" + g2 := peerdiversity.PeerGroupInfo{Cpl: 2, IPGroupKey: peerdiversity.PeerIPGroupKey(key)} + require.True(t, r.Allow(g2)) + r.Increment(g2) + + g2.Cpl = 3 + require.True(t, r.Allow(g2)) + r.Increment(g2) + + g2.Cpl = 4 + require.True(t, r.Allow(g2)) + r.Increment(g2) + + require.False(t, r.Allow(g2)) + + // remove a peer with a prefix and it works + r.Decrement(g2) + require.True(t, r.Allow(g2)) + r.Increment(g2) + + // and then it dosen't work again + require.False(t, r.Allow(g2)) +} + +func TestRoutingTableEndToEndMaxPerCpl(t *testing.T) { + ctx := context.Background() + h := bhost.New(swarmt.GenSwarm(t, ctx, swarmt.OptDisableReuseport)) + r := NewRTPeerDiversityFilter(h, 1, 2) + + d, err := New( + ctx, + h, + testPrefix, + NamespacedValidator("v", blankValidator{}), + Mode(ModeServer), + DisableAutoRefresh(), + RoutingTablePeerDiversityFilter(r), + ) + require.NoError(t, err) + + var d2 *IpfsDHT + var d3 *IpfsDHT + + for { + d2 = setupDHT(ctx, t, false) + if kb.CommonPrefixLen(d.selfKey, kb.ConvertPeerID(d2.self)) == 1 { + break + } + } + + for { + d3 = setupDHT(ctx, t, false) + if kb.CommonPrefixLen(d.selfKey, kb.ConvertPeerID(d3.self)) == 1 { + break + } + } + + // d2 will be allowed in the Routing table but + // d3 will not be allowed. + connectNoSync(t, ctx, d, d2) + require.Eventually(t, func() bool { + return d.routingTable.Find(d2.self) != "" + }, 1*time.Second, 100*time.Millisecond) + + connectNoSync(t, ctx, d, d3) + time.Sleep(1 * time.Second) + require.Len(t, d.routingTable.ListPeers(), 1) + require.True(t, d.routingTable.Find(d3.self) == "") + + // it works after removing d2 + d.routingTable.RemovePeer(d2.self) + b, err := d.routingTable.TryAddPeer(d3.self, true, false) + require.NoError(t, err) + require.True(t, b) + require.Len(t, d.routingTable.ListPeers(), 1) + require.True(t, d.routingTable.Find(d3.self) != "") +} + +func TestRoutingTableEndToEndMaxPerTable(t *testing.T) { + ctx := context.Background() + h := bhost.New(swarmt.GenSwarm(t, ctx, swarmt.OptDisableReuseport)) + r := NewRTPeerDiversityFilter(h, 100, 3) + + d, err := New( + ctx, + h, + testPrefix, + NamespacedValidator("v", blankValidator{}), + Mode(ModeServer), + DisableAutoRefresh(), + RoutingTablePeerDiversityFilter(r), + ) + require.NoError(t, err) + + // only 3 peers per prefix for the table. + d2 := setupDHT(ctx, t, false, DisableAutoRefresh()) + connect(t, ctx, d, d2) + waitForWellFormedTables(t, []*IpfsDHT{d}, 1, 1, 1*time.Second) + + d3 := setupDHT(ctx, t, false, DisableAutoRefresh()) + connect(t, ctx, d, d3) + waitForWellFormedTables(t, []*IpfsDHT{d}, 2, 2, 1*time.Second) + + d4 := setupDHT(ctx, t, false, DisableAutoRefresh()) + connect(t, ctx, d, d4) + waitForWellFormedTables(t, []*IpfsDHT{d}, 3, 3, 1*time.Second) + + d5 := setupDHT(ctx, t, false, DisableAutoRefresh()) + connectNoSync(t, ctx, d, d5) + time.Sleep(1 * time.Second) + require.Len(t, d.routingTable.ListPeers(), 3) + require.True(t, d.routingTable.Find(d5.self) == "") +} diff --git a/rtrefresh/rt_refresh_manager.go b/rtrefresh/rt_refresh_manager.go index 68f73159e..1e3dbaeab 100644 --- a/rtrefresh/rt_refresh_manager.go +++ b/rtrefresh/rt_refresh_manager.go @@ -50,6 +50,8 @@ type RtRefreshManager struct { successfulOutboundQueryGracePeriod time.Duration triggerRefresh chan *triggerRefreshReq // channel to write refresh requests to. + + refreshDoneCh chan struct{} // write to this channel after every refresh } func NewRtRefreshManager(h host.Host, rt *kbucket.RoutingTable, autoRefresh bool, @@ -57,7 +59,8 @@ func NewRtRefreshManager(h host.Host, rt *kbucket.RoutingTable, autoRefresh bool refreshQueryFnc func(ctx context.Context, key string) error, refreshQueryTimeout time.Duration, refreshInterval time.Duration, - successfulOutboundQueryGracePeriod time.Duration) (*RtRefreshManager, error) { + successfulOutboundQueryGracePeriod time.Duration, + refreshDoneCh chan struct{}) (*RtRefreshManager, error) { ctx, cancel := context.WithCancel(context.Background()) return &RtRefreshManager{ @@ -76,6 +79,7 @@ func NewRtRefreshManager(h host.Host, rt *kbucket.RoutingTable, autoRefresh bool successfulOutboundQueryGracePeriod: successfulOutboundQueryGracePeriod, triggerRefresh: make(chan *triggerRefreshReq), + refreshDoneCh: refreshDoneCh, }, nil } @@ -236,6 +240,12 @@ func (r *RtRefreshManager) doRefresh(forceRefresh bool) error { } } + select { + case r.refreshDoneCh <- struct{}{}: + case <-r.ctx.Done(): + return r.ctx.Err() + } + return merr } diff --git a/rtrefresh/rt_refresh_manager_test.go b/rtrefresh/rt_refresh_manager_test.go index d384aa432..b9b598830 100644 --- a/rtrefresh/rt_refresh_manager_test.go +++ b/rtrefresh/rt_refresh_manager_test.go @@ -38,7 +38,7 @@ func TestSkipRefreshOnGapCpls(t *testing.T) { p, err := rt.GenRandPeerID(uint(u)) require.NoError(t, err) - b, err := rt.TryAddPeer(p, true) + b, err := rt.TryAddPeer(p, true, false) require.True(t, b) require.NoError(t, err) return nil @@ -53,14 +53,14 @@ func TestSkipRefreshOnGapCpls(t *testing.T) { // when 2*gapcpl < maxCpl // gap is 2 and max is 10 - rt, err := kb.NewRoutingTable(2, kb.ConvertPeerID(local), time.Hour, pstore.NewMetrics(), 100*time.Hour) + rt, err := kb.NewRoutingTable(2, kb.ConvertPeerID(local), time.Hour, pstore.NewMetrics(), 100*time.Hour, nil) require.NoError(t, err) r := &RtRefreshManager{ctx: ctx, rt: rt, refreshKeyGenFnc: kfnc, dhtPeerId: local} icpl := uint(2) lastCpl := 2 * (icpl + 1) p, err := rt.GenRandPeerID(10) require.NoError(t, err) - b, _ := rt.TryAddPeer(p, true) + b, _ := rt.TryAddPeer(p, true, false) require.True(t, b) r.refreshQueryFnc = qFuncWithIgnore(rt, icpl) require.NoError(t, r.doRefresh(true)) @@ -77,13 +77,13 @@ func TestSkipRefreshOnGapCpls(t *testing.T) { } // when 2 * (gapcpl + 1) > maxCpl - rt, err = kb.NewRoutingTable(2, kb.ConvertPeerID(local), time.Hour, pstore.NewMetrics(), 100*time.Hour) + rt, err = kb.NewRoutingTable(2, kb.ConvertPeerID(local), time.Hour, pstore.NewMetrics(), 100*time.Hour, nil) require.NoError(t, err) r = &RtRefreshManager{ctx: ctx, rt: rt, refreshKeyGenFnc: kfnc, dhtPeerId: local} icpl = uint(6) p, err = rt.GenRandPeerID(10) require.NoError(t, err) - b, _ = rt.TryAddPeer(p, true) + b, _ = rt.TryAddPeer(p, true, false) require.True(t, b) r.refreshQueryFnc = qFuncWithIgnore(rt, icpl) require.NoError(t, r.doRefresh(true)) diff --git a/subscriber_notifee.go b/subscriber_notifee.go index fc5ed2296..75e9cbe83 100644 --- a/subscriber_notifee.go +++ b/subscriber_notifee.go @@ -85,7 +85,9 @@ func (nn *subscriberNotifee) subscribe(proc goprocess.Process) { // with our new address to all peers we are connected to. However, we might not necessarily be connected // to our closet peers & so in the true spirit of Zen, searching for ourself in the network really is the best way // to to forge connections with those matter. - dht.rtRefreshManager.RefreshNoWait() + if dht.autoRefresh || dht.testAddressUpdateProcessing { + dht.rtRefreshManager.RefreshNoWait() + } case event.EvtPeerProtocolsUpdated: handlePeerChangeEvent(dht, evt.Peer) case event.EvtPeerIdentificationCompleted: