From a92d4e7aea27185e8f11613903808ee9ea54cbc2 Mon Sep 17 00:00:00 2001 From: sukun Date: Wed, 18 Oct 2023 19:31:48 +0530 Subject: [PATCH] address review comments --- core/transport/transport.go | 8 +-- p2p/net/swarm/dial_ranker.go | 106 +++++++++++++++++++----------- p2p/net/swarm/dial_ranker_test.go | 26 +++++++- p2p/net/swarm/dial_worker.go | 14 ++-- p2p/net/swarm/dial_worker_test.go | 13 ++-- p2p/transport/tcp/tcp.go | 10 +-- p2p/transport/tcp/tcp_test.go | 4 +- 7 files changed, 119 insertions(+), 62 deletions(-) diff --git a/core/transport/transport.go b/core/transport/transport.go index 8809c268dc..d56a3cff06 100644 --- a/core/transport/transport.go +++ b/core/transport/transport.go @@ -140,9 +140,9 @@ const ( UpdateKindDialFailed DialUpdateKind = iota // UpdateKindDialSuccessful indicates dial succeeded. UpdateKindDialSuccessful - // UpdateKindTCPConnectionEstablished indicates successful completion of the TCP 3-way + // UpdateKindHandshakeProgressed indicates successful completion of the TCP 3-way // handshake - UpdateKindTCPConnectionEstablished + UpdateKindHandshakeProgressed ) func (k DialUpdateKind) String() string { @@ -151,8 +151,8 @@ func (k DialUpdateKind) String() string { return "DialFailed" case UpdateKindDialSuccessful: return "DialSuccessful" - case UpdateKindTCPConnectionEstablished: - return "TCPConnectionEstablished" + case UpdateKindHandshakeProgressed: + return "UpdateKindHandshakeProgressed" default: return fmt.Sprintf("DialUpdateKind", k) } diff --git a/p2p/net/swarm/dial_ranker.go b/p2p/net/swarm/dial_ranker.go index a0799df428..e08d87a1fb 100644 --- a/p2p/net/swarm/dial_ranker.go +++ b/p2p/net/swarm/dial_ranker.go @@ -58,8 +58,19 @@ func NoDelayDialRanker(addrs []ma.Multiaddr) []network.AddrDelay { // 3. If a QUIC or WebTransport address is present, TCP addresses dials are delayed relative to the last QUIC dial: // We prefer to end up with a QUIC connection. For public addresses, the delay introduced is 250ms (PublicTCPDelay), // and for private addresses 30ms (PrivateTCPDelay). +// 4. For the TCP addresses we follow a strategy similar to QUIC with an optimisation for handling the long TCP +// handshake time described in 6. If both IPv6 TCP and IPv4 TCP addresses are present, we do a Happy Eyeballs +// style ranking. First dial the IPv6 TCP address with the lowest port. After this, dial the IPv4 TCP address +// with the lowest port delayed by 250ms (PublicTCPDelay) for public addresses, and 30ms (PrivateTCPDelay) +// for local addresses. After this we dial all the rest of the addresses delayed by 250ms (PublicTCPDelay) for +// public addresses, and 30ms (PrivateTCPDelay) for local addresses. +// 5. If only one of TCP IPv6 or TCP IPv4 addresses are present, dial the TCP address with the lowest port +// first. After this we dial the rest of the TCP addresses delayed by 250ms (PublicTCPDelay) for public +// addresses, and 30ms (PrivateTCPDelay) for local addresses. +// 6. When a TCP socket is connected and awaiting security and muxer upgrade, we stop new dials for 2*PrivateTCPDelay +// to allow for the upgrade to complete. // -// We dial lowest ports first for QUIC addresses as they are more likely to be the listen port. +// We dial lowest ports first as they are more likely to be the listen port. func DefaultDialRanker(addrs []ma.Multiaddr) []network.AddrDelay { relay, addrs := filterAddrs(addrs, isRelayAddr) pvt, addrs := filterAddrs(addrs, manet.IsPrivateAddr) @@ -88,43 +99,55 @@ func DefaultDialRanker(addrs []ma.Multiaddr) []network.AddrDelay { // addresses relative to direct addresses. func getAddrDelay(addrs []ma.Multiaddr, tcpDelay time.Duration, quicDelay time.Duration, offset time.Duration) []network.AddrDelay { + if len(addrs) == 0 { + return nil + } sort.Slice(addrs, func(i, j int) bool { return score(addrs[i]) < score(addrs[j]) }) - // If the first address is (QUIC, IPv6), make the second address (QUIC, IPv4). + // addrs is now sorted by (Transport, IPVersion). Reorder addrs for happy eyeballs dialing. + // For QUIC and TCP, if we have both IPv6 and IPv4 addresses, move the + // highest priority IPv4 address to the second position. happyEyeballsQUIC := false happyEyeballsTCP := false - if len(addrs) > 0 { + // tcpStartIdx is the index of the first TCP Address + var tcpStartIdx int + { + i := 0 + // If the first QUIC address is IPv6 move the first QUIC IPv4 address to second position if isQUICAddr(addrs[0]) && isProtocolAddr(addrs[0], ma.P_IP6) { - for i := 1; i < len(addrs); i++ { - if isQUICAddr(addrs[i]) && isProtocolAddr(addrs[i], ma.P_IP4) { - // make IPv4 address the second element - if i > 1 { - a := addrs[i] - copy(addrs[2:], addrs[1:i]) + for j := 1; j < len(addrs); j++ { + if isQUICAddr(addrs[j]) && isProtocolAddr(addrs[j], ma.P_IP4) { + // The first IPv4 address is at position i + // Move the ith element at position 1 shifting the affected elements + if j > 1 { + a := addrs[j] + copy(addrs[2:], addrs[1:j]) addrs[1] = a } happyEyeballsQUIC = true + i = j + 1 break } } } - // idx is the index of the first tcp address - idx := 0 - for i, a := range addrs { - if isProtocolAddr(a, ma.P_TCP) { - idx = i + + for tcpStartIdx = i; tcpStartIdx < len(addrs); tcpStartIdx++ { + if isProtocolAddr(addrs[tcpStartIdx], ma.P_TCP) { break } } - if isProtocolAddr(addrs[idx], ma.P_TCP) && isProtocolAddr(addrs[idx], ma.P_IP6) { - for i := idx + 1; i < len(addrs); i++ { - if isProtocolAddr(addrs[i], ma.P_TCP) && isProtocolAddr(addrs[i], ma.P_IP4) { - // make IPv4 address the second element - if i > idx+1 { - a := addrs[i] - copy(addrs[idx+2:], addrs[idx+1:i]) - addrs[idx+1] = a + + // If the first TCP address is IPv6 move the first TCP IPv4 address to second position + if tcpStartIdx < len(addrs) && isProtocolAddr(addrs[tcpStartIdx], ma.P_IP6) { + for j := tcpStartIdx + 1; j < len(addrs); j++ { + if isProtocolAddr(addrs[j], ma.P_TCP) && isProtocolAddr(addrs[j], ma.P_IP4) { + // First TCP IPv4 address is at position j, move it to position tcpStartIdx+1 + // which is the second priority TCP address + if j > tcpStartIdx+1 { + a := addrs[j] + copy(addrs[tcpStartIdx+2:], addrs[tcpStartIdx+1:j]) + addrs[tcpStartIdx+1] = a } happyEyeballsTCP = true break @@ -134,33 +157,42 @@ func getAddrDelay(addrs []ma.Multiaddr, tcpDelay time.Duration, quicDelay time.D } res := make([]network.AddrDelay, 0, len(addrs)) - - var tcpIdx int - var totalTCPDelay time.Duration + var tcpFirstDialDelay time.Duration for i, addr := range addrs { var delay time.Duration switch { case isQUICAddr(addr): + // We dial an IPv6 address, then after quicDelay an IPv4 + // address, then after a further quicDelay we dial rest of the addresses. if i == 1 { delay = quicDelay } - if i > 1 && happyEyeballsQUIC { - delay = 2 * quicDelay - } else if i > 1 { - delay = quicDelay + if i > 1 { + // If we have happy eyeballs for QUIC, dials after the second position + // will be delayed by 2*quicDelay + if happyEyeballsQUIC { + delay = 2 * quicDelay + } else { + delay = quicDelay + } } - totalTCPDelay = delay + tcpDelay + tcpFirstDialDelay = delay + tcpDelay case isProtocolAddr(addr, ma.P_TCP): - if tcpIdx == 1 { + // We dial an IPv6 address, then after tcpDelay an IPv4 + // address, then after a further tcpDelay we dial rest of the addresses. + if i == tcpStartIdx+1 { delay = tcpDelay } - if tcpIdx > 1 && happyEyeballsTCP { - delay = 2 * tcpDelay - } else if tcpIdx > 1 { - delay = tcpDelay + if i > tcpStartIdx+1 { + // If we have happy eyeballs for TCP, dials after the second position + // will be delayed by 2*tcpDelay + if happyEyeballsTCP { + delay = 2 * tcpDelay + } else { + delay = tcpDelay + } } - tcpIdx++ - delay += totalTCPDelay + delay += tcpFirstDialDelay } res = append(res, network.AddrDelay{Addr: addr, Delay: offset + delay}) } diff --git a/p2p/net/swarm/dial_ranker_test.go b/p2p/net/swarm/dial_ranker_test.go index 1964764b8f..5ef3cc27f1 100644 --- a/p2p/net/swarm/dial_ranker_test.go +++ b/p2p/net/swarm/dial_ranker_test.go @@ -161,7 +161,7 @@ func TestDelayRankerTCPDelay(t *testing.T) { t1 := ma.StringCast("/ip4/1.2.3.5/tcp/1/") t1v6 := ma.StringCast("/ip6/1::2/tcp/1") t2 := ma.StringCast("/ip4/1.2.3.4/tcp/2") - t3 := ma.StringCast("/ip6/1::3/tcp/3") + t3 := ma.StringCast("/ip4/1.2.3.4/tcp/3") testCase := []struct { name string @@ -193,6 +193,25 @@ func TestDelayRankerTCPDelay(t *testing.T) { {Addr: t2, Delay: 3 * PublicQUICDelay}, }, }, + { + name: "quic-ip4-with-tcp-ipv4", + addrs: []ma.Multiaddr{q1v1, t2, t3, t1}, + output: []network.AddrDelay{ + {Addr: q1v1, Delay: 0}, + {Addr: t1, Delay: PublicTCPDelay}, + {Addr: t2, Delay: 2 * PublicQUICDelay}, + {Addr: t3, Delay: 2 * PublicTCPDelay}, + }, + }, + { + name: "quic-ip4-with-two-tcp", + addrs: []ma.Multiaddr{q1v1, t1v6, t2}, + output: []network.AddrDelay{ + {Addr: q1v1, Delay: 0}, + {Addr: t1v6, Delay: PublicTCPDelay}, + {Addr: t2, Delay: 2 * PublicTCPDelay}, + }, + }, { name: "tcp-ip4-ip6", addrs: []ma.Multiaddr{t1, t2, t1v6, t3}, @@ -203,6 +222,11 @@ func TestDelayRankerTCPDelay(t *testing.T) { {Addr: t3, Delay: 2 * PublicTCPDelay}, }, }, + { + name: "empty", + addrs: []ma.Multiaddr{}, + output: []network.AddrDelay{}, + }, } for _, tc := range testCase { t.Run(tc.name, func(t *testing.T) { diff --git a/p2p/net/swarm/dial_worker.go b/p2p/net/swarm/dial_worker.go index e3a9efe06f..6d6dcf65a2 100644 --- a/p2p/net/swarm/dial_worker.go +++ b/p2p/net/swarm/dial_worker.go @@ -64,8 +64,8 @@ type addrDial struct { createdAt time.Time // dialRankingDelay is the delay in dialing this address introduced by the ranking logic dialRankingDelay time.Duration - // expectedTCPUpgTime is the expected time by which security upgrade will complete - expectedTCPUpgTime time.Time + // expectedTCPUpgradeTime is the expected time by which security upgrade will complete + expectedTCPUpgradeTime time.Time } // dialWorker synchronises concurrent dials to a peer. It ensures that we make at most one dial to a @@ -134,8 +134,8 @@ func (w *dialWorker) loop() { } else { resetTime := startTime.Add(dq.top().Delay) for _, ad := range w.trackedDials { - if !ad.expectedTCPUpgTime.IsZero() && ad.expectedTCPUpgTime.After(resetTime) { - resetTime = ad.expectedTCPUpgTime + if !ad.expectedTCPUpgradeTime.IsZero() && ad.expectedTCPUpgradeTime.After(resetTime) { + resetTime = ad.expectedTCPUpgradeTime } } dialTimer.Reset(resetTime) @@ -325,17 +325,17 @@ loop: // TCP Connection has been established. Wait for connection upgrade on this address // before making new dials. - if res.Kind == tpt.UpdateKindTCPConnectionEstablished { + if res.Kind == tpt.UpdateKindHandshakeProgressed { // Only wait for public addresses to complete dialing since private dials // are quick any way if manet.IsPublicAddr(res.Addr) { - ad.expectedTCPUpgTime = w.cl.Now().Add(PublicTCPDelay) + ad.expectedTCPUpgradeTime = w.cl.Now().Add(PublicTCPDelay) } scheduleNextDial() continue } dialsInFlight-- - ad.expectedTCPUpgTime = time.Time{} + ad.expectedTCPUpgradeTime = time.Time{} if res.Conn != nil { // we got a connection, add it to the swarm conn, err := w.s.addConn(res.Conn, network.DirOutbound) diff --git a/p2p/net/swarm/dial_worker_test.go b/p2p/net/swarm/dial_worker_test.go index fd98660075..e70bc3ce07 100644 --- a/p2p/net/swarm/dial_worker_test.go +++ b/p2p/net/swarm/dial_worker_test.go @@ -1098,7 +1098,7 @@ func TestDialWorkerLoopTCPConnUpgradeWait(t *testing.T) { rankerCalled := make(chan struct{}) s1.dialRanker = func(addrs []ma.Multiaddr) []network.AddrDelay { - close(rankerCalled) + defer close(rankerCalled) return []network.AddrDelay{{Addr: a1, Delay: 0}, {Addr: a2, Delay: 100 * time.Millisecond}} } @@ -1116,25 +1116,26 @@ func TestDialWorkerLoopTCPConnUpgradeWait(t *testing.T) { // Wait a bit to let the loop make the dial attempt to a1 time.Sleep(1 * time.Second) // Send conn established for a1 - worker.resch <- transport.DialUpdate{Kind: transport.UpdateKindTCPConnectionEstablished, Addr: a1} + worker.resch <- transport.DialUpdate{Kind: transport.UpdateKindHandshakeProgressed, Addr: a1} // Dial to a2 shouldn't happen even if a2 is scheduled to dial by now - cl.AdvanceBy(290 * time.Millisecond) + cl.AdvanceBy(200 * time.Millisecond) select { case r := <-resch: t.Fatalf("didn't expect any event on resch %s %s", r.err, r.conn) case <-time.After(500 * time.Millisecond): } + // Dial to a2 should happen now // This number is high because there's a race between this goroutine advancing the clock // and the worker loop goroutine processing the TCPConnectionEstablished event. // In case it processes the event after the previous clock advancement we need to wait - // 290 + 300ms. - cl.AdvanceBy(600 * time.Millisecond) + // 2 * PublicTCPDelay. + cl.AdvanceBy(2 * PublicTCPDelay) select { case r := <-resch: require.NoError(t, r.err) require.NotNil(t, r.conn) - case <-time.After(5 * time.Second): + case <-time.After(3 * time.Second): t.Errorf("expected a fail response") } } diff --git a/p2p/transport/tcp/tcp.go b/p2p/transport/tcp/tcp.go index f10da4f933..d52bb96019 100644 --- a/p2p/transport/tcp/tcp.go +++ b/p2p/transport/tcp/tcp.go @@ -180,14 +180,14 @@ func (t *TcpTransport) Dial(ctx context.Context, raddr ma.Multiaddr, p peer.ID) return t.DialWithUpdates(ctx, raddr, p, nil) } -func (t *TcpTransport) DialWithUpdates(ctx context.Context, raddr ma.Multiaddr, p peer.ID, updCh chan<- transport.DialUpdate) (transport.CapableConn, error) { +func (t *TcpTransport) DialWithUpdates(ctx context.Context, raddr ma.Multiaddr, p peer.ID, updateChan chan<- transport.DialUpdate) (transport.CapableConn, error) { connScope, err := t.rcmgr.OpenConnection(network.DirOutbound, true, raddr) if err != nil { log.Debugw("resource manager blocked outgoing connection", "peer", p, "addr", raddr, "error", err) return nil, err } - c, err := t.dialWithScope(ctx, raddr, p, connScope, updCh) + c, err := t.dialWithScope(ctx, raddr, p, connScope, updateChan) if err != nil { connScope.Done() return nil, err @@ -195,7 +195,7 @@ func (t *TcpTransport) DialWithUpdates(ctx context.Context, raddr ma.Multiaddr, return c, nil } -func (t *TcpTransport) dialWithScope(ctx context.Context, raddr ma.Multiaddr, p peer.ID, connScope network.ConnManagementScope, updCh chan<- transport.DialUpdate) (transport.CapableConn, error) { +func (t *TcpTransport) dialWithScope(ctx context.Context, raddr ma.Multiaddr, p peer.ID, connScope network.ConnManagementScope, updateChan chan<- transport.DialUpdate) (transport.CapableConn, error) { if err := connScope.SetPeer(p); err != nil { log.Debugw("resource manager blocked outgoing connection for peer", "peer", p, "addr", raddr, "error", err) return nil, err @@ -217,9 +217,9 @@ func (t *TcpTransport) dialWithScope(ctx context.Context, raddr ma.Multiaddr, p return nil, err } } - if updCh != nil { + if updateChan != nil { select { - case updCh <- transport.DialUpdate{Kind: transport.UpdateKindTCPConnectionEstablished, Addr: raddr}: + case updateChan <- transport.DialUpdate{Kind: transport.UpdateKindHandshakeProgressed, Addr: raddr}: default: // It is better to skip the update than to delay upgrading the connection } diff --git a/p2p/transport/tcp/tcp_test.go b/p2p/transport/tcp/tcp_test.go index b207f6018b..d96c34317b 100644 --- a/p2p/transport/tcp/tcp_test.go +++ b/p2p/transport/tcp/tcp_test.go @@ -168,7 +168,7 @@ func TestDialWithUpdates(t *testing.T) { updCh := make(chan transport.DialUpdate, 1) conn, err := tb.DialWithUpdates(context.Background(), ln.Multiaddr(), peerA, updCh) upd := <-updCh - require.Equal(t, upd.Kind, transport.UpdateKindTCPConnectionEstablished) + require.Equal(t, upd.Kind, transport.UpdateKindHandshakeProgressed) require.NotNil(t, conn) require.NoError(t, err) @@ -191,7 +191,7 @@ func TestDialWithUpdates(t *testing.T) { // This dial will fail as acceptAndClose will not upgrade the connection conn, err = tb.DialWithUpdates(context.Background(), li.Multiaddr(), peerA, updCh) upd = <-updCh - require.Equal(t, upd.Kind, transport.UpdateKindTCPConnectionEstablished) + require.Equal(t, upd.Kind, transport.UpdateKindHandshakeProgressed) require.Nil(t, conn) require.Error(t, err) }