From 2b114a2c9f77c358dad610d807476749b6ce2bf0 Mon Sep 17 00:00:00 2001 From: sukun Date: Fri, 29 Nov 2024 15:27:18 +0530 Subject: [PATCH] autorelay: send addresses on eventbus; dont wrap address factory --- config/config.go | 34 ++------ core/event/addrs.go | 5 ++ p2p/host/autorelay/addrsplosion.go | 15 +--- p2p/host/autorelay/autorelay.go | 37 +++----- p2p/host/autorelay/autorelay_test.go | 70 ++++++++++++++- p2p/host/autorelay/relay.go | 19 ---- p2p/host/autorelay/relay_finder.go | 126 +++++++++++++++++---------- p2p/host/basic/basic_host.go | 40 ++++++++- 8 files changed, 220 insertions(+), 126 deletions(-) delete mode 100644 p2p/host/autorelay/relay.go diff --git a/config/config.go b/config/config.go index f3ea35855a..baf5c3f273 100644 --- a/config/config.go +++ b/config/config.go @@ -439,6 +439,8 @@ func (cfg *Config) newBasicHost(swrm *swarm.Swarm, eventBus event.Bus) (*bhost.B DisableIdentifyAddressDiscovery: cfg.DisableIdentifyAddressDiscovery, EnableAutoNATv2: cfg.EnableAutoNATv2, AutoNATv2Dialer: autonatv2Dialer, + EnableAutoRelay: cfg.EnableAutoRelay, + AutoRelayOpts: cfg.AutoRelayOpts, }) if err != nil { return nil, err @@ -518,28 +520,6 @@ func (cfg *Config) NewNode() (host.Host, error) { ) } - // enable autorelay - fxopts = append(fxopts, - fx.Invoke(func(h *bhost.BasicHost, lifecycle fx.Lifecycle) error { - if cfg.EnableAutoRelay { - if !cfg.DisableMetrics { - mt := autorelay.WithMetricsTracer( - autorelay.NewMetricsTracer(autorelay.WithRegisterer(cfg.PrometheusRegisterer))) - mtOpts := []autorelay.Option{mt} - cfg.AutoRelayOpts = append(mtOpts, cfg.AutoRelayOpts...) - } - - ar, err := autorelay.NewAutoRelay(h, cfg.AutoRelayOpts...) - if err != nil { - return err - } - lifecycle.Append(fx.StartStopHook(ar.Start, ar.Close)) - return nil - } - return nil - }), - ) - var bh *bhost.BasicHost fxopts = append(fxopts, fx.Invoke(func(bho *bhost.BasicHost) { bh = bho })) fxopts = append(fxopts, fx.Invoke(func(h *bhost.BasicHost, lifecycle fx.Lifecycle) { @@ -554,12 +534,10 @@ func (cfg *Config) NewNode() (host.Host, error) { fxopts = append(fxopts, cfg.UserFxOptions...) app := fx.New(fxopts...) - if err := app.Start(context.Background()); err != nil { - return nil, err + if app.Err() != nil { + return nil, fmt.Errorf("failed to create host: %w", app.Err()) } - if err := cfg.addAutoNAT(bh); err != nil { - app.Stop(context.Background()) if cfg.Routing != nil { rh.Close() } else { @@ -568,6 +546,10 @@ func (cfg *Config) NewNode() (host.Host, error) { return nil, err } + if err := app.Start(context.Background()); err != nil { + return nil, err + } + if cfg.Routing != nil { return &closableRoutedHost{App: app, RoutedHost: rh}, nil } diff --git a/core/event/addrs.go b/core/event/addrs.go index 312a2fad56..d0b578352e 100644 --- a/core/event/addrs.go +++ b/core/event/addrs.go @@ -81,3 +81,8 @@ type EvtLocalAddressesUpdated struct { // wrapped in a record.Envelope and signed by the Host's private key. SignedPeerRecord *record.Envelope } + +// EvtAutoRelayAddrsUpdated is sent by the autorelay when the node's relay addresses are updated +type EvtAutoRelayAddrs struct { + RelayAddrs []ma.Multiaddr +} diff --git a/p2p/host/autorelay/addrsplosion.go b/p2p/host/autorelay/addrsplosion.go index 710dab1491..13e6274b71 100644 --- a/p2p/host/autorelay/addrsplosion.go +++ b/p2p/host/autorelay/addrsplosion.go @@ -9,6 +9,9 @@ import ( // This function cleans up a relay's address set to remove private addresses and curtail // addrsplosion. +// TODO: Remove this, we don't need this. The current method tries to select the +// best address for the relay. Instead we should rely on the addresses provided by the +// relay in response to the reservation request. func cleanupAddressSet(addrs []ma.Multiaddr) []ma.Multiaddr { var public, private []ma.Multiaddr @@ -17,7 +20,7 @@ func cleanupAddressSet(addrs []ma.Multiaddr) []ma.Multiaddr { continue } - if manet.IsPublicAddr(a) || isDNSAddr(a) { + if manet.IsPublicAddr(a) { public = append(public, a) continue } @@ -51,16 +54,6 @@ func isRelayAddr(a ma.Multiaddr) bool { return isRelay } -func isDNSAddr(a ma.Multiaddr) bool { - if first, _ := ma.SplitFirst(a); first != nil { - switch first.Protocol().Code { - case ma.P_DNS, ma.P_DNS4, ma.P_DNS6, ma.P_DNSADDR: - return true - } - } - return false -} - // we have addrsplosion if for some protocol we advertise multiple ports on // the same base address. func hasAddrsplosion(addrs []ma.Multiaddr) bool { diff --git a/p2p/host/autorelay/autorelay.go b/p2p/host/autorelay/autorelay.go index b31302098d..3ba685f658 100644 --- a/p2p/host/autorelay/autorelay.go +++ b/p2p/host/autorelay/autorelay.go @@ -3,16 +3,16 @@ package autorelay import ( "context" "errors" + "fmt" "sync" "github.com/libp2p/go-libp2p/core/event" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/network" - basic "github.com/libp2p/go-libp2p/p2p/host/basic" "github.com/libp2p/go-libp2p/p2p/host/eventbus" + ma "github.com/multiformats/go-multiaddr" logging "github.com/ipfs/go-log/v2" - ma "github.com/multiformats/go-multiaddr" ) var log = logging.Logger("autorelay") @@ -22,8 +22,6 @@ type AutoRelay struct { ctx context.Context ctxCancel context.CancelFunc - conf *config - mx sync.Mutex status network.Reachability @@ -34,9 +32,9 @@ type AutoRelay struct { metricsTracer MetricsTracer } -func NewAutoRelay(bhost *basic.BasicHost, opts ...Option) (*AutoRelay, error) { +func NewAutoRelay(host host.Host, opts ...Option) (*AutoRelay, error) { r := &AutoRelay{ - host: bhost, + host: host, status: network.ReachabilityUnknown, } conf := defaultConfig @@ -46,29 +44,20 @@ func NewAutoRelay(bhost *basic.BasicHost, opts ...Option) (*AutoRelay, error) { } } r.ctx, r.ctxCancel = context.WithCancel(context.Background()) - r.conf = &conf - r.relayFinder = newRelayFinder(bhost, conf.peerSource, &conf) - r.metricsTracer = &wrappedMetricsTracer{conf.metricsTracer} - - // Update the host address factory to use autorelay addresses if we're private - // - // TODO: Don't update host address factory. Instead send our relay addresses on the eventbus. - // The host can decide how to handle those. - addrF := bhost.AddrsFactory - bhost.AddrsFactory = func(addrs []ma.Multiaddr) []ma.Multiaddr { - addrs = addrF(addrs) - r.mx.Lock() - defer r.mx.Unlock() - - if r.status != network.ReachabilityPrivate { - return addrs - } - return r.relayFinder.relayAddrs(addrs) + rf, err := newRelayFinder(host, &conf) + if err != nil { + return nil, fmt.Errorf("failed to create autorelay: %w", err) } + r.relayFinder = rf + r.metricsTracer = &wrappedMetricsTracer{conf.metricsTracer} return r, nil } +func (r *AutoRelay) RelayAddrs() []ma.Multiaddr { + return r.relayFinder.RelayAddrs() +} + func (r *AutoRelay) Start() { r.refCount.Add(1) go func() { diff --git a/p2p/host/autorelay/autorelay_test.go b/p2p/host/autorelay/autorelay_test.go index 410f8cb6dc..a7fdcd39d5 100644 --- a/p2p/host/autorelay/autorelay_test.go +++ b/p2p/host/autorelay/autorelay_test.go @@ -3,12 +3,14 @@ package autorelay_test import ( "context" "fmt" + "slices" "strings" "sync/atomic" "testing" "time" "github.com/libp2p/go-libp2p" + "github.com/libp2p/go-libp2p/core/event" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" @@ -96,7 +98,10 @@ func newRelay(t *testing.T) host.Host { saddr := addr.String() if strings.HasPrefix(saddr, "/ip4/127.0.0.1/") { addrNoIP := strings.TrimPrefix(saddr, "/ip4/127.0.0.1") - addrs[i] = ma.StringCast("/dns4/localhost" + addrNoIP) + // .internal is classified as a public address as users + // are free to map this dns to a public ip address for + // use within a LAN + addrs[i] = ma.StringCast("/dns/libp2p.internal" + addrNoIP) } } return addrs @@ -517,3 +522,66 @@ func TestNoBusyLoop0MinInterval(t *testing.T) { val := atomic.LoadUint64(&calledTimes) require.Less(t, val, uint64(2)) } +func TestAutoRelayAddrsEvent(t *testing.T) { + cl := newMockClock() + r1, r2 := newRelay(t), newRelay(t) + t.Cleanup(func() { + r1.Close() + r2.Close() + }) + + relayFromP2PAddr := func(a ma.Multiaddr) peer.ID { + r, c := ma.SplitLast(a) + if c.Protocol().Code != ma.P_CIRCUIT { + return "" + } + if id, err := peer.IDFromP2PAddr(r); err == nil { + return id + } + return "" + } + + checkPeersExist := func(addrs []ma.Multiaddr, peers ...peer.ID) bool { + for _, p := range peers { + if !slices.ContainsFunc(addrs, func(a ma.Multiaddr) bool { return relayFromP2PAddr(a) == p }) { + return false + } + } + return true + } + peerChan := make(chan peer.AddrInfo, 3) + h := newPrivateNode(t, + func(context.Context, int) <-chan peer.AddrInfo { + return peerChan + }, + autorelay.WithClock(cl), + autorelay.WithMinCandidates(1), + autorelay.WithMaxCandidates(10), + autorelay.WithNumRelays(3), + autorelay.WithBootDelay(1*time.Second), + autorelay.WithMinInterval(time.Hour), + ) + defer h.Close() + + sub, err := h.EventBus().Subscribe(new(event.EvtAutoRelayAddrs)) + require.NoError(t, err) + + peerChan <- peer.AddrInfo{ID: r1.ID(), Addrs: r1.Addrs()} + cl.AdvanceBy(time.Second) + + require.Eventually(t, func() bool { + e := <-sub.Out() + if !checkPeersExist(e.(event.EvtAutoRelayAddrs).RelayAddrs, r1.ID()) { + return false + } + if checkPeersExist(e.(event.EvtAutoRelayAddrs).RelayAddrs, r2.ID()) { + return false + } + return true + }, 5*time.Second, 50*time.Millisecond) + peerChan <- peer.AddrInfo{ID: r2.ID(), Addrs: r2.Addrs()} + require.Eventually(t, func() bool { + e := <-sub.Out() + return checkPeersExist(e.(event.EvtAutoRelayAddrs).RelayAddrs, r1.ID(), r2.ID()) + }, 5*time.Second, 50*time.Millisecond) +} diff --git a/p2p/host/autorelay/relay.go b/p2p/host/autorelay/relay.go deleted file mode 100644 index 2ae5bf240c..0000000000 --- a/p2p/host/autorelay/relay.go +++ /dev/null @@ -1,19 +0,0 @@ -package autorelay - -import ( - ma "github.com/multiformats/go-multiaddr" -) - -// Filter filters out all relay addresses. -// -// Deprecated: It is trivial for a user to implement this if they need this. -func Filter(addrs []ma.Multiaddr) []ma.Multiaddr { - raddrs := make([]ma.Multiaddr, 0, len(addrs)) - for _, addr := range addrs { - if isRelayAddr(addr) { - continue - } - raddrs = append(raddrs, addr) - } - return raddrs -} diff --git a/p2p/host/autorelay/relay_finder.go b/p2p/host/autorelay/relay_finder.go index 13f8c63e6b..d4fabddaa0 100644 --- a/p2p/host/autorelay/relay_finder.go +++ b/p2p/host/autorelay/relay_finder.go @@ -1,25 +1,27 @@ package autorelay import ( + "bytes" "context" "errors" "fmt" "math/rand" + "slices" "sync" "time" "golang.org/x/sync/errgroup" "github.com/libp2p/go-libp2p/core/event" + "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" - basic "github.com/libp2p/go-libp2p/p2p/host/basic" "github.com/libp2p/go-libp2p/p2p/host/eventbus" circuitv2 "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/client" circuitv2_proto "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/proto" + "github.com/libp2p/go-libp2p/p2p/protocol/identify" ma "github.com/multiformats/go-multiaddr" - manet "github.com/multiformats/go-multiaddr/net" ) const protoIDv2 = circuitv2_proto.ProtoIDv2Hop @@ -47,7 +49,7 @@ type candidate struct { // relayFinder is a Host that uses relays for connectivity when a NAT is detected. type relayFinder struct { bootTime time.Time - host *basic.BasicHost + host host.Host conf *config @@ -72,29 +74,34 @@ type relayFinder struct { relayUpdated chan struct{} - relayMx sync.Mutex - relays map[peer.ID]*circuitv2.Reservation - - cachedAddrs []ma.Multiaddr - cachedAddrsExpiry time.Time + relayMx sync.Mutex + relays map[peer.ID]*circuitv2.Reservation + cachedAddrs []ma.Multiaddr // A channel that triggers a run of `runScheduledWork`. triggerRunScheduledWork chan struct{} metricsTracer MetricsTracer + + emitter event.Emitter } var errAlreadyRunning = errors.New("relayFinder already running") -func newRelayFinder(host *basic.BasicHost, peerSource PeerSource, conf *config) *relayFinder { - if peerSource == nil { +func newRelayFinder(host host.Host, conf *config) (*relayFinder, error) { + if conf.peerSource == nil { panic("Can not create a new relayFinder. Need a Peer Source fn or a list of static relays. Refer to the documentation around `libp2p.EnableAutoRelay`") } + emitter, err := host.EventBus().Emitter(new(event.EvtAutoRelayAddrs)) + if err != nil { + return nil, err + } + return &relayFinder{ bootTime: conf.clock.Now(), host: host, conf: conf, - peerSource: peerSource, + peerSource: conf.peerSource, candidates: make(map[peer.ID]*candidate), backoff: make(map[peer.ID]time.Time), candidateFound: make(chan struct{}, 1), @@ -104,7 +111,8 @@ func newRelayFinder(host *basic.BasicHost, peerSource PeerSource, conf *config) relays: make(map[peer.ID]*circuitv2.Reservation), relayUpdated: make(chan struct{}, 1), metricsTracer: &wrappedMetricsTracer{conf.metricsTracer}, - } + emitter: emitter, + }, nil } type scheduledWorkTimes struct { @@ -213,11 +221,24 @@ func (rf *relayFinder) background(ctx context.Context) { func (rf *relayFinder) clearCachedAddrsAndSignalAddressChange() { rf.relayMx.Lock() - rf.cachedAddrs = nil + oldAddrs := rf.cachedAddrs + rf.cachedAddrs = rf.relayAddrsUnlocked() + newAddrs := rf.cachedAddrs rf.relayMx.Unlock() - rf.host.SignalAddressChange() - rf.metricsTracer.RelayAddressUpdated() + rf.metricsTracer.RelayAddressCount(len(rf.cachedAddrs)) + if haveAddrsDiff(newAddrs, oldAddrs) { + log.Debug("relay addresses updated") + rf.metricsTracer.RelayAddressUpdated() + rf.emitter.Emit(event.EvtAutoRelayAddrs{RelayAddrs: newAddrs}) + } +} + +// RelayAddrs returns the node's relay addresses +func (rf *relayFinder) RelayAddrs() []ma.Multiaddr { + rf.relayMx.Lock() + defer rf.relayMx.Unlock() + return rf.cachedAddrs } func (rf *relayFinder) runScheduledWork(ctx context.Context, now time.Time, scheduledWork *scheduledWorkTimes, peerSourceRateLimiter chan<- struct{}) time.Time { @@ -473,11 +494,16 @@ func (rf *relayFinder) tryNode(ctx context.Context, pi peer.AddrInfo) (supportsR } // wait for identify to complete in at least one conn so that we can check the supported protocols + hi, ok := rf.host.(interface{ IDService() identify.IDService }) + if !ok { + // if we don't have identify, assume the peer supports relay. + return true, nil + } ready := make(chan struct{}, 1) for _, conn := range conns { go func(conn network.Conn) { select { - case <-rf.host.IDService().IdentifyWait(conn): + case <-hi.IDService().IdentifyWait(conn): select { case ready <- struct{}{}: default: @@ -707,31 +733,13 @@ func (rf *relayFinder) selectCandidates() []*candidate { return candidates } -// This function is computes the NATed relay addrs when our status is private: -// - The public addrs are removed from the address set. -// - The non-public addrs are included verbatim so that peers behind the same NAT/firewall -// can still dial us directly. -// - On top of those, we add the relay-specific addrs for the relays to which we are -// connected. For each non-private relay addr, we encapsulate the p2p-circuit addr -// through which we can be dialed. -func (rf *relayFinder) relayAddrs(addrs []ma.Multiaddr) []ma.Multiaddr { - rf.relayMx.Lock() - defer rf.relayMx.Unlock() - - if rf.cachedAddrs != nil && rf.conf.clock.Now().Before(rf.cachedAddrsExpiry) { - return rf.cachedAddrs - } - +// This function computes the relay addrs when our status is private. +// The returned addresses are of the for /p2p/relay-id/p2p-circuit. +// +// caller must hold the relay lock +func (rf *relayFinder) relayAddrsUnlocked() []ma.Multiaddr { raddrs := make([]ma.Multiaddr, 0, 4*len(rf.relays)+4) - // only keep private addrs from the original addr set - for _, addr := range addrs { - if manet.IsPrivateAddr(addr) { - raddrs = append(raddrs, addr) - } - } - - // add relay specific addrs to the list relayAddrCnt := 0 for p := range rf.relays { addrs := cleanupAddressSet(rf.host.Peerstore().Addrs(p)) @@ -742,11 +750,10 @@ func (rf *relayFinder) relayAddrs(addrs []ma.Multiaddr) []ma.Multiaddr { raddrs = append(raddrs, pub) } } - - rf.cachedAddrs = raddrs - rf.cachedAddrsExpiry = rf.conf.clock.Now().Add(30 * time.Second) - - rf.metricsTracer.RelayAddressCount(relayAddrCnt) + if relayAddrCnt > 100 { + slices.SortStableFunc(raddrs, func(a, b ma.Multiaddr) int { return bytes.Compare(a.Bytes(), b.Bytes()) }) + raddrs = raddrs[:100] + } return raddrs } @@ -808,3 +815,34 @@ func (rf *relayFinder) resetMetrics() { rf.metricsTracer.RelayAddressCount(0) rf.metricsTracer.ScheduledWorkUpdated(&scheduledWorkTimes{}) } + +func haveAddrsDiff(a, b []ma.Multiaddr) bool { + if len(a) != len(b) { + return true + } + for _, aa := range a { + found := false + for _, bb := range b { + if aa.Equal(bb) { + found = true + break + } + } + if !found { + return true + } + } + for _, bb := range b { + found := false + for _, aa := range a { + if aa.Equal(bb) { + found = true + break + } + } + if !found { + return true + } + } + return false +} diff --git a/p2p/host/basic/basic_host.go b/p2p/host/basic/basic_host.go index a85d1978d7..5e2b6c2440 100644 --- a/p2p/host/basic/basic_host.go +++ b/p2p/host/basic/basic_host.go @@ -21,6 +21,7 @@ import ( "github.com/libp2p/go-libp2p/core/record" "github.com/libp2p/go-libp2p/core/transport" "github.com/libp2p/go-libp2p/p2p/host/autonat" + "github.com/libp2p/go-libp2p/p2p/host/autorelay" "github.com/libp2p/go-libp2p/p2p/host/basic/internal/backoff" "github.com/libp2p/go-libp2p/p2p/host/eventbus" "github.com/libp2p/go-libp2p/p2p/host/pstoremanager" @@ -109,6 +110,8 @@ type BasicHost struct { autoNat autonat.AutoNAT autonatv2 *autonatv2.AutoNAT + addrSub event.Subscription + autorelay *autorelay.AutoRelay } var _ host.Host = (*BasicHost)(nil) @@ -170,6 +173,9 @@ type HostOpts struct { DisableIdentifyAddressDiscovery bool EnableAutoNATv2 bool AutoNATv2Dialer host.Host + + EnableAutoRelay bool + AutoRelayOpts []autorelay.Option } // NewHost constructs a new *BasicHost and activates it by attaching its stream and connection handlers to the given inet.Network. @@ -185,6 +191,11 @@ func NewHost(n network.Network, opts *HostOpts) (*BasicHost, error) { if err != nil { return nil, err } + + addrSub, err := opts.EventBus.Subscribe(new(event.EvtAutoRelayAddrs)) + if err != nil { + return nil, err + } hostCtx, cancel := context.WithCancel(context.Background()) h := &BasicHost{ network: n, @@ -197,6 +208,7 @@ func NewHost(n network.Network, opts *HostOpts) (*BasicHost, error) { ctx: hostCtx, ctxCancel: cancel, disableSignedPeerRecord: opts.DisableSignedPeerRecord, + addrSub: addrSub, } h.updateLocalIpAddr() @@ -328,6 +340,21 @@ func NewHost(n network.Network, opts *HostOpts) (*BasicHost, error) { } } + if opts.EnableAutoRelay { + if opts.EnableMetrics { + mt := autorelay.WithMetricsTracer( + autorelay.NewMetricsTracer(autorelay.WithRegisterer(opts.PrometheusRegisterer))) + mtOpts := []autorelay.Option{mt} + opts.AutoRelayOpts = append(mtOpts, opts.AutoRelayOpts...) + } + + ar, err := autorelay.NewAutoRelay(h, opts.AutoRelayOpts...) + if err != nil { + return nil, fmt.Errorf("failed to create autorelay: %w", err) + } + h.autorelay = ar + } + n.SetStreamHandler(h.newStreamHandler) // register to be notified when the network's listen addrs change, @@ -430,6 +457,9 @@ func (h *BasicHost) Start() { h.psManager.Start() h.refCount.Add(1) h.ids.Start() + if h.autorelay != nil { + h.autorelay.Start() + } if h.autonatv2 != nil { err := h.autonatv2.Start() if err != nil { @@ -841,8 +871,13 @@ func (h *BasicHost) ConnManager() connmgr.ConnManager { // When used with AutoRelay, and if the host is not publicly reachable, // this will only have host's private, relay, and no public addresses. func (h *BasicHost) Addrs() []ma.Multiaddr { + addrs := h.AllAddrs() // Make a copy. Consumers can modify the slice elements - addrs := slices.Clone(h.AddrsFactory(h.AllAddrs())) + if h.autoNat != nil && h.autorelay != nil && h.autoNat.Status() == network.ReachabilityPrivate { + addrs = slices.DeleteFunc(addrs, func(a ma.Multiaddr) bool { return manet.IsPublicAddr(a) }) + addrs = append(addrs, h.autorelay.RelayAddrs()...) + } + addrs = slices.Clone(h.AddrsFactory(addrs)) // Add certhashes for the addresses provided by the user via address factory. return h.addCertHashes(ma.Unique(addrs)) } @@ -1106,6 +1141,9 @@ func (h *BasicHost) Close() error { if h.autonatv2 != nil { h.autonatv2.Close() } + if h.autorelay != nil { + h.autorelay.Close() + } _ = h.emitters.evtLocalProtocolsUpdated.Close() _ = h.emitters.evtLocalAddrsUpdated.Close()