Skip to content

Commit

Permalink
Merge branch 'main' into greg/config2
Browse files Browse the repository at this point in the history
  • Loading branch information
romac committed May 15, 2024
2 parents bdc621f + a2192b7 commit fb27e15
Show file tree
Hide file tree
Showing 12 changed files with 67 additions and 39 deletions.
68 changes: 45 additions & 23 deletions code/actors/src/consensus.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::collections::VecDeque;
use std::collections::{BTreeSet, VecDeque};
use std::fmt::Display;
use std::sync::Arc;

Expand Down Expand Up @@ -85,6 +85,7 @@ where
timers: ActorRef<TimersMsg>,
msg_queue: VecDeque<Msg<Ctx>>,
validator_set: Ctx::ValidatorSet,
connected_peers: BTreeSet<PeerId>,
}

impl<Ctx> Consensus<Ctx>
Expand Down Expand Up @@ -149,24 +150,13 @@ where
myself: ActorRef<Msg<Ctx>>,
state: &mut State<Ctx>,
) -> Result<(), ractor::ActorProcessingErr> {
match event {
GossipEvent::Listening(addr) => {
info!("Listening on {addr}");
}
GossipEvent::PeerConnected(peer_id) => {
info!("Connected to peer {peer_id}");
}
GossipEvent::PeerDisconnected(peer_id) => {
info!("Disconnected from peer {peer_id}");
}
GossipEvent::Message(from, Channel::Consensus, data) => {
let from = PeerId::new(from.to_string());
let msg = NetworkMsg::from_network_bytes(data).unwrap();
if let GossipEvent::Message(from, Channel::Consensus, data) = event {
let from = PeerId::new(from.to_string());
let msg = NetworkMsg::from_network_bytes(data).unwrap();

info!("Received message from peer {from}: {msg:?}");
info!("Received message from peer {from}: {msg:?}");

self.handle_network_msg(from, msg, myself, state).await?;
}
self.handle_network_msg(from, msg, myself, state).await?;
}

Ok(())
Expand Down Expand Up @@ -582,6 +572,7 @@ where
timers,
msg_queue: VecDeque::new(),
validator_set: self.params.initial_validator_set.clone(),
connected_peers: BTreeSet::new(),
})
}

Expand Down Expand Up @@ -668,12 +659,43 @@ where
}

Msg::GossipEvent(event) => {
if state.driver.round() == Round::Nil {
debug!("Received gossip event at round -1, queuing for later");
state.msg_queue.push_back(Msg::GossipEvent(event));
} else {
self.handle_gossip_event(event.as_ref(), myself, state)
.await?;
match event.as_ref() {
GossipEvent::Listening(addr) => {
info!("Listening on {addr}");
}

GossipEvent::PeerConnected(peer_id) => {
info!("Connected to peer {peer_id}");

state.connected_peers.insert(PeerId::new(peer_id));

if state.connected_peers.len() == state.validator_set.count() - 1 {
info!(
"Enough peers {} connected to start consensus",
state.connected_peers.len()
);

myself.cast(Msg::StartHeight(state.driver.height()))?;
}
}

GossipEvent::PeerDisconnected(peer_id) => {
info!("Disconnected from peer {peer_id}");

state.connected_peers.retain(|p| p != &PeerId::new(peer_id));

// TODO: pause/stop consensus, if necessary
}

_ => {
if state.driver.round() == Round::Nil {
debug!("Received gossip event at round -1, queuing for later");
state.msg_queue.push_back(Msg::GossipEvent(event));
} else {
self.handle_gossip_event(event.as_ref(), myself, state)
.await?;
}
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion code/actors/tests/actor_gossip_n3f1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ pub async fn one_node_fails_to_start() {
TestNode::correct(15),
TestNode::correct(10),
],
4,
0,
);

run_test(nodes).await
Expand Down
13 changes: 6 additions & 7 deletions code/actors/tests/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ use malachite_common::{Round, VotingPower};
use malachite_test::utils::make_validators;
use malachite_test::{Height, PrivateKey, Validator, ValidatorSet, Value};

use malachite_actors::node::Msg;
use malachite_actors::util::make_node_actor;

pub const SEED: u64 = 42;
Expand Down Expand Up @@ -94,7 +93,11 @@ pub async fn run_test<const N: usize>(test: Test<N>) {

let mut handles = Vec::with_capacity(N);

for (v, sk) in &test.vals_and_keys {
for i in 0..N {
if test.nodes[i].faults.contains(&Fault::NoStart) {
continue;
}
let (v, sk) = &test.vals_and_keys[i];
let (tx_decision, rx_decision) = mpsc::channel(HEIGHTS as usize);

let node = tokio::spawn(make_node_actor(
Expand All @@ -119,11 +122,7 @@ pub async fn run_test<const N: usize>(test: Test<N>) {
let mut actors = Vec::with_capacity(nodes.len());
let mut rxs = Vec::with_capacity(nodes.len());

for (actor, node_test, rx) in nodes {
if node_test.start_node() {
actor.cast(Msg::Start).unwrap();
}

for (actor, _, rx) in nodes {
actors.push(actor);
rxs.push(rx);
}
Expand Down
7 changes: 0 additions & 7 deletions code/cli/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
use std::time::Duration;

use malachite_actors::node::Msg;
use malachite_actors::util::make_node_actor;
use malachite_node::config::Config;
use malachite_test::{Address, PrivateKey, ValidatorSet};
Expand Down Expand Up @@ -67,10 +64,6 @@ pub async fn main() -> Result<(), Box<dyn std::error::Error>> {
}
});

tokio::time::sleep(Duration::from_secs(1)).await;

actor.cast(Msg::Start)?;

while let Some((height, round, value)) = rx_decision.recv().await {
info!(
"[{}] Decision at height {height} and round {round}: {value:?}",
Expand Down
2 changes: 2 additions & 0 deletions code/common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
rustdoc::private_intra_doc_links,
variant_size_differences
)]
// For coverage on nightly
#![allow(unexpected_cfgs)]
#![cfg_attr(not(test), deny(clippy::unwrap_used, clippy::panic))]
#![cfg_attr(coverage_nightly, feature(coverage_attribute))]

Expand Down
2 changes: 2 additions & 0 deletions code/driver/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
rustdoc::private_intra_doc_links,
variant_size_differences
)]
// For coverage on nightly
#![allow(unexpected_cfgs)]
#![cfg_attr(not(test), deny(clippy::unwrap_used, clippy::panic))]
#![cfg_attr(coverage_nightly, feature(coverage_attribute))]

Expand Down
2 changes: 2 additions & 0 deletions code/network/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
// For coverage on nightly
#![allow(unexpected_cfgs)]
#![cfg_attr(coverage_nightly, feature(coverage_attribute))]

mod msg;
Expand Down
2 changes: 1 addition & 1 deletion code/network/src/peer_id.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::str::FromStr;

use serde::{Deserialize, Serialize};

#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
#[serde(transparent)]
pub struct PeerId(String);

Expand Down
2 changes: 2 additions & 0 deletions code/node/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
// For coverage on nightly
#![allow(unexpected_cfgs)]
#![cfg_attr(coverage_nightly, feature(coverage_attribute))]

pub mod config;
2 changes: 2 additions & 0 deletions code/round/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
rustdoc::private_intra_doc_links,
variant_size_differences
)]
// For coverage on nightly
#![allow(unexpected_cfgs)]
#![cfg_attr(not(test), deny(clippy::unwrap_used, clippy::panic))]
#![cfg_attr(coverage_nightly, feature(coverage_attribute))]

Expand Down
2 changes: 2 additions & 0 deletions code/test/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#![forbid(unsafe_code)]
#![deny(trivial_casts, trivial_numeric_casts)]
// For coverage on nightly
#![allow(unexpected_cfgs)]
#![cfg_attr(coverage_nightly, feature(coverage_attribute))]

mod address;
Expand Down
2 changes: 2 additions & 0 deletions code/vote/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
rustdoc::private_intra_doc_links,
variant_size_differences
)]
// For coverage on nightly
#![allow(unexpected_cfgs)]
#![cfg_attr(not(test), deny(clippy::unwrap_used, clippy::panic))]
#![cfg_attr(coverage_nightly, feature(coverage_attribute))]

Expand Down

0 comments on commit fb27e15

Please sign in to comment.