diff --git a/crates/sc-proof-of-time/src/source/gossip.rs b/crates/sc-proof-of-time/src/source/gossip.rs index ad9d9edce4..c8399161e6 100644 --- a/crates/sc-proof-of-time/src/source/gossip.rs +++ b/crates/sc-proof-of-time/src/source/gossip.rs @@ -180,17 +180,16 @@ where let mut incoming_unverified_messages = incoming_unverified_messages.fuse(); loop { - let gossip_engine_poll = poll_fn(|cx| self.engine.lock().poll_unpin(cx)); + let mut gossip_engine_poll = poll_fn(|cx| self.engine.lock().poll_unpin(cx)).fuse(); + futures::select! { - message = incoming_unverified_messages.next() => { - if let Some((sender, proof)) = message { - self.handle_proof_candidate(sender, proof).await; - } + (sender, proof) = incoming_unverified_messages.select_next_some() => { + self.handle_proof_candidate(sender, proof).await; }, message = self.to_gossip_receiver.select_next_some() => { self.handle_to_gossip_messages(message).await }, - _ = gossip_engine_poll.fuse() => { + _ = gossip_engine_poll => { error!("Gossip engine has terminated"); return; } diff --git a/domains/client/cross-domain-message-gossip/src/gossip_worker.rs b/domains/client/cross-domain-message-gossip/src/gossip_worker.rs index 666bd2ac11..1a52c19166 100644 --- a/domains/client/cross-domain-message-gossip/src/gossip_worker.rs +++ b/domains/client/cross-domain-message-gossip/src/gossip_worker.rs @@ -12,6 +12,7 @@ use sp_core::twox_256; use sp_messenger::messages::ChainId; use sp_runtime::traits::{Block as BlockT, Hash as HashT, Header as HeaderT}; use std::collections::{BTreeMap, HashSet}; +use std::future::poll_fn; use std::pin::pin; use std::sync::Arc; @@ -124,7 +125,7 @@ fn topic() -> Block::Hash { impl GossipWorker { /// Starts the Gossip message worker. pub async fn run(mut self) { - let mut incoming_cross_chain_messages = pin!(self + let incoming_cross_chain_messages = pin!(self .gossip_engine .lock() .messages_for(topic::()) @@ -133,27 +134,26 @@ impl GossipWorker { .ok() .map(|msg| (notification.sender, msg)) })); + let mut incoming_cross_chain_messages = incoming_cross_chain_messages.fuse(); loop { let engine = self.gossip_engine.clone(); - let gossip_engine = futures::future::poll_fn(|cx| engine.lock().poll_unpin(cx)); + let mut gossip_engine = poll_fn(|cx| engine.lock().poll_unpin(cx)).fuse(); futures::select! { - cross_chain_message = incoming_cross_chain_messages.next().fuse() => { + cross_chain_message = incoming_cross_chain_messages.next() => { if let Some((maybe_peer, msg)) = cross_chain_message { tracing::debug!(target: LOG_TARGET, "Incoming cross chain message for chain from Network: {:?}", msg.chain_id); self.handle_cross_chain_message(msg, maybe_peer); } }, - cross_chain_message = self.gossip_msg_stream.next().fuse() => { - if let Some(msg) = cross_chain_message { - tracing::debug!(target: LOG_TARGET, "Incoming cross chain message for chain from Relayer: {:?}", msg.chain_id); - self.handle_cross_chain_message(msg, None); - } + msg = self.gossip_msg_stream.select_next_some() => { + tracing::debug!(target: LOG_TARGET, "Incoming cross chain message for chain from Relayer: {:?}", msg.chain_id); + self.handle_cross_chain_message(msg, None); } - _ = gossip_engine.fuse() => { + _ = gossip_engine => { tracing::error!(target: LOG_TARGET, "Gossip engine has terminated."); return; } diff --git a/domains/client/subnet-gossip/src/worker.rs b/domains/client/subnet-gossip/src/worker.rs index d1d44e7ceb..5b05833d13 100644 --- a/domains/client/subnet-gossip/src/worker.rs +++ b/domains/client/subnet-gossip/src/worker.rs @@ -2,11 +2,12 @@ use crate::{ topic, BundleFor, BundleReceiver, GossipMessage, GossipMessageHandler, GossipValidator, LOG_TARGET, }; -use futures::{future, FutureExt, StreamExt}; +use futures::{FutureExt, StreamExt}; use parity_scale_codec::{Decode, Encode}; use parking_lot::Mutex; use sc_network_gossip::GossipEngine; use sp_runtime::traits::Block as BlockT; +use std::future::poll_fn; use std::pin::pin; use std::sync::Arc; @@ -50,20 +51,21 @@ where } pub(super) async fn run(mut self) { - let mut incoming = pin!(self + let incoming = pin!(self .gossip_engine .lock() .messages_for(topic::()) .filter_map(|notification| async move { GossipMessage::::decode(&mut ¬ification.message[..]).ok() })); + let mut incoming = incoming.fuse(); loop { let engine = self.gossip_engine.clone(); - let gossip_engine = future::poll_fn(|cx| engine.lock().poll_unpin(cx)); + let mut gossip_engine = poll_fn(|cx| engine.lock().poll_unpin(cx)).fuse(); futures::select! { - gossip_message = incoming.next().fuse() => { + gossip_message = incoming.next() => { if let Some(message) = gossip_message { tracing::debug!(target: LOG_TARGET, ?message, "Rebroadcasting an executor gossip message"); match message { @@ -73,12 +75,10 @@ where return } } - bundle = self.bundle_receiver.next().fuse() => { - if let Some(bundle) = bundle { - self.gossip_bundle(bundle); - } + bundle = self.bundle_receiver.select_next_some() => { + self.gossip_bundle(bundle); } - _ = gossip_engine.fuse() => { + _ = gossip_engine => { tracing::error!(target: LOG_TARGET, "Gossip engine has terminated."); return; }