From 6a1c92e189c0d0215c1085415dd1b94eefcf97b2 Mon Sep 17 00:00:00 2001 From: guillaumemichel Date: Wed, 14 Jun 2023 09:30:26 +0200 Subject: [PATCH 1/5] fix: leaking go routines --- dht.go | 43 ++++++++++++++++++------ dht_net.go | 5 --- dht_options.go | 9 +++++ go.mod | 34 +++++++++---------- go.sum | 69 ++++++++++++++++++++------------------- internal/config/config.go | 34 ++++++++++--------- 6 files changed, 112 insertions(+), 82 deletions(-) diff --git a/dht.go b/dht.go index e0273a824..88f0e899b 100644 --- a/dht.go +++ b/dht.go @@ -125,6 +125,9 @@ type IpfsDHT struct { // timeout for the lookupCheck operation lookupCheckTimeout time.Duration + // number of concurrent lookupCheck operations + lookupCheckCapacity int + lookupChecksLk sync.Mutex // A function returning a set of bootstrap peers to fallback on if all other attempts to fix // the routing table fail (or, e.g., this is the first time this node is @@ -296,6 +299,7 @@ func makeDHT(ctx context.Context, h host.Host, cfg dhtcfg.Config) (*IpfsDHT, err bucketSize: cfg.BucketSize, alpha: cfg.Concurrency, beta: cfg.Resiliency, + lookupCheckCapacity: cfg.LookupCheckConcurrency, queryPeerFilter: cfg.QueryPeerFilter, routingTablePeerFilter: cfg.RoutingTable.PeerFilter, rtPeerDiversityFilter: cfg.RoutingTable.DiversityFilter, @@ -658,8 +662,8 @@ func (dht *IpfsDHT) rtPeerLoop(proc goprocess.Process) { // it fails to answer, it isn't added to the routingTable. func (dht *IpfsDHT) peerFound(ctx context.Context, p peer.ID) { // if the peer is already in the routing table or the appropriate bucket is - // already full, don't try to add the new peer.ID - if dht.routingTable.Find(p) != "" || !dht.routingTable.UsefulPeer(p) { + // already full, don't try to add the new peer.ID + if !dht.routingTable.UsefulNewPeer(p) { return } @@ -669,17 +673,36 @@ func (dht *IpfsDHT) peerFound(ctx context.Context, p peer.ID) { logger.Errorw("failed to validate if peer is a DHT peer", "peer", p, "error", err) } else if b { - livelinessCtx, cancel := context.WithTimeout(ctx, dht.lookupCheckTimeout) - defer cancel() - - // performing a FIND_NODE query - if err := dht.lookupCheck(livelinessCtx, p); err != nil { - logger.Debugw("connected peer not answering DHT request as expected", "peer", p, "error", err) + // check if the maximal number of concurrent lookup checks is reached + dht.lookupChecksLk.Lock() + if dht.lookupCheckCapacity == 0 { + dht.lookupChecksLk.Unlock() + // drop the new peer.ID if the maximal number of concurrent lookup + // checks is reached return } + dht.lookupCheckCapacity-- + dht.lookupChecksLk.Unlock() + + go func() { + livelinessCtx, cancel := context.WithTimeout(ctx, dht.lookupCheckTimeout) + defer cancel() + + // performing a FIND_NODE query + err := dht.lookupCheck(livelinessCtx, p) + + dht.lookupChecksLk.Lock() + dht.lookupCheckCapacity++ + dht.lookupChecksLk.Unlock() + + if err != nil { + logger.Debugw("connected peer not answering DHT request as expected", "peer", p, "error", err) + return + } - // if the FIND_NODE succeeded, the peer is considered as valid - dht.validPeerFound(ctx, p) + // if the FIND_NODE succeeded, the peer is considered as valid + dht.validPeerFound(ctx, p) + }() } } diff --git a/dht_net.go b/dht_net.go index f1eedd5d9..9f3fbf01f 100644 --- a/dht_net.go +++ b/dht_net.go @@ -110,11 +110,6 @@ func (dht *IpfsDHT) handleNewMessage(s network.Stream) bool { return false } - // a peer has queried us, let's add it to RT. A new go routine is required - // because we can't block the stream handler until the remote peer answers - // our query. - go dht.peerFound(dht.ctx, mPeer) - if c := baseLogger.Check(zap.DebugLevel, "handling message"); c != nil { c.Write(zap.String("from", mPeer.String()), zap.Int32("type", int32(req.GetType())), diff --git a/dht_options.go b/dht_options.go index 208786a61..eb1b90899 100644 --- a/dht_options.go +++ b/dht_options.go @@ -194,6 +194,15 @@ func Resiliency(beta int) Option { } } +// LookupInterval configures maximal number of go routines that can be used to +// perform a lookup check operation, before adding a new node to the routing table. +func LookupCheckConcurrency(n int) Option { + return func(c *dhtcfg.Config) error { + c.LookupCheckConcurrency = n + return nil + } +} + // MaxRecordAge specifies the maximum time that any node will hold onto a record ("PutValue record") // from the time its received. This does not apply to any other forms of validity that // the record may contain. diff --git a/go.mod b/go.mod index 0467adade..28aef81ab 100644 --- a/go.mod +++ b/go.mod @@ -14,8 +14,8 @@ require ( github.com/ipfs/go-detect-race v0.0.1 github.com/ipfs/go-log v1.0.5 github.com/jbenet/goprocess v0.1.4 - github.com/libp2p/go-libp2p v0.27.5 - github.com/libp2p/go-libp2p-kbucket v0.6.1 + github.com/libp2p/go-libp2p v0.27.6 + github.com/libp2p/go-libp2p-kbucket v0.6.2 github.com/libp2p/go-libp2p-record v0.2.0 github.com/libp2p/go-libp2p-routing-helpers v0.7.0 github.com/libp2p/go-libp2p-testing v0.12.0 @@ -37,7 +37,7 @@ require ( ) require ( - github.com/benbjohnson/clock v1.3.0 // indirect + github.com/benbjohnson/clock v1.3.5 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/containerd/cgroups v1.1.0 // indirect @@ -55,15 +55,15 @@ require ( github.com/godbus/dbus/v5 v5.1.0 // indirect github.com/golang/mock v1.6.0 // indirect github.com/golang/protobuf v1.5.3 // indirect - github.com/google/pprof v0.0.0-20230405160723-4a4c7d95572b // indirect + github.com/google/pprof v0.0.0-20230602150820-91b7bce49751 // indirect github.com/gorilla/websocket v1.5.0 // indirect github.com/hashicorp/errwrap v1.1.0 // indirect - github.com/huin/goupnp v1.1.0 // indirect + github.com/huin/goupnp v1.2.0 // indirect github.com/ipfs/go-log/v2 v2.5.1 // indirect github.com/ipld/go-ipld-prime v0.20.0 // indirect github.com/jackpal/go-nat-pmp v1.0.2 // indirect github.com/jbenet/go-temp-err-catcher v0.1.0 // indirect - github.com/klauspost/compress v1.16.4 // indirect + github.com/klauspost/compress v1.16.5 // indirect github.com/klauspost/cpuid/v2 v2.2.5 // indirect github.com/koron/go-ssdp v0.0.4 // indirect github.com/libp2p/go-buffer-pool v0.1.0 // indirect @@ -71,12 +71,12 @@ require ( github.com/libp2p/go-flow-metrics v0.1.0 // indirect github.com/libp2p/go-libp2p-asn-util v0.3.0 // indirect github.com/libp2p/go-nat v0.1.0 // indirect - github.com/libp2p/go-reuseport v0.2.0 // indirect + github.com/libp2p/go-reuseport v0.3.0 // indirect github.com/libp2p/go-yamux/v4 v4.0.0 // indirect github.com/marten-seemann/tcp v0.0.0-20210406111302-dfbc87cc63fd // indirect github.com/mattn/go-isatty v0.0.19 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect - github.com/miekg/dns v1.1.53 // indirect + github.com/miekg/dns v1.1.54 // indirect github.com/mikioh/tcpinfo v0.0.0-20190314235526-30a79bb1804b // indirect github.com/mikioh/tcpopt v0.0.0-20190314235656-172688c1accc // indirect github.com/minio/sha256-simd v1.0.1 // indirect @@ -86,7 +86,7 @@ require ( github.com/multiformats/go-multiaddr-fmt v0.1.0 // indirect github.com/multiformats/go-multicodec v0.9.0 // indirect github.com/multiformats/go-varint v0.0.7 // indirect - github.com/onsi/ginkgo/v2 v2.9.2 // indirect + github.com/onsi/ginkgo/v2 v2.9.7 // indirect github.com/opencontainers/runtime-spec v1.0.2 // indirect github.com/opentracing/opentracing-go v1.2.0 // indirect github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 // indirect @@ -94,29 +94,29 @@ require ( github.com/pmezard/go-difflib v1.0.0 // indirect github.com/polydawn/refmt v0.89.0 // indirect github.com/prometheus/client_golang v1.14.0 // indirect - github.com/prometheus/client_model v0.3.0 // indirect + github.com/prometheus/client_model v0.4.0 // indirect github.com/prometheus/common v0.42.0 // indirect github.com/prometheus/procfs v0.9.0 // indirect github.com/quic-go/qpack v0.4.0 // indirect github.com/quic-go/qtls-go1-19 v0.3.2 // indirect github.com/quic-go/qtls-go1-20 v0.2.2 // indirect github.com/quic-go/quic-go v0.33.0 // indirect - github.com/quic-go/webtransport-go v0.5.2 // indirect + github.com/quic-go/webtransport-go v0.5.3 // indirect github.com/raulk/go-watchdog v1.3.0 // indirect github.com/spaolacci/murmur3 v1.1.0 // indirect go.opentelemetry.io/otel/metric v1.16.0 // indirect go.uber.org/atomic v1.11.0 // indirect - go.uber.org/dig v1.16.1 // indirect + go.uber.org/dig v1.17.0 // indirect go.uber.org/fx v1.19.2 // indirect go.uber.org/multierr v1.11.0 // indirect - golang.org/x/crypto v0.9.0 // indirect + golang.org/x/crypto v0.10.0 // indirect golang.org/x/exp v0.0.0-20230321023759-10a507213a29 // indirect golang.org/x/mod v0.10.0 // indirect golang.org/x/net v0.10.0 // indirect - golang.org/x/sync v0.1.0 // indirect - golang.org/x/sys v0.8.0 // indirect - golang.org/x/text v0.9.0 // indirect - golang.org/x/tools v0.7.0 // indirect + golang.org/x/sync v0.2.0 // indirect + golang.org/x/sys v0.9.0 // indirect + golang.org/x/text v0.10.0 // indirect + golang.org/x/tools v0.9.1 // indirect google.golang.org/protobuf v1.30.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect lukechampine.com/blake3 v1.2.1 // indirect diff --git a/go.sum b/go.sum index 158ec8621..d0b0404fd 100644 --- a/go.sum +++ b/go.sum @@ -13,8 +13,9 @@ github.com/aead/siphash v1.0.1/go.mod h1:Nywa3cDsYNNK3gaciGTWPwHt0wlpNV15vwmswBA github.com/anmitsu/go-shlex v0.0.0-20161002113705-648efa622239/go.mod h1:2FmKhYUyUczH0OGQWaF5ceTx0UBShxjsH6f8oGKYe2c= github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8= github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= -github.com/benbjohnson/clock v1.3.0 h1:ip6w0uFQkncKQ979AypyG0ER7mqUSBdKLOgAle/AT8A= github.com/benbjohnson/clock v1.3.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= +github.com/benbjohnson/clock v1.3.5 h1:VvXlSJBzZpA/zum6Sj74hxwYI2DIxRWuNIoXAzHZz5o= +github.com/benbjohnson/clock v1.3.5/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= @@ -160,8 +161,8 @@ github.com/google/gopacket v1.1.19 h1:ves8RnFZPGiFnTS0uPQStjwru6uO6h+nlr9j6fL7kF github.com/google/gopacket v1.1.19/go.mod h1:iJ8V8n6KS+z2U1A8pUwu8bW5SyEMkXJB8Yo/Vo+TKTo= github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs= github.com/google/pprof v0.0.0-20181206194817-3ea8567a2e57/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc= -github.com/google/pprof v0.0.0-20230405160723-4a4c7d95572b h1:Qcx5LM0fSiks9uCyFZwDBUasd3lxd1RM0GYpL+Li5o4= -github.com/google/pprof v0.0.0-20230405160723-4a4c7d95572b/go.mod h1:79YE0hCXdHag9sBkw2o+N/YnZtTkXi0UT9Nnixa5eYk= +github.com/google/pprof v0.0.0-20230602150820-91b7bce49751 h1:hR7/MlvK23p6+lIw9SN1TigNLn9ZnF3W4SYRKq2gAHs= +github.com/google/pprof v0.0.0-20230602150820-91b7bce49751/go.mod h1:Jh3hGz2jkYak8qXPD19ryItVnUgpgeqzdkY/D0EaeuA= github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= @@ -189,8 +190,8 @@ github.com/hashicorp/golang-lru v0.5.4/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uG github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/huin/goupnp v1.0.0/go.mod h1:n9v9KO1tAxYH82qOn+UTIFQDmx5n1Zxd/ClZDMX7Bnc= -github.com/huin/goupnp v1.1.0 h1:gEe0Dp/lZmPZiDFzJJaOfUpOvv2MKUkoBX8lDrn9vKU= -github.com/huin/goupnp v1.1.0/go.mod h1:gnGPsThkYa7bFi/KWmEysQRf48l2dvR5bxr2OFckNX8= +github.com/huin/goupnp v1.2.0 h1:uOKW26NG1hsSSbXIZ1IR7XP9Gjd1U8pnLaCMgntmkmY= +github.com/huin/goupnp v1.2.0/go.mod h1:gnGPsThkYa7bFi/KWmEysQRf48l2dvR5bxr2OFckNX8= github.com/huin/goutil v0.0.0-20170803182201-1ca381bf3150/go.mod h1:PpLOETDnJ0o3iZrZfqZzyLl6l7F3c6L1oWn7OICBi6o= github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= github.com/ipfs/boxo v0.10.0 h1:tdDAxq8jrsbRkYoF+5Rcqyeb91hgWe2hp7iLu7ORZLY= @@ -243,8 +244,8 @@ github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/kkdai/bstream v0.0.0-20161212061736-f391b8402d23/go.mod h1:J+Gs4SYgM6CZQHDETBtE9HaSEkGmuNXF86RwHhHUvq4= github.com/klauspost/compress v1.10.3/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= -github.com/klauspost/compress v1.16.4 h1:91KN02FnsOYhuunwU4ssRe8lc2JosWmizWa91B5v1PU= -github.com/klauspost/compress v1.16.4/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= +github.com/klauspost/compress v1.16.5 h1:IFV2oUNUzZaz+XyusxpLzpzS8Pt5rh0Z16For/djlyI= +github.com/klauspost/compress v1.16.5/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= github.com/klauspost/cpuid/v2 v2.2.5 h1:0E5MSMDEoAulmXNFquVs//DdoomxaoTY1kUhbc/qbZg= github.com/klauspost/cpuid/v2 v2.2.5/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= @@ -270,15 +271,15 @@ github.com/libp2p/go-flow-metrics v0.0.1/go.mod h1:Iv1GH0sG8DtYN3SVJ2eG221wMiNpZ github.com/libp2p/go-flow-metrics v0.0.3/go.mod h1:HeoSNUrOJVK1jEpDqVEiUOIXqhbnS27omG0uWU5slZs= github.com/libp2p/go-flow-metrics v0.1.0 h1:0iPhMI8PskQwzh57jB9WxIuIOQ0r+15PChFGkx3Q3WM= github.com/libp2p/go-flow-metrics v0.1.0/go.mod h1:4Xi8MX8wj5aWNDAZttg6UPmc0ZrnFNsMtpsYUClFtro= -github.com/libp2p/go-libp2p v0.27.5 h1:KwA7pXKXpz8hG6Cr1fMA7UkgleogcwQj0sxl5qquWRg= -github.com/libp2p/go-libp2p v0.27.5/go.mod h1:oMfQGTb9CHnrOuSM6yMmyK2lXz3qIhnkn2+oK3B1Y2g= +github.com/libp2p/go-libp2p v0.27.6 h1:KmGU5kskCaaerm53heqzfGOlrW2z8icZ+fnyqgrZs38= +github.com/libp2p/go-libp2p v0.27.6/go.mod h1:oMfQGTb9CHnrOuSM6yMmyK2lXz3qIhnkn2+oK3B1Y2g= github.com/libp2p/go-libp2p-asn-util v0.3.0 h1:gMDcMyYiZKkocGXDQ5nsUQyquC9+H+iLEQHwOCZ7s8s= github.com/libp2p/go-libp2p-asn-util v0.3.0/go.mod h1:B1mcOrKUE35Xq/ASTmQ4tN3LNzVVaMNmq2NACuqyB9w= github.com/libp2p/go-libp2p-core v0.2.4/go.mod h1:STh4fdfa5vDYr0/SzYYeqnt+E6KfEV5VxfIrm0bcI0g= github.com/libp2p/go-libp2p-core v0.3.0/go.mod h1:ACp3DmS3/N64c2jDzcV429ukDpicbL6+TrrxANBjPGw= github.com/libp2p/go-libp2p-kbucket v0.3.1/go.mod h1:oyjT5O7tS9CQurok++ERgc46YLwEpuGoFq9ubvoUOio= -github.com/libp2p/go-libp2p-kbucket v0.6.1 h1:Y/NIvALuY5/fJlOpaJor9Azg4eor15JskGs9Lb2EhH0= -github.com/libp2p/go-libp2p-kbucket v0.6.1/go.mod h1:dvWO707Oq/vhMVuUhyfLkw0QsOrJFETepbNfpVHSELI= +github.com/libp2p/go-libp2p-kbucket v0.6.2 h1:JI5MDqQJJS9wmjs1/2fmmnToSxrepV4ScqwizXCCH4E= +github.com/libp2p/go-libp2p-kbucket v0.6.2/go.mod h1:RCseT7AH6eJWxxk2ol03xtP9pEHetYSPXOaJnOiD8i0= github.com/libp2p/go-libp2p-peerstore v0.1.4/go.mod h1:+4BDbDiiKf4PzpANZDAT+knVdLxvqh7hXOujessqdzs= github.com/libp2p/go-libp2p-record v0.2.0 h1:oiNUOCWno2BFuxt3my4i1frNrt7PerzB3queqa1NkQ0= github.com/libp2p/go-libp2p-record v0.2.0/go.mod h1:I+3zMkvvg5m2OcSdoL0KPljyJyvNDFGKX7QdlpYUcwk= @@ -298,8 +299,8 @@ github.com/libp2p/go-netroute v0.2.1 h1:V8kVrpD8GK0Riv15/7VN6RbUQ3URNZVosw7H2v9t github.com/libp2p/go-netroute v0.2.1/go.mod h1:hraioZr0fhBjG0ZRXJJ6Zj2IVEVNx6tDTFQfSmcq7mQ= github.com/libp2p/go-openssl v0.0.3/go.mod h1:unDrJpgy3oFr+rqXsarWifmJuNnJR4chtO1HmaZjggc= github.com/libp2p/go-openssl v0.0.4/go.mod h1:unDrJpgy3oFr+rqXsarWifmJuNnJR4chtO1HmaZjggc= -github.com/libp2p/go-reuseport v0.2.0 h1:18PRvIMlpY6ZK85nIAicSBuXXvrYoSw3dsBAR7zc560= -github.com/libp2p/go-reuseport v0.2.0/go.mod h1:bvVho6eLMm6Bz5hmU0LYN3ixd3nPPvtIlaURZZgOY4k= +github.com/libp2p/go-reuseport v0.3.0 h1:iiZslO5byUYZEg9iCwJGf5h+sf1Agmqx2V2FDjPyvUw= +github.com/libp2p/go-reuseport v0.3.0/go.mod h1:laea40AimhtfEqysZ71UpYj4S+R9VpH8PgqLo7L+SwI= github.com/libp2p/go-sockaddr v0.0.2/go.mod h1:syPvOmNs24S3dFVGJA1/mrqdeijPxLV2Le3BRLKd68k= github.com/libp2p/go-yamux/v4 v4.0.0 h1:+Y80dV2Yx/kv7Y7JKu0LECyVdMXm1VUoko+VQ9rBfZQ= github.com/libp2p/go-yamux/v4 v4.0.0/go.mod h1:NWjl8ZTLOGlozrXSOZ/HlfG++39iKNnM5wwmtQP1YB4= @@ -323,8 +324,8 @@ github.com/matttproud/golang_protobuf_extensions v1.0.4/go.mod h1:BSXmuO+STAnVfr github.com/mgutz/ansi v0.0.0-20170206155736-9520e82c474b/go.mod h1:01TrycV0kFyexm33Z7vhZRXopbI8J3TDReVlkTgMUxE= github.com/microcosm-cc/bluemonday v1.0.1/go.mod h1:hsXNsILzKxV+sX77C5b8FSuKF00vh2OMYv+xgHpAMF4= github.com/miekg/dns v1.1.41/go.mod h1:p6aan82bvRIyn+zDIv9xYNUpwa73JcSh9BKwknJysuI= -github.com/miekg/dns v1.1.53 h1:ZBkuHr5dxHtB1caEOlZTLPo7D3L3TWckgUUs/RHfDxw= -github.com/miekg/dns v1.1.53/go.mod h1:uInx36IzPl7FYnDcMeVWxj9byh7DutNykX4G9Sj60FY= +github.com/miekg/dns v1.1.54 h1:5jon9mWcb0sFJGpnI99tOMhCPyJ+RPVz5b63MQG0VWI= +github.com/miekg/dns v1.1.54/go.mod h1:uInx36IzPl7FYnDcMeVWxj9byh7DutNykX4G9Sj60FY= github.com/mikioh/tcp v0.0.0-20190314235350-803a9b46060c h1:bzE/A84HN25pxAuk9Eej1Kz9OUelF97nAc82bDquQI8= github.com/mikioh/tcp v0.0.0-20190314235350-803a9b46060c/go.mod h1:0SQS9kMwD2VsyFEB++InYyBJroV/FRmBgcydeSUcJms= github.com/mikioh/tcpinfo v0.0.0-20190314235526-30a79bb1804b h1:z78hV3sbSMAUoyUMM0I83AUIT6Hu17AWfgjzIbtrYFc= @@ -386,10 +387,10 @@ github.com/neelance/astrewrite v0.0.0-20160511093645-99348263ae86/go.mod h1:kHJE github.com/neelance/sourcemap v0.0.0-20151028013722-8c68805598ab/go.mod h1:Qr6/a/Q4r9LP1IltGz7tA7iOK1WonHEYhu1HRBA7ZiM= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= -github.com/onsi/ginkgo/v2 v2.9.2 h1:BA2GMJOtfGAfagzYtrAlufIP0lq6QERkFmHLMLPwFSU= -github.com/onsi/ginkgo/v2 v2.9.2/go.mod h1:WHcJJG2dIlcCqVfBAwUCrJxSPFb6v4azBwgxeMeDuts= +github.com/onsi/ginkgo/v2 v2.9.7 h1:06xGQy5www2oN160RtEZoTvnP2sPhEfePYmCDc2szss= +github.com/onsi/ginkgo/v2 v2.9.7/go.mod h1:cxrmXWykAwTwhQsJOPfdIDiJ+l2RYq7U8hFU+M/1uw0= github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= -github.com/onsi/gomega v1.27.4 h1:Z2AnStgsdSayCMDiCU42qIz+HLqEPcgiOCXjAU/w+8E= +github.com/onsi/gomega v1.27.7 h1:fVih9JD6ogIiHUN6ePK7HJidyEDpWGVB5mzM7cWNXoU= github.com/opencontainers/runtime-spec v1.0.2 h1:UfAcuLBJB9Coz72x1hgl8O5RVzTdNiaglX6v2DM6FI0= github.com/opencontainers/runtime-spec v1.0.2/go.mod h1:jwyrGlmzljRJv/Fgzds9SsS/C5hL+LL3ko9hs6T5lQ0= github.com/opentracing/opentracing-go v1.0.2/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= @@ -411,8 +412,8 @@ github.com/prometheus/client_golang v1.14.0 h1:nJdhIvne2eSX/XRAFV9PcvFFRbrjbcTUj github.com/prometheus/client_golang v1.14.0/go.mod h1:8vpkKitgIVNcqrRBWh1C4TIUQgYNtG/XQE4E/Zae36Y= github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= -github.com/prometheus/client_model v0.3.0 h1:UBgGFHqYdG/TPFD1B1ogZywDqEkwp3fBMvqdiQ7Xew4= -github.com/prometheus/client_model v0.3.0/go.mod h1:LDGWKZIo7rky3hgvBe+caln+Dr3dPggB5dvjtD7w9+w= +github.com/prometheus/client_model v0.4.0 h1:5lQXD3cAg1OXBf4Wq03gTrXHeaV0TQvGfUooCfx1yqY= +github.com/prometheus/client_model v0.4.0/go.mod h1:oMQmHW1/JoDwqLtg57MGgP/Fb1CJEYF2imWWhWtMkYU= github.com/prometheus/common v0.0.0-20180801064454-c7de2306084e/go.mod h1:daVV7qP5qjZbuso7PdcryaAu0sAZbrN9i7WWcTMWvro= github.com/prometheus/common v0.42.0 h1:EKsfXEYo4JpWMHH5cg+KOUWeuJSov1Id8zGR8eeI1YM= github.com/prometheus/common v0.42.0/go.mod h1:xBwqVerjNdUDjgODMpudtOMwlOwf2SaTr1yjz4b7Zbc= @@ -427,8 +428,8 @@ github.com/quic-go/qtls-go1-20 v0.2.2 h1:WLOPx6OY/hxtTxKV1Zrq20FtXtDEkeY00CGQm8G github.com/quic-go/qtls-go1-20 v0.2.2/go.mod h1:JKtK6mjbAVcUTN/9jZpvLbGxvdWIKS8uT7EiStoU1SM= github.com/quic-go/quic-go v0.33.0 h1:ItNoTDN/Fm/zBlq769lLJc8ECe9gYaW40veHCCco7y0= github.com/quic-go/quic-go v0.33.0/go.mod h1:YMuhaAV9/jIu0XclDXwZPAsP/2Kgr5yMYhe9oxhhOFA= -github.com/quic-go/webtransport-go v0.5.2 h1:GA6Bl6oZY+g/flt00Pnu0XtivSD8vukOu3lYhJjnGEk= -github.com/quic-go/webtransport-go v0.5.2/go.mod h1:OhmmgJIzTTqXK5xvtuX0oBpLV2GkLWNDA+UeTGJXErU= +github.com/quic-go/webtransport-go v0.5.3 h1:5XMlzemqB4qmOlgIus5zB45AcZ2kCgCy2EptUrfOPWU= +github.com/quic-go/webtransport-go v0.5.3/go.mod h1:OhmmgJIzTTqXK5xvtuX0oBpLV2GkLWNDA+UeTGJXErU= github.com/raulk/go-watchdog v1.3.0 h1:oUmdlHxdkXRJlwfG0O9omj8ukerm8MEQavSiDTEtBsk= github.com/raulk/go-watchdog v1.3.0/go.mod h1:fIvOnLbF0b0ZwkB9YU4mOW9Did//4vPZtDqv66NfsMU= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= @@ -529,8 +530,8 @@ go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE= go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= -go.uber.org/dig v1.16.1 h1:+alNIBsl0qfY0j6epRubp/9obgtrObRAc5aD+6jbWY8= -go.uber.org/dig v1.16.1/go.mod h1:557JTAUZT5bUK0SvCwikmLPPtdQhfvLYtO5tJgQSbnk= +go.uber.org/dig v1.17.0 h1:5Chju+tUvcC+N7N6EV08BJz41UZuO3BmHcN4A287ZLI= +go.uber.org/dig v1.17.0/go.mod h1:rTxpf7l5I0eBTlE6/9RL+lDybC7WFwY2QH55ZSjy1mU= go.uber.org/fx v1.19.2 h1:SyFgYQFr1Wl0AYstE8vyYIzP4bFz2URrScjwC4cwUvY= go.uber.org/fx v1.19.2/go.mod h1:43G1VcqSzbIv77y00p1DRAsyZS8WdzuYdhZXmEUkMyQ= go.uber.org/goleak v1.1.11-0.20210813005559-691160354723/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ= @@ -559,8 +560,8 @@ golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8U golang.org/x/crypto v0.0.0-20200602180216-279210d13fed/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20210322153248-0c34fe9e7dc2/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= -golang.org/x/crypto v0.9.0 h1:LF6fAI+IutBocDJ2OT0Q1g8plpYljMZ4+lty+dsqw3g= -golang.org/x/crypto v0.9.0/go.mod h1:yrmDGqONDYtNj3tH8X9dzUun2m2lzPa9ngI6/RUPGR0= +golang.org/x/crypto v0.10.0 h1:LKqV2xt9+kDzSTfOhx4FrkEBcMrAgHSYgzywV9zcGmM= +golang.org/x/crypto v0.10.0/go.mod h1:o4eNf7Ede1fv+hwOwZsTHl9EsPFO6q6ZvYR8vYfY45I= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20230321023759-10a507213a29 h1:ooxPy7fPvB4kwsA2h+iBNHkAbp/4JxTSwCmvdjEYmug= golang.org/x/exp v0.0.0-20230321023759-10a507213a29/go.mod h1:CxIveKay+FTh1D0yPZemJVgC/95VzuuOLq5Qi4xnoYc= @@ -611,8 +612,8 @@ golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o= -golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.2.0 h1:PUR+T4wwASmuSTYdKjYHI5TD22Wy5ogLU5qZCOLxBrI= +golang.org/x/sync v0.2.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180810173357-98c5dad5d1a0/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -640,15 +641,15 @@ golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.8.0 h1:EBmGv8NaZBZTWvrbjNoL6HVt+IVy3QDQpJs7VRIw3tU= -golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.9.0 h1:KS/R3tvhPqvJvwcKfnBHJwwthS11LRhmM5D59eEXa0s= +golang.org/x/sys v0.9.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/text v0.9.0 h1:2sjJmO8cDvYveuX97RDLsxlyUxLl+GHoLxBiRdHllBE= -golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= +golang.org/x/text v0.10.0 h1:UpjohKhiEgNc0CSauXmwYftY1+LlaC75SJwh0SgCX58= +golang.org/x/text v0.10.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= @@ -672,8 +673,8 @@ golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roY golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.1.1/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= -golang.org/x/tools v0.7.0 h1:W4OVu8VVOaIO0yzWMNdepAulS7YfoS3Zabrm8DOXXU4= -golang.org/x/tools v0.7.0/go.mod h1:4pg6aUX35JBAogB10C9AtvVL+qowtN4pT3CGSQex14s= +golang.org/x/tools v0.9.1 h1:8WMNJAz3zrtPmnYC7ISf5dEn3MT0gY7jBJfw27yrrLo= +golang.org/x/tools v0.9.1/go.mod h1:owI94Op576fPu3cIGQeHs3joujW/2Oc6MtlxbF5dfNc= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/internal/config/config.go b/internal/config/config.go index c6d922ca0..4bfca3452 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -32,20 +32,21 @@ type RouteTableFilterFunc func(dht interface{}, p peer.ID) bool // Config is a structure containing all the options that can be used when constructing a DHT. type Config struct { - Datastore ds.Batching - Validator record.Validator - ValidatorChanged bool // if true implies that the validator has been changed and that Defaults should not be used - Mode ModeOpt - ProtocolPrefix protocol.ID - V1ProtocolOverride protocol.ID - BucketSize int - Concurrency int - Resiliency int - MaxRecordAge time.Duration - EnableProviders bool - EnableValues bool - ProviderStore providers.ProviderStore - QueryPeerFilter QueryFilterFunc + Datastore ds.Batching + Validator record.Validator + ValidatorChanged bool // if true implies that the validator has been changed and that Defaults should not be used + Mode ModeOpt + ProtocolPrefix protocol.ID + V1ProtocolOverride protocol.ID + BucketSize int + Concurrency int + Resiliency int + MaxRecordAge time.Duration + EnableProviders bool + EnableValues bool + ProviderStore providers.ProviderStore + QueryPeerFilter QueryFilterFunc + LookupCheckConcurrency int RoutingTable struct { RefreshQueryTimeout time.Duration @@ -112,8 +113,8 @@ var Defaults = func(o *Config) error { o.EnableValues = true o.QueryPeerFilter = EmptyQueryFilter - o.RoutingTable.LatencyTolerance = time.Minute - o.RoutingTable.RefreshQueryTimeout = 1 * time.Minute + o.RoutingTable.LatencyTolerance = 10 * time.Second + o.RoutingTable.RefreshQueryTimeout = 10 * time.Second o.RoutingTable.RefreshInterval = 10 * time.Minute o.RoutingTable.AutoRefresh = true o.RoutingTable.PeerFilter = EmptyRTFilter @@ -123,6 +124,7 @@ var Defaults = func(o *Config) error { o.BucketSize = defaultBucketSize o.Concurrency = 10 o.Resiliency = 3 + o.LookupCheckConcurrency = 256 // MAGIC: It makes sense to set it to a multiple of OptProvReturnRatio * BucketSize. We chose a multiple of 4. o.OptimisticProvideJobsPoolSize = 60 From 701721fa1144041df596a39cac15a9eb1d310bf3 Mon Sep 17 00:00:00 2001 From: Jorropo Date: Thu, 15 Jun 2023 02:48:16 +0200 Subject: [PATCH 2/5] fix: bump kbucket for abba bug --- go.mod | 2 +- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index 28aef81ab..a237e6026 100644 --- a/go.mod +++ b/go.mod @@ -15,7 +15,7 @@ require ( github.com/ipfs/go-log v1.0.5 github.com/jbenet/goprocess v0.1.4 github.com/libp2p/go-libp2p v0.27.6 - github.com/libp2p/go-libp2p-kbucket v0.6.2 + github.com/libp2p/go-libp2p-kbucket v0.6.3-0.20230615004129-e99cd472ed1e github.com/libp2p/go-libp2p-record v0.2.0 github.com/libp2p/go-libp2p-routing-helpers v0.7.0 github.com/libp2p/go-libp2p-testing v0.12.0 diff --git a/go.sum b/go.sum index d0b0404fd..a1b9ccc54 100644 --- a/go.sum +++ b/go.sum @@ -278,8 +278,8 @@ github.com/libp2p/go-libp2p-asn-util v0.3.0/go.mod h1:B1mcOrKUE35Xq/ASTmQ4tN3LNz github.com/libp2p/go-libp2p-core v0.2.4/go.mod h1:STh4fdfa5vDYr0/SzYYeqnt+E6KfEV5VxfIrm0bcI0g= github.com/libp2p/go-libp2p-core v0.3.0/go.mod h1:ACp3DmS3/N64c2jDzcV429ukDpicbL6+TrrxANBjPGw= github.com/libp2p/go-libp2p-kbucket v0.3.1/go.mod h1:oyjT5O7tS9CQurok++ERgc46YLwEpuGoFq9ubvoUOio= -github.com/libp2p/go-libp2p-kbucket v0.6.2 h1:JI5MDqQJJS9wmjs1/2fmmnToSxrepV4ScqwizXCCH4E= -github.com/libp2p/go-libp2p-kbucket v0.6.2/go.mod h1:RCseT7AH6eJWxxk2ol03xtP9pEHetYSPXOaJnOiD8i0= +github.com/libp2p/go-libp2p-kbucket v0.6.3-0.20230615004129-e99cd472ed1e h1:E5ZSk+DxxsXoszjKZYtZBJoQ6LnwGCjq4zVIINZaaT4= +github.com/libp2p/go-libp2p-kbucket v0.6.3-0.20230615004129-e99cd472ed1e/go.mod h1:RCseT7AH6eJWxxk2ol03xtP9pEHetYSPXOaJnOiD8i0= github.com/libp2p/go-libp2p-peerstore v0.1.4/go.mod h1:+4BDbDiiKf4PzpANZDAT+knVdLxvqh7hXOujessqdzs= github.com/libp2p/go-libp2p-record v0.2.0 h1:oiNUOCWno2BFuxt3my4i1frNrt7PerzB3queqa1NkQ0= github.com/libp2p/go-libp2p-record v0.2.0/go.mod h1:I+3zMkvvg5m2OcSdoL0KPljyJyvNDFGKX7QdlpYUcwk= From cbe39cd7dfc6783002f087bc4ddf9ac1ce43b158 Mon Sep 17 00:00:00 2001 From: Jorropo Date: Thu, 15 Jun 2023 02:47:17 +0200 Subject: [PATCH 3/5] refactor: remove goprocess --- dht.go | 214 ++++++++++++++-------------- dht_bootstrap_test.go | 4 +- dht_test.go | 73 +++++----- fullrt/dht.go | 5 +- go.mod | 4 +- handlers_test.go | 2 +- internal/net/message_manager.go | 2 +- pb/protocol_messenger.go | 6 + providers/providers_manager.go | 210 ++++++++++++++------------- providers/providers_manager_test.go | 20 +-- query.go | 2 +- rtrefresh/rt_refresh_manager.go | 17 +-- subscriber_notifee.go | 149 +++++++------------ 13 files changed, 328 insertions(+), 380 deletions(-) diff --git a/dht.go b/dht.go index 88f0e899b..da3e74d83 100644 --- a/dht.go +++ b/dht.go @@ -33,11 +33,10 @@ import ( "github.com/gogo/protobuf/proto" ds "github.com/ipfs/go-datastore" logging "github.com/ipfs/go-log" - "github.com/jbenet/goprocess" - goprocessctx "github.com/jbenet/goprocess/context" "github.com/multiformats/go-base32" ma "github.com/multiformats/go-multiaddr" "go.opencensus.io/tag" + "go.uber.org/multierr" "go.uber.org/zap" ) @@ -92,13 +91,12 @@ type IpfsDHT struct { Validator record.Validator - ctx context.Context - proc goprocess.Process + ctx context.Context + cancel context.CancelFunc + wg sync.WaitGroup protoMessenger *pb.ProtocolMessenger - msgSender pb.MessageSender - - plk sync.Mutex + msgSender pb.MessageSenderWithDisconnect stripedPutLocks [256]sync.Mutex @@ -187,7 +185,7 @@ func New(ctx context.Context, h host.Host, options ...Option) (*IpfsDHT, error) return nil, err } - dht, err := makeDHT(ctx, h, cfg) + dht, err := makeDHT(h, cfg) if err != nil { return nil, fmt.Errorf("failed to create DHT, err=%s", err) } @@ -225,30 +223,27 @@ func New(ctx context.Context, h host.Host, options ...Option) (*IpfsDHT, error) } // register for event bus and network notifications - sn, err := newSubscriberNotifiee(dht) - if err != nil { + if err := dht.startNetworkSubscriber(); err != nil { return nil, err } - dht.proc.Go(sn.subscribe) - // handle providers - if mgr, ok := dht.providerStore.(interface{ Process() goprocess.Process }); ok { - dht.proc.AddChild(mgr.Process()) - } // go-routine to make sure we ALWAYS have RT peer addresses in the peerstore // since RT membership is decoupled from connectivity go dht.persistRTPeersInPeerStore() - dht.proc.Go(dht.rtPeerLoop) + dht.rtPeerLoop() // Fill routing table with currently connected peers that are DHT servers - dht.plk.Lock() for _, p := range dht.host.Network().Peers() { - dht.peerFound(dht.ctx, p) + dht.peerFound(p) } - dht.plk.Unlock() - dht.proc.Go(dht.populatePeers) + dht.rtRefreshManager.Start() + + // listens to the fix low peers chan and tries to fix the Routing Table + if !dht.disableFixLowPeers { + dht.runFixLowPeersLoop() + } return dht, nil } @@ -275,7 +270,7 @@ func NewDHTClient(ctx context.Context, h host.Host, dstore ds.Batching) *IpfsDHT return dht } -func makeDHT(ctx context.Context, h host.Host, cfg dhtcfg.Config) (*IpfsDHT, error) { +func makeDHT(h host.Host, cfg dhtcfg.Config) (*IpfsDHT, error) { var protocols, serverProtocols []protocol.ID v1proto := cfg.ProtocolPrefix + kad1 @@ -346,26 +341,19 @@ func makeDHT(ctx context.Context, h host.Host, cfg dhtcfg.Config) (*IpfsDHT, err } // rt refresh manager - rtRefresh, err := makeRtRefreshManager(dht, cfg, maxLastSuccessfulOutboundThreshold) + dht.rtRefreshManager, err = makeRtRefreshManager(dht, cfg, maxLastSuccessfulOutboundThreshold) if err != nil { return nil, fmt.Errorf("failed to construct RT Refresh Manager,err=%s", err) } - dht.rtRefreshManager = rtRefresh - - // create a DHT proc with the given context - dht.proc = goprocessctx.WithContextAndTeardown(ctx, func() error { - return rtRefresh.Close() - }) // create a tagged context derived from the original context - ctxTags := dht.newContextWithLocalTags(ctx) // the DHT context should be done when the process is closed - dht.ctx = goprocessctx.WithProcessClosing(ctxTags, dht.proc) + dht.ctx, dht.cancel = context.WithCancel(dht.newContextWithLocalTags(context.Background())) if cfg.ProviderStore != nil { dht.providerStore = cfg.ProviderStore } else { - dht.providerStore, err = providers.NewProviderManager(dht.ctx, h.ID(), dht.peerstore, cfg.Datastore) + dht.providerStore, err = providers.NewProviderManager(h.ID(), dht.peerstore, cfg.Datastore) if err != nil { return nil, fmt.Errorf("initializing default provider manager (%v)", err) } @@ -468,42 +456,32 @@ func (dht *IpfsDHT) Mode() ModeOpt { return dht.auto } -func (dht *IpfsDHT) populatePeers(_ goprocess.Process) { - if !dht.disableFixLowPeers { - dht.fixLowPeers(dht.ctx) - } +// runFixLowPeersLoop manages simultaneous requests to fixLowPeers +func (dht *IpfsDHT) runFixLowPeersLoop() { + dht.wg.Add(1) + go func() { + defer dht.wg.Done() - if err := dht.rtRefreshManager.Start(); err != nil { - logger.Error(err) - } + dht.fixLowPeers() - // listens to the fix low peers chan and tries to fix the Routing Table - if !dht.disableFixLowPeers { - dht.proc.Go(dht.fixLowPeersRoutine) - } - -} + ticker := time.NewTicker(periodicBootstrapInterval) + defer ticker.Stop() -// fixLowPeersRouting manages simultaneous requests to fixLowPeers -func (dht *IpfsDHT) fixLowPeersRoutine(proc goprocess.Process) { - ticker := time.NewTicker(periodicBootstrapInterval) - defer ticker.Stop() + for { + select { + case <-dht.fixLowPeersChan: + case <-ticker.C: + case <-dht.ctx.Done(): + return + } - for { - select { - case <-dht.fixLowPeersChan: - case <-ticker.C: - case <-proc.Closing(): - return + dht.fixLowPeers() } - - dht.fixLowPeers(dht.Context()) - } - + }() } // fixLowPeers tries to get more peers into the routing table if we're below the threshold -func (dht *IpfsDHT) fixLowPeers(ctx context.Context) { +func (dht *IpfsDHT) fixLowPeers() { if dht.routingTable.Size() > minRTRefreshThreshold { return } @@ -511,7 +489,7 @@ func (dht *IpfsDHT) fixLowPeers(ctx context.Context) { // we try to add all peers we are connected to to the Routing Table // in case they aren't already there. for _, p := range dht.host.Network().Peers() { - dht.peerFound(ctx, p) + dht.peerFound(p) } // TODO Active Bootstrapping @@ -528,7 +506,7 @@ func (dht *IpfsDHT) fixLowPeers(ctx context.Context) { found := 0 for _, i := range rand.Perm(len(bootstrapPeers)) { ai := bootstrapPeers[i] - err := dht.Host().Connect(ctx, ai) + err := dht.Host().Connect(dht.ctx, ai) if err == nil { found++ } else { @@ -613,54 +591,59 @@ func (dht *IpfsDHT) putLocal(ctx context.Context, key string, rec *recpb.Record) return dht.datastore.Put(ctx, mkDsKey(key), data) } -func (dht *IpfsDHT) rtPeerLoop(proc goprocess.Process) { - bootstrapCount := 0 - isBootsrapping := false - var timerCh <-chan time.Time +func (dht *IpfsDHT) rtPeerLoop() { + dht.wg.Add(1) + go func() { + defer dht.wg.Done() + + var bootstrapCount uint + var isBootsrapping bool + var timerCh <-chan time.Time + + for { + select { + case <-timerCh: + dht.routingTable.MarkAllPeersIrreplaceable() + case p := <-dht.addPeerToRTChan: + if dht.routingTable.Size() == 0 { + isBootsrapping = true + bootstrapCount = 0 + timerCh = nil + } + // queryPeer set to true as we only try to add queried peers to the RT + newlyAdded, err := dht.routingTable.TryAddPeer(p, true, isBootsrapping) + if err != nil { + // peer not added. + continue + } + if !newlyAdded { + // the peer is already in our RT, but we just successfully queried it and so let's give it a + // bump on the query time so we don't ping it too soon for a liveliness check. + dht.routingTable.UpdateLastSuccessfulOutboundQueryAt(p, time.Now()) + } + case <-dht.refreshFinishedCh: + bootstrapCount = bootstrapCount + 1 + if bootstrapCount == 2 { + timerCh = time.NewTimer(dht.rtFreezeTimeout).C + } - for { - select { - case <-timerCh: - dht.routingTable.MarkAllPeersIrreplaceable() - case p := <-dht.addPeerToRTChan: - if dht.routingTable.Size() == 0 { - isBootsrapping = true - bootstrapCount = 0 - timerCh = nil - } - // queryPeer set to true as we only try to add queried peers to the RT - newlyAdded, err := dht.routingTable.TryAddPeer(p, true, isBootsrapping) - if err != nil { - // peer not added. - continue - } - if !newlyAdded { - // the peer is already in our RT, but we just successfully queried it and so let's give it a - // bump on the query time so we don't ping it too soon for a liveliness check. - dht.routingTable.UpdateLastSuccessfulOutboundQueryAt(p, time.Now()) - } - case <-dht.refreshFinishedCh: - bootstrapCount = bootstrapCount + 1 - if bootstrapCount == 2 { - timerCh = time.NewTimer(dht.rtFreezeTimeout).C - } + old := isBootsrapping + isBootsrapping = false + if old { + dht.rtRefreshManager.RefreshNoWait() + } - old := isBootsrapping - isBootsrapping = false - if old { - dht.rtRefreshManager.RefreshNoWait() + case <-dht.ctx.Done(): + return } - - case <-proc.Closing(): - return } - } + }() } // peerFound verifies whether the found peer advertises DHT protocols // and probe it to make sure it answers DHT queries as expected. If // it fails to answer, it isn't added to the routingTable. -func (dht *IpfsDHT) peerFound(ctx context.Context, p peer.ID) { +func (dht *IpfsDHT) peerFound(p peer.ID) { // if the peer is already in the routing table or the appropriate bucket is // already full, don't try to add the new peer.ID if !dht.routingTable.UsefulNewPeer(p) { @@ -685,7 +668,7 @@ func (dht *IpfsDHT) peerFound(ctx context.Context, p peer.ID) { dht.lookupChecksLk.Unlock() go func() { - livelinessCtx, cancel := context.WithTimeout(ctx, dht.lookupCheckTimeout) + livelinessCtx, cancel := context.WithTimeout(dht.ctx, dht.lookupCheckTimeout) defer cancel() // performing a FIND_NODE query @@ -701,14 +684,14 @@ func (dht *IpfsDHT) peerFound(ctx context.Context, p peer.ID) { } // if the FIND_NODE succeeded, the peer is considered as valid - dht.validPeerFound(ctx, p) + dht.validPeerFound(p) }() } } // validPeerFound signals the routingTable that we've found a peer that // supports the DHT protocol, and just answered correctly to a DHT FindPeers -func (dht *IpfsDHT) validPeerFound(ctx context.Context, p peer.ID) { +func (dht *IpfsDHT) validPeerFound(p peer.ID) { if c := baseLogger.Check(zap.DebugLevel, "peer found"); c != nil { c.Write(zap.String("peer", p.String())) } @@ -852,11 +835,6 @@ func (dht *IpfsDHT) Context() context.Context { return dht.ctx } -// Process returns the DHT's process. -func (dht *IpfsDHT) Process() goprocess.Process { - return dht.proc -} - // RoutingTable returns the DHT's routingTable. func (dht *IpfsDHT) RoutingTable() *kb.RoutingTable { return dht.routingTable @@ -864,7 +842,25 @@ func (dht *IpfsDHT) RoutingTable() *kb.RoutingTable { // Close calls Process Close. func (dht *IpfsDHT) Close() error { - return dht.proc.Close() + dht.cancel() + dht.wg.Wait() + + var wg sync.WaitGroup + closes := [...]func() error{ + dht.rtRefreshManager.Close, + dht.providerStore.Close, + } + var errors [len(closes)]error + wg.Add(len(errors)) + for i, c := range closes { + go func(i int, c func() error) { + defer wg.Done() + errors[i] = c() + }(i, c) + } + wg.Wait() + + return multierr.Combine(errors[:]...) } func mkDsKey(s string) ds.Key { diff --git a/dht_bootstrap_test.go b/dht_bootstrap_test.go index 9dd496c0a..e2236f5a1 100644 --- a/dht_bootstrap_test.go +++ b/dht_bootstrap_test.go @@ -191,8 +191,8 @@ func TestBootstrappersReplacable(t *testing.T) { require.NoError(t, d.host.Network().ClosePeer(d5.self)) connectNoSync(t, ctx, d, d1) connectNoSync(t, ctx, d, d5) - d.peerFound(ctx, d5.self) - d.peerFound(ctx, d1.self) + d.peerFound(d5.self) + d.peerFound(d1.self) time.Sleep(1 * time.Second) require.Len(t, d.routingTable.ListPeers(), 2) diff --git a/dht_test.go b/dht_test.go index 5f028d540..ab02b8869 100644 --- a/dht_test.go +++ b/dht_test.go @@ -166,9 +166,7 @@ func connectNoSync(t *testing.T, ctx context.Context, a, b *IpfsDHT) { t.Fatal("peers setup incorrectly: no local address") } - a.peerstore.AddAddrs(idB, addrB, peerstore.TempAddrTTL) - pi := peer.AddrInfo{ID: idB} - if err := a.host.Connect(ctx, pi); err != nil { + if err := a.host.Connect(ctx, peer.AddrInfo{ID: idB, Addrs: addrB}); err != nil { t.Fatal(err) } } @@ -273,6 +271,7 @@ func TestValueGetSet(t *testing.T) { defer dhts[i].host.Close() } + t.Log("before connect") connect(t, ctx, dhts[0], dhts[1]) t.Log("adding value on: ", dhts[0].self) @@ -291,13 +290,13 @@ func TestValueGetSet(t *testing.T) { if err != nil { t.Fatal(err) } + t.Log("after get value") if string(val) != "world" { t.Fatalf("Expected 'world' got '%s'", string(val)) } - // late connect - + t.Log("late connect") connect(t, ctx, dhts[2], dhts[0]) connect(t, ctx, dhts[2], dhts[1]) @@ -320,6 +319,7 @@ func TestValueGetSet(t *testing.T) { t.Fatalf("Expected 'world' got '%s'", string(val)) } + t.Log("very late connect") for _, d := range dhts[:3] { connect(t, ctx, dhts[3], d) } @@ -610,25 +610,6 @@ func waitForWellFormedTables(t *testing.T, dhts []*IpfsDHT, minPeers, avgPeers i // test "well-formed-ness" (>= minPeers peers in every routing table) t.Helper() - checkTables := func() bool { - totalPeers := 0 - for _, dht := range dhts { - rtlen := dht.routingTable.Size() - totalPeers += rtlen - if minPeers > 0 && rtlen < minPeers { - // t.Logf("routing table for %s only has %d peers (should have >%d)", dht.self, rtlen, minPeers) - return false - } - } - actualAvgPeers := totalPeers / len(dhts) - t.Logf("avg rt size: %d", actualAvgPeers) - if avgPeers > 0 && actualAvgPeers < avgPeers { - t.Logf("avg rt size: %d < %d", actualAvgPeers, avgPeers) - return false - } - return true - } - timeoutA := time.After(timeout) for { select { @@ -636,7 +617,7 @@ func waitForWellFormedTables(t *testing.T, dhts []*IpfsDHT, minPeers, avgPeers i t.Errorf("failed to reach well-formed routing tables after %s", timeout) return case <-time.After(5 * time.Millisecond): - if checkTables() { + if checkForWellFormedTablesOnce(t, dhts, minPeers, avgPeers) { // succeeded return } @@ -644,6 +625,26 @@ func waitForWellFormedTables(t *testing.T, dhts []*IpfsDHT, minPeers, avgPeers i } } +func checkForWellFormedTablesOnce(t *testing.T, dhts []*IpfsDHT, minPeers, avgPeers int) bool { + t.Helper() + totalPeers := 0 + for _, dht := range dhts { + rtlen := dht.routingTable.Size() + totalPeers += rtlen + if minPeers > 0 && rtlen < minPeers { + t.Logf("routing table for %s only has %d peers (should have >%d)", dht.self, rtlen, minPeers) + return false + } + } + actualAvgPeers := totalPeers / len(dhts) + t.Logf("avg rt size: %d", actualAvgPeers) + if avgPeers > 0 && actualAvgPeers < avgPeers { + t.Logf("avg rt size: %d < %d", actualAvgPeers, avgPeers) + return false + } + return true +} + func printRoutingTables(dhts []*IpfsDHT) { // the routing tables should be full now. let's inspect them. fmt.Printf("checking routing table of %d\n", len(dhts)) @@ -679,24 +680,16 @@ func TestRefresh(t *testing.T) { <-time.After(100 * time.Millisecond) // bootstrap a few times until we get good tables. t.Logf("bootstrapping them so they find each other %d", nDHTs) - ctxT, cancelT := context.WithTimeout(ctx, 5*time.Second) - defer cancelT() - for ctxT.Err() == nil { - bootstrap(t, ctxT, dhts) + for { + bootstrap(t, ctx, dhts) - // wait a bit. - select { - case <-time.After(50 * time.Millisecond): - continue // being explicit - case <-ctxT.Done(): - return + if checkForWellFormedTablesOnce(t, dhts, 7, 10) { + break } - } - - waitForWellFormedTables(t, dhts, 7, 10, 10*time.Second) - cancelT() + time.Sleep(time.Microsecond * 50) + } if u.Debug { // the routing tables should be full now. let's inspect them. @@ -2121,7 +2114,7 @@ func TestBootstrapPeersFunc(t *testing.T) { bootstrapPeersB = []peer.AddrInfo{addrA} lock.Unlock() - dhtB.fixLowPeers(ctx) + dhtB.fixLowPeers() require.NotEqual(t, 0, len(dhtB.host.Network().Peers())) } diff --git a/fullrt/dht.go b/fullrt/dht.go index f1a26e70a..3b0cd3e94 100644 --- a/fullrt/dht.go +++ b/fullrt/dht.go @@ -151,7 +151,7 @@ func NewFullRT(h host.Host, protocolPrefix protocol.ID, options ...Option) (*Ful ctx, cancel := context.WithCancel(context.Background()) self := h.ID() - pm, err := providers.NewProviderManager(ctx, self, h.Peerstore(), dhtcfg.Datastore, fullrtcfg.pmOpts...) + pm, err := providers.NewProviderManager(self, h.Peerstore(), dhtcfg.Datastore, fullrtcfg.pmOpts...) if err != nil { cancel() return nil, err @@ -355,9 +355,8 @@ func (dht *FullRT) runCrawler(ctx context.Context) { func (dht *FullRT) Close() error { dht.cancel() - err := dht.ProviderManager.Process().Close() dht.wg.Wait() - return err + return dht.ProviderManager.Close() } func (dht *FullRT) Bootstrap(ctx context.Context) error { diff --git a/go.mod b/go.mod index a237e6026..fa233ef4a 100644 --- a/go.mod +++ b/go.mod @@ -13,7 +13,6 @@ require ( github.com/ipfs/go-datastore v0.6.0 github.com/ipfs/go-detect-race v0.0.1 github.com/ipfs/go-log v1.0.5 - github.com/jbenet/goprocess v0.1.4 github.com/libp2p/go-libp2p v0.27.6 github.com/libp2p/go-libp2p-kbucket v0.6.3-0.20230615004129-e99cd472ed1e github.com/libp2p/go-libp2p-record v0.2.0 @@ -32,6 +31,7 @@ require ( go.opencensus.io v0.24.0 go.opentelemetry.io/otel v1.16.0 go.opentelemetry.io/otel/trace v1.16.0 + go.uber.org/multierr v1.11.0 go.uber.org/zap v1.24.0 gonum.org/v1/gonum v0.13.0 ) @@ -63,6 +63,7 @@ require ( github.com/ipld/go-ipld-prime v0.20.0 // indirect github.com/jackpal/go-nat-pmp v1.0.2 // indirect github.com/jbenet/go-temp-err-catcher v0.1.0 // indirect + github.com/jbenet/goprocess v0.1.4 // indirect github.com/klauspost/compress v1.16.5 // indirect github.com/klauspost/cpuid/v2 v2.2.5 // indirect github.com/koron/go-ssdp v0.0.4 // indirect @@ -108,7 +109,6 @@ require ( go.uber.org/atomic v1.11.0 // indirect go.uber.org/dig v1.17.0 // indirect go.uber.org/fx v1.19.2 // indirect - go.uber.org/multierr v1.11.0 // indirect golang.org/x/crypto v0.10.0 // indirect golang.org/x/exp v0.0.0-20230321023759-10a507213a29 // indirect golang.org/x/mod v0.10.0 // indirect diff --git a/handlers_test.go b/handlers_test.go index d829e38b1..35959df62 100644 --- a/handlers_test.go +++ b/handlers_test.go @@ -111,7 +111,7 @@ func BenchmarkHandleFindPeer(b *testing.B) { panic(err) } - d.peerFound(ctx, id) + d.peerFound(id) peers = append(peers, id) a, err := ma.NewMultiaddr(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", 2000+i)) diff --git a/internal/net/message_manager.go b/internal/net/message_manager.go index f04dd0889..7908be2bd 100644 --- a/internal/net/message_manager.go +++ b/internal/net/message_manager.go @@ -43,7 +43,7 @@ type messageSenderImpl struct { protocols []protocol.ID } -func NewMessageSenderImpl(h host.Host, protos []protocol.ID) pb.MessageSender { +func NewMessageSenderImpl(h host.Host, protos []protocol.ID) pb.MessageSenderWithDisconnect { return &messageSenderImpl{ host: h, strmap: make(map[peer.ID]*peerMessageSender), diff --git a/pb/protocol_messenger.go b/pb/protocol_messenger.go index e175dde10..48aba3e35 100644 --- a/pb/protocol_messenger.go +++ b/pb/protocol_messenger.go @@ -45,6 +45,12 @@ func NewProtocolMessenger(msgSender MessageSender, opts ...ProtocolMessengerOpti return pm, nil } +type MessageSenderWithDisconnect interface { + MessageSender + + OnDisconnect(context.Context, peer.ID) +} + // MessageSender handles sending wire protocol messages to a given peer type MessageSender interface { // SendRequest sends a peer a message and waits for its response diff --git a/providers/providers_manager.go b/providers/providers_manager.go index f2a7ad17c..b7f1d7d90 100644 --- a/providers/providers_manager.go +++ b/providers/providers_manager.go @@ -4,7 +4,9 @@ import ( "context" "encoding/binary" "fmt" + "io" "strings" + "sync" "time" lru "github.com/hashicorp/golang-lru/simplelru" @@ -12,8 +14,6 @@ import ( "github.com/ipfs/go-datastore/autobatch" dsq "github.com/ipfs/go-datastore/query" logging "github.com/ipfs/go-log" - "github.com/jbenet/goprocess" - goprocessctx "github.com/jbenet/goprocess/context" "github.com/libp2p/go-libp2p-kad-dht/internal" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/peerstore" @@ -45,6 +45,7 @@ var log = logging.Logger("providers") type ProviderStore interface { AddProvider(ctx context.Context, key []byte, prov peer.AddrInfo) error GetProviders(ctx context.Context, key []byte) ([]peer.AddrInfo, error) + io.Closer } // ProviderManager adds and pulls providers out of the datastore, @@ -59,9 +60,12 @@ type ProviderManager struct { newprovs chan *addProv getprovs chan *getProv - proc goprocess.Process cleanupInterval time.Duration + + ctx context.Context + cancel context.CancelFunc + wg sync.WaitGroup } var _ ProviderStore = (*ProviderManager)(nil) @@ -109,7 +113,7 @@ type getProv struct { } // NewProviderManager constructor -func NewProviderManager(ctx context.Context, local peer.ID, ps peerstore.Peerstore, dstore ds.Batching, opts ...Option) (*ProviderManager, error) { +func NewProviderManager(local peer.ID, ps peerstore.Peerstore, dstore ds.Batching, opts ...Option) (*ProviderManager, error) { pm := new(ProviderManager) pm.self = local pm.getprovs = make(chan *getProv) @@ -125,117 +129,121 @@ func NewProviderManager(ctx context.Context, local peer.ID, ps peerstore.Peersto if err := pm.applyOptions(opts...); err != nil { return nil, err } - pm.proc = goprocessctx.WithContext(ctx) - pm.proc.Go(func(proc goprocess.Process) { pm.run(ctx, proc) }) + pm.ctx, pm.cancel = context.WithCancel(context.Background()) + pm.run() return pm, nil } -// Process returns the ProviderManager process -func (pm *ProviderManager) Process() goprocess.Process { - return pm.proc -} +func (pm *ProviderManager) run() { + pm.wg.Add(1) + go func() { + defer pm.wg.Done() -func (pm *ProviderManager) run(ctx context.Context, proc goprocess.Process) { - var ( - gcQuery dsq.Results - gcQueryRes <-chan dsq.Result - gcSkip map[string]struct{} - gcTime time.Time - gcTimer = time.NewTimer(pm.cleanupInterval) - ) - - defer func() { - gcTimer.Stop() - if gcQuery != nil { - // don't really care if this fails. - _ = gcQuery.Close() - } - if err := pm.dstore.Flush(ctx); err != nil { - log.Error("failed to flush datastore: ", err) - } - }() + var gcQuery dsq.Results + gcTimer := time.NewTimer(pm.cleanupInterval) - for { - select { - case np := <-pm.newprovs: - err := pm.addProv(np.ctx, np.key, np.val) - if err != nil { - log.Error("error adding new providers: ", err) - continue + defer func() { + gcTimer.Stop() + if gcQuery != nil { + // don't really care if this fails. + _ = gcQuery.Close() } - if gcSkip != nil { - // we have an gc, tell it to skip this provider - // as we've updated it since the GC started. - gcSkip[mkProvKeyFor(np.key, np.val)] = struct{}{} + if err := pm.dstore.Flush(context.Background()); err != nil { + log.Error("failed to flush datastore: ", err) } - case gp := <-pm.getprovs: - provs, err := pm.getProvidersForKey(gp.ctx, gp.key) - if err != nil && err != ds.ErrNotFound { - log.Error("error reading providers: ", err) - } - - // set the cap so the user can't append to this. - gp.resp <- provs[0:len(provs):len(provs)] - case res, ok := <-gcQueryRes: - if !ok { - if err := gcQuery.Close(); err != nil { - log.Error("failed to close provider GC query: ", err) + }() + + var gcQueryRes <-chan dsq.Result + var gcSkip map[string]struct{} + var gcTime time.Time + for { + select { + case np := <-pm.newprovs: + err := pm.addProv(np.ctx, np.key, np.val) + if err != nil { + log.Error("error adding new providers: ", err) + continue + } + if gcSkip != nil { + // we have an gc, tell it to skip this provider + // as we've updated it since the GC started. + gcSkip[mkProvKeyFor(np.key, np.val)] = struct{}{} + } + case gp := <-pm.getprovs: + provs, err := pm.getProvidersForKey(gp.ctx, gp.key) + if err != nil && err != ds.ErrNotFound { + log.Error("error reading providers: ", err) } - gcTimer.Reset(pm.cleanupInterval) - // cleanup GC round - gcQueryRes = nil - gcSkip = nil - gcQuery = nil - continue - } - if res.Error != nil { - log.Error("got error from GC query: ", res.Error) - continue - } - if _, ok := gcSkip[res.Key]; ok { - // We've updated this record since starting the - // GC round, skip it. - continue - } + // set the cap so the user can't append to this. + gp.resp <- provs[0:len(provs):len(provs)] + case res, ok := <-gcQueryRes: + if !ok { + if err := gcQuery.Close(); err != nil { + log.Error("failed to close provider GC query: ", err) + } + gcTimer.Reset(pm.cleanupInterval) + + // cleanup GC round + gcQueryRes = nil + gcSkip = nil + gcQuery = nil + continue + } + if res.Error != nil { + log.Error("got error from GC query: ", res.Error) + continue + } + if _, ok := gcSkip[res.Key]; ok { + // We've updated this record since starting the + // GC round, skip it. + continue + } - // check expiration time - t, err := readTimeValue(res.Value) - switch { - case err != nil: - // couldn't parse the time - log.Error("parsing providers record from disk: ", err) - fallthrough - case gcTime.Sub(t) > ProvideValidity: - // or expired - err = pm.dstore.Delete(ctx, ds.RawKey(res.Key)) - if err != nil && err != ds.ErrNotFound { - log.Error("failed to remove provider record from disk: ", err) + // check expiration time + t, err := readTimeValue(res.Value) + switch { + case err != nil: + // couldn't parse the time + log.Error("parsing providers record from disk: ", err) + fallthrough + case gcTime.Sub(t) > ProvideValidity: + // or expired + err = pm.dstore.Delete(pm.ctx, ds.RawKey(res.Key)) + if err != nil && err != ds.ErrNotFound { + log.Error("failed to remove provider record from disk: ", err) + } } - } - case gcTime = <-gcTimer.C: - // You know the wonderful thing about caches? You can - // drop them. - // - // Much faster than GCing. - pm.cache.Purge() - - // Now, kick off a GC of the datastore. - q, err := pm.dstore.Query(ctx, dsq.Query{ - Prefix: ProvidersKeyPrefix, - }) - if err != nil { - log.Error("provider record GC query failed: ", err) - continue + case gcTime = <-gcTimer.C: + // You know the wonderful thing about caches? You can + // drop them. + // + // Much faster than GCing. + pm.cache.Purge() + + // Now, kick off a GC of the datastore. + q, err := pm.dstore.Query(pm.ctx, dsq.Query{ + Prefix: ProvidersKeyPrefix, + }) + if err != nil { + log.Error("provider record GC query failed: ", err) + continue + } + gcQuery = q + gcQueryRes = q.Next() + gcSkip = make(map[string]struct{}) + case <-pm.ctx.Done(): + return } - gcQuery = q - gcQueryRes = q.Next() - gcSkip = make(map[string]struct{}) - case <-proc.Closing(): - return } - } + }() +} + +func (pm *ProviderManager) Close() error { + pm.cancel() + pm.wg.Wait() + return nil } // AddProvider adds a provider diff --git a/providers/providers_manager_test.go b/providers/providers_manager_test.go index ba238a59e..e830929ef 100644 --- a/providers/providers_manager_test.go +++ b/providers/providers_manager_test.go @@ -31,7 +31,7 @@ func TestProviderManager(t *testing.T) { if err != nil { t.Fatal(err) } - p, err := NewProviderManager(ctx, mid, ps, dssync.MutexWrap(ds.NewMapDatastore())) + p, err := NewProviderManager(mid, ps, dssync.MutexWrap(ds.NewMapDatastore())) if err != nil { t.Fatal(err) } @@ -60,7 +60,7 @@ func TestProviderManager(t *testing.T) { t.Fatalf("Should have got 3 providers, got %d", len(resp)) } - p.proc.Close() + p.Close() } func TestProvidersDatastore(t *testing.T) { @@ -77,11 +77,11 @@ func TestProvidersDatastore(t *testing.T) { t.Fatal(err) } - p, err := NewProviderManager(ctx, mid, ps, dssync.MutexWrap(ds.NewMapDatastore())) + p, err := NewProviderManager(mid, ps, dssync.MutexWrap(ds.NewMapDatastore())) if err != nil { t.Fatal(err) } - defer p.proc.Close() + defer p.Close() friend := peer.ID("friend") var mhs []mh.Multihash @@ -166,7 +166,7 @@ func TestProvidesExpire(t *testing.T) { if err != nil { t.Fatal(err) } - p, err := NewProviderManager(ctx, mid, ps, ds) + p, err := NewProviderManager(mid, ps, ds) if err != nil { t.Fatal(err) } @@ -216,7 +216,7 @@ func TestProvidesExpire(t *testing.T) { time.Sleep(time.Second / 2) // Stop to prevent data races - p.Process().Close() + p.Close() if p.cache.Len() != 0 { t.Fatal("providers map not cleaned up") @@ -278,11 +278,11 @@ func TestLargeProvidersSet(t *testing.T) { t.Fatal(err) } - p, err := NewProviderManager(ctx, mid, ps, dstore) + p, err := NewProviderManager(mid, ps, dstore) if err != nil { t.Fatal(err) } - defer p.proc.Close() + defer p.Close() var mhs []mh.Multihash for i := 0; i < 1000; i++ { @@ -318,7 +318,7 @@ func TestUponCacheMissProvidersAreReadFromDatastore(t *testing.T) { t.Fatal(err) } - pm, err := NewProviderManager(ctx, p1, ps, dssync.MutexWrap(ds.NewMapDatastore())) + pm, err := NewProviderManager(p1, ps, dssync.MutexWrap(ds.NewMapDatastore())) if err != nil { t.Fatal(err) } @@ -347,7 +347,7 @@ func TestWriteUpdatesCache(t *testing.T) { t.Fatal(err) } - pm, err := NewProviderManager(ctx, p1, ps, dssync.MutexWrap(ds.NewMapDatastore())) + pm, err := NewProviderManager(p1, ps, dssync.MutexWrap(ds.NewMapDatastore())) if err != nil { t.Fatal(err) } diff --git a/query.go b/query.go index c8b07d650..524269aec 100644 --- a/query.go +++ b/query.go @@ -446,7 +446,7 @@ func (q *query) queryPeer(ctx context.Context, ch chan<- *queryUpdate, p peer.ID queryDuration := time.Since(startQuery) // query successful, try to add to RT - q.dht.validPeerFound(q.dht.ctx, p) + q.dht.validPeerFound(p) // process new peers saw := []peer.ID{} diff --git a/rtrefresh/rt_refresh_manager.go b/rtrefresh/rt_refresh_manager.go index d08983702..6de69b026 100644 --- a/rtrefresh/rt_refresh_manager.go +++ b/rtrefresh/rt_refresh_manager.go @@ -31,10 +31,9 @@ type triggerRefreshReq struct { } type RtRefreshManager struct { - ctx context.Context - cancel context.CancelFunc - refcount sync.WaitGroup - closeOnce sync.Once + ctx context.Context + cancel context.CancelFunc + refcount sync.WaitGroup // peerId of this DHT peer i.e. self peerId. h host.Host @@ -89,17 +88,14 @@ func NewRtRefreshManager(h host.Host, rt *kbucket.RoutingTable, autoRefresh bool }, nil } -func (r *RtRefreshManager) Start() error { +func (r *RtRefreshManager) Start() { r.refcount.Add(1) go r.loop() - return nil } func (r *RtRefreshManager) Close() error { - r.closeOnce.Do(func() { - r.cancel() - r.refcount.Wait() - }) + r.cancel() + r.refcount.Wait() return nil } @@ -117,6 +113,7 @@ func (r *RtRefreshManager) Refresh(force bool) <-chan error { case r.triggerRefresh <- &triggerRefreshReq{respCh: resp, forceCplRefresh: force}: case <-r.ctx.Done(): resp <- r.ctx.Err() + close(resp) } }() diff --git a/subscriber_notifee.go b/subscriber_notifee.go index 23c21ffb9..759db76c6 100644 --- a/subscriber_notifee.go +++ b/subscriber_notifee.go @@ -1,27 +1,15 @@ package dht import ( - "context" "fmt" "github.com/libp2p/go-libp2p/core/event" "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/p2p/host/eventbus" - - "github.com/jbenet/goprocess" - ma "github.com/multiformats/go-multiaddr" ) -// subscriberNotifee implements network.Notifee and also manages the subscriber to the event bus. We consume peer -// identification events to trigger inclusion in the routing table, and we consume Disconnected events to eject peers -// from it. -type subscriberNotifee struct { - dht *IpfsDHT - subs event.Subscription -} - -func newSubscriberNotifiee(dht *IpfsDHT) (*subscriberNotifee, error) { +func (dht *IpfsDHT) startNetworkSubscriber() error { bufSize := eventbus.BufSize(256) evts := []interface{}{ @@ -35,6 +23,9 @@ func newSubscriberNotifiee(dht *IpfsDHT) (*subscriberNotifee, error) { // register for event bus notifications for when our local address/addresses change so we can // advertise those to the network new(event.EvtLocalAddressesUpdated), + + // we want to know when we are disconnecting from other peers. + new(event.EvtPeerConnectednessChanged), } // register for event bus local routability changes in order to trigger switching between client and server modes @@ -45,61 +36,57 @@ func newSubscriberNotifiee(dht *IpfsDHT) (*subscriberNotifee, error) { subs, err := dht.host.EventBus().Subscribe(evts, bufSize) if err != nil { - return nil, fmt.Errorf("dht could not subscribe to eventbus events; err: %s", err) + return fmt.Errorf("dht could not subscribe to eventbus events: %w", err) } - nn := &subscriberNotifee{ - dht: dht, - subs: subs, - } - - // register for network notifications - dht.host.Network().Notify(nn) - - return nn, nil -} - -func (nn *subscriberNotifee) subscribe(proc goprocess.Process) { - dht := nn.dht - defer dht.host.Network().StopNotify(nn) - defer nn.subs.Close() - - for { - select { - case e, more := <-nn.subs.Out(): - if !more { - return - } + dht.wg.Add(1) + go func() { + defer dht.wg.Done() + defer subs.Close() - switch evt := e.(type) { - case event.EvtLocalAddressesUpdated: - // when our address changes, we should proactively tell our closest peers about it so - // we become discoverable quickly. The Identify protocol will push a signed peer record - // with our new address to all peers we are connected to. However, we might not necessarily be connected - // to our closet peers & so in the true spirit of Zen, searching for ourself in the network really is the best way - // to to forge connections with those matter. - if dht.autoRefresh || dht.testAddressUpdateProcessing { - dht.rtRefreshManager.RefreshNoWait() + for { + select { + case e, more := <-subs.Out(): + if !more { + return } - case event.EvtPeerProtocolsUpdated: - handlePeerChangeEvent(dht, evt.Peer) - case event.EvtPeerIdentificationCompleted: - handlePeerChangeEvent(dht, evt.Peer) - case event.EvtLocalReachabilityChanged: - if dht.auto == ModeAuto || dht.auto == ModeAutoServer { - handleLocalReachabilityChangedEvent(dht, evt) - } else { - // something has gone really wrong if we get an event we did not subscribe to - logger.Errorf("received LocalReachabilityChanged event that was not subscribed to") + + switch evt := e.(type) { + case event.EvtLocalAddressesUpdated: + // when our address changes, we should proactively tell our closest peers about it so + // we become discoverable quickly. The Identify protocol will push a signed peer record + // with our new address to all peers we are connected to. However, we might not necessarily be connected + // to our closet peers & so in the true spirit of Zen, searching for ourself in the network really is the best way + // to to forge connections with those matter. + if dht.autoRefresh || dht.testAddressUpdateProcessing { + dht.rtRefreshManager.RefreshNoWait() + } + case event.EvtPeerProtocolsUpdated: + handlePeerChangeEvent(dht, evt.Peer) + case event.EvtPeerIdentificationCompleted: + handlePeerChangeEvent(dht, evt.Peer) + case event.EvtPeerConnectednessChanged: + if evt.Connectedness != network.Connected { + dht.msgSender.OnDisconnect(dht.ctx, evt.Peer) + } + case event.EvtLocalReachabilityChanged: + if dht.auto == ModeAuto || dht.auto == ModeAutoServer { + handleLocalReachabilityChangedEvent(dht, evt) + } else { + // something has gone really wrong if we get an event we did not subscribe to + logger.Errorf("received LocalReachabilityChanged event that was not subscribed to") + } + default: + // something has gone really wrong if we get an event for another type + logger.Errorf("got wrong type from subscription: %T", e) } - default: - // something has gone really wrong if we get an event for another type - logger.Errorf("got wrong type from subscription: %T", e) + case <-dht.ctx.Done(): + return } - case <-proc.Closing(): - return } - } + }() + + return nil } func handlePeerChangeEvent(dht *IpfsDHT, p peer.ID) { @@ -108,7 +95,7 @@ func handlePeerChangeEvent(dht *IpfsDHT, p peer.ID) { logger.Errorf("could not check peerstore for protocol support: err: %s", err) return } else if valid { - dht.peerFound(dht.ctx, p) + dht.peerFound(p) dht.fixRTIfNeeded() } else { dht.peerStoppedDHT(p) @@ -153,41 +140,3 @@ func (dht *IpfsDHT) validRTPeer(p peer.ID) (bool, error) { return dht.routingTablePeerFilter == nil || dht.routingTablePeerFilter(dht, p), nil } - -type disconnector interface { - OnDisconnect(ctx context.Context, p peer.ID) -} - -func (nn *subscriberNotifee) Disconnected(n network.Network, v network.Conn) { - dht := nn.dht - - ms, ok := dht.msgSender.(disconnector) - if !ok { - return - } - - select { - case <-dht.Process().Closing(): - return - default: - } - - p := v.RemotePeer() - - // Lock and check to see if we're still connected. We lock to make sure - // we don't concurrently process a connect event. - dht.plk.Lock() - defer dht.plk.Unlock() - if dht.host.Network().Connectedness(p) == network.Connected { - // We're still connected. - return - } - - ms.OnDisconnect(dht.Context(), p) -} - -func (nn *subscriberNotifee) Connected(network.Network, network.Conn) {} -func (nn *subscriberNotifee) OpenedStream(network.Network, network.Stream) {} -func (nn *subscriberNotifee) ClosedStream(network.Network, network.Stream) {} -func (nn *subscriberNotifee) Listen(network.Network, ma.Multiaddr) {} -func (nn *subscriberNotifee) ListenClose(network.Network, ma.Multiaddr) {} From 2eee8535129ae0743865fec68161bbf46cabfbc7 Mon Sep 17 00:00:00 2001 From: guillaumemichel Date: Thu, 15 Jun 2023 08:13:56 +0200 Subject: [PATCH 4/5] fix: decrease tests noise, update kbucket and fix fixRTIUfNeeded --- dht.go | 5 ++++- dht_test.go | 6 +----- go.mod | 2 +- go.sum | 4 ++-- subscriber_notifee.go | 1 - 5 files changed, 8 insertions(+), 10 deletions(-) diff --git a/dht.go b/dht.go index da3e74d83..ac69db38f 100644 --- a/dht.go +++ b/dht.go @@ -616,7 +616,10 @@ func (dht *IpfsDHT) rtPeerLoop() { // peer not added. continue } - if !newlyAdded { + if newlyAdded { + // peer was added to the RT, it can now be fixed if needed. + dht.fixRTIfNeeded() + } else { // the peer is already in our RT, but we just successfully queried it and so let's give it a // bump on the query time so we don't ping it too soon for a liveliness check. dht.routingTable.UpdateLastSuccessfulOutboundQueryAt(p, time.Now()) diff --git a/dht_test.go b/dht_test.go index ab02b8869..b643adbdd 100644 --- a/dht_test.go +++ b/dht_test.go @@ -271,7 +271,6 @@ func TestValueGetSet(t *testing.T) { defer dhts[i].host.Close() } - t.Log("before connect") connect(t, ctx, dhts[0], dhts[1]) t.Log("adding value on: ", dhts[0].self) @@ -290,13 +289,11 @@ func TestValueGetSet(t *testing.T) { if err != nil { t.Fatal(err) } - t.Log("after get value") if string(val) != "world" { t.Fatalf("Expected 'world' got '%s'", string(val)) } - t.Log("late connect") connect(t, ctx, dhts[2], dhts[0]) connect(t, ctx, dhts[2], dhts[1]) @@ -319,7 +316,6 @@ func TestValueGetSet(t *testing.T) { t.Fatalf("Expected 'world' got '%s'", string(val)) } - t.Log("very late connect") for _, d := range dhts[:3] { connect(t, ctx, dhts[3], d) } @@ -632,7 +628,7 @@ func checkForWellFormedTablesOnce(t *testing.T, dhts []*IpfsDHT, minPeers, avgPe rtlen := dht.routingTable.Size() totalPeers += rtlen if minPeers > 0 && rtlen < minPeers { - t.Logf("routing table for %s only has %d peers (should have >%d)", dht.self, rtlen, minPeers) + //t.Logf("routing table for %s only has %d peers (should have >%d)", dht.self, rtlen, minPeers) return false } } diff --git a/go.mod b/go.mod index fa233ef4a..08e54b503 100644 --- a/go.mod +++ b/go.mod @@ -14,7 +14,7 @@ require ( github.com/ipfs/go-detect-race v0.0.1 github.com/ipfs/go-log v1.0.5 github.com/libp2p/go-libp2p v0.27.6 - github.com/libp2p/go-libp2p-kbucket v0.6.3-0.20230615004129-e99cd472ed1e + github.com/libp2p/go-libp2p-kbucket v0.6.3 github.com/libp2p/go-libp2p-record v0.2.0 github.com/libp2p/go-libp2p-routing-helpers v0.7.0 github.com/libp2p/go-libp2p-testing v0.12.0 diff --git a/go.sum b/go.sum index a1b9ccc54..8ac139ffc 100644 --- a/go.sum +++ b/go.sum @@ -278,8 +278,8 @@ github.com/libp2p/go-libp2p-asn-util v0.3.0/go.mod h1:B1mcOrKUE35Xq/ASTmQ4tN3LNz github.com/libp2p/go-libp2p-core v0.2.4/go.mod h1:STh4fdfa5vDYr0/SzYYeqnt+E6KfEV5VxfIrm0bcI0g= github.com/libp2p/go-libp2p-core v0.3.0/go.mod h1:ACp3DmS3/N64c2jDzcV429ukDpicbL6+TrrxANBjPGw= github.com/libp2p/go-libp2p-kbucket v0.3.1/go.mod h1:oyjT5O7tS9CQurok++ERgc46YLwEpuGoFq9ubvoUOio= -github.com/libp2p/go-libp2p-kbucket v0.6.3-0.20230615004129-e99cd472ed1e h1:E5ZSk+DxxsXoszjKZYtZBJoQ6LnwGCjq4zVIINZaaT4= -github.com/libp2p/go-libp2p-kbucket v0.6.3-0.20230615004129-e99cd472ed1e/go.mod h1:RCseT7AH6eJWxxk2ol03xtP9pEHetYSPXOaJnOiD8i0= +github.com/libp2p/go-libp2p-kbucket v0.6.3 h1:p507271wWzpy2f1XxPzCQG9NiN6R6lHL9GiSErbQQo0= +github.com/libp2p/go-libp2p-kbucket v0.6.3/go.mod h1:RCseT7AH6eJWxxk2ol03xtP9pEHetYSPXOaJnOiD8i0= github.com/libp2p/go-libp2p-peerstore v0.1.4/go.mod h1:+4BDbDiiKf4PzpANZDAT+knVdLxvqh7hXOujessqdzs= github.com/libp2p/go-libp2p-record v0.2.0 h1:oiNUOCWno2BFuxt3my4i1frNrt7PerzB3queqa1NkQ0= github.com/libp2p/go-libp2p-record v0.2.0/go.mod h1:I+3zMkvvg5m2OcSdoL0KPljyJyvNDFGKX7QdlpYUcwk= diff --git a/subscriber_notifee.go b/subscriber_notifee.go index 759db76c6..c1eb69387 100644 --- a/subscriber_notifee.go +++ b/subscriber_notifee.go @@ -96,7 +96,6 @@ func handlePeerChangeEvent(dht *IpfsDHT, p peer.ID) { return } else if valid { dht.peerFound(p) - dht.fixRTIfNeeded() } else { dht.peerStoppedDHT(p) } From 5bbf6ca474697448160313ad5a1a1b8b7e76e19d Mon Sep 17 00:00:00 2001 From: Jorropo Date: Thu, 15 Jun 2023 02:56:51 +0200 Subject: [PATCH 5/5] chore: release v0.24.1 --- version.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/version.json b/version.json index ee0e5814d..353be1642 100644 --- a/version.json +++ b/version.json @@ -1,3 +1,3 @@ { - "version": "v0.24.0" + "version": "v0.24.1" }