diff --git a/crates/topos-tce/src/app_context/api.rs b/crates/topos-tce/src/app_context/api.rs index 965087e90..46371bf25 100644 --- a/crates/topos-tce/src/app_context/api.rs +++ b/crates/topos-tce/src/app_context/api.rs @@ -1,5 +1,6 @@ use crate::AppContext; use std::collections::HashMap; +use tokio::spawn; use topos_core::uci::{Certificate, SubnetId}; use topos_metrics::CERTIFICATE_DELIVERY_LATENCY; use topos_tce_api::RuntimeError; @@ -20,79 +21,82 @@ impl AppContext { self.delivery_latency .insert(certificate.id, CERTIFICATE_DELIVERY_LATENCY.start_timer()); - _ = match self - .validator_store - .insert_pending_certificate(&certificate) - .await - { - Ok(Some(pending_id)) => { - let certificate_id = certificate.id; - debug!( - "Certificate {} from subnet {} has been inserted into pending pool", - certificate_id, certificate.source_subnet_id - ); + let validator_store = self.validator_store.clone(); + let double_echo = self.tce_cli.get_double_echo_channel(); - if self - .tce_cli - .get_double_echo_channel() - .send(DoubleEchoCommand::Broadcast { - need_gossip: true, - cert: *certificate, - pending_id, - }) - .await - .is_err() - { - error!( - "Unable to send DoubleEchoCommand::Broadcast command to double \ - echo for {}", - certificate_id + spawn(async move { + _ = match validator_store + .insert_pending_certificate(&certificate) + .await + { + Ok(Some(pending_id)) => { + let certificate_id = certificate.id; + debug!( + "Certificate {} from subnet {} has been inserted into pending pool", + certificate_id, certificate.source_subnet_id ); - sender.send(Err(RuntimeError::CommunicationError( - "Unable to send DoubleEchoCommand::Broadcast command to double \ - echo" - .to_string(), - ))) - } else { - sender.send(Ok(PendingResult::InPending(pending_id))) + if double_echo + .send(DoubleEchoCommand::Broadcast { + need_gossip: true, + cert: *certificate, + pending_id, + }) + .await + .is_err() + { + error!( + "Unable to send DoubleEchoCommand::Broadcast command to \ + double echo for {}", + certificate_id + ); + + sender.send(Err(RuntimeError::CommunicationError( + "Unable to send DoubleEchoCommand::Broadcast command to \ + double echo" + .to_string(), + ))) + } else { + sender.send(Ok(PendingResult::InPending(pending_id))) + } } - } - Ok(None) => { - debug!( - "Certificate {} from subnet {} has been inserted into precedence pool \ - waiting for {}", - certificate.id, certificate.source_subnet_id, certificate.prev_id - ); - sender.send(Ok(PendingResult::AwaitPrecedence)) - } - Err(StorageError::InternalStorage( - InternalStorageError::CertificateAlreadyPending, - )) => { - debug!( - "Certificate {} has already been added to the pending pool, skipping", - certificate.id - ); - sender.send(Ok(PendingResult::AlreadyPending)) - } - Err(StorageError::InternalStorage( - InternalStorageError::CertificateAlreadyExists, - )) => { - debug!( - "Certificate {} has already been delivered, skipping", - certificate.id - ); - sender.send(Ok(PendingResult::AlreadyDelivered)) - } - Err(error) => { - error!( - "Unable to insert pending certificate {}: {}", - certificate.id, error - ); + Ok(None) => { + debug!( + "Certificate {} from subnet {} has been inserted into precedence \ + pool waiting for {}", + certificate.id, certificate.source_subnet_id, certificate.prev_id + ); + sender.send(Ok(PendingResult::AwaitPrecedence)) + } + Err(StorageError::InternalStorage( + InternalStorageError::CertificateAlreadyPending, + )) => { + debug!( + "Certificate {} has already been added to the pending pool, \ + skipping", + certificate.id + ); + sender.send(Ok(PendingResult::AlreadyPending)) + } + Err(StorageError::InternalStorage( + InternalStorageError::CertificateAlreadyExists, + )) => { + debug!( + "Certificate {} has already been delivered, skipping", + certificate.id + ); + sender.send(Ok(PendingResult::AlreadyDelivered)) + } + Err(error) => { + error!( + "Unable to insert pending certificate {}: {}", + certificate.id, error + ); - sender.send(Err(error.into())) - } - }; + sender.send(Err(error.into())) + } + }; + }); } ApiEvent::GetSourceHead { subnet_id, sender } => { diff --git a/crates/topos-tce/src/app_context/network.rs b/crates/topos-tce/src/app_context/network.rs index acfdb569b..993c2c328 100644 --- a/crates/topos-tce/src/app_context/network.rs +++ b/crates/topos-tce/src/app_context/network.rs @@ -22,183 +22,185 @@ impl AppContext { &evt ); - if let NetEvent::Gossip { data, from } = evt { - if let Ok(DoubleEchoRequest { - request: Some(double_echo_request), - }) = DoubleEchoRequest::decode(&data[..]) - { - match double_echo_request { - double_echo_request::Request::Gossip(Gossip { - certificate: Some(certificate), - }) => match uci::Certificate::try_from(certificate) { - Ok(cert) => { - if let hash_map::Entry::Vacant(entry) = - self.delivery_latency.entry(cert.id) - { - entry.insert(CERTIFICATE_DELIVERY_LATENCY.start_timer()); - } - info!( - "Received certificate {} from GossipSub from {}", - cert.id, from - ); - - match self.validator_store.insert_pending_certificate(&cert).await { - Ok(Some(pending_id)) => { - let certificate_id = cert.id; - debug!( - "Certificate {} has been inserted into pending pool", - certificate_id - ); - - if self - .tce_cli - .get_double_echo_channel() - .send(DoubleEchoCommand::Broadcast { - need_gossip: false, - cert, - pending_id, - }) - .await - .is_err() - { - error!( - "Unable to send DoubleEchoCommand::Broadcast command \ - to double echo for {}", - certificate_id - ); - } - } - - Ok(None) => { - debug!( - "Certificate {} from subnet {} has been inserted into \ - precedence pool waiting for {}", - cert.id, cert.source_subnet_id, cert.prev_id - ); - } - Err(StorageError::InternalStorage( - InternalStorageError::CertificateAlreadyPending, - )) => { - debug!( - "Certificate {} has been already added to the pending \ - pool, skipping", - cert.id - ); - } - Err(StorageError::InternalStorage( - InternalStorageError::CertificateAlreadyExists, - )) => { - debug!( - "Certificate {} has been already delivered, skipping", - cert.id - ); - } - Err(error) => { - error!( - "Unable to insert pending certificate {}: {}", - cert.id, error - ); - } - } - } - Err(e) => { - error!("Failed to parse the received Certificate: {e}"); - } - }, - double_echo_request::Request::Echo(Echo { - certificate_id: Some(certificate_id), - signature: Some(signature), - validator_id: Some(validator_id), - }) => { - let channel = self.tce_cli.get_double_echo_channel(); - spawn(async move { - let certificate_id = certificate_id.clone().try_into().map_err(|e| { - error!( - "Failed to parse the CertificateId {certificate_id} from \ - Echo: {e}" - ); - e - }); - let validator_id = validator_id.clone().try_into().map_err(|e| { - error!( - "Failed to parse the ValidatorId {validator_id} from Echo: {e}" - ); - e - }); - - if let (Ok(certificate_id), Ok(validator_id)) = - (certificate_id, validator_id) - { - trace!( - "Received Echo message, certificate_id: {certificate_id}, \ - validator_id: {validator_id} from: {from}", - certificate_id = certificate_id, - validator_id = validator_id - ); - - if let Err(e) = channel - .send(DoubleEchoCommand::Echo { - signature: signature.into(), - certificate_id, - validator_id, - }) - .await - { - error!("Unable to pass received Echo message: {:?}", e); - } - } else { - error!("Unable to process Echo message due to invalid data"); - } - }); - } - double_echo_request::Request::Ready(Ready { - certificate_id: Some(certificate_id), - signature: Some(signature), - validator_id: Some(validator_id), - }) => { - let channel = self.tce_cli.get_double_echo_channel(); - spawn(async move { - let certificate_id = certificate_id.clone().try_into().map_err(|e| { - error!( - "Failed to parse the CertificateId {certificate_id} from \ - Ready: {e}" - ); - e - }); - let validator_id = validator_id.clone().try_into().map_err(|e| { - error!( - "Failed to parse the ValidatorId {validator_id} from Ready: \ - {e}" - ); - e - }); - if let (Ok(certificate_id), Ok(validator_id)) = - (certificate_id, validator_id) - { - trace!( - "Received Ready message, certificate_id: {certificate_id}, \ - validator_id: {validator_id} from: {from}", - certificate_id = certificate_id, - validator_id = validator_id - ); - if let Err(e) = channel - .send(DoubleEchoCommand::Ready { - signature: signature.into(), - certificate_id, - validator_id, - }) - .await - { - error!("Unable to pass received Ready message: {:?}", e); - } - } else { - error!("Unable to process Ready message due to invalid data"); - } - }); - } - _ => {} - } - } - } + // if let NetEvent::Gossip { data, from } = evt { + // if let Ok(DoubleEchoRequest { + // request: Some(double_echo_request), + // }) = DoubleEchoRequest::decode(&data[..]) + // { + // match double_echo_request { + // double_echo_request::Request::Gossip(Gossip { + // certificate: Some(certificate), + // }) => match uci::Certificate::try_from(certificate) { + // Ok(cert) => { + // if let hash_map::Entry::Vacant(entry) = + // self.delivery_latency.entry(cert.id) + // { + // entry.insert(CERTIFICATE_DELIVERY_LATENCY.start_timer()); + // } + // info!( + // "Received certificate {} from GossipSub from {}", + // cert.id, from + // ); + // + // let validator_store = self.validator_store.clone(); + // let double_echo = self.tce_cli.get_double_echo_channel(); + // spawn(async move { + // match validator_store.insert_pending_certificate(&cert).await { + // Ok(Some(pending_id)) => { + // let certificate_id = cert.id; + // debug!( + // "Certificate {} has been inserted into pending pool", + // certificate_id + // ); + // + // if double_echo + // .send(DoubleEchoCommand::Broadcast { + // need_gossip: false, + // cert, + // pending_id, + // }) + // .await + // .is_err() + // { + // error!( + // "Unable to send DoubleEchoCommand::Broadcast \ + // command to double echo for {}", + // certificate_id + // ); + // } + // } + // + // Ok(None) => { + // debug!( + // "Certificate {} from subnet {} has been inserted into \ + // precedence pool waiting for {}", + // cert.id, cert.source_subnet_id, cert.prev_id + // ); + // } + // Err(StorageError::InternalStorage( + // InternalStorageError::CertificateAlreadyPending, + // )) => { + // debug!( + // "Certificate {} has been already added to the pending \ + // pool, skipping", + // cert.id + // ); + // } + // Err(StorageError::InternalStorage( + // InternalStorageError::CertificateAlreadyExists, + // )) => { + // debug!( + // "Certificate {} has been already delivered, skipping", + // cert.id + // ); + // } + // Err(error) => { + // error!( + // "Unable to insert pending certificate {}: {}", + // cert.id, error + // ); + // } + // } + // }); + // } + // Err(e) => { + // error!("Failed to parse the received Certificate: {e}"); + // } + // }, + // double_echo_request::Request::Echo(Echo { + // certificate_id: Some(certificate_id), + // signature: Some(signature), + // validator_id: Some(validator_id), + // }) => { + // let channel = self.tce_cli.get_double_echo_channel(); + // spawn(async move { + // let certificate_id = certificate_id.clone().try_into().map_err(|e| { + // error!( + // "Failed to parse the CertificateId {certificate_id} from \ + // Echo: {e}" + // ); + // e + // }); + // let validator_id = validator_id.clone().try_into().map_err(|e| { + // error!( + // "Failed to parse the ValidatorId {validator_id} from Echo: {e}" + // ); + // e + // }); + // + // if let (Ok(certificate_id), Ok(validator_id)) = + // (certificate_id, validator_id) + // { + // trace!( + // "Received Echo message, certificate_id: {certificate_id}, \ + // validator_id: {validator_id} from: {from}", + // certificate_id = certificate_id, + // validator_id = validator_id + // ); + // + // if let Err(e) = channel + // .send(DoubleEchoCommand::Echo { + // signature: signature.into(), + // certificate_id, + // validator_id, + // }) + // .await + // { + // error!("Unable to pass received Echo message: {:?}", e); + // } + // } else { + // error!("Unable to process Echo message due to invalid data"); + // } + // }); + // } + // double_echo_request::Request::Ready(Ready { + // certificate_id: Some(certificate_id), + // signature: Some(signature), + // validator_id: Some(validator_id), + // }) => { + // let channel = self.tce_cli.get_double_echo_channel(); + // spawn(async move { + // let certificate_id = certificate_id.clone().try_into().map_err(|e| { + // error!( + // "Failed to parse the CertificateId {certificate_id} from \ + // Ready: {e}" + // ); + // e + // }); + // let validator_id = validator_id.clone().try_into().map_err(|e| { + // error!( + // "Failed to parse the ValidatorId {validator_id} from Ready: \ + // {e}" + // ); + // e + // }); + // if let (Ok(certificate_id), Ok(validator_id)) = + // (certificate_id, validator_id) + // { + // trace!( + // "Received Ready message, certificate_id: {certificate_id}, \ + // validator_id: {validator_id} from: {from}", + // certificate_id = certificate_id, + // validator_id = validator_id + // ); + // if let Err(e) = channel + // .send(DoubleEchoCommand::Ready { + // signature: signature.into(), + // certificate_id, + // validator_id, + // }) + // .await + // { + // error!("Unable to pass received Ready message: {:?}", e); + // } + // } else { + // error!("Unable to process Ready message due to invalid data"); + // } + // }); + // } + // _ => {} + // } + // } + // } } } diff --git a/crates/topos-tce/src/app_context/protocol.rs b/crates/topos-tce/src/app_context/protocol.rs index 278a13c0a..c516a7dc5 100644 --- a/crates/topos-tce/src/app_context/protocol.rs +++ b/crates/topos-tce/src/app_context/protocol.rs @@ -1,3 +1,4 @@ +use tokio::spawn; use topos_core::api::grpc::tce::v1::{double_echo_request, DoubleEchoRequest, Echo, Gossip, Ready}; use topos_tce_broadcast::event::ProtocolEvents; use tracing::{error, info, warn}; @@ -14,20 +15,22 @@ impl AppContext { ProtocolEvents::Gossip { cert } => { let cert_id = cert.id; - let request = DoubleEchoRequest { - request: Some(double_echo_request::Request::Gossip(Gossip { - certificate: Some(cert.into()), - })), - }; + let network_client = self.network_client.clone(); + spawn(async move { + let request = DoubleEchoRequest { + request: Some(double_echo_request::Request::Gossip(Gossip { + certificate: Some(cert.into()), + })), + }; - info!("Sending Gossip for certificate {}", cert_id); - if let Err(e) = self - .network_client - .publish(topos_p2p::TOPOS_GOSSIP, request) - .await - { - error!("Unable to send Gossip: {e}"); - } + info!("Sending Gossip for certificate {}", cert_id); + if let Err(e) = network_client + .publish(topos_p2p::TOPOS_GOSSIP, request) + .await + { + error!("Unable to send Gossip: {e}"); + } + }); } ProtocolEvents::Echo { @@ -35,22 +38,21 @@ impl AppContext { signature, validator_id, } if self.is_validator => { - // Send echo message - let request = DoubleEchoRequest { - request: Some(double_echo_request::Request::Echo(Echo { - certificate_id: Some(certificate_id.into()), - signature: Some(signature.into()), - validator_id: Some(validator_id.into()), - })), - }; + let network_client = self.network_client.clone(); + spawn(async move { + // Send echo message + let request = DoubleEchoRequest { + request: Some(double_echo_request::Request::Echo(Echo { + certificate_id: Some(certificate_id.into()), + signature: Some(signature.into()), + validator_id: Some(validator_id.into()), + })), + }; - if let Err(e) = self - .network_client - .publish(topos_p2p::TOPOS_ECHO, request) - .await - { - error!("Unable to send Echo: {e}"); - } + if let Err(e) = network_client.publish(topos_p2p::TOPOS_ECHO, request).await { + error!("Unable to send Echo: {e}"); + } + }); } ProtocolEvents::Ready { @@ -58,21 +60,23 @@ impl AppContext { signature, validator_id, } if self.is_validator => { - let request = DoubleEchoRequest { - request: Some(double_echo_request::Request::Ready(Ready { - certificate_id: Some(certificate_id.into()), - signature: Some(signature.into()), - validator_id: Some(validator_id.into()), - })), - }; + let network_client = self.network_client.clone(); + spawn(async move { + let request = DoubleEchoRequest { + request: Some(double_echo_request::Request::Ready(Ready { + certificate_id: Some(certificate_id.into()), + signature: Some(signature.into()), + validator_id: Some(validator_id.into()), + })), + }; - if let Err(e) = self - .network_client - .publish(topos_p2p::TOPOS_READY, request) - .await - { - error!("Unable to send Ready: {e}"); - } + if let Err(e) = network_client + .publish(topos_p2p::TOPOS_READY, request) + .await + { + error!("Unable to send Ready: {e}"); + } + }); } ProtocolEvents::BroadcastFailed { certificate_id } => { warn!("Broadcast failed for certificate {certificate_id}")