Skip to content

Commit

Permalink
Merge pull request #687 from libp2p/feat/merge-dht-hardening-0.7
Browse files Browse the repository at this point in the history
Hardening Improvements: RT diversity and decreased RT churn
  • Loading branch information
aschmahmann authored Aug 18, 2020
2 parents 0e93285 + 5729f0a commit e788ffc
Show file tree
Hide file tree
Showing 17 changed files with 696 additions and 127 deletions.
125 changes: 109 additions & 16 deletions dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/libp2p/go-libp2p-kad-dht/providers"
"github.com/libp2p/go-libp2p-kad-dht/rtrefresh"
kb "github.com/libp2p/go-libp2p-kbucket"
"github.com/libp2p/go-libp2p-kbucket/peerdiversity"
record "github.com/libp2p/go-libp2p-record"
recpb "github.com/libp2p/go-libp2p-record/pb"

Expand All @@ -40,6 +41,8 @@ import (
var (
logger = logging.Logger("dht")
baseLogger = logger.Desugar()

rtFreezeTimeout = 1 * time.Minute
)

const (
Expand All @@ -66,6 +69,11 @@ const (
protectedBuckets = 2
)

type addPeerRTReq struct {
p peer.ID
queryPeer bool
}

// IpfsDHT is an implementation of Kademlia with S/Kademlia modifications.
// It is used to implement the base Routing module.
type IpfsDHT struct {
Expand Down Expand Up @@ -115,6 +123,7 @@ type IpfsDHT struct {

queryPeerFilter QueryFilterFunc
routingTablePeerFilter RouteTableFilterFunc
rtPeerDiversityFilter peerdiversity.PeerIPGroupFilter

autoRefresh bool

Expand All @@ -131,6 +140,14 @@ type IpfsDHT struct {
enableProviders, enableValues bool

fixLowPeersChan chan struct{}

addPeerToRTChan chan addPeerRTReq
refreshFinishedCh chan struct{}

rtFreezeTimeout time.Duration

// configuration variables for tests
testAddressUpdateProcessing bool
}

// Assert that IPFS assumptions about interfaces aren't broken. These aren't a
Expand Down Expand Up @@ -173,6 +190,8 @@ func New(ctx context.Context, h host.Host, options ...Option) (*IpfsDHT, error)

dht.Validator = cfg.validator

dht.testAddressUpdateProcessing = cfg.testAddressUpdateProcessing

dht.auto = cfg.mode
switch cfg.mode {
case ModeAuto, ModeClient:
Expand Down Expand Up @@ -207,7 +226,11 @@ func New(ctx context.Context, h host.Host, options ...Option) (*IpfsDHT, error)
go dht.persistRTPeersInPeerStore()

// listens to the fix low peers chan and tries to fix the Routing Table
dht.proc.Go(dht.fixLowPeersRoutine)
if !cfg.disableFixLowPeers {
dht.proc.Go(dht.fixLowPeersRoutine)
}

dht.proc.Go(dht.rtPeerLoop)

return dht, nil
}
Expand Down Expand Up @@ -271,7 +294,12 @@ func makeDHT(ctx context.Context, h host.Host, cfg config) (*IpfsDHT, error) {
beta: cfg.resiliency,
queryPeerFilter: cfg.queryPeerFilter,
routingTablePeerFilter: cfg.routingTable.peerFilter,
fixLowPeersChan: make(chan struct{}, 1),
rtPeerDiversityFilter: cfg.routingTable.diversityFilter,

fixLowPeersChan: make(chan struct{}, 1),

addPeerToRTChan: make(chan addPeerRTReq),
refreshFinishedCh: make(chan struct{}),
}

var maxLastSuccessfulOutboundThreshold time.Duration
Expand Down Expand Up @@ -320,6 +348,8 @@ func makeDHT(ctx context.Context, h host.Host, cfg config) (*IpfsDHT, error) {
}
dht.ProviderManager = pm

dht.rtFreezeTimeout = rtFreezeTimeout

return dht, nil
}

Expand All @@ -340,13 +370,32 @@ func makeRtRefreshManager(dht *IpfsDHT, cfg config, maxLastSuccessfulOutboundThr
queryFnc,
cfg.routingTable.refreshQueryTimeout,
cfg.routingTable.refreshInterval,
maxLastSuccessfulOutboundThreshold)
maxLastSuccessfulOutboundThreshold,
dht.refreshFinishedCh)

return r, err
}

func makeRoutingTable(dht *IpfsDHT, cfg config, maxLastSuccessfulOutboundThreshold time.Duration) (*kb.RoutingTable, error) {
rt, err := kb.NewRoutingTable(cfg.bucketSize, dht.selfKey, time.Minute, dht.host.Peerstore(), maxLastSuccessfulOutboundThreshold)
// make a Routing Table Diversity Filter
var filter *peerdiversity.Filter
if dht.rtPeerDiversityFilter != nil {
df, err := peerdiversity.NewFilter(dht.rtPeerDiversityFilter, "rt/diversity", func(p peer.ID) int {
return kb.CommonPrefixLen(dht.selfKey, kb.ConvertPeerID(p))
})

if err != nil {
return nil, fmt.Errorf("failed to construct peer diversity filter: %w", err)
}

filter = df
}

rt, err := kb.NewRoutingTable(cfg.bucketSize, dht.selfKey, time.Minute, dht.host.Peerstore(), maxLastSuccessfulOutboundThreshold, filter)
if err != nil {
return nil, err
}

cmgr := dht.host.ConnManager()

rt.PeerAdded = func(p peer.ID) {
Expand All @@ -368,6 +417,11 @@ func makeRoutingTable(dht *IpfsDHT, cfg config, maxLastSuccessfulOutboundThresho
return rt, err
}

// GetRoutingTableDiversityStats returns the diversity stats for the Routing Table.
func (dht *IpfsDHT) GetRoutingTableDiversityStats() []peerdiversity.CplDiversityStats {
return dht.routingTable.GetDiversityStats()
}

// Mode allows introspection of the operation mode of the DHT
func (dht *IpfsDHT) Mode() ModeOpt {
return dht.auto
Expand Down Expand Up @@ -496,19 +550,19 @@ func (dht *IpfsDHT) getValueOrPeers(ctx context.Context, p peer.ID, key string)
// Perhaps we were given closer peers
peers := pb.PBPeersToPeerInfos(pmes.GetCloserPeers())

if record := pmes.GetRecord(); record != nil {
if rec := pmes.GetRecord(); rec != nil {
// Success! We were given the value
logger.Debug("got value")

// make sure record is valid.
err = dht.Validator.Validate(string(record.GetKey()), record.GetValue())
err = dht.Validator.Validate(string(rec.GetKey()), rec.GetValue())
if err != nil {
logger.Debug("received invalid record (discarded)")
// return a sentinal to signify an invalid record was received
err = errInvalidRecord
record = new(recpb.Record)
rec = new(recpb.Record)
}
return record, peers, err
return rec, peers, err
}

if len(peers) > 0 {
Expand Down Expand Up @@ -554,6 +608,50 @@ func (dht *IpfsDHT) putLocal(key string, rec *recpb.Record) error {
return dht.datastore.Put(mkDsKey(key), data)
}

func (dht *IpfsDHT) rtPeerLoop(proc goprocess.Process) {
bootstrapCount := 0
isBootsrapping := false
var timerCh <-chan time.Time

for {
select {
case <-timerCh:
dht.routingTable.MarkAllPeersIrreplaceable()
case addReq := <-dht.addPeerToRTChan:
prevSize := dht.routingTable.Size()
if prevSize == 0 {
isBootsrapping = true
bootstrapCount = 0
timerCh = nil
}
newlyAdded, err := dht.routingTable.TryAddPeer(addReq.p, addReq.queryPeer, isBootsrapping)
if err != nil {
// peer not added.
continue
}
if !newlyAdded && addReq.queryPeer {
// the peer is already in our RT, but we just successfully queried it and so let's give it a
// bump on the query time so we don't ping it too soon for a liveliness check.
dht.routingTable.UpdateLastSuccessfulOutboundQueryAt(addReq.p, time.Now())
}
case <-dht.refreshFinishedCh:
bootstrapCount = bootstrapCount + 1
if bootstrapCount == 2 {
timerCh = time.NewTimer(dht.rtFreezeTimeout).C
}

old := isBootsrapping
isBootsrapping = false
if old {
dht.rtRefreshManager.RefreshNoWait()
}

case <-proc.Closing():
return
}
}
}

// peerFound signals the routingTable that we've found a peer that
// might support the DHT protocol.
// If we have a connection a peer but no exchange of a query RPC ->
Expand All @@ -575,16 +673,11 @@ func (dht *IpfsDHT) peerFound(ctx context.Context, p peer.ID, queryPeer bool) {
if err != nil {
logger.Errorw("failed to validate if peer is a DHT peer", "peer", p, "error", err)
} else if b {
newlyAdded, err := dht.routingTable.TryAddPeer(p, queryPeer)
if err != nil {
// peer not added.
select {
case dht.addPeerToRTChan <- addPeerRTReq{p, queryPeer}:
case <-dht.ctx.Done():
return
}
if !newlyAdded && queryPeer {
// the peer is already in our RT, but we just successfully queried it and so let's give it a
// bump on the query time so we don't ping it too soon for a liveliness check.
dht.routingTable.UpdateLastSuccessfulOutboundQueryAt(p, time.Now())
}
}
}

Expand Down
133 changes: 132 additions & 1 deletion dht_bootstrap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
func TestSelfWalkOnAddressChange(t *testing.T) {
ctx := context.Background()
// create three DHT instances with auto refresh disabled
d1 := setupDHT(ctx, t, false, DisableAutoRefresh())
d1 := setupDHT(ctx, t, false, DisableAutoRefresh(), forceAddressUpdateProcessing(t))
d2 := setupDHT(ctx, t, false, DisableAutoRefresh())
d3 := setupDHT(ctx, t, false, DisableAutoRefresh())

Expand Down Expand Up @@ -68,3 +68,134 @@ func TestDefaultBootstrappers(t *testing.T) {
}
require.Empty(t, dfmap)
}

func TestBootstrappersReplacable(t *testing.T) {
old := rtFreezeTimeout
rtFreezeTimeout = 100 * time.Millisecond
defer func() {
rtFreezeTimeout = old
}()
ctx := context.Background()
d := setupDHT(ctx, t, false, disableFixLowPeersRoutine(t), BucketSize(2))
defer d.host.Close()
defer d.Close()

var d1 *IpfsDHT
var d2 *IpfsDHT

// d1 & d2 have a cpl of 0
for {
d1 = setupDHT(ctx, t, false, disableFixLowPeersRoutine(t))
if kb.CommonPrefixLen(d.selfKey, d1.selfKey) == 0 {
break
}
}

for {
d2 = setupDHT(ctx, t, false, disableFixLowPeersRoutine(t))
if kb.CommonPrefixLen(d.selfKey, d2.selfKey) == 0 {
break
}
}
defer d1.host.Close()
defer d1.Close()

defer d2.host.Close()
defer d2.Close()

connect(t, ctx, d, d1)
connect(t, ctx, d, d2)
require.Len(t, d.routingTable.ListPeers(), 2)

// d3 & d4 with cpl=0 will go in as d1 & d2 are replacable.
var d3 *IpfsDHT
var d4 *IpfsDHT

for {
d3 = setupDHT(ctx, t, false, disableFixLowPeersRoutine(t))
if kb.CommonPrefixLen(d.selfKey, d3.selfKey) == 0 {
break
}
}

for {
d4 = setupDHT(ctx, t, false, disableFixLowPeersRoutine(t))
if kb.CommonPrefixLen(d.selfKey, d4.selfKey) == 0 {
break
}
}

defer d3.host.Close()
defer d3.Close()
defer d4.host.Close()
defer d4.Close()

connect(t, ctx, d, d3)
connect(t, ctx, d, d4)
require.Len(t, d.routingTable.ListPeers(), 2)
require.Contains(t, d.routingTable.ListPeers(), d3.self)
require.Contains(t, d.routingTable.ListPeers(), d4.self)

// do couple of refreshes and wait for the Routing Table to be "frozen".
<-d.RefreshRoutingTable()
<-d.RefreshRoutingTable()
time.Sleep(1 * time.Second)

// adding d5 fails because RT is frozen
var d5 *IpfsDHT
for {
d5 = setupDHT(ctx, t, false, disableFixLowPeersRoutine(t))
if kb.CommonPrefixLen(d.selfKey, d5.selfKey) == 0 {
break
}
}
defer d5.host.Close()
defer d5.Close()

connectNoSync(t, ctx, d, d5)
time.Sleep(500 * time.Millisecond)
require.Len(t, d.routingTable.ListPeers(), 2)
require.Contains(t, d.routingTable.ListPeers(), d3.self)
require.Contains(t, d.routingTable.ListPeers(), d4.self)

// Let's empty the routing table
for _, p := range d.routingTable.ListPeers() {
d.routingTable.RemovePeer(p)
}
require.Len(t, d.routingTable.ListPeers(), 0)

// adding d1 & d2 works now because there is space in the Routing Table
require.NoError(t, d.host.Network().ClosePeer(d1.self))
require.NoError(t, d.host.Network().ClosePeer(d2.self))
connect(t, ctx, d, d1)
connect(t, ctx, d, d2)
require.Len(t, d.routingTable.ListPeers(), 2)
require.Contains(t, d.routingTable.ListPeers(), d1.self)
require.Contains(t, d.routingTable.ListPeers(), d2.self)

// adding d3 & d4 also works because the RT is not frozen.
require.NoError(t, d.host.Network().ClosePeer(d3.self))
require.NoError(t, d.host.Network().ClosePeer(d4.self))
connect(t, ctx, d, d3)
connect(t, ctx, d, d4)
require.Len(t, d.routingTable.ListPeers(), 2)
require.Contains(t, d.routingTable.ListPeers(), d3.self)
require.Contains(t, d.routingTable.ListPeers(), d4.self)

// run refreshes and freeze the RT
<-d.RefreshRoutingTable()
<-d.RefreshRoutingTable()
time.Sleep(1 * time.Second)
// cant add d1 & d5 because RT is frozen.
require.NoError(t, d.host.Network().ClosePeer(d1.self))
require.NoError(t, d.host.Network().ClosePeer(d5.self))
connectNoSync(t, ctx, d, d1)
connectNoSync(t, ctx, d, d5)
d.peerFound(ctx, d5.self, true)
d.peerFound(ctx, d1.self, true)
time.Sleep(1 * time.Second)

require.Len(t, d.routingTable.ListPeers(), 2)
require.Contains(t, d.routingTable.ListPeers(), d3.self)
require.Contains(t, d.routingTable.ListPeers(), d4.self)
}
Loading

0 comments on commit e788ffc

Please sign in to comment.