Skip to content

Commit

Permalink
address review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
sukunrt committed Oct 18, 2023
1 parent f835493 commit a92d4e7
Show file tree
Hide file tree
Showing 7 changed files with 119 additions and 62 deletions.
8 changes: 4 additions & 4 deletions core/transport/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,9 +140,9 @@ const (
UpdateKindDialFailed DialUpdateKind = iota
// UpdateKindDialSuccessful indicates dial succeeded.
UpdateKindDialSuccessful
// UpdateKindTCPConnectionEstablished indicates successful completion of the TCP 3-way
// UpdateKindHandshakeProgressed indicates successful completion of the TCP 3-way
// handshake
UpdateKindTCPConnectionEstablished
UpdateKindHandshakeProgressed
)

func (k DialUpdateKind) String() string {
Expand All @@ -151,8 +151,8 @@ func (k DialUpdateKind) String() string {
return "DialFailed"
case UpdateKindDialSuccessful:
return "DialSuccessful"
case UpdateKindTCPConnectionEstablished:
return "TCPConnectionEstablished"
case UpdateKindHandshakeProgressed:
return "UpdateKindHandshakeProgressed"
default:
return fmt.Sprintf("DialUpdateKind<Unknown-%d>", k)
}
Expand Down
106 changes: 69 additions & 37 deletions p2p/net/swarm/dial_ranker.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,19 @@ func NoDelayDialRanker(addrs []ma.Multiaddr) []network.AddrDelay {
// 3. If a QUIC or WebTransport address is present, TCP addresses dials are delayed relative to the last QUIC dial:
// We prefer to end up with a QUIC connection. For public addresses, the delay introduced is 250ms (PublicTCPDelay),
// and for private addresses 30ms (PrivateTCPDelay).
// 4. For the TCP addresses we follow a strategy similar to QUIC with an optimisation for handling the long TCP
// handshake time described in 6. If both IPv6 TCP and IPv4 TCP addresses are present, we do a Happy Eyeballs
// style ranking. First dial the IPv6 TCP address with the lowest port. After this, dial the IPv4 TCP address
// with the lowest port delayed by 250ms (PublicTCPDelay) for public addresses, and 30ms (PrivateTCPDelay)
// for local addresses. After this we dial all the rest of the addresses delayed by 250ms (PublicTCPDelay) for
// public addresses, and 30ms (PrivateTCPDelay) for local addresses.
// 5. If only one of TCP IPv6 or TCP IPv4 addresses are present, dial the TCP address with the lowest port
// first. After this we dial the rest of the TCP addresses delayed by 250ms (PublicTCPDelay) for public
// addresses, and 30ms (PrivateTCPDelay) for local addresses.
// 6. When a TCP socket is connected and awaiting security and muxer upgrade, we stop new dials for 2*PrivateTCPDelay
// to allow for the upgrade to complete.
//
// We dial lowest ports first for QUIC addresses as they are more likely to be the listen port.
// We dial lowest ports first as they are more likely to be the listen port.
func DefaultDialRanker(addrs []ma.Multiaddr) []network.AddrDelay {
relay, addrs := filterAddrs(addrs, isRelayAddr)
pvt, addrs := filterAddrs(addrs, manet.IsPrivateAddr)
Expand Down Expand Up @@ -88,43 +99,55 @@ func DefaultDialRanker(addrs []ma.Multiaddr) []network.AddrDelay {
// addresses relative to direct addresses.
func getAddrDelay(addrs []ma.Multiaddr, tcpDelay time.Duration, quicDelay time.Duration,
offset time.Duration) []network.AddrDelay {
if len(addrs) == 0 {
return nil
}

sort.Slice(addrs, func(i, j int) bool { return score(addrs[i]) < score(addrs[j]) })

// If the first address is (QUIC, IPv6), make the second address (QUIC, IPv4).
// addrs is now sorted by (Transport, IPVersion). Reorder addrs for happy eyeballs dialing.
// For QUIC and TCP, if we have both IPv6 and IPv4 addresses, move the
// highest priority IPv4 address to the second position.
happyEyeballsQUIC := false
happyEyeballsTCP := false
if len(addrs) > 0 {
// tcpStartIdx is the index of the first TCP Address
var tcpStartIdx int
{
i := 0
// If the first QUIC address is IPv6 move the first QUIC IPv4 address to second position
if isQUICAddr(addrs[0]) && isProtocolAddr(addrs[0], ma.P_IP6) {
for i := 1; i < len(addrs); i++ {
if isQUICAddr(addrs[i]) && isProtocolAddr(addrs[i], ma.P_IP4) {
// make IPv4 address the second element
if i > 1 {
a := addrs[i]
copy(addrs[2:], addrs[1:i])
for j := 1; j < len(addrs); j++ {
if isQUICAddr(addrs[j]) && isProtocolAddr(addrs[j], ma.P_IP4) {
// The first IPv4 address is at position i
// Move the ith element at position 1 shifting the affected elements
if j > 1 {
a := addrs[j]
copy(addrs[2:], addrs[1:j])
addrs[1] = a
}
happyEyeballsQUIC = true
i = j + 1
break
}
}
}
// idx is the index of the first tcp address
idx := 0
for i, a := range addrs {
if isProtocolAddr(a, ma.P_TCP) {
idx = i

for tcpStartIdx = i; tcpStartIdx < len(addrs); tcpStartIdx++ {
if isProtocolAddr(addrs[tcpStartIdx], ma.P_TCP) {
break
}
}
if isProtocolAddr(addrs[idx], ma.P_TCP) && isProtocolAddr(addrs[idx], ma.P_IP6) {
for i := idx + 1; i < len(addrs); i++ {
if isProtocolAddr(addrs[i], ma.P_TCP) && isProtocolAddr(addrs[i], ma.P_IP4) {
// make IPv4 address the second element
if i > idx+1 {
a := addrs[i]
copy(addrs[idx+2:], addrs[idx+1:i])
addrs[idx+1] = a

// If the first TCP address is IPv6 move the first TCP IPv4 address to second position
if tcpStartIdx < len(addrs) && isProtocolAddr(addrs[tcpStartIdx], ma.P_IP6) {
for j := tcpStartIdx + 1; j < len(addrs); j++ {
if isProtocolAddr(addrs[j], ma.P_TCP) && isProtocolAddr(addrs[j], ma.P_IP4) {
// First TCP IPv4 address is at position j, move it to position tcpStartIdx+1
// which is the second priority TCP address
if j > tcpStartIdx+1 {
a := addrs[j]
copy(addrs[tcpStartIdx+2:], addrs[tcpStartIdx+1:j])
addrs[tcpStartIdx+1] = a
}
happyEyeballsTCP = true
break
Expand All @@ -134,33 +157,42 @@ func getAddrDelay(addrs []ma.Multiaddr, tcpDelay time.Duration, quicDelay time.D
}

res := make([]network.AddrDelay, 0, len(addrs))

var tcpIdx int
var totalTCPDelay time.Duration
var tcpFirstDialDelay time.Duration
for i, addr := range addrs {
var delay time.Duration
switch {
case isQUICAddr(addr):
// We dial an IPv6 address, then after quicDelay an IPv4
// address, then after a further quicDelay we dial rest of the addresses.
if i == 1 {
delay = quicDelay
}
if i > 1 && happyEyeballsQUIC {
delay = 2 * quicDelay
} else if i > 1 {
delay = quicDelay
if i > 1 {
// If we have happy eyeballs for QUIC, dials after the second position
// will be delayed by 2*quicDelay
if happyEyeballsQUIC {
delay = 2 * quicDelay
} else {
delay = quicDelay
}
}
totalTCPDelay = delay + tcpDelay
tcpFirstDialDelay = delay + tcpDelay
case isProtocolAddr(addr, ma.P_TCP):
if tcpIdx == 1 {
// We dial an IPv6 address, then after tcpDelay an IPv4
// address, then after a further tcpDelay we dial rest of the addresses.
if i == tcpStartIdx+1 {
delay = tcpDelay
}
if tcpIdx > 1 && happyEyeballsTCP {
delay = 2 * tcpDelay
} else if tcpIdx > 1 {
delay = tcpDelay
if i > tcpStartIdx+1 {
// If we have happy eyeballs for TCP, dials after the second position
// will be delayed by 2*tcpDelay
if happyEyeballsTCP {
delay = 2 * tcpDelay
} else {
delay = tcpDelay
}
}
tcpIdx++
delay += totalTCPDelay
delay += tcpFirstDialDelay
}
res = append(res, network.AddrDelay{Addr: addr, Delay: offset + delay})
}
Expand Down
26 changes: 25 additions & 1 deletion p2p/net/swarm/dial_ranker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ func TestDelayRankerTCPDelay(t *testing.T) {
t1 := ma.StringCast("/ip4/1.2.3.5/tcp/1/")
t1v6 := ma.StringCast("/ip6/1::2/tcp/1")
t2 := ma.StringCast("/ip4/1.2.3.4/tcp/2")
t3 := ma.StringCast("/ip6/1::3/tcp/3")
t3 := ma.StringCast("/ip4/1.2.3.4/tcp/3")

testCase := []struct {
name string
Expand Down Expand Up @@ -193,6 +193,25 @@ func TestDelayRankerTCPDelay(t *testing.T) {
{Addr: t2, Delay: 3 * PublicQUICDelay},
},
},
{
name: "quic-ip4-with-tcp-ipv4",
addrs: []ma.Multiaddr{q1v1, t2, t3, t1},
output: []network.AddrDelay{
{Addr: q1v1, Delay: 0},
{Addr: t1, Delay: PublicTCPDelay},
{Addr: t2, Delay: 2 * PublicQUICDelay},
{Addr: t3, Delay: 2 * PublicTCPDelay},
},
},
{
name: "quic-ip4-with-two-tcp",
addrs: []ma.Multiaddr{q1v1, t1v6, t2},
output: []network.AddrDelay{
{Addr: q1v1, Delay: 0},
{Addr: t1v6, Delay: PublicTCPDelay},
{Addr: t2, Delay: 2 * PublicTCPDelay},
},
},
{
name: "tcp-ip4-ip6",
addrs: []ma.Multiaddr{t1, t2, t1v6, t3},
Expand All @@ -203,6 +222,11 @@ func TestDelayRankerTCPDelay(t *testing.T) {
{Addr: t3, Delay: 2 * PublicTCPDelay},
},
},
{
name: "empty",
addrs: []ma.Multiaddr{},
output: []network.AddrDelay{},
},
}
for _, tc := range testCase {
t.Run(tc.name, func(t *testing.T) {
Expand Down
14 changes: 7 additions & 7 deletions p2p/net/swarm/dial_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@ type addrDial struct {
createdAt time.Time
// dialRankingDelay is the delay in dialing this address introduced by the ranking logic
dialRankingDelay time.Duration
// expectedTCPUpgTime is the expected time by which security upgrade will complete
expectedTCPUpgTime time.Time
// expectedTCPUpgradeTime is the expected time by which security upgrade will complete
expectedTCPUpgradeTime time.Time
}

// dialWorker synchronises concurrent dials to a peer. It ensures that we make at most one dial to a
Expand Down Expand Up @@ -134,8 +134,8 @@ func (w *dialWorker) loop() {
} else {
resetTime := startTime.Add(dq.top().Delay)
for _, ad := range w.trackedDials {
if !ad.expectedTCPUpgTime.IsZero() && ad.expectedTCPUpgTime.After(resetTime) {
resetTime = ad.expectedTCPUpgTime
if !ad.expectedTCPUpgradeTime.IsZero() && ad.expectedTCPUpgradeTime.After(resetTime) {
resetTime = ad.expectedTCPUpgradeTime
}
}
dialTimer.Reset(resetTime)
Expand Down Expand Up @@ -325,17 +325,17 @@ loop:

// TCP Connection has been established. Wait for connection upgrade on this address
// before making new dials.
if res.Kind == tpt.UpdateKindTCPConnectionEstablished {
if res.Kind == tpt.UpdateKindHandshakeProgressed {
// Only wait for public addresses to complete dialing since private dials
// are quick any way
if manet.IsPublicAddr(res.Addr) {
ad.expectedTCPUpgTime = w.cl.Now().Add(PublicTCPDelay)
ad.expectedTCPUpgradeTime = w.cl.Now().Add(PublicTCPDelay)
}
scheduleNextDial()
continue
}
dialsInFlight--
ad.expectedTCPUpgTime = time.Time{}
ad.expectedTCPUpgradeTime = time.Time{}
if res.Conn != nil {
// we got a connection, add it to the swarm
conn, err := w.s.addConn(res.Conn, network.DirOutbound)
Expand Down
13 changes: 7 additions & 6 deletions p2p/net/swarm/dial_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1098,7 +1098,7 @@ func TestDialWorkerLoopTCPConnUpgradeWait(t *testing.T) {

rankerCalled := make(chan struct{})
s1.dialRanker = func(addrs []ma.Multiaddr) []network.AddrDelay {
close(rankerCalled)
defer close(rankerCalled)
return []network.AddrDelay{{Addr: a1, Delay: 0}, {Addr: a2, Delay: 100 * time.Millisecond}}
}

Expand All @@ -1116,25 +1116,26 @@ func TestDialWorkerLoopTCPConnUpgradeWait(t *testing.T) {
// Wait a bit to let the loop make the dial attempt to a1
time.Sleep(1 * time.Second)
// Send conn established for a1
worker.resch <- transport.DialUpdate{Kind: transport.UpdateKindTCPConnectionEstablished, Addr: a1}
worker.resch <- transport.DialUpdate{Kind: transport.UpdateKindHandshakeProgressed, Addr: a1}
// Dial to a2 shouldn't happen even if a2 is scheduled to dial by now
cl.AdvanceBy(290 * time.Millisecond)
cl.AdvanceBy(200 * time.Millisecond)
select {
case r := <-resch:
t.Fatalf("didn't expect any event on resch %s %s", r.err, r.conn)
case <-time.After(500 * time.Millisecond):
}

// Dial to a2 should happen now
// This number is high because there's a race between this goroutine advancing the clock
// and the worker loop goroutine processing the TCPConnectionEstablished event.
// In case it processes the event after the previous clock advancement we need to wait
// 290 + 300ms.
cl.AdvanceBy(600 * time.Millisecond)
// 2 * PublicTCPDelay.
cl.AdvanceBy(2 * PublicTCPDelay)
select {
case r := <-resch:
require.NoError(t, r.err)
require.NotNil(t, r.conn)
case <-time.After(5 * time.Second):
case <-time.After(3 * time.Second):
t.Errorf("expected a fail response")
}
}
10 changes: 5 additions & 5 deletions p2p/transport/tcp/tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,22 +180,22 @@ func (t *TcpTransport) Dial(ctx context.Context, raddr ma.Multiaddr, p peer.ID)
return t.DialWithUpdates(ctx, raddr, p, nil)
}

func (t *TcpTransport) DialWithUpdates(ctx context.Context, raddr ma.Multiaddr, p peer.ID, updCh chan<- transport.DialUpdate) (transport.CapableConn, error) {
func (t *TcpTransport) DialWithUpdates(ctx context.Context, raddr ma.Multiaddr, p peer.ID, updateChan chan<- transport.DialUpdate) (transport.CapableConn, error) {
connScope, err := t.rcmgr.OpenConnection(network.DirOutbound, true, raddr)
if err != nil {
log.Debugw("resource manager blocked outgoing connection", "peer", p, "addr", raddr, "error", err)
return nil, err
}

c, err := t.dialWithScope(ctx, raddr, p, connScope, updCh)
c, err := t.dialWithScope(ctx, raddr, p, connScope, updateChan)
if err != nil {
connScope.Done()
return nil, err
}
return c, nil
}

func (t *TcpTransport) dialWithScope(ctx context.Context, raddr ma.Multiaddr, p peer.ID, connScope network.ConnManagementScope, updCh chan<- transport.DialUpdate) (transport.CapableConn, error) {
func (t *TcpTransport) dialWithScope(ctx context.Context, raddr ma.Multiaddr, p peer.ID, connScope network.ConnManagementScope, updateChan chan<- transport.DialUpdate) (transport.CapableConn, error) {
if err := connScope.SetPeer(p); err != nil {
log.Debugw("resource manager blocked outgoing connection for peer", "peer", p, "addr", raddr, "error", err)
return nil, err
Expand All @@ -217,9 +217,9 @@ func (t *TcpTransport) dialWithScope(ctx context.Context, raddr ma.Multiaddr, p
return nil, err
}
}
if updCh != nil {
if updateChan != nil {
select {
case updCh <- transport.DialUpdate{Kind: transport.UpdateKindTCPConnectionEstablished, Addr: raddr}:
case updateChan <- transport.DialUpdate{Kind: transport.UpdateKindHandshakeProgressed, Addr: raddr}:
default:
// It is better to skip the update than to delay upgrading the connection
}
Expand Down
4 changes: 2 additions & 2 deletions p2p/transport/tcp/tcp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ func TestDialWithUpdates(t *testing.T) {
updCh := make(chan transport.DialUpdate, 1)
conn, err := tb.DialWithUpdates(context.Background(), ln.Multiaddr(), peerA, updCh)
upd := <-updCh
require.Equal(t, upd.Kind, transport.UpdateKindTCPConnectionEstablished)
require.Equal(t, upd.Kind, transport.UpdateKindHandshakeProgressed)
require.NotNil(t, conn)
require.NoError(t, err)

Expand All @@ -191,7 +191,7 @@ func TestDialWithUpdates(t *testing.T) {
// This dial will fail as acceptAndClose will not upgrade the connection
conn, err = tb.DialWithUpdates(context.Background(), li.Multiaddr(), peerA, updCh)
upd = <-updCh
require.Equal(t, upd.Kind, transport.UpdateKindTCPConnectionEstablished)
require.Equal(t, upd.Kind, transport.UpdateKindHandshakeProgressed)
require.Nil(t, conn)
require.Error(t, err)
}
Expand Down

0 comments on commit a92d4e7

Please sign in to comment.