From 6b3a5806d5ed27cfeafe3a0b83c780054cc0dba9 Mon Sep 17 00:00:00 2001 From: Adin Schmahmann Date: Fri, 14 Aug 2020 14:43:20 -0400 Subject: [PATCH 1/6] Revert "Revert "Peer Diversity in the Routing Table (#658)"" This reverts commit c98518490cc773f6b81133fe7a8e2808fa37e4e2. --- dht.go | 27 +++++++- dht_options.go | 20 +++++- dht_test.go | 1 - dual/dual.go | 10 +++ go.mod | 2 +- go.sum | 18 ++--- rt_diversity_filter.go | 100 +++++++++++++++++++++++++++ rt_diversity_filter_test.go | 87 +++++++++++++++++++++++ rtrefresh/rt_refresh_manager_test.go | 4 +- 9 files changed, 249 insertions(+), 20 deletions(-) create mode 100644 rt_diversity_filter.go create mode 100644 rt_diversity_filter_test.go diff --git a/dht.go b/dht.go index acd451980..45a461cc3 100644 --- a/dht.go +++ b/dht.go @@ -22,6 +22,7 @@ import ( "github.com/libp2p/go-libp2p-kad-dht/providers" "github.com/libp2p/go-libp2p-kad-dht/rtrefresh" kb "github.com/libp2p/go-libp2p-kbucket" + "github.com/libp2p/go-libp2p-kbucket/peerdiversity" record "github.com/libp2p/go-libp2p-record" recpb "github.com/libp2p/go-libp2p-record/pb" @@ -115,6 +116,7 @@ type IpfsDHT struct { queryPeerFilter QueryFilterFunc routingTablePeerFilter RouteTableFilterFunc + rtPeerDiversityFilter peerdiversity.PeerIPGroupFilter autoRefresh bool @@ -271,7 +273,9 @@ func makeDHT(ctx context.Context, h host.Host, cfg config) (*IpfsDHT, error) { beta: cfg.resiliency, queryPeerFilter: cfg.queryPeerFilter, routingTablePeerFilter: cfg.routingTable.peerFilter, - fixLowPeersChan: make(chan struct{}, 1), + rtPeerDiversityFilter: cfg.routingTable.diversityFilter, + + fixLowPeersChan: make(chan struct{}, 1), } var maxLastSuccessfulOutboundThreshold time.Duration @@ -346,7 +350,21 @@ func makeRtRefreshManager(dht *IpfsDHT, cfg config, maxLastSuccessfulOutboundThr } func makeRoutingTable(dht *IpfsDHT, cfg config, maxLastSuccessfulOutboundThreshold time.Duration) (*kb.RoutingTable, error) { - rt, err := kb.NewRoutingTable(cfg.bucketSize, dht.selfKey, time.Minute, dht.host.Peerstore(), maxLastSuccessfulOutboundThreshold) + // make a Routing Table Diversity Filter + var filter *peerdiversity.Filter + if dht.rtPeerDiversityFilter != nil { + df, err := peerdiversity.NewFilter(dht.rtPeerDiversityFilter, "rt/diversity", func(p peer.ID) int { + return kb.CommonPrefixLen(dht.selfKey, kb.ConvertPeerID(p)) + }) + + if err != nil { + return nil, fmt.Errorf("failed to construct peer diversity filter: %w", err) + } + + filter = df + } + + rt, err := kb.NewRoutingTable(cfg.bucketSize, dht.selfKey, time.Minute, dht.host.Peerstore(), maxLastSuccessfulOutboundThreshold, filter) cmgr := dht.host.ConnManager() rt.PeerAdded = func(p peer.ID) { @@ -368,6 +386,11 @@ func makeRoutingTable(dht *IpfsDHT, cfg config, maxLastSuccessfulOutboundThresho return rt, err } +// GetRoutingTableDiversityStats returns the diversity stats for the Routing Table. +func (d *IpfsDHT) GetRoutingTableDiversityStats() []peerdiversity.CplDiversityStats { + return d.routingTable.GetDiversityStats() +} + // Mode allows introspection of the operation mode of the DHT func (dht *IpfsDHT) Mode() ModeOpt { return dht.auto diff --git a/dht_options.go b/dht_options.go index d08d119db..191081fcf 100644 --- a/dht_options.go +++ b/dht_options.go @@ -4,15 +4,18 @@ import ( "fmt" "time" - ds "github.com/ipfs/go-datastore" - dssync "github.com/ipfs/go-datastore/sync" - "github.com/ipfs/go-ipns" "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/protocol" "github.com/libp2p/go-libp2p-kad-dht/providers" + + "github.com/libp2p/go-libp2p-kbucket/peerdiversity" record "github.com/libp2p/go-libp2p-record" + + ds "github.com/ipfs/go-datastore" + dssync "github.com/ipfs/go-datastore/sync" + "github.com/ipfs/go-ipns" ) // ModeOpt describes what mode the dht should operate in @@ -56,6 +59,7 @@ type config struct { latencyTolerance time.Duration checkInterval time.Duration peerFilter RouteTableFilterFunc + diversityFilter peerdiversity.PeerIPGroupFilter } // set to true if we're operating in v1 dht compatible mode @@ -403,3 +407,13 @@ func BootstrapPeers(bootstrappers ...peer.AddrInfo) Option { return nil } } + +// RoutingTablePeerDiversityFilter configures the implementation of the `PeerIPGroupFilter` that will be used +// to construct the diversity filter for the Routing Table. +// Please see the docs for `peerdiversity.PeerIPGroupFilter` AND `peerdiversity.Filter` for more details. +func RoutingTablePeerDiversityFilter(pg peerdiversity.PeerIPGroupFilter) Option { + return func(c *config) error { + c.routingTable.diversityFilter = pg + return nil + } +} diff --git a/dht_test.go b/dht_test.go index 3d7f5838b..cb487087d 100644 --- a/dht_test.go +++ b/dht_test.go @@ -2056,7 +2056,6 @@ func TestBootStrapWhenRTIsEmpty(t *testing.T) { require.NoError(t, dht1.host.Network().ClosePeer(dht2.self)) dht1.routingTable.RemovePeer(dht2.self) require.NotContains(t, dht2.self, dht1.routingTable.ListPeers()) - require.Eventually(t, func() bool { return dht1.routingTable.Size() == 2 && dht1.routingTable.Find(bootstrappers[0].self) != "" && dht1.routingTable.Find(bootstrapcons[0].self) != "" diff --git a/dual/dual.go b/dual/dual.go index c8e487cae..0b4317999 100644 --- a/dual/dual.go +++ b/dual/dual.go @@ -4,6 +4,7 @@ package dual import ( "context" + "github.com/libp2p/go-libp2p-kbucket/peerdiversity" "sync" dht "github.com/libp2p/go-libp2p-kad-dht" @@ -50,6 +51,7 @@ func New(ctx context.Context, h host.Host, options ...dht.Option) (*DHT, error) wanOpts := append(options, dht.QueryFilter(dht.PublicQueryFilter), dht.RoutingTableFilter(dht.PublicRoutingTableFilter), + dht.RoutingTablePeerDiversityFilter(dht.NewRTPeerDiversityFilter(h, 2, 3)), ) wan, err := dht.New(ctx, h, wanOpts...) if err != nil { @@ -93,6 +95,14 @@ func (dht *DHT) Provide(ctx context.Context, key cid.Cid, announce bool) error { return dht.LAN.Provide(ctx, key, announce) } +// GetRoutingTableDiversityStats fetches the Routing Table Diversity Stats. +func (dht *DHT) GetRoutingTableDiversityStats() []peerdiversity.CplDiversityStats { + if dht.WANActive() { + return dht.WAN.GetRoutingTableDiversityStats() + } + return nil +} + // FindProvidersAsync searches for peers who are able to provide a given key func (dht *DHT) FindProvidersAsync(ctx context.Context, key cid.Cid, count int) <-chan peer.AddrInfo { reqCtx, cancel := context.WithCancel(ctx) diff --git a/go.mod b/go.mod index adff013a2..b3a719456 100644 --- a/go.mod +++ b/go.mod @@ -18,7 +18,7 @@ require ( github.com/libp2p/go-eventbus v0.2.1 github.com/libp2p/go-libp2p v0.10.0 github.com/libp2p/go-libp2p-core v0.6.1 - github.com/libp2p/go-libp2p-kbucket v0.4.2 + github.com/libp2p/go-libp2p-kbucket v0.4.6 github.com/libp2p/go-libp2p-peerstore v0.2.6 github.com/libp2p/go-libp2p-record v0.1.3 github.com/libp2p/go-libp2p-routing-helpers v0.2.3 diff --git a/go.sum b/go.sum index 64032e786..6272365e5 100644 --- a/go.sum +++ b/go.sum @@ -80,7 +80,6 @@ github.com/golang/mock v1.2.0/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfb github.com/golang/mock v1.4.0/go.mod h1:UOMv5ysSaYNkG+OFQykRIcU/QvvxJf3p21QfJ2Bt3cw= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.0/go.mod h1:Qd/q+1AKNOZr9uGQzbzCmRO6sUih6GTPZv6a1/R87v0= -github.com/golang/protobuf v1.3.1 h1:YF8+flBXS5eO826T4nzqPrxfhQThhXl0YzfuUPu4SBg= github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8= github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA= @@ -91,7 +90,6 @@ github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvq github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= -github.com/google/go-cmp v0.3.0 h1:crn/baboCvb5fXaQ0IJ1SGTsTVrWpDsCWC8EGETZijY= github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4= @@ -126,7 +124,6 @@ github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ github.com/hashicorp/golang-lru v0.5.4 h1:YDjusn29QI/Das2iO9M0BHnIbxPeyuCHsjMW+lJfyTc= github.com/hashicorp/golang-lru v0.5.4/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4= github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= -github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/huin/goupnp v1.0.0 h1:wg75sLpL6DZqwHQN6E1Cfk6mtfzS45z8OV+ic+DtHRo= github.com/huin/goupnp v1.0.0/go.mod h1:n9v9KO1tAxYH82qOn+UTIFQDmx5n1Zxd/ClZDMX7Bnc= @@ -218,6 +215,8 @@ github.com/libp2p/go-addr-util v0.0.2/go.mod h1:Ecd6Fb3yIuLzq4bD7VcywcVSBtefcAwn github.com/libp2p/go-buffer-pool v0.0.1/go.mod h1:xtyIz9PMobb13WaxR6Zo1Pd1zXJKYg0a8KiIvDp3TzQ= github.com/libp2p/go-buffer-pool v0.0.2 h1:QNK2iAFa8gjAe1SPz6mHSMuCcjs+X1wlHzeOSqcmlfs= github.com/libp2p/go-buffer-pool v0.0.2/go.mod h1:MvaB6xw5vOrDl8rYZGLFdKAuk/hRoRZd1Vi32+RXyFM= +github.com/libp2p/go-cidranger v1.0.0 h1:EHrUPBAmseAjEKl+aqaonm0u6eZ04+Cvw2UejIhg3lE= +github.com/libp2p/go-cidranger v1.0.0/go.mod h1:66eNgiCUVEUSDYy3J2db4qc4hA7QyTyTqiF1n3CgSvA= github.com/libp2p/go-conn-security-multistream v0.1.0 h1:aqGmto+ttL/uJgX0JtQI0tD21CIEy5eYd1Hlp0juHY0= github.com/libp2p/go-conn-security-multistream v0.1.0/go.mod h1:aw6eD7LOsHEX7+2hJkDxw1MteijaVcI+/eP2/x3J1xc= github.com/libp2p/go-conn-security-multistream v0.2.0 h1:uNiDjS58vrvJTg9jO6bySd1rMKejieG7v45ekqHbZ1M= @@ -236,6 +235,8 @@ github.com/libp2p/go-libp2p v0.7.4/go.mod h1:oXsBlTLF1q7pxr+9w6lqzS1ILpyHsaBPniV github.com/libp2p/go-libp2p v0.8.3/go.mod h1:EsH1A+8yoWK+L4iKcbPYu6MPluZ+CHWI9El8cTaefiM= github.com/libp2p/go-libp2p v0.10.0 h1:7ooOvK1wi8eLpyTppy8TeH43UHy5uI75GAHGJxenUi0= github.com/libp2p/go-libp2p v0.10.0/go.mod h1:yBJNpb+mGJdgrwbKAKrhPU0u3ogyNFTfjJ6bdM+Q/G8= +github.com/libp2p/go-libp2p-asn-util v0.0.0-20200606034824-1b967eb41be7 h1:nf8vB9BDuXCSIPq2dykC6+zRhToq/M7Mib02w23ifwQ= +github.com/libp2p/go-libp2p-asn-util v0.0.0-20200606034824-1b967eb41be7/go.mod h1:bLVf7QeUbm5OTzlsVDVzIBpWCWuYWv4TTvgcAgSyyck= github.com/libp2p/go-libp2p-autonat v0.1.1/go.mod h1:OXqkeGOY2xJVWKAGV2inNF5aKN/djNA3fdpCWloIudE= github.com/libp2p/go-libp2p-autonat v0.2.0/go.mod h1:DX+9teU4pEEoZUqR1PiMlqliONQdNbfzE1C718tcViI= github.com/libp2p/go-libp2p-autonat v0.2.1/go.mod h1:MWtAhV5Ko1l6QBsHQNSuM6b1sRkXrpk0/LqCr+vCVxI= @@ -272,7 +273,6 @@ github.com/libp2p/go-libp2p-core v0.5.4/go.mod h1:uN7L2D4EvPCvzSH5SrhR72UWbnSGpt github.com/libp2p/go-libp2p-core v0.5.5/go.mod h1:vj3awlOr9+GMZJFH9s4mpt9RHHgGqeHCopzbYKZdRjM= github.com/libp2p/go-libp2p-core v0.5.6/go.mod h1:txwbVEhHEXikXn9gfC7/UDDw7rkxuX0bJvM49Ykaswo= github.com/libp2p/go-libp2p-core v0.5.7/go.mod h1:txwbVEhHEXikXn9gfC7/UDDw7rkxuX0bJvM49Ykaswo= -github.com/libp2p/go-libp2p-core v0.6.0 h1:u03qofNYTBN+yVg08PuAKylZogVf0xcTEeM8skGf+ak= github.com/libp2p/go-libp2p-core v0.6.0/go.mod h1:txwbVEhHEXikXn9gfC7/UDDw7rkxuX0bJvM49Ykaswo= github.com/libp2p/go-libp2p-core v0.6.1 h1:XS+Goh+QegCDojUZp00CaPMfiEADCrLjNZskWE7pvqs= github.com/libp2p/go-libp2p-core v0.6.1/go.mod h1:FfewUH/YpvWbEB+ZY9AQRQ4TAD8sJBt/G1rVvhz5XT8= @@ -282,8 +282,8 @@ github.com/libp2p/go-libp2p-discovery v0.3.0 h1:+JnYBRLzZQtRq0mK3xhyjBwHytLmJXMT github.com/libp2p/go-libp2p-discovery v0.3.0/go.mod h1:o03drFnz9BVAZdzC/QUQ+NeQOu38Fu7LJGEOK2gQltw= github.com/libp2p/go-libp2p-discovery v0.4.0 h1:dK78UhopBk48mlHtRCzbdLm3q/81g77FahEBTjcqQT8= github.com/libp2p/go-libp2p-discovery v0.4.0/go.mod h1:bZ0aJSrFc/eX2llP0ryhb1kpgkPyTo23SJ5b7UQCMh4= -github.com/libp2p/go-libp2p-kbucket v0.4.2 h1:wg+VPpCtY61bCasGRexCuXOmEmdKjN+k1w+JtTwu9gA= -github.com/libp2p/go-libp2p-kbucket v0.4.2/go.mod h1:7sCeZx2GkNK1S6lQnGUW5JYZCFPnXzAZCCBBS70lytY= +github.com/libp2p/go-libp2p-kbucket v0.4.6 h1:09hF4woC+3ZNl3k5flPErUV5pXKq34LBhvupjn4qjW4= +github.com/libp2p/go-libp2p-kbucket v0.4.6/go.mod h1:12pr3Yg2qtBdwRjffaGJTa995rrMQxBk5a+KncHqTn4= github.com/libp2p/go-libp2p-loggables v0.1.0 h1:h3w8QFfCt2UJl/0/NW4K829HX/0S4KD31PQ7m8UXXO8= github.com/libp2p/go-libp2p-loggables v0.1.0/go.mod h1:EyumB2Y6PrYjr55Q3/tiJ/o3xoDasoRYM7nOzEpoa90= github.com/libp2p/go-libp2p-mplex v0.2.0/go.mod h1:Ejl9IyjvXJ0T9iqUTE1jpYATQ9NM3g+OtR+EMMODbKo= @@ -304,6 +304,7 @@ github.com/libp2p/go-libp2p-peerstore v0.2.0/go.mod h1:N2l3eVIeAitSg3Pi2ipSrJYnq github.com/libp2p/go-libp2p-peerstore v0.2.1/go.mod h1:NQxhNjWxf1d4w6PihR8btWIRjwRLBr4TYKfNgrUkOPA= github.com/libp2p/go-libp2p-peerstore v0.2.2/go.mod h1:NQxhNjWxf1d4w6PihR8btWIRjwRLBr4TYKfNgrUkOPA= github.com/libp2p/go-libp2p-peerstore v0.2.3/go.mod h1:K8ljLdFn590GMttg/luh4caB/3g0vKuY01psze0upRw= +github.com/libp2p/go-libp2p-peerstore v0.2.4 h1:jU9S4jYN30kdzTpDAR7SlHUD+meDUjTODh4waLWF1ws= github.com/libp2p/go-libp2p-peerstore v0.2.4/go.mod h1:ss/TWTgHZTMpsU/oKVVPQCGuDHItOpf2W8RxAi50P2s= github.com/libp2p/go-libp2p-peerstore v0.2.6 h1:2ACefBX23iMdJU9Ke+dcXt3w86MIryes9v7In4+Qq3U= github.com/libp2p/go-libp2p-peerstore v0.2.6/go.mod h1:ss/TWTgHZTMpsU/oKVVPQCGuDHItOpf2W8RxAi50P2s= @@ -382,7 +383,6 @@ github.com/libp2p/go-reuseport v0.0.1/go.mod h1:jn6RmB1ufnQwl0Q1f+YxAj8isJgDCQza github.com/libp2p/go-reuseport-transport v0.0.2/go.mod h1:YkbSDrvjUVDL6b8XqriyA20obEtsW9BLkuOUyQAOCbs= github.com/libp2p/go-reuseport-transport v0.0.3 h1:zzOeXnTooCkRvoH+bSXEfXhn76+LAiwoneM0gnXjF2M= github.com/libp2p/go-reuseport-transport v0.0.3/go.mod h1:Spv+MPft1exxARzP2Sruj2Wb5JSyHNncjf1Oi2dEbzM= -github.com/libp2p/go-sockaddr v0.0.2 h1:tCuXfpA9rq7llM/v834RKc/Xvovy/AqM9kHvTV/jY/Q= github.com/libp2p/go-sockaddr v0.0.2/go.mod h1:syPvOmNs24S3dFVGJA1/mrqdeijPxLV2Le3BRLKd68k= github.com/libp2p/go-sockaddr v0.1.0 h1:Y4s3/jNoryVRKEBrkJ576F17CPOaMIzUeCsg7dlTDj0= github.com/libp2p/go-sockaddr v0.1.0/go.mod h1:syPvOmNs24S3dFVGJA1/mrqdeijPxLV2Le3BRLKd68k= @@ -503,7 +503,6 @@ github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+W github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.8.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.11.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= -github.com/onsi/ginkgo v1.12.0 h1:Iw5WCbBcaAAd0fpRb1c9r5YCylv4XDoCSigm1zLevwU= github.com/onsi/ginkgo v1.12.0/go.mod h1:oUhWkIvk5aDxtKvDDuw8gItl8pKl42LzjC9KZE0HfGg= github.com/onsi/ginkgo v1.12.1 h1:mFwc4LvZ0xpSvDZ3E+k8Yte0hLOMxXUlP+yXtJqkYfQ= github.com/onsi/ginkgo v1.12.1/go.mod h1:zj2OWP4+oCPe1qIXoGWkgMRwljMUYCdkwsT2108oapk= @@ -584,7 +583,6 @@ github.com/tarm/serial v0.0.0-20180830185346-98f6abe2eb07/go.mod h1:kDXzergiv9cb github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0= github.com/viant/assertly v0.4.8/go.mod h1:aGifi++jvCrUaklKEKT0BU95igDNaqkvz+49uaYMPRU= github.com/viant/toolbox v0.24.0/go.mod h1:OxMCG57V0PXuIP2HNQrtJf2CjqdmbrOx5EkMILuUhzM= -github.com/wangjia184/sortedset v0.0.0-20160527075905-f5d03557ba30/go.mod h1:YkocrP2K2tcw938x9gCOmT5G5eCD6jsTz0SZuyAqwIE= github.com/whyrusleeping/go-keyspace v0.0.0-20160322163242-5b898ac5add1 h1:EKhdznlJHPMoKr0XTrX+IlJs1LH3lyx2nfr1dOlZ79k= github.com/whyrusleeping/go-keyspace v0.0.0-20160322163242-5b898ac5add1/go.mod h1:8UvriyWtv5Q5EOgjHaSseUEdkQfvwFv1I/In/O2M9gc= github.com/whyrusleeping/go-logging v0.0.0-20170515211332-0457bb6b88fc/go.mod h1:bopw91TMyo8J3tvftk8xmU2kPmlrt4nScJQZU2hE5EM= @@ -720,7 +718,6 @@ golang.org/x/tools v0.0.0-20191108193012-7d206e10da11/go.mod h1:b+2E5dAYhXwXZwtn golang.org/x/tools v0.0.0-20191216052735-49a3e744a425 h1:VvQyQJN0tSuecqgcIxMWnnfG5kSmgy9KZR9sW3W5QeA= golang.org/x/tools v0.0.0-20191216052735-49a3e744a425/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898 h1:/atklqdjdhuosWIl6AIbOeHJjicWYPqR9bpxqxYG2pA= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= @@ -756,7 +753,6 @@ gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8 gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= -gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw= gopkg.in/src-d/go-cli.v0 v0.0.0-20181105080154-d492247bbc0d/go.mod h1:z+K8VcOYVYcSwSjGebuDL6176A1XskgbtNl64NSg+n8= diff --git a/rt_diversity_filter.go b/rt_diversity_filter.go new file mode 100644 index 000000000..35d9021ef --- /dev/null +++ b/rt_diversity_filter.go @@ -0,0 +1,100 @@ +package dht + +import ( + "github.com/libp2p/go-libp2p-core/host" + "github.com/libp2p/go-libp2p-core/peer" + "github.com/libp2p/go-libp2p-kbucket/peerdiversity" + ma "github.com/multiformats/go-multiaddr" + "sync" +) + +var _ peerdiversity.PeerIPGroupFilter = (*rtPeerIPGroupFilter)(nil) + +type rtPeerIPGroupFilter struct { + mu sync.RWMutex + h host.Host + + maxPerCpl int + maxForTable int + + cplIpGroupCount map[int]map[peerdiversity.PeerIPGroupKey]int + tableIpGroupCount map[peerdiversity.PeerIPGroupKey]int +} + +// NewRTPeerDiversityFilter constructs the `PeerIPGroupFilter` that will be used to configure +// the diversity filter for the Routing Table. +// Please see the docs for `peerdiversity.PeerIPGroupFilter` AND `peerdiversity.Filter` for more details. +func NewRTPeerDiversityFilter(h host.Host, maxPerCpl, maxForTable int) *rtPeerIPGroupFilter { + return &rtPeerIPGroupFilter{ + h: h, + + maxPerCpl: maxPerCpl, + maxForTable: maxForTable, + + cplIpGroupCount: make(map[int]map[peerdiversity.PeerIPGroupKey]int), + tableIpGroupCount: make(map[peerdiversity.PeerIPGroupKey]int), + } + +} + +func (r *rtPeerIPGroupFilter) Allow(g peerdiversity.PeerGroupInfo) bool { + r.mu.RLock() + defer r.mu.RUnlock() + + key := g.IPGroupKey + cpl := g.Cpl + + if r.tableIpGroupCount[key] >= r.maxForTable { + + return false + } + + c, ok := r.cplIpGroupCount[cpl] + allow := !ok || c[key] < r.maxPerCpl + return allow +} + +func (r *rtPeerIPGroupFilter) Increment(g peerdiversity.PeerGroupInfo) { + r.mu.Lock() + defer r.mu.Unlock() + + key := g.IPGroupKey + cpl := g.Cpl + + r.tableIpGroupCount[key] = r.tableIpGroupCount[key] + 1 + if _, ok := r.cplIpGroupCount[cpl]; !ok { + r.cplIpGroupCount[cpl] = make(map[peerdiversity.PeerIPGroupKey]int) + } + + r.cplIpGroupCount[cpl][key] = r.cplIpGroupCount[cpl][key] + 1 +} + +func (r *rtPeerIPGroupFilter) Decrement(g peerdiversity.PeerGroupInfo) { + r.mu.Lock() + defer r.mu.Unlock() + + key := g.IPGroupKey + cpl := g.Cpl + + r.tableIpGroupCount[key] = r.tableIpGroupCount[key] - 1 + if r.tableIpGroupCount[key] == 0 { + delete(r.tableIpGroupCount, key) + } + + r.cplIpGroupCount[cpl][key] = r.cplIpGroupCount[cpl][key] - 1 + if r.cplIpGroupCount[cpl][key] == 0 { + delete(r.cplIpGroupCount[cpl], key) + } + if len(r.cplIpGroupCount[cpl]) == 0 { + delete(r.cplIpGroupCount, cpl) + } +} + +func (r *rtPeerIPGroupFilter) PeerAddresses(p peer.ID) []ma.Multiaddr { + cs := r.h.Network().ConnsToPeer(p) + addr := make([]ma.Multiaddr, 0, len(cs)) + for _, c := range cs { + addr = append(addr, c.RemoteMultiaddr()) + } + return addr +} diff --git a/rt_diversity_filter_test.go b/rt_diversity_filter_test.go new file mode 100644 index 000000000..1843e3604 --- /dev/null +++ b/rt_diversity_filter_test.go @@ -0,0 +1,87 @@ +package dht + +import ( + "context" + "testing" + "time" + + "github.com/libp2p/go-libp2p-kbucket/peerdiversity" + swarmt "github.com/libp2p/go-libp2p-swarm/testing" + bhost "github.com/libp2p/go-libp2p/p2p/host/basic" + + "github.com/stretchr/testify/require" +) + +func TestRTPeerDiversityFilter(t *testing.T) { + ctx := context.Background() + h := bhost.New(swarmt.GenSwarm(t, ctx, swarmt.OptDisableReuseport)) + r := NewRTPeerDiversityFilter(h, 2, 3) + + // table should only have 2 for each prefix per cpl + key := "key" + g := peerdiversity.PeerGroupInfo{Cpl: 1, IPGroupKey: peerdiversity.PeerIPGroupKey(key)} + require.True(t, r.Allow(g)) + r.Increment(g) + require.True(t, r.Allow(g)) + r.Increment(g) + require.False(t, r.Allow(g)) + + // table should ONLY have 3 for a Prefix + key = "random" + g2 := peerdiversity.PeerGroupInfo{Cpl: 2, IPGroupKey: peerdiversity.PeerIPGroupKey(key)} + require.True(t, r.Allow(g2)) + r.Increment(g2) + + g2.Cpl = 3 + require.True(t, r.Allow(g2)) + r.Increment(g2) + + g2.Cpl = 4 + require.True(t, r.Allow(g2)) + r.Increment(g2) + + require.False(t, r.Allow(g2)) + + // remove a peer with a prefix and it works + r.Decrement(g2) + require.True(t, r.Allow(g2)) + r.Increment(g2) + + // and then it dosen't work again + require.False(t, r.Allow(g2)) +} + +func TestRoutingTableEndToEnd(t *testing.T) { + ctx := context.Background() + h := bhost.New(swarmt.GenSwarm(t, ctx, swarmt.OptDisableReuseport)) + r := NewRTPeerDiversityFilter(h, 2, 3) + + d, err := New( + ctx, + h, + testPrefix, + NamespacedValidator("v", blankValidator{}), + Mode(ModeServer), + DisableAutoRefresh(), + RoutingTablePeerDiversityFilter(r), + ) + require.NoError(t, err) + + // only 3 peers per prefix for a cpl + d2 := setupDHT(ctx, t, false, DisableAutoRefresh()) + connect(t, ctx, d, d2) + waitForWellFormedTables(t, []*IpfsDHT{d}, 1, 1, 1*time.Second) + + d3 := setupDHT(ctx, t, false, DisableAutoRefresh()) + connect(t, ctx, d, d3) + waitForWellFormedTables(t, []*IpfsDHT{d}, 2, 2, 1*time.Second) + + d4 := setupDHT(ctx, t, false, DisableAutoRefresh()) + connect(t, ctx, d, d4) + waitForWellFormedTables(t, []*IpfsDHT{d}, 3, 3, 1*time.Second) + + d5 := setupDHT(ctx, t, false, DisableAutoRefresh()) + connectNoSync(t, ctx, d, d5) + time.Sleep(1 * time.Second) + require.Len(t, d.routingTable.ListPeers(), 3) +} diff --git a/rtrefresh/rt_refresh_manager_test.go b/rtrefresh/rt_refresh_manager_test.go index d384aa432..355e4cf4b 100644 --- a/rtrefresh/rt_refresh_manager_test.go +++ b/rtrefresh/rt_refresh_manager_test.go @@ -53,7 +53,7 @@ func TestSkipRefreshOnGapCpls(t *testing.T) { // when 2*gapcpl < maxCpl // gap is 2 and max is 10 - rt, err := kb.NewRoutingTable(2, kb.ConvertPeerID(local), time.Hour, pstore.NewMetrics(), 100*time.Hour) + rt, err := kb.NewRoutingTable(2, kb.ConvertPeerID(local), time.Hour, pstore.NewMetrics(), 100*time.Hour, nil) require.NoError(t, err) r := &RtRefreshManager{ctx: ctx, rt: rt, refreshKeyGenFnc: kfnc, dhtPeerId: local} icpl := uint(2) @@ -77,7 +77,7 @@ func TestSkipRefreshOnGapCpls(t *testing.T) { } // when 2 * (gapcpl + 1) > maxCpl - rt, err = kb.NewRoutingTable(2, kb.ConvertPeerID(local), time.Hour, pstore.NewMetrics(), 100*time.Hour) + rt, err = kb.NewRoutingTable(2, kb.ConvertPeerID(local), time.Hour, pstore.NewMetrics(), 100*time.Hour, nil) require.NoError(t, err) r = &RtRefreshManager{ctx: ctx, rt: rt, refreshKeyGenFnc: kfnc, dhtPeerId: local} icpl = uint(6) From 1bfc87844d831dd69b3e4bbc8456f541c06e798f Mon Sep 17 00:00:00 2001 From: Aarsh Shah Date: Wed, 10 Jun 2020 11:57:39 +0530 Subject: [PATCH 2/6] Feat/reduce churn (#668) * reduce churn --- dht.go | 81 +++++++++++++++-- dht_bootstrap_test.go | 131 +++++++++++++++++++++++++++ dht_options.go | 15 ++- rtrefresh/rt_refresh_manager.go | 12 ++- rtrefresh/rt_refresh_manager_test.go | 6 +- 5 files changed, 229 insertions(+), 16 deletions(-) diff --git a/dht.go b/dht.go index 45a461cc3..4254e2904 100644 --- a/dht.go +++ b/dht.go @@ -41,6 +41,8 @@ import ( var ( logger = logging.Logger("dht") baseLogger = logger.Desugar() + + rtFreezeTimeout = 1 * time.Minute ) const ( @@ -67,6 +69,11 @@ const ( protectedBuckets = 2 ) +type addPeerRTReq struct { + p peer.ID + queryPeer bool +} + // IpfsDHT is an implementation of Kademlia with S/Kademlia modifications. // It is used to implement the base Routing module. type IpfsDHT struct { @@ -133,6 +140,11 @@ type IpfsDHT struct { enableProviders, enableValues bool fixLowPeersChan chan struct{} + + addPeerToRTChan chan addPeerRTReq + refreshFinishedCh chan struct{} + + rtFreezeTimeout time.Duration } // Assert that IPFS assumptions about interfaces aren't broken. These aren't a @@ -209,7 +221,11 @@ func New(ctx context.Context, h host.Host, options ...Option) (*IpfsDHT, error) go dht.persistRTPeersInPeerStore() // listens to the fix low peers chan and tries to fix the Routing Table - dht.proc.Go(dht.fixLowPeersRoutine) + if !cfg.disableFixLowPeers { + dht.proc.Go(dht.fixLowPeersRoutine) + } + + dht.proc.Go(dht.rtPeerLoop) return dht, nil } @@ -276,6 +292,9 @@ func makeDHT(ctx context.Context, h host.Host, cfg config) (*IpfsDHT, error) { rtPeerDiversityFilter: cfg.routingTable.diversityFilter, fixLowPeersChan: make(chan struct{}, 1), + + addPeerToRTChan: make(chan addPeerRTReq), + refreshFinishedCh: make(chan struct{}), } var maxLastSuccessfulOutboundThreshold time.Duration @@ -324,6 +343,8 @@ func makeDHT(ctx context.Context, h host.Host, cfg config) (*IpfsDHT, error) { } dht.ProviderManager = pm + dht.rtFreezeTimeout = rtFreezeTimeout + return dht, nil } @@ -344,7 +365,8 @@ func makeRtRefreshManager(dht *IpfsDHT, cfg config, maxLastSuccessfulOutboundThr queryFnc, cfg.routingTable.refreshQueryTimeout, cfg.routingTable.refreshInterval, - maxLastSuccessfulOutboundThreshold) + maxLastSuccessfulOutboundThreshold, + dht.refreshFinishedCh) return r, err } @@ -577,6 +599,50 @@ func (dht *IpfsDHT) putLocal(key string, rec *recpb.Record) error { return dht.datastore.Put(mkDsKey(key), data) } +func (dht *IpfsDHT) rtPeerLoop(proc goprocess.Process) { + bootstrapCount := 0 + isBootsrapping := false + var timerCh <-chan time.Time + + for { + select { + case <-timerCh: + dht.routingTable.MarkAllPeersIrreplaceable() + case addReq := <-dht.addPeerToRTChan: + prevSize := dht.routingTable.Size() + if prevSize == 0 { + isBootsrapping = true + bootstrapCount = 0 + timerCh = nil + } + newlyAdded, err := dht.routingTable.TryAddPeer(addReq.p, addReq.queryPeer, isBootsrapping) + if err != nil { + // peer not added. + continue + } + if !newlyAdded && addReq.queryPeer { + // the peer is already in our RT, but we just successfully queried it and so let's give it a + // bump on the query time so we don't ping it too soon for a liveliness check. + dht.routingTable.UpdateLastSuccessfulOutboundQueryAt(addReq.p, time.Now()) + } + case <-dht.refreshFinishedCh: + bootstrapCount = bootstrapCount + 1 + if bootstrapCount == 2 { + timerCh = time.NewTimer(dht.rtFreezeTimeout).C + } + + old := isBootsrapping + isBootsrapping = false + if old { + dht.rtRefreshManager.RefreshNoWait() + } + + case <-proc.Closing(): + return + } + } +} + // peerFound signals the routingTable that we've found a peer that // might support the DHT protocol. // If we have a connection a peer but no exchange of a query RPC -> @@ -598,16 +664,11 @@ func (dht *IpfsDHT) peerFound(ctx context.Context, p peer.ID, queryPeer bool) { if err != nil { logger.Errorw("failed to validate if peer is a DHT peer", "peer", p, "error", err) } else if b { - newlyAdded, err := dht.routingTable.TryAddPeer(p, queryPeer) - if err != nil { - // peer not added. + select { + case dht.addPeerToRTChan <- addPeerRTReq{p, queryPeer}: + case <-dht.ctx.Done(): return } - if !newlyAdded && queryPeer { - // the peer is already in our RT, but we just successfully queried it and so let's give it a - // bump on the query time so we don't ping it too soon for a liveliness check. - dht.routingTable.UpdateLastSuccessfulOutboundQueryAt(p, time.Now()) - } } } diff --git a/dht_bootstrap_test.go b/dht_bootstrap_test.go index f6bf447b0..42bd48cc7 100644 --- a/dht_bootstrap_test.go +++ b/dht_bootstrap_test.go @@ -68,3 +68,134 @@ func TestDefaultBootstrappers(t *testing.T) { } require.Empty(t, dfmap) } + +func TestBootstrappersReplacable(t *testing.T) { + old := rtFreezeTimeout + rtFreezeTimeout = 100 * time.Millisecond + defer func() { + rtFreezeTimeout = old + }() + ctx := context.Background() + d := setupDHT(ctx, t, false, disableFixLowPeersRoutine(t), BucketSize(2)) + defer d.host.Close() + defer d.Close() + + var d1 *IpfsDHT + var d2 *IpfsDHT + + // d1 & d2 have a cpl of 0 + for { + d1 = setupDHT(ctx, t, false, disableFixLowPeersRoutine(t)) + if kb.CommonPrefixLen(d.selfKey, d1.selfKey) == 0 { + break + } + } + + for { + d2 = setupDHT(ctx, t, false, disableFixLowPeersRoutine(t)) + if kb.CommonPrefixLen(d.selfKey, d2.selfKey) == 0 { + break + } + } + defer d1.host.Close() + defer d1.Close() + + defer d2.host.Close() + defer d2.Close() + + connect(t, ctx, d, d1) + connect(t, ctx, d, d2) + require.Len(t, d.routingTable.ListPeers(), 2) + + // d3 & d4 with cpl=0 will go in as d1 & d2 are replacable. + var d3 *IpfsDHT + var d4 *IpfsDHT + + for { + d3 = setupDHT(ctx, t, false, disableFixLowPeersRoutine(t)) + if kb.CommonPrefixLen(d.selfKey, d3.selfKey) == 0 { + break + } + } + + for { + d4 = setupDHT(ctx, t, false, disableFixLowPeersRoutine(t)) + if kb.CommonPrefixLen(d.selfKey, d4.selfKey) == 0 { + break + } + } + + defer d3.host.Close() + defer d3.Close() + defer d4.host.Close() + defer d4.Close() + + connect(t, ctx, d, d3) + connect(t, ctx, d, d4) + require.Len(t, d.routingTable.ListPeers(), 2) + require.Contains(t, d.routingTable.ListPeers(), d3.self) + require.Contains(t, d.routingTable.ListPeers(), d4.self) + + // do couple of refreshes and wait for the Routing Table to be "frozen". + <-d.RefreshRoutingTable() + <-d.RefreshRoutingTable() + time.Sleep(1 * time.Second) + + // adding d5 fails because RT is frozen + var d5 *IpfsDHT + for { + d5 = setupDHT(ctx, t, false, disableFixLowPeersRoutine(t)) + if kb.CommonPrefixLen(d.selfKey, d5.selfKey) == 0 { + break + } + } + defer d5.host.Close() + defer d5.Close() + + connectNoSync(t, ctx, d, d5) + time.Sleep(500 * time.Millisecond) + require.Len(t, d.routingTable.ListPeers(), 2) + require.Contains(t, d.routingTable.ListPeers(), d3.self) + require.Contains(t, d.routingTable.ListPeers(), d4.self) + + // Let's empty the routing table + for _, p := range d.routingTable.ListPeers() { + d.routingTable.RemovePeer(p) + } + require.Len(t, d.routingTable.ListPeers(), 0) + + // adding d1 & d2 works now because there is space in the Routing Table + require.NoError(t, d.host.Network().ClosePeer(d1.self)) + require.NoError(t, d.host.Network().ClosePeer(d2.self)) + connect(t, ctx, d, d1) + connect(t, ctx, d, d2) + require.Len(t, d.routingTable.ListPeers(), 2) + require.Contains(t, d.routingTable.ListPeers(), d1.self) + require.Contains(t, d.routingTable.ListPeers(), d2.self) + + // adding d3 & d4 also works because the RT is not frozen. + require.NoError(t, d.host.Network().ClosePeer(d3.self)) + require.NoError(t, d.host.Network().ClosePeer(d4.self)) + connect(t, ctx, d, d3) + connect(t, ctx, d, d4) + require.Len(t, d.routingTable.ListPeers(), 2) + require.Contains(t, d.routingTable.ListPeers(), d3.self) + require.Contains(t, d.routingTable.ListPeers(), d4.self) + + // run refreshes and freeze the RT + <-d.RefreshRoutingTable() + <-d.RefreshRoutingTable() + time.Sleep(1 * time.Second) + // cant add d1 & d5 because RT is frozen. + require.NoError(t, d.host.Network().ClosePeer(d1.self)) + require.NoError(t, d.host.Network().ClosePeer(d5.self)) + connectNoSync(t, ctx, d, d1) + connectNoSync(t, ctx, d, d5) + d.peerFound(ctx, d5.self, true) + d.peerFound(ctx, d1.self, true) + time.Sleep(1 * time.Second) + + require.Len(t, d.routingTable.ListPeers(), 2) + require.Contains(t, d.routingTable.ListPeers(), d3.self) + require.Contains(t, d.routingTable.ListPeers(), d4.self) +} diff --git a/dht_options.go b/dht_options.go index 191081fcf..0c072d9ac 100644 --- a/dht_options.go +++ b/dht_options.go @@ -2,6 +2,7 @@ package dht import ( "fmt" + "testing" "time" "github.com/libp2p/go-libp2p-core/host" @@ -63,8 +64,9 @@ type config struct { } // set to true if we're operating in v1 dht compatible mode - v1CompatibleMode bool - bootstrapPeers []peer.AddrInfo + v1CompatibleMode bool + bootstrapPeers []peer.AddrInfo + disableFixLowPeers bool } func emptyQueryFilter(_ *IpfsDHT, ai peer.AddrInfo) bool { return true } @@ -417,3 +419,12 @@ func RoutingTablePeerDiversityFilter(pg peerdiversity.PeerIPGroupFilter) Option return nil } } + +// disableFixLowPeersRoutine disables the "fixLowPeers" routine in the DHT. +// This is ONLY for tests. +func disableFixLowPeersRoutine(t *testing.T) Option { + return func(c *config) error { + c.disableFixLowPeers = true + return nil + } +} diff --git a/rtrefresh/rt_refresh_manager.go b/rtrefresh/rt_refresh_manager.go index 7da01908c..6798f9e0f 100644 --- a/rtrefresh/rt_refresh_manager.go +++ b/rtrefresh/rt_refresh_manager.go @@ -49,6 +49,8 @@ type RtRefreshManager struct { successfulOutboundQueryGracePeriod time.Duration triggerRefresh chan *triggerRefreshReq // channel to write refresh requests to. + + refreshDoneCh chan struct{} // write to this channel after every refresh } func NewRtRefreshManager(h host.Host, rt *kbucket.RoutingTable, autoRefresh bool, @@ -56,7 +58,8 @@ func NewRtRefreshManager(h host.Host, rt *kbucket.RoutingTable, autoRefresh bool refreshQueryFnc func(ctx context.Context, key string) error, refreshQueryTimeout time.Duration, refreshInterval time.Duration, - successfulOutboundQueryGracePeriod time.Duration) (*RtRefreshManager, error) { + successfulOutboundQueryGracePeriod time.Duration, + refreshDoneCh chan struct{}) (*RtRefreshManager, error) { ctx, cancel := context.WithCancel(context.Background()) return &RtRefreshManager{ @@ -75,6 +78,7 @@ func NewRtRefreshManager(h host.Host, rt *kbucket.RoutingTable, autoRefresh bool successfulOutboundQueryGracePeriod: successfulOutboundQueryGracePeriod, triggerRefresh: make(chan *triggerRefreshReq), + refreshDoneCh: refreshDoneCh, }, nil } @@ -235,6 +239,12 @@ func (r *RtRefreshManager) doRefresh(forceRefresh bool) error { } } + select { + case r.refreshDoneCh <- struct{}{}: + case <-r.ctx.Done(): + return r.ctx.Err() + } + return merr } diff --git a/rtrefresh/rt_refresh_manager_test.go b/rtrefresh/rt_refresh_manager_test.go index 355e4cf4b..b9b598830 100644 --- a/rtrefresh/rt_refresh_manager_test.go +++ b/rtrefresh/rt_refresh_manager_test.go @@ -38,7 +38,7 @@ func TestSkipRefreshOnGapCpls(t *testing.T) { p, err := rt.GenRandPeerID(uint(u)) require.NoError(t, err) - b, err := rt.TryAddPeer(p, true) + b, err := rt.TryAddPeer(p, true, false) require.True(t, b) require.NoError(t, err) return nil @@ -60,7 +60,7 @@ func TestSkipRefreshOnGapCpls(t *testing.T) { lastCpl := 2 * (icpl + 1) p, err := rt.GenRandPeerID(10) require.NoError(t, err) - b, _ := rt.TryAddPeer(p, true) + b, _ := rt.TryAddPeer(p, true, false) require.True(t, b) r.refreshQueryFnc = qFuncWithIgnore(rt, icpl) require.NoError(t, r.doRefresh(true)) @@ -83,7 +83,7 @@ func TestSkipRefreshOnGapCpls(t *testing.T) { icpl = uint(6) p, err = rt.GenRandPeerID(10) require.NoError(t, err) - b, _ = rt.TryAddPeer(p, true) + b, _ = rt.TryAddPeer(p, true, false) require.True(t, b) r.refreshQueryFnc = qFuncWithIgnore(rt, icpl) require.NoError(t, r.doRefresh(true)) From 07d0e9ab3c24ae4ae232aaa9e616efc98c7423b8 Mon Sep 17 00:00:00 2001 From: Aarsh Shah Date: Thu, 11 Jun 2020 00:20:51 +0530 Subject: [PATCH 3/6] refactor: Refactoring components from Peer Diversity for Queries (#664) --- dual/dual.go | 7 +++- qpeerset/qpeerset.go | 56 ++++++++++--------------- qpeerset/qpeerset_test.go | 18 ++++---- query.go | 82 ++++++++++++++++++++----------------- routing.go | 2 +- rt_diversity_filter.go | 5 ++- rt_diversity_filter_test.go | 62 ++++++++++++++++++++++++++-- 7 files changed, 144 insertions(+), 88 deletions(-) diff --git a/dual/dual.go b/dual/dual.go index 0b4317999..efb5353e8 100644 --- a/dual/dual.go +++ b/dual/dual.go @@ -42,6 +42,11 @@ var ( _ routing.ValueStore = (*DHT)(nil) ) +var ( + maxPrefixCountPerCpl = 2 + maxPrefixCount = 3 +) + // New creates a new DualDHT instance. Options provided are forwarded on to the two concrete // IpfsDHT internal constructions, modulo additional options used by the Dual DHT to enforce // the LAN-vs-WAN distinction. @@ -51,7 +56,7 @@ func New(ctx context.Context, h host.Host, options ...dht.Option) (*DHT, error) wanOpts := append(options, dht.QueryFilter(dht.PublicQueryFilter), dht.RoutingTableFilter(dht.PublicRoutingTableFilter), - dht.RoutingTablePeerDiversityFilter(dht.NewRTPeerDiversityFilter(h, 2, 3)), + dht.RoutingTablePeerDiversityFilter(dht.NewRTPeerDiversityFilter(h, maxPrefixCountPerCpl, maxPrefixCount)), ) wan, err := dht.New(ctx, h, wanOpts...) if err != nil { diff --git a/qpeerset/qpeerset.go b/qpeerset/qpeerset.go index 971b5489d..b940a0430 100644 --- a/qpeerset/qpeerset.go +++ b/qpeerset/qpeerset.go @@ -121,53 +121,39 @@ func (qp *QueryPeerset) GetReferrer(p peer.ID) peer.ID { return qp.all[qp.find(p)].referredBy } -// NumWaiting returns the number of peers in state PeerWaiting. -func (qp *QueryPeerset) NumWaiting() int { - return len(qp.GetWaitingPeers()) -} - -// GetWaitingPeers returns a slice of all peers in state PeerWaiting, in an undefined order. -func (qp *QueryPeerset) GetWaitingPeers() (result []peer.ID) { - for _, p := range qp.all { - if p.state == PeerWaiting { - result = append(result, p.id) - } +// GetClosestNInStates returns the closest to the key peers, which are in one of the given states. +// It returns n peers or less, if fewer peers meet the condition. +// The returned peers are sorted in ascending order by their distance to the key. +func (qp *QueryPeerset) GetClosestNInStates(n int, states ...PeerState) (result []peer.ID) { + qp.sort() + m := make(map[PeerState]struct{}, len(states)) + for i := range states { + m[states[i]] = struct{}{} } - return -} -// GetClosestNotUnreachable returns the closest to the key peers, which are not in state PeerUnreachable. -// It returns count peers or less, if fewer peers meet the condition. -func (qp *QueryPeerset) GetClosestNotUnreachable(count int) (result []peer.ID) { - qp.sort() for _, p := range qp.all { - if p.state != PeerUnreachable { + if _, ok := m[p.state]; ok { result = append(result, p.id) } } - if len(result) >= count { - return result[:count] + if len(result) >= n { + return result[:n] } return result } -// NumHeard returns the number of peers in state PeerHeard. -func (qp *QueryPeerset) NumHeard() int { - return len(qp.GetHeardPeers()) +// GetClosestInStates returns the peers, which are in one of the given states. +// The returned peers are sorted in ascending order by their distance to the key. +func (qp *QueryPeerset) GetClosestInStates(states ...PeerState) (result []peer.ID) { + return qp.GetClosestNInStates(len(qp.all), states...) } -// GetHeardPeers returns a slice of all peers in state PeerHeard, in an undefined order. -func (qp *QueryPeerset) GetHeardPeers() (result []peer.ID) { - for _, p := range qp.all { - if p.state == PeerHeard { - result = append(result, p.id) - } - } - return +// NumHeard returns the number of peers in state PeerHeard. +func (qp *QueryPeerset) NumHeard() int { + return len(qp.GetClosestInStates(PeerHeard)) } -// GetSortedHeard returns a slice of all peers in state PeerHeard, ordered by ascending distance to the target key. -func (qp *QueryPeerset) GetSortedHeard() (result []peer.ID) { - qp.sort() - return qp.GetHeardPeers() +// NumWaiting returns the number of peers in state PeerWaiting. +func (qp *QueryPeerset) NumWaiting() int { + return len(qp.GetClosestInStates(PeerWaiting)) } diff --git a/qpeerset/qpeerset_test.go b/qpeerset/qpeerset_test.go index 4d0f7db0a..bd40413af 100644 --- a/qpeerset/qpeerset_test.go +++ b/qpeerset/qpeerset_test.go @@ -56,31 +56,31 @@ func TestQPeerSet(t *testing.T) { // add peer4 require.True(t, qp.TryAdd(peer4, oracle)) - cl := qp.GetClosestNotUnreachable(2) + cl := qp.GetClosestNInStates(2, PeerHeard, PeerWaiting, PeerQueried) require.Equal(t, []peer.ID{peer4, peer2}, cl) - cl = qp.GetClosestNotUnreachable(3) + cl = qp.GetClosestNInStates(3, PeerHeard, PeerWaiting, PeerQueried) require.Equal(t, []peer.ID{peer4, peer2}, cl) - cl = qp.GetClosestNotUnreachable(1) + cl = qp.GetClosestNInStates(1, PeerHeard, PeerWaiting, PeerQueried) require.Equal(t, []peer.ID{peer4}, cl) // mark as unreachable & try to get it qp.SetState(peer4, PeerUnreachable) - cl = qp.GetClosestNotUnreachable(1) + cl = qp.GetClosestNInStates(1, PeerHeard, PeerWaiting, PeerQueried) require.Equal(t, []peer.ID{peer2}, cl) // add peer1 require.True(t, qp.TryAdd(peer1, oracle)) - cl = qp.GetClosestNotUnreachable(1) + cl = qp.GetClosestNInStates(1, PeerHeard, PeerWaiting, PeerQueried) require.Equal(t, []peer.ID{peer1}, cl) - cl = qp.GetClosestNotUnreachable(2) + cl = qp.GetClosestNInStates(2, PeerHeard, PeerWaiting, PeerQueried) require.Equal(t, []peer.ID{peer1, peer2}, cl) // mark as waiting and assert qp.SetState(peer2, PeerWaiting) - require.Equal(t, []peer.ID{peer2}, qp.GetWaitingPeers()) + require.Equal(t, []peer.ID{peer2}, qp.GetClosestInStates(PeerWaiting)) - require.Equal(t, []peer.ID{peer1}, qp.GetHeardPeers()) + require.Equal(t, []peer.ID{peer1}, qp.GetClosestInStates(PeerHeard)) require.True(t, qp.TryAdd(peer3, oracle)) - require.Equal(t, []peer.ID{peer3, peer1}, qp.GetSortedHeard()) + require.Equal(t, []peer.ID{peer3, peer1}, qp.GetClosestInStates(PeerHeard)) require.Equal(t, 2, qp.NumHeard()) } diff --git a/query.go b/query.go index 88a1b63e6..47d87df05 100644 --- a/query.go +++ b/query.go @@ -8,13 +8,12 @@ import ( "sync" "time" - "github.com/google/uuid" - "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" pstore "github.com/libp2p/go-libp2p-core/peerstore" "github.com/libp2p/go-libp2p-core/routing" + "github.com/google/uuid" "github.com/libp2p/go-libp2p-kad-dht/qpeerset" kb "github.com/libp2p/go-libp2p-kbucket" ) @@ -223,7 +222,7 @@ func (q *query) constructLookupResult(target kb.ID) *lookupWithFollowupResult { // extract the top K not unreachable peers var peers []peer.ID peerState := make(map[peer.ID]qpeerset.PeerState) - qp := q.queryPeers.GetClosestNotUnreachable(q.dht.bucketSize) + qp := q.queryPeers.GetClosestNInStates(q.dht.bucketSize, qpeerset.PeerHeard, qpeerset.PeerWaiting, qpeerset.PeerQueried) for _, p := range qp { state := q.queryPeers.GetState(p) peerState[p] = state @@ -251,10 +250,11 @@ func (q *query) constructLookupResult(target kb.ID) *lookupWithFollowupResult { } type queryUpdate struct { - cause peer.ID - heard []peer.ID - queried []peer.ID - unreachable []peer.ID + cause peer.ID + queried []peer.ID + heard []peer.ID + unreachable []peer.ID + queryDuration time.Duration } @@ -279,8 +279,14 @@ func (q *query) run() { q.terminate(pathCtx, cancelPath, LookupCancelled) } + // calculate the maximum number of queries we could be spawning. + // Note: NumWaiting will be updated in spawnQuery + maxNumQueriesToSpawn := alpha - q.queryPeers.NumWaiting() + // termination is triggered on end-of-lookup conditions or starvation of unused peers - if ready, reason := q.isReadyToTerminate(); ready { + // it also returns the peers we should query next for a maximum of `maxNumQueriesToSpawn` peers. + ready, reason, qPeers := q.isReadyToTerminate(pathCtx, maxNumQueriesToSpawn) + if ready { q.terminate(pathCtx, cancelPath, reason) } @@ -288,29 +294,15 @@ func (q *query) run() { return } - // if all "threads" are busy, wait until someone finishes - if q.queryPeers.NumWaiting() >= alpha { - continue - } - - // spawn new queries, up to the parallelism allowance - // calculate the maximum number of queries we could be spawning. - // Note: NumWaiting will be updated in spawnQuery - maxNumQueriesToSpawn := alpha - q.queryPeers.NumWaiting() // try spawning the queries, if there are no available peers to query then we won't spawn them - for j := 0; j < maxNumQueriesToSpawn; j++ { - q.spawnQuery(pathCtx, cause, ch) + for _, p := range qPeers { + q.spawnQuery(pathCtx, cause, p, ch) } } } // spawnQuery starts one query, if an available heard peer is found -func (q *query) spawnQuery(ctx context.Context, cause peer.ID, ch chan<- *queryUpdate) { - peers := q.queryPeers.GetSortedHeard() - if len(peers) == 0 { - return - } - +func (q *query) spawnQuery(ctx context.Context, cause peer.ID, queryPeer peer.ID, ch chan<- *queryUpdate) { PublishLookupEvent(ctx, NewLookupEvent( q.dht.self, @@ -318,39 +310,52 @@ func (q *query) spawnQuery(ctx context.Context, cause peer.ID, ch chan<- *queryU q.key, NewLookupUpdateEvent( cause, - q.queryPeers.GetReferrer(peers[0]), - nil, // heard - []peer.ID{peers[0]}, // waiting - nil, // queried - nil, // unreachable + q.queryPeers.GetReferrer(queryPeer), + nil, // heard + []peer.ID{queryPeer}, // waiting + nil, // queried + nil, // unreachable ), nil, nil, ), ) - q.queryPeers.SetState(peers[0], qpeerset.PeerWaiting) + q.queryPeers.SetState(queryPeer, qpeerset.PeerWaiting) q.waitGroup.Add(1) - go q.queryPeer(ctx, ch, peers[0]) + go q.queryPeer(ctx, ch, queryPeer) } -func (q *query) isReadyToTerminate() (bool, LookupTerminationReason) { +func (q *query) isReadyToTerminate(ctx context.Context, nPeersToQuery int) (bool, LookupTerminationReason, []peer.ID) { // give the application logic a chance to terminate if q.stopFn() { - return true, LookupStopped + return true, LookupStopped, nil } if q.isStarvationTermination() { - return true, LookupStarvation + return true, LookupStarvation, nil } if q.isLookupTermination() { - return true, LookupCompleted + return true, LookupCompleted, nil } - return false, -1 + + // The peers we query next should be ones that we have only Heard about. + var peersToQuery []peer.ID + peers := q.queryPeers.GetClosestInStates(qpeerset.PeerHeard) + count := 0 + for _, p := range peers { + peersToQuery = append(peersToQuery, p) + count++ + if count == nPeersToQuery { + break + } + } + + return false, -1, peersToQuery } // From the set of all nodes that are not unreachable, // if the closest beta nodes are all queried, the lookup can terminate. func (q *query) isLookupTermination() bool { - peers := q.queryPeers.GetClosestNotUnreachable(q.dht.beta) + peers := q.queryPeers.GetClosestNInStates(q.dht.beta, qpeerset.PeerHeard, qpeerset.PeerWaiting, qpeerset.PeerQueried) for _, p := range peers { if q.queryPeers.GetState(p) != qpeerset.PeerQueried { return false @@ -478,6 +483,7 @@ func (q *query) updateState(ctx context.Context, up *queryUpdate) { if p == q.dht.self { // don't add self. continue } + if st := q.queryPeers.GetState(p); st == qpeerset.PeerWaiting { q.queryPeers.SetState(p, qpeerset.PeerUnreachable) } else { diff --git a/routing.go b/routing.go index b57e0ae84..f31e9a5b2 100644 --- a/routing.go +++ b/routing.go @@ -667,7 +667,7 @@ func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (_ peer.AddrInfo, // Note: we consider PeerUnreachable to be a valid state because the peer may not support the DHT protocol // and therefore the peer would fail the query. The fact that a peer that is returned can be a non-DHT // server peer and is not identified as such is a bug. - dialedPeerDuringQuery = lookupRes.state[i] != qpeerset.PeerHeard + dialedPeerDuringQuery = (lookupRes.state[i] == qpeerset.PeerQueried || lookupRes.state[i] == qpeerset.PeerUnreachable || lookupRes.state[i] == qpeerset.PeerWaiting) break } } diff --git a/rt_diversity_filter.go b/rt_diversity_filter.go index 35d9021ef..06c3116e6 100644 --- a/rt_diversity_filter.go +++ b/rt_diversity_filter.go @@ -1,11 +1,14 @@ package dht import ( + "sync" + "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/peer" + "github.com/libp2p/go-libp2p-kbucket/peerdiversity" + ma "github.com/multiformats/go-multiaddr" - "sync" ) var _ peerdiversity.PeerIPGroupFilter = (*rtPeerIPGroupFilter)(nil) diff --git a/rt_diversity_filter_test.go b/rt_diversity_filter_test.go index 1843e3604..d439e863a 100644 --- a/rt_diversity_filter_test.go +++ b/rt_diversity_filter_test.go @@ -5,6 +5,7 @@ import ( "testing" "time" + kb "github.com/libp2p/go-libp2p-kbucket" "github.com/libp2p/go-libp2p-kbucket/peerdiversity" swarmt "github.com/libp2p/go-libp2p-swarm/testing" bhost "github.com/libp2p/go-libp2p/p2p/host/basic" @@ -51,10 +52,64 @@ func TestRTPeerDiversityFilter(t *testing.T) { require.False(t, r.Allow(g2)) } -func TestRoutingTableEndToEnd(t *testing.T) { +func TestRoutingTableEndToEndMaxPerCpl(t *testing.T) { ctx := context.Background() h := bhost.New(swarmt.GenSwarm(t, ctx, swarmt.OptDisableReuseport)) - r := NewRTPeerDiversityFilter(h, 2, 3) + r := NewRTPeerDiversityFilter(h, 1, 2) + + d, err := New( + ctx, + h, + testPrefix, + NamespacedValidator("v", blankValidator{}), + Mode(ModeServer), + DisableAutoRefresh(), + RoutingTablePeerDiversityFilter(r), + ) + require.NoError(t, err) + + var d2 *IpfsDHT + var d3 *IpfsDHT + + for { + d2 = setupDHT(ctx, t, false) + if kb.CommonPrefixLen(d.selfKey, kb.ConvertPeerID(d2.self)) == 1 { + break + } + } + + for { + d3 = setupDHT(ctx, t, false) + if kb.CommonPrefixLen(d.selfKey, kb.ConvertPeerID(d3.self)) == 1 { + break + } + } + + // d2 will be allowed in the Routing table but + // d3 will not be allowed. + connectNoSync(t, ctx, d, d2) + require.Eventually(t, func() bool { + return d.routingTable.Find(d2.self) != "" + }, 1*time.Second, 100*time.Millisecond) + + connectNoSync(t, ctx, d, d3) + time.Sleep(1 * time.Second) + require.Len(t, d.routingTable.ListPeers(), 1) + require.True(t, d.routingTable.Find(d3.self) == "") + + // it works after removing d2 + d.routingTable.RemovePeer(d2.self) + b, err := d.routingTable.TryAddPeer(d3.self, true, false) + require.NoError(t, err) + require.True(t, b) + require.Len(t, d.routingTable.ListPeers(), 1) + require.True(t, d.routingTable.Find(d3.self) != "") +} + +func TestRoutingTableEndToEndMaxPerTable(t *testing.T) { + ctx := context.Background() + h := bhost.New(swarmt.GenSwarm(t, ctx, swarmt.OptDisableReuseport)) + r := NewRTPeerDiversityFilter(h, 100, 3) d, err := New( ctx, @@ -67,7 +122,7 @@ func TestRoutingTableEndToEnd(t *testing.T) { ) require.NoError(t, err) - // only 3 peers per prefix for a cpl + // only 3 peers per prefix for the table. d2 := setupDHT(ctx, t, false, DisableAutoRefresh()) connect(t, ctx, d, d2) waitForWellFormedTables(t, []*IpfsDHT{d}, 1, 1, 1*time.Second) @@ -84,4 +139,5 @@ func TestRoutingTableEndToEnd(t *testing.T) { connectNoSync(t, ctx, d, d5) time.Sleep(1 * time.Second) require.Len(t, d.routingTable.ListPeers(), 3) + require.True(t, d.routingTable.Find(d5.self) == "") } From 20f111c9a905fa5981d6555f15ec8257e1445644 Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Tue, 14 Jul 2020 20:22:09 -0700 Subject: [PATCH 4/6] feat: allow passing custom dual-dht options (#671) This is a BREAKING CHANGE for the dual DHT but it should be a relatively minor one. Co-authored-by: Petar Maymounkov --- dual/dual.go | 55 +++++++++++++++++++++++++++++++++++++++++++---- dual/dual_test.go | 2 +- 2 files changed, 52 insertions(+), 5 deletions(-) diff --git a/dual/dual.go b/dual/dual.go index efb5353e8..a291abb06 100644 --- a/dual/dual.go +++ b/dual/dual.go @@ -4,7 +4,7 @@ package dual import ( "context" - "github.com/libp2p/go-libp2p-kbucket/peerdiversity" + "fmt" "sync" dht "github.com/libp2p/go-libp2p-kad-dht" @@ -16,6 +16,7 @@ import ( "github.com/libp2p/go-libp2p-core/protocol" "github.com/libp2p/go-libp2p-core/routing" kb "github.com/libp2p/go-libp2p-kbucket" + "github.com/libp2p/go-libp2p-kbucket/peerdiversity" helper "github.com/libp2p/go-libp2p-routing-helpers" ma "github.com/multiformats/go-multiaddr" @@ -47,13 +48,59 @@ var ( maxPrefixCount = 3 ) +type config struct { + wan, lan []dht.Option +} + +func (cfg *config) apply(opts ...Option) error { + for i, o := range opts { + if err := o(cfg); err != nil { + return fmt.Errorf("dual dht option %d failed: %w", i, err) + } + } + return nil +} + +// Option is an option used to configure the Dual DHT. +type Option func(*config) error + +// WanDHTOption applies the given DHT options to the WAN DHT. +func WanDHTOption(opts ...dht.Option) Option { + return func(c *config) error { + c.wan = append(c.wan, opts...) + return nil + } +} + +// LanDHTOption applies the given DHT options to the LAN DHT. +func LanDHTOption(opts ...dht.Option) Option { + return func(c *config) error { + c.lan = append(c.lan, opts...) + return nil + } +} + +// DHTOption applies the given DHT options to both the WAN and the LAN DHTs. +func DHTOption(opts ...dht.Option) Option { + return func(c *config) error { + c.lan = append(c.lan, opts...) + c.wan = append(c.wan, opts...) + return nil + } +} + // New creates a new DualDHT instance. Options provided are forwarded on to the two concrete // IpfsDHT internal constructions, modulo additional options used by the Dual DHT to enforce // the LAN-vs-WAN distinction. // Note: query or routing table functional options provided as arguments to this function // will be overriden by this constructor. -func New(ctx context.Context, h host.Host, options ...dht.Option) (*DHT, error) { - wanOpts := append(options, +func New(ctx context.Context, h host.Host, options ...Option) (*DHT, error) { + var cfg config + err := cfg.apply(options...) + if err != nil { + return nil, err + } + wanOpts := append(cfg.wan, dht.QueryFilter(dht.PublicQueryFilter), dht.RoutingTableFilter(dht.PublicRoutingTableFilter), dht.RoutingTablePeerDiversityFilter(dht.NewRTPeerDiversityFilter(h, maxPrefixCountPerCpl, maxPrefixCount)), @@ -65,7 +112,7 @@ func New(ctx context.Context, h host.Host, options ...dht.Option) (*DHT, error) // Unless overridden by user supplied options, the LAN DHT should default // to 'AutoServer' mode. - lanOpts := append(options, + lanOpts := append(cfg.lan, dht.ProtocolExtension(LanExtension), dht.QueryFilter(dht.PrivateQueryFilter), dht.RoutingTableFilter(dht.PrivateRoutingTableFilter), diff --git a/dual/dual_test.go b/dual/dual_test.go index dbd4aab12..323ff1169 100644 --- a/dual/dual_test.go +++ b/dual/dual_test.go @@ -90,7 +90,7 @@ func setupDHT(ctx context.Context, t *testing.T, options ...dht.Option) *DHT { d, err := New( ctx, bhost.New(swarmt.GenSwarm(t, ctx, swarmt.OptDisableReuseport)), - append(baseOpts, options...)..., + append([]Option{DHTOption(baseOpts...)}, DHTOption(options...))..., ) if err != nil { t.Fatal(err) From 57a49f15d90bd1ba44e6ca89c0f8a5cc4033bd66 Mon Sep 17 00:00:00 2001 From: Adin Schmahmann Date: Thu, 13 Aug 2020 11:28:39 -0400 Subject: [PATCH 5/6] cleanup: fixed a few linter/vetting errors --- dht.go | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/dht.go b/dht.go index 4254e2904..8b5ff9ce9 100644 --- a/dht.go +++ b/dht.go @@ -387,6 +387,10 @@ func makeRoutingTable(dht *IpfsDHT, cfg config, maxLastSuccessfulOutboundThresho } rt, err := kb.NewRoutingTable(cfg.bucketSize, dht.selfKey, time.Minute, dht.host.Peerstore(), maxLastSuccessfulOutboundThreshold, filter) + if err != nil { + return nil, err + } + cmgr := dht.host.ConnManager() rt.PeerAdded = func(p peer.ID) { @@ -409,8 +413,8 @@ func makeRoutingTable(dht *IpfsDHT, cfg config, maxLastSuccessfulOutboundThresho } // GetRoutingTableDiversityStats returns the diversity stats for the Routing Table. -func (d *IpfsDHT) GetRoutingTableDiversityStats() []peerdiversity.CplDiversityStats { - return d.routingTable.GetDiversityStats() +func (dht *IpfsDHT) GetRoutingTableDiversityStats() []peerdiversity.CplDiversityStats { + return dht.routingTable.GetDiversityStats() } // Mode allows introspection of the operation mode of the DHT @@ -541,19 +545,19 @@ func (dht *IpfsDHT) getValueOrPeers(ctx context.Context, p peer.ID, key string) // Perhaps we were given closer peers peers := pb.PBPeersToPeerInfos(pmes.GetCloserPeers()) - if record := pmes.GetRecord(); record != nil { + if rec := pmes.GetRecord(); rec != nil { // Success! We were given the value logger.Debug("got value") // make sure record is valid. - err = dht.Validator.Validate(string(record.GetKey()), record.GetValue()) + err = dht.Validator.Validate(string(rec.GetKey()), rec.GetValue()) if err != nil { logger.Debug("received invalid record (discarded)") // return a sentinal to signify an invalid record was received err = errInvalidRecord - record = new(recpb.Record) + rec = new(recpb.Record) } - return record, peers, err + return rec, peers, err } if len(peers) > 0 { From 5729f0a194642f523e362a21b65a74dd817c345b Mon Sep 17 00:00:00 2001 From: Adin Schmahmann Date: Fri, 14 Aug 2020 16:46:40 -0400 Subject: [PATCH 6/6] fix: do not refresh during address changes if autorefresh is disabled testing: add test-specific option to force processing of Host address changes even when AutoRefresh is disabled --- dht.go | 5 +++++ dht_bootstrap_test.go | 2 +- dht_options.go | 19 ++++++++++++++++--- subscriber_notifee.go | 4 +++- 4 files changed, 25 insertions(+), 5 deletions(-) diff --git a/dht.go b/dht.go index 8b5ff9ce9..a7052f44a 100644 --- a/dht.go +++ b/dht.go @@ -145,6 +145,9 @@ type IpfsDHT struct { refreshFinishedCh chan struct{} rtFreezeTimeout time.Duration + + // configuration variables for tests + testAddressUpdateProcessing bool } // Assert that IPFS assumptions about interfaces aren't broken. These aren't a @@ -187,6 +190,8 @@ func New(ctx context.Context, h host.Host, options ...Option) (*IpfsDHT, error) dht.Validator = cfg.validator + dht.testAddressUpdateProcessing = cfg.testAddressUpdateProcessing + dht.auto = cfg.mode switch cfg.mode { case ModeAuto, ModeClient: diff --git a/dht_bootstrap_test.go b/dht_bootstrap_test.go index 42bd48cc7..8ea9c3609 100644 --- a/dht_bootstrap_test.go +++ b/dht_bootstrap_test.go @@ -15,7 +15,7 @@ import ( func TestSelfWalkOnAddressChange(t *testing.T) { ctx := context.Background() // create three DHT instances with auto refresh disabled - d1 := setupDHT(ctx, t, false, DisableAutoRefresh()) + d1 := setupDHT(ctx, t, false, DisableAutoRefresh(), forceAddressUpdateProcessing(t)) d2 := setupDHT(ctx, t, false, DisableAutoRefresh()) d3 := setupDHT(ctx, t, false, DisableAutoRefresh()) diff --git a/dht_options.go b/dht_options.go index 0c072d9ac..6edce9dd6 100644 --- a/dht_options.go +++ b/dht_options.go @@ -64,9 +64,12 @@ type config struct { } // set to true if we're operating in v1 dht compatible mode - v1CompatibleMode bool - bootstrapPeers []peer.AddrInfo - disableFixLowPeers bool + v1CompatibleMode bool + bootstrapPeers []peer.AddrInfo + + // test specific config options + disableFixLowPeers bool + testAddressUpdateProcessing bool } func emptyQueryFilter(_ *IpfsDHT, ai peer.AddrInfo) bool { return true } @@ -428,3 +431,13 @@ func disableFixLowPeersRoutine(t *testing.T) Option { return nil } } + +// forceAddressUpdateProcessing forces the DHT to handle changes to the hosts addresses. +// This occurs even when AutoRefresh has been disabled. +// This is ONLY for tests. +func forceAddressUpdateProcessing(t *testing.T) Option { + return func(c *config) error { + c.testAddressUpdateProcessing = true + return nil + } +} diff --git a/subscriber_notifee.go b/subscriber_notifee.go index fc5ed2296..75e9cbe83 100644 --- a/subscriber_notifee.go +++ b/subscriber_notifee.go @@ -85,7 +85,9 @@ func (nn *subscriberNotifee) subscribe(proc goprocess.Process) { // with our new address to all peers we are connected to. However, we might not necessarily be connected // to our closet peers & so in the true spirit of Zen, searching for ourself in the network really is the best way // to to forge connections with those matter. - dht.rtRefreshManager.RefreshNoWait() + if dht.autoRefresh || dht.testAddressUpdateProcessing { + dht.rtRefreshManager.RefreshNoWait() + } case event.EvtPeerProtocolsUpdated: handlePeerChangeEvent(dht, evt.Peer) case event.EvtPeerIdentificationCompleted: