Skip to content

Commit

Permalink
Merge pull request #2310 from subspace/remove-extra-boxing
Browse files Browse the repository at this point in the history
Remove extra boxing around `Unpin` requirement and fix fused future usage
  • Loading branch information
nazar-pc authored Dec 11, 2023
2 parents 744df6f + 0a2b00d commit e36e0fd
Show file tree
Hide file tree
Showing 9 changed files with 112 additions and 120 deletions.
32 changes: 15 additions & 17 deletions crates/sc-proof-of-time/src/source/gossip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use std::collections::{HashMap, VecDeque};
use std::future::poll_fn;
use std::hash::{Hash, Hasher};
use std::num::{NonZeroU32, NonZeroUsize};
use std::pin::pin;
use std::sync::{atomic, Arc};
use subspace_core_primitives::{PotCheckpoints, PotSeed, SlotNumber};
use tracing::{debug, error, trace, warn};
Expand Down Expand Up @@ -167,31 +168,28 @@ where
/// should be running on a dedicated thread.
pub async fn run(mut self) {
let message_receiver = self.engine.lock().messages_for(self.topic);
let mut incoming_unverified_messages = Box::pin(
message_receiver
.filter_map(|notification| async move {
notification.sender.map(|sender| {
let proof = GossipProof::decode(&mut notification.message.as_ref())
.expect("Only valid messages get here; qed");

(sender, proof)
})
let incoming_unverified_messages =
pin!(message_receiver.filter_map(|notification| async move {
notification.sender.map(|sender| {
let proof = GossipProof::decode(&mut notification.message.as_ref())
.expect("Only valid messages get here; qed");

(sender, proof)
})
.fuse(),
);
}));
let mut incoming_unverified_messages = incoming_unverified_messages.fuse();

loop {
let gossip_engine_poll = poll_fn(|cx| self.engine.lock().poll_unpin(cx));
let mut gossip_engine_poll = poll_fn(|cx| self.engine.lock().poll_unpin(cx)).fuse();

futures::select! {
message = incoming_unverified_messages.next() => {
if let Some((sender, proof)) = message {
self.handle_proof_candidate(sender, proof).await;
}
(sender, proof) = incoming_unverified_messages.select_next_some() => {
self.handle_proof_candidate(sender, proof).await;
},
message = self.to_gossip_receiver.select_next_some() => {
self.handle_to_gossip_messages(message).await
},
_ = gossip_engine_poll.fuse() => {
_ = gossip_engine_poll => {
error!("Gossip engine has terminated");
return;
}
Expand Down
15 changes: 7 additions & 8 deletions crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use std::fs;
use std::net::SocketAddr;
use std::num::{NonZeroU8, NonZeroUsize};
use std::path::PathBuf;
use std::pin::pin;
use std::str::FromStr;
use std::sync::Arc;
use subspace_core_primitives::crypto::kzg::{embedded_kzg_settings, Kzg};
Expand Down Expand Up @@ -602,7 +603,7 @@ where
// event handlers
drop(readers_and_pieces);

let farm_fut = run_future_in_dedicated_thread(
let farm_fut = pin!(run_future_in_dedicated_thread(
move || async move {
while let Some(result) = single_disk_farms_stream.next().await {
let id = result?;
Expand All @@ -612,26 +613,24 @@ where
anyhow::Ok(())
},
"farmer-farm".to_string(),
)?;
let mut farm_fut = Box::pin(farm_fut).fuse();
)?);

let networking_fut = run_future_in_dedicated_thread(
let networking_fut = pin!(run_future_in_dedicated_thread(
move || async move { node_runner.run().await },
"farmer-networking".to_string(),
)?;
let mut networking_fut = Box::pin(networking_fut).fuse();
)?);

futures::select!(
// Signal future
_ = signal.fuse() => {},

// Farm future
result = farm_fut => {
result = farm_fut.fuse() => {
result??;
},

// Node runner future
_ = networking_fut => {
_ = networking_fut.fuse() => {
info!("Node runner exited.")
},
);
Expand Down
29 changes: 13 additions & 16 deletions crates/subspace-farmer/src/bin/subspace-farmer/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,24 +25,21 @@ pub(crate) fn raise_fd_limit() {
#[cfg(unix)]
pub(crate) async fn shutdown_signal() {
use futures::FutureExt;
use std::pin::pin;

futures::future::select(
Box::pin(
signal::unix::signal(signal::unix::SignalKind::interrupt())
.expect("Setting signal handlers must never fail")
.recv()
.map(|_| {
tracing::info!("Received SIGINT, shutting down farmer...");
}),
),
Box::pin(
signal::unix::signal(signal::unix::SignalKind::terminate())
.expect("Setting signal handlers must never fail")
.recv()
.map(|_| {
tracing::info!("Received SIGTERM, shutting down farmer...");
}),
),
pin!(signal::unix::signal(signal::unix::SignalKind::interrupt())
.expect("Setting signal handlers must never fail")
.recv()
.map(|_| {
tracing::info!("Received SIGINT, shutting down farmer...");
}),),
pin!(signal::unix::signal(signal::unix::SignalKind::terminate())
.expect("Setting signal handlers must never fail")
.recv()
.map(|_| {
tracing::info!("Received SIGTERM, shutting down farmer...");
}),),
)
.await;
}
Expand Down
47 changes: 25 additions & 22 deletions crates/subspace-farmer/src/single_disk_farm/plotting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::fs::File;
use std::io;
use std::num::{NonZeroU16, NonZeroUsize};
use std::ops::Range;
use std::pin::pin;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::Duration;
Expand Down Expand Up @@ -289,29 +290,31 @@ where
let mut sector = Vec::new();
let mut sector_metadata = Vec::new();

let plot_sector_fut = encode_sector::<PosTable>(
downloaded_sector,
EncodeSectorOptions {
sector_index,
erasure_coding,
pieces_in_sector,
sector_output: &mut sector,
sector_metadata_output: &mut sector_metadata,
encoding_semaphore: Some(encoding_semaphore),
table_generator: &mut table_generator,
},
);

let plotted_sector = Handle::current().block_on(async {
select! {
plotting_result = Box::pin(plot_sector_fut).fuse() => {
plotting_result.map_err(PlottingError::from)
let plotted_sector = {
let plot_sector_fut = pin!(encode_sector::<PosTable>(
downloaded_sector,
EncodeSectorOptions {
sector_index,
erasure_coding,
pieces_in_sector,
sector_output: &mut sector,
sector_metadata_output: &mut sector_metadata,
encoding_semaphore: Some(encoding_semaphore),
table_generator: &mut table_generator,
},
));

Handle::current().block_on(async {
select! {
plotting_result = plot_sector_fut.fuse() => {
plotting_result.map_err(PlottingError::from)
}
_ = stop_receiver.recv().fuse() => {
Err(PlottingError::FarmIsShuttingDown)
}
}
_ = stop_receiver.recv().fuse() => {
Err(PlottingError::FarmIsShuttingDown)
}
}
})?;
})?
};

Ok((sector, sector_metadata, table_generator, plotted_sector))
})
Expand Down
30 changes: 14 additions & 16 deletions crates/subspace-networking/examples/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,23 +114,21 @@ async fn get_peer(peer_id: PeerId, node: Node) {

#[cfg(unix)]
pub(crate) async fn shutdown_signal() {
use std::pin::pin;

futures::future::select(
Box::pin(
signal::unix::signal(signal::unix::SignalKind::interrupt())
.expect("Setting signal handlers must never fail")
.recv()
.map(|_| {
tracing::info!("Received SIGINT, shutting down farmer...");
}),
),
Box::pin(
signal::unix::signal(signal::unix::SignalKind::terminate())
.expect("Setting signal handlers must never fail")
.recv()
.map(|_| {
tracing::info!("Received SIGTERM, shutting down farmer...");
}),
),
pin!(signal::unix::signal(signal::unix::SignalKind::interrupt())
.expect("Setting signal handlers must never fail")
.recv()
.map(|_| {
tracing::info!("Received SIGINT, shutting down farmer...");
}),),
pin!(signal::unix::signal(signal::unix::SignalKind::terminate())
.expect("Setting signal handlers must never fail")
.recv()
.map(|_| {
tracing::info!("Received SIGTERM, shutting down farmer...");
}),),
)
.await;
}
Expand Down
6 changes: 1 addition & 5 deletions crates/subspace-networking/src/behavior/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ async fn test_async_handler_works_with_pending_internal_future() {

let (node_2, mut node_runner_2) = crate::construct(config_2).unwrap();

let bootstrap_fut = Box::pin({
tokio::spawn({
let node = node_2.clone();

async move {
Expand All @@ -240,10 +240,6 @@ async fn test_async_handler_works_with_pending_internal_future() {
}
});

tokio::spawn(async move {
bootstrap_fut.await;
});

tokio::spawn(async move {
node_runner_2.run().await;
});
Expand Down
36 changes: 18 additions & 18 deletions domains/client/cross-domain-message-gossip/src/gossip_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ use sp_core::twox_256;
use sp_messenger::messages::ChainId;
use sp_runtime::traits::{Block as BlockT, Hash as HashT, Header as HeaderT};
use std::collections::{BTreeMap, HashSet};
use std::future::poll_fn;
use std::pin::pin;
use std::sync::Arc;

const LOG_TARGET: &str = "cross_chain_gossip_worker";
Expand Down Expand Up @@ -123,37 +125,35 @@ fn topic<Block: BlockT>() -> Block::Hash {
impl<Block: BlockT, Network> GossipWorker<Block, Network> {
/// Starts the Gossip message worker.
pub async fn run(mut self) {
let mut incoming_cross_chain_messages = Box::pin(
self.gossip_engine
.lock()
.messages_for(topic::<Block>())
.filter_map(|notification| async move {
Message::decode(&mut &notification.message[..])
.ok()
.map(|msg| (notification.sender, msg))
}),
);
let incoming_cross_chain_messages = pin!(self
.gossip_engine
.lock()
.messages_for(topic::<Block>())
.filter_map(|notification| async move {
Message::decode(&mut &notification.message[..])
.ok()
.map(|msg| (notification.sender, msg))
}));
let mut incoming_cross_chain_messages = incoming_cross_chain_messages.fuse();

loop {
let engine = self.gossip_engine.clone();
let gossip_engine = futures::future::poll_fn(|cx| engine.lock().poll_unpin(cx));
let mut gossip_engine = poll_fn(|cx| engine.lock().poll_unpin(cx)).fuse();

futures::select! {
cross_chain_message = incoming_cross_chain_messages.next().fuse() => {
cross_chain_message = incoming_cross_chain_messages.next() => {
if let Some((maybe_peer, msg)) = cross_chain_message {
tracing::debug!(target: LOG_TARGET, "Incoming cross chain message for chain from Network: {:?}", msg.chain_id);
self.handle_cross_chain_message(msg, maybe_peer);
}
},

cross_chain_message = self.gossip_msg_stream.next().fuse() => {
if let Some(msg) = cross_chain_message {
tracing::debug!(target: LOG_TARGET, "Incoming cross chain message for chain from Relayer: {:?}", msg.chain_id);
self.handle_cross_chain_message(msg, None);
}
msg = self.gossip_msg_stream.select_next_some() => {
tracing::debug!(target: LOG_TARGET, "Incoming cross chain message for chain from Relayer: {:?}", msg.chain_id);
self.handle_cross_chain_message(msg, None);
}

_ = gossip_engine.fuse() => {
_ = gossip_engine => {
tracing::error!(target: LOG_TARGET, "Gossip engine has terminated.");
return;
}
Expand Down
5 changes: 3 additions & 2 deletions domains/client/domain-operator/src/domain_worker_starter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use sp_domains_fraud_proof::FraudProofApi;
use sp_messenger::MessengerApi;
use sp_runtime::traits::NumberFor;
use sp_transaction_pool::runtime_api::TaggedTransactionQueue;
use std::pin::pin;
use std::sync::Arc;
use subspace_runtime_primitives::Balance;
use tracing::{info, Instrument};
Expand Down Expand Up @@ -138,8 +139,8 @@ pub(super) async fn start_worker<
.boxed()
}
};
let mut new_slot_notification_stream = Box::pin(new_slot_notification_stream);
let mut acknowledgement_sender_stream = Box::pin(acknowledgement_sender_stream);
let mut new_slot_notification_stream = pin!(new_slot_notification_stream);
let mut acknowledgement_sender_stream = pin!(acknowledgement_sender_stream);
loop {
tokio::select! {
// Ensure any new slot/block import must handle first before the `acknowledgement_sender_stream`
Expand Down
Loading

0 comments on commit e36e0fd

Please sign in to comment.