Skip to content

Commit

Permalink
networking: Update connected-peers tests.
Browse files Browse the repository at this point in the history
  • Loading branch information
shamil-gadelshin committed Oct 9, 2023
1 parent fdddc4a commit 2ba4630
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
mod handler;

#[cfg(not(windows))] // TODO: Restore tests on windows after changing the waiting algorithm
#[cfg(test)]
mod tests;

Expand Down
46 changes: 40 additions & 6 deletions crates/subspace-networking/src/protocols/connected_peers/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use libp2p::swarm::{NetworkBehaviour, SwarmBuilder, SwarmEvent};
use libp2p::{yamux, Swarm};
use libp2p_swarm_test::SwarmExt;
use std::time::Duration;
use tokio::time::sleep;
use tokio::sync;

#[derive(Debug, Clone)]
struct ConnectedPeersInstance;
Expand All @@ -25,6 +25,30 @@ const DECISION_TIMEOUT: Duration = Duration::from_millis(900);
#[cfg(windows)]
const LONG_DELAY: Duration = Duration::from_millis(3000);

#[cfg(not(windows))]
fn get_delay_signal_channel() -> sync::mpsc::UnboundedReceiver<()> {
let (tx, rx) = sync::mpsc::unbounded_channel::<()>();
tokio::spawn(async move {
// send a signal after the waiting
tokio::time::sleep(LONG_DELAY).await;
tx.send(()).unwrap();
});

rx
}

#[cfg(windows)]
fn get_delay_signal_channel() -> sync::mpsc::UnboundedReceiver<()> {
let (tx, rx) = sync::mpsc::unbounded_channel::<()>();

std::thread::spawn(move || {
std::thread::sleep(LONG_DELAY);
tx.send(()).unwrap();
});

rx
}

#[tokio::test()]
async fn test_connection_breaks_after_timeout_without_decision() {
let mut peer1 = new_ephemeral(
Expand All @@ -46,11 +70,13 @@ async fn test_connection_breaks_after_timeout_without_decision() {
peer2.listen().await;
peer1.connect(&mut peer2).await;

let mut delay_signal = get_delay_signal_channel();

loop {
select! {
_ = peer1.next_swarm_event().fuse() => {},
_ = peer2.next_swarm_event().fuse() => {},
_ = sleep(LONG_DELAY).fuse() => {
_ = delay_signal.recv().fuse() => {
break;
}
}
Expand Down Expand Up @@ -89,11 +115,13 @@ async fn test_connection_decision() {
.behaviour_mut()
.update_keep_alive_status(*peer1.local_peer_id(), true);

let mut delay_signal = get_delay_signal_channel();

loop {
select! {
_ = peer1.next_swarm_event().fuse() => {},
_ = peer2.next_swarm_event().fuse() => {},
_ = sleep(LONG_DELAY).fuse() => {
_ = delay_signal.recv().fuse() => {
break;
}
}
Expand Down Expand Up @@ -132,11 +160,13 @@ async fn test_connection_decision_symmetry() {
.behaviour_mut()
.update_keep_alive_status(*peer1.local_peer_id(), false);

let mut delay_signal = get_delay_signal_channel();

loop {
select! {
_ = peer1.next_swarm_event().fuse() => {},
_ = peer2.next_swarm_event().fuse() => {},
_ = sleep(LONG_DELAY).fuse() => {
_ = delay_signal.recv().fuse() => {
break;
}
}
Expand All @@ -161,14 +191,16 @@ async fn test_new_peer_request() {

peer1.listen().await;

let mut delay_signal = get_delay_signal_channel();

loop {
select! {
event = peer1.next_swarm_event().fuse() => {
if matches!(event, SwarmEvent::Behaviour(ConnectedPeersEvent::<ConnectedPeersInstance>::NewDialingCandidatesRequested(..))){
break;
}
},
_ = sleep(LONG_DELAY).fuse() => {
_ = delay_signal.recv().fuse() => {
panic!("No new peers requests.");
}
}
Expand Down Expand Up @@ -237,12 +269,14 @@ async fn test_target_connected_peer_limit_number() {
.behaviour_mut()
.update_keep_alive_status(*peer2.local_peer_id(), true);

let mut delay_signal = get_delay_signal_channel();

loop {
select! {
_ = peer1.next_swarm_event().fuse() => {},
_ = peer2.next_swarm_event().fuse() => {},
_ = peer3.next_swarm_event().fuse() => {},
_ = sleep(LONG_DELAY).fuse() => {
_ = delay_signal.recv().fuse() => {
break;
}
}
Expand Down

0 comments on commit 2ba4630

Please sign in to comment.