-
Notifications
You must be signed in to change notification settings - Fork 1.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[DRAFT] feat: conntracker #3032
Draft
MarcoPolo
wants to merge
6
commits into
master
Choose a base branch
from
marco/conntracker-service
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Draft
Changes from all commits
Commits
Show all changes
6 commits
Select commit
Hold shift + click to select a range
370f60d
feat(host): Add conntracker to track connections and their protocols
MarcoPolo b6efcf2
FIXME: remove this commit
MarcoPolo 2c22768
wip
MarcoPolo 3ba2089
remove unused const
MarcoPolo 915400e
fix: identify: push should not dial a new connection
MarcoPolo ff25249
fix test
MarcoPolo File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -22,6 +22,7 @@ import ( | |
"github.com/libp2p/go-libp2p/core/transport" | ||
"github.com/libp2p/go-libp2p/p2p/host/autonat" | ||
"github.com/libp2p/go-libp2p/p2p/host/basic/internal/backoff" | ||
"github.com/libp2p/go-libp2p/p2p/host/conntracker" | ||
"github.com/libp2p/go-libp2p/p2p/host/eventbus" | ||
"github.com/libp2p/go-libp2p/p2p/host/pstoremanager" | ||
"github.com/libp2p/go-libp2p/p2p/host/relaysvc" | ||
|
@@ -74,6 +75,7 @@ type BasicHost struct { | |
// keep track of resources we need to wait on before shutting down | ||
refCount sync.WaitGroup | ||
|
||
connTracker *conntracker.ConnTracker | ||
network network.Network | ||
psManager *pstoremanager.PeerstoreManager | ||
mux *msmux.MultistreamMuxer[protocol.ID] | ||
|
@@ -92,6 +94,7 @@ type BasicHost struct { | |
emitters struct { | ||
evtLocalProtocolsUpdated event.Emitter | ||
evtLocalAddrsUpdated event.Emitter | ||
evtProtoNegotiation event.Emitter | ||
} | ||
|
||
addrChangeChan chan struct{} | ||
|
@@ -170,6 +173,8 @@ type HostOpts struct { | |
DisableIdentifyAddressDiscovery bool | ||
EnableAutoNATv2 bool | ||
AutoNATv2Dialer host.Host | ||
|
||
ConnTracker *conntracker.ConnTracker | ||
} | ||
|
||
// NewHost constructs a new *BasicHost and activates it by attaching its stream and connection handlers to the given inet.Network. | ||
|
@@ -185,7 +190,21 @@ func NewHost(n network.Network, opts *HostOpts) (*BasicHost, error) { | |
if err != nil { | ||
return nil, err | ||
} | ||
|
||
hostCtx, cancel := context.WithCancel(context.Background()) | ||
|
||
// FIXME: | ||
// Remove this. Hack to try the tests quickly | ||
if opts.ConnTracker == nil { | ||
opts.ConnTracker = &conntracker.ConnTracker{} | ||
opts.ConnTracker.Start(opts.EventBus, n) | ||
go func() { | ||
<-hostCtx.Done() | ||
opts.ConnTracker.Stop() | ||
|
||
}() | ||
} | ||
|
||
h := &BasicHost{ | ||
network: n, | ||
psManager: psManager, | ||
|
@@ -197,6 +216,7 @@ func NewHost(n network.Network, opts *HostOpts) (*BasicHost, error) { | |
ctx: hostCtx, | ||
ctxCancel: cancel, | ||
disableSignedPeerRecord: opts.DisableSignedPeerRecord, | ||
connTracker: opts.ConnTracker, | ||
} | ||
|
||
h.updateLocalIpAddr() | ||
|
@@ -207,6 +227,9 @@ func NewHost(n network.Network, opts *HostOpts) (*BasicHost, error) { | |
if h.emitters.evtLocalAddrsUpdated, err = h.eventbus.Emitter(&event.EvtLocalAddressesUpdated{}, eventbus.Stateful); err != nil { | ||
return nil, err | ||
} | ||
if h.emitters.evtProtoNegotiation, err = h.eventbus.Emitter(&event.EvtProtocolNegotiationSuccess{}); err != nil { | ||
return nil, err | ||
} | ||
|
||
if !h.disableSignedPeerRecord { | ||
cab, ok := peerstore.GetCertifiedAddrBook(n.Peerstore()) | ||
|
@@ -685,6 +708,80 @@ func (h *BasicHost) RemoveStreamHandler(pid protocol.ID) { | |
}) | ||
} | ||
|
||
func (h *BasicHost) newConnFromConnTracker(ctx context.Context, p peer.ID) (conntracker.ConnWithMeta, error) { | ||
connFilter := conntracker.NoLimitedConnFilter | ||
if canUseLimitedConn, _ := network.GetAllowLimitedConn(ctx); canUseLimitedConn { | ||
connFilter = nil | ||
} | ||
|
||
// requiredProtos is nil because we will fallback to MSS to negotiate the | ||
// protocol if none of our prefrred protocols were broadcasted by identify. | ||
var requiredProtos []protocol.ID | ||
|
||
// Do we have a conn? | ||
// TODO: for both usages of conntracker, use a sort fn that sorts by more streams. | ||
conn, err := h.connTracker.GetBestConn(ctx, p, conntracker.GetBestConnOpts{ | ||
OneOf: requiredProtos, | ||
FilterFn: connFilter, | ||
WaitForIdentify: true, | ||
AllowNoConn: true, | ||
}) | ||
if err == nil { | ||
return conn, nil | ||
} | ||
|
||
var errCh chan error | ||
if nodial, _ := network.GetNoDial(ctx); !nodial { | ||
errCh = make(chan error, 1) | ||
go func() { | ||
err := h.Connect(ctx, peer.AddrInfo{ID: p}) | ||
if err != nil { | ||
select { | ||
case errCh <- err: | ||
default: | ||
} | ||
} | ||
}() | ||
} | ||
|
||
// Wait for a connection that works for us | ||
connChan, err := h.connTracker.GetBestConnChan(ctx, p, conntracker.GetBestConnOpts{ | ||
OneOf: requiredProtos, | ||
FilterFn: connFilter, | ||
WaitForIdentify: true, // Old behavior | ||
}) | ||
if err != nil { | ||
return conntracker.ConnWithMeta{}, err | ||
} | ||
Comment on lines
+734
to
+755
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't like this pattern: Dial the peer through swarm, then wait on a conntracker chan. Letting the conn tracker dial it also seems bad. Maybe, just do |
||
|
||
select { | ||
case <-ctx.Done(): | ||
return conntracker.ConnWithMeta{}, ctx.Err() | ||
case <-h.ctx.Done(): | ||
return conntracker.ConnWithMeta{}, h.ctx.Err() | ||
case err = <-errCh: | ||
return conntracker.ConnWithMeta{}, err | ||
case connWithMeta := <-connChan: | ||
return connWithMeta, nil | ||
} | ||
} | ||
|
||
func (h *BasicHost) newStreamWithConnTracker(ctx context.Context, p peer.ID, pids ...protocol.ID) (network.Stream, protocol.ID, error) { | ||
var preferredProto protocol.ID | ||
connWithMeta, err := h.newConnFromConnTracker(ctx, p) | ||
if err != nil { | ||
return nil, "", err | ||
} | ||
for _, proto := range pids { | ||
if connWithMeta.SupportsProtocol(proto) { | ||
preferredProto = proto | ||
break | ||
} | ||
} | ||
s, err := connWithMeta.NewStream(ctx) | ||
return s, preferredProto, err | ||
} | ||
|
||
// NewStream opens a new stream to given peer p, and writes a p2p/protocol | ||
// header with given protocol.ID. If there is no connection to p, attempts | ||
// to create one. If ProtocolID is "", writes no header. | ||
|
@@ -698,56 +795,71 @@ func (h *BasicHost) NewStream(ctx context.Context, p peer.ID, pids ...protocol.I | |
} | ||
} | ||
|
||
// If the caller wants to prevent the host from dialing, it should use the NoDial option. | ||
if nodial, _ := network.GetNoDial(ctx); !nodial { | ||
err := h.Connect(ctx, peer.AddrInfo{ID: p}) | ||
var ( | ||
err error | ||
s network.Stream | ||
preferredProto protocol.ID | ||
) | ||
|
||
if h.connTracker != nil { | ||
s, preferredProto, err = h.newStreamWithConnTracker(ctx, p, pids...) | ||
if err != nil { | ||
return nil, err | ||
} | ||
} | ||
|
||
s, err := h.Network().NewStream(network.WithNoDial(ctx, "already dialed"), p) | ||
if err != nil { | ||
// TODO: It would be nicer to get the actual error from the swarm, | ||
// but this will require some more work. | ||
if errors.Is(err, network.ErrNoConn) { | ||
return nil, errors.New("connection failed") | ||
} else { | ||
// If the caller wants to prevent the host from dialing, it should use the NoDial option. | ||
if nodial, _ := network.GetNoDial(ctx); !nodial { | ||
err := h.Connect(ctx, peer.AddrInfo{ID: p}) | ||
if err != nil { | ||
return nil, err | ||
} | ||
} | ||
return nil, fmt.Errorf("failed to open stream: %w", err) | ||
} | ||
defer func() { | ||
if strErr != nil && s != nil { | ||
s.Reset() | ||
|
||
s, err = h.Network().NewStream(network.WithNoDial(ctx, "already dialed"), p) | ||
if err != nil { | ||
// TODO: It would be nicer to get the actual error from the swarm, | ||
// but this will require some more work. | ||
if errors.Is(err, network.ErrNoConn) { | ||
return nil, errors.New("connection failed") | ||
} | ||
return nil, fmt.Errorf("failed to open stream: %w", err) | ||
} | ||
}() | ||
defer func() { | ||
if strErr != nil && s != nil { | ||
s.Reset() | ||
} | ||
}() | ||
|
||
// Wait for any in-progress identifies on the connection to finish. This | ||
// is faster than negotiating. | ||
// | ||
// If the other side doesn't support identify, that's fine. This will | ||
// just be a no-op. | ||
select { | ||
case <-h.ids.IdentifyWait(s.Conn()): | ||
case <-ctx.Done(): | ||
return nil, fmt.Errorf("identify failed to complete: %w", ctx.Err()) | ||
} | ||
// Wait for any in-progress identifies on the connection to finish. This | ||
// is faster than negotiating. | ||
// | ||
// If the other side doesn't support identify, that's fine. This will | ||
// just be a no-op. | ||
select { | ||
case <-h.ids.IdentifyWait(s.Conn()): | ||
case <-ctx.Done(): | ||
return nil, fmt.Errorf("identify failed to complete: %w", ctx.Err()) | ||
} | ||
|
||
pref, err := h.preferredProtocol(p, pids) | ||
if err != nil { | ||
return nil, err | ||
preferredProto, err = h.preferredProtocol(p, pids) | ||
if err != nil { | ||
return nil, err | ||
} | ||
} | ||
|
||
if pref != "" { | ||
if err := s.SetProtocol(pref); err != nil { | ||
if preferredProto != "" { | ||
if err := s.SetProtocol(preferredProto); err != nil { | ||
return nil, err | ||
} | ||
lzcon := msmux.NewMSSelect(s, pref) | ||
lzcon := msmux.NewMSSelect(s, preferredProto) | ||
return &streamWrapper{ | ||
Stream: s, | ||
rw: lzcon, | ||
}, nil | ||
} | ||
|
||
// Fallback to MultiStreamSelect. | ||
|
||
// Negotiate the protocol in the background, obeying the context. | ||
var selected protocol.ID | ||
errCh := make(chan error, 1) | ||
|
@@ -771,6 +883,8 @@ func (h *BasicHost) NewStream(ctx context.Context, p peer.ID, pids ...protocol.I | |
return nil, err | ||
} | ||
_ = h.Peerstore().AddProtocols(p, selected) // adding the protocol to the peerstore isn't critical | ||
h.emitters.evtProtoNegotiation.Emit(event.EvtProtocolNegotiationSuccess{Peer: p, Conn: s.Conn(), Protocol: selected}) | ||
|
||
return s, nil | ||
} | ||
|
||
|
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How can users of basic host get access to the ConnTracker?