Skip to content

Commit

Permalink
feat(modp2p): Ping and ConnectionState APIs
Browse files Browse the repository at this point in the history
  • Loading branch information
Wondertan committed Dec 9, 2024
1 parent c5008b6 commit 1e3b12e
Show file tree
Hide file tree
Showing 4 changed files with 170 additions and 0 deletions.
74 changes: 74 additions & 0 deletions nodebuilder/p2p/cmd/p2p.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
package cmd

import (
"github.com/celestiaorg/celestia-node/nodebuilder/p2p"
"github.com/libp2p/go-libp2p/core/metrics"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
ma2 "github.com/multiformats/go-multiaddr"
"github.com/spf13/cobra"
"time"

cmdnode "github.com/celestiaorg/celestia-node/cmd"
)
Expand Down Expand Up @@ -35,6 +37,8 @@ func init() {
bandwidthForProtocolCmd,
pubsubPeersCmd,
pubsubTopicsCmd,
connectionInfoCmd,
pingCmd,
)
}

Expand Down Expand Up @@ -599,3 +603,73 @@ var pubsubTopicsCmd = &cobra.Command{
return cmdnode.PrintOutput(topics, err, formatter)
},
}

var connectionInfoCmd = &cobra.Command{
Use: "connection-state [peerID]",
Short: "Gets connection info for a given peer ID",
Args: cobra.ExactArgs(1),
RunE: func(cmd *cobra.Command, args []string) error {
client, err := cmdnode.ParseClientFromCtx(cmd.Context())
if err != nil {
return err
}
defer client.Close()

pid, err := peer.Decode(args[0])
if err != nil {
return err
}

infos, err := client.P2P.ConnectionState(cmd.Context(), pid)
return cmdnode.PrintOutput(infos, err, func(i interface{}) interface{} {
type state struct {
Info network.ConnectionState
NumStreams int
Direction string
Opened string
Limited bool
}

states := i.([]p2p.ConnectionState)
infos := make([]state, len(states))
for i, s := range states {
infos[i] = state{
Info: s.Info,
NumStreams: s.NumStreams,
Direction: s.Direction.String(),
Opened: s.Opened.Format("2006-01-02 15:04:05"),
Limited: s.Limited,
}
}

if len(infos) == 1 {
return infos[0]
} else {
return infos
}
})
},
}

var pingCmd = &cobra.Command{
Use: "ping [peerID]",
Short: "Pings given peer and tell how much time that took or errors",
Args: cobra.ExactArgs(1),
RunE: func(cmd *cobra.Command, args []string) error {
client, err := cmdnode.ParseClientFromCtx(cmd.Context())
if err != nil {
return err
}
defer client.Close()

pid, err := peer.Decode(args[0])
if err != nil {
return err
}

pingDuration, err := client.P2P.Ping(cmd.Context(), pid)
return cmdnode.PrintOutput(pingDuration, err, func(i interface{}) interface{} {
return i.(time.Duration).String()
})
},
}
32 changes: 32 additions & 0 deletions nodebuilder/p2p/mocks/api.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

59 changes: 59 additions & 0 deletions nodebuilder/p2p/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ package p2p
import (
"context"
"fmt"
"github.com/libp2p/go-libp2p/p2p/protocol/ping"
"reflect"
"time"

pubsub "github.com/libp2p/go-libp2p-pubsub"
libhost "github.com/libp2p/go-libp2p/core/host"
Expand All @@ -18,6 +20,21 @@ import (

var _ Module = (*API)(nil)

// ConnectionState holds information about a connection.
type ConnectionState struct {
Info network.ConnectionState
// NumStreams is the number of streams on the connection.
NumStreams int
// Direction specifies whether this is an inbound or an outbound connection.
Direction network.Direction
// Opened is the timestamp when this connection was opened.
Opened time.Time
// Limited indicates that this connection is Limited. It maybe limited by
// bytes or time. In practice, this is a connection formed over a circuit v2
// relay.
Limited bool
}

// Module represents all accessible methods related to the node's p2p
// host / operations.
//
Expand All @@ -39,6 +56,9 @@ type Module interface {
ClosePeer(ctx context.Context, id peer.ID) error
// Connectedness returns a state signaling connection capabilities.
Connectedness(ctx context.Context, id peer.ID) (network.Connectedness, error)
// ConnectionState returns information about each *active* connection to the peer.
// NOTE: At most cases there should be only a single connection.
ConnectionState(ctx context.Context, id peer.ID) ([]ConnectionState, error)
// NATStatus returns the current NAT status.
NATStatus(context.Context) (network.Reachability, error)

Expand Down Expand Up @@ -80,6 +100,9 @@ type Module interface {
PubSubPeers(ctx context.Context, topic string) ([]peer.ID, error)
// PubSubTopics reports current PubSubTopics the node participates in.
PubSubTopics(ctx context.Context) ([]string, error)

// Ping pings the selected peer and returns time it took or error.
Ping(ctx context.Context, peer peer.ID) (time.Duration, error)
}

// module contains all components necessary to access information and
Expand Down Expand Up @@ -205,6 +228,32 @@ func (m *module) PubSubTopics(_ context.Context) ([]string, error) {
return m.ps.GetTopics(), nil
}

func (m *module) Ping(ctx context.Context, peer peer.ID) (time.Duration, error) {
res := <-ping.Ping(ctx, m.host, peer) // context is handled for us
return res.RTT, res.Error
}

func (m *module) ConnectionState(_ context.Context, peer peer.ID) ([]ConnectionState, error) {
cons := m.host.Network().ConnsToPeer(peer)
if len(cons) == 0 {
return nil, fmt.Errorf("no connections to peer %s", peer)
}

conInfos := make([]ConnectionState, len(cons))
for i, con := range cons {
stat := con.Stat()
conInfos[i] = ConnectionState{
Info: con.ConnState(),
NumStreams: stat.NumStreams,
Direction: stat.Direction,
Opened: stat.Opened,
Limited: stat.Limited,
}
}

return conInfos, nil
}

// API is a wrapper around Module for the RPC.
//
//nolint:dupl
Expand All @@ -229,6 +278,8 @@ type API struct {
ResourceState func(context.Context) (rcmgr.ResourceManagerStat, error) `perm:"admin"`
PubSubPeers func(ctx context.Context, topic string) ([]peer.ID, error) `perm:"admin"`
PubSubTopics func(ctx context.Context) ([]string, error) `perm:"admin"`
Ping func(ctx context.Context, peer peer.ID) (time.Duration, error) `perm:"admin"`
ConnectionState func(context.Context, peer.ID) ([]ConnectionState, error) `perm:"admin"`
}
}

Expand Down Expand Up @@ -307,3 +358,11 @@ func (api *API) PubSubPeers(ctx context.Context, topic string) ([]peer.ID, error
func (api *API) PubSubTopics(ctx context.Context) ([]string, error) {
return api.Internal.PubSubTopics(ctx)
}

func (api *API) Ping(ctx context.Context, peer peer.ID) (time.Duration, error) {
return api.Internal.Ping(ctx, peer)
}

func (api *API) ConnectionState(ctx context.Context, peer peer.ID) ([]ConnectionState, error) {
return api.Internal.ConnectionState(ctx, peer)
}
5 changes: 5 additions & 0 deletions nodebuilder/p2p/p2p_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ func TestP2PModule_Host(t *testing.T) {

connectedness, err := mgr.Connectedness(ctx, peer.ID())
require.NoError(t, err)

infos, err := mgr.ConnectionState(ctx, peer.ID())
require.NoError(t, err)
require.GreaterOrEqual(t, len(infos), 1)

assert.Equal(t, host.Network().Connectedness(peer.ID()), connectedness)
// now disconnect using manager and check for connectedness match again
assert.NoError(t, mgr.ClosePeer(ctx, peer.ID()))
Expand Down

0 comments on commit 1e3b12e

Please sign in to comment.