Skip to content

Commit

Permalink
Fix fused future usage
Browse files Browse the repository at this point in the history
  • Loading branch information
nazar-pc committed Dec 11, 2023
1 parent eb199bd commit 8440b5e
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 24 deletions.
11 changes: 5 additions & 6 deletions crates/sc-proof-of-time/src/source/gossip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,17 +180,16 @@ where
let mut incoming_unverified_messages = incoming_unverified_messages.fuse();

loop {
let gossip_engine_poll = poll_fn(|cx| self.engine.lock().poll_unpin(cx));
let mut gossip_engine_poll = poll_fn(|cx| self.engine.lock().poll_unpin(cx)).fuse();

futures::select! {
message = incoming_unverified_messages.next() => {
if let Some((sender, proof)) = message {
self.handle_proof_candidate(sender, proof).await;
}
(sender, proof) = incoming_unverified_messages.select_next_some() => {
self.handle_proof_candidate(sender, proof).await;
},
message = self.to_gossip_receiver.select_next_some() => {
self.handle_to_gossip_messages(message).await
},
_ = gossip_engine_poll.fuse() => {
_ = gossip_engine_poll => {
error!("Gossip engine has terminated");
return;
}
Expand Down
18 changes: 9 additions & 9 deletions domains/client/cross-domain-message-gossip/src/gossip_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use sp_core::twox_256;
use sp_messenger::messages::ChainId;
use sp_runtime::traits::{Block as BlockT, Hash as HashT, Header as HeaderT};
use std::collections::{BTreeMap, HashSet};
use std::future::poll_fn;
use std::pin::pin;
use std::sync::Arc;

Expand Down Expand Up @@ -124,7 +125,7 @@ fn topic<Block: BlockT>() -> Block::Hash {
impl<Block: BlockT, Network> GossipWorker<Block, Network> {
/// Starts the Gossip message worker.
pub async fn run(mut self) {
let mut incoming_cross_chain_messages = pin!(self
let incoming_cross_chain_messages = pin!(self
.gossip_engine
.lock()
.messages_for(topic::<Block>())
Expand All @@ -133,27 +134,26 @@ impl<Block: BlockT, Network> GossipWorker<Block, Network> {
.ok()
.map(|msg| (notification.sender, msg))
}));
let mut incoming_cross_chain_messages = incoming_cross_chain_messages.fuse();

loop {
let engine = self.gossip_engine.clone();
let gossip_engine = futures::future::poll_fn(|cx| engine.lock().poll_unpin(cx));
let mut gossip_engine = poll_fn(|cx| engine.lock().poll_unpin(cx)).fuse();

futures::select! {
cross_chain_message = incoming_cross_chain_messages.next().fuse() => {
cross_chain_message = incoming_cross_chain_messages.next() => {
if let Some((maybe_peer, msg)) = cross_chain_message {
tracing::debug!(target: LOG_TARGET, "Incoming cross chain message for chain from Network: {:?}", msg.chain_id);
self.handle_cross_chain_message(msg, maybe_peer);
}
},

cross_chain_message = self.gossip_msg_stream.next().fuse() => {
if let Some(msg) = cross_chain_message {
tracing::debug!(target: LOG_TARGET, "Incoming cross chain message for chain from Relayer: {:?}", msg.chain_id);
self.handle_cross_chain_message(msg, None);
}
msg = self.gossip_msg_stream.select_next_some() => {
tracing::debug!(target: LOG_TARGET, "Incoming cross chain message for chain from Relayer: {:?}", msg.chain_id);
self.handle_cross_chain_message(msg, None);
}

_ = gossip_engine.fuse() => {
_ = gossip_engine => {
tracing::error!(target: LOG_TARGET, "Gossip engine has terminated.");
return;
}
Expand Down
18 changes: 9 additions & 9 deletions domains/client/subnet-gossip/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@ use crate::{
topic, BundleFor, BundleReceiver, GossipMessage, GossipMessageHandler, GossipValidator,
LOG_TARGET,
};
use futures::{future, FutureExt, StreamExt};
use futures::{FutureExt, StreamExt};
use parity_scale_codec::{Decode, Encode};
use parking_lot::Mutex;
use sc_network_gossip::GossipEngine;
use sp_runtime::traits::Block as BlockT;
use std::future::poll_fn;
use std::pin::pin;
use std::sync::Arc;

Expand Down Expand Up @@ -50,20 +51,21 @@ where
}

pub(super) async fn run(mut self) {
let mut incoming = pin!(self
let incoming = pin!(self
.gossip_engine
.lock()
.messages_for(topic::<Block>())
.filter_map(|notification| async move {
GossipMessage::<CBlock, Block>::decode(&mut &notification.message[..]).ok()
}));
let mut incoming = incoming.fuse();

loop {
let engine = self.gossip_engine.clone();
let gossip_engine = future::poll_fn(|cx| engine.lock().poll_unpin(cx));
let mut gossip_engine = poll_fn(|cx| engine.lock().poll_unpin(cx)).fuse();

futures::select! {
gossip_message = incoming.next().fuse() => {
gossip_message = incoming.next() => {
if let Some(message) = gossip_message {
tracing::debug!(target: LOG_TARGET, ?message, "Rebroadcasting an executor gossip message");
match message {
Expand All @@ -73,12 +75,10 @@ where
return
}
}
bundle = self.bundle_receiver.next().fuse() => {
if let Some(bundle) = bundle {
self.gossip_bundle(bundle);
}
bundle = self.bundle_receiver.select_next_some() => {
self.gossip_bundle(bundle);
}
_ = gossip_engine.fuse() => {
_ = gossip_engine => {
tracing::error!(target: LOG_TARGET, "Gossip engine has terminated.");
return;
}
Expand Down

0 comments on commit 8440b5e

Please sign in to comment.