Skip to content
This repository has been archived by the owner on Oct 31, 2024. It is now read-only.

Commit

Permalink
fix: add message_id on the receiver side
Browse files Browse the repository at this point in the history
  • Loading branch information
gruberb committed Mar 22, 2024
1 parent 3ce420a commit d1dda03
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 4 deletions.
3 changes: 3 additions & 0 deletions crates/topos-p2p/src/behaviour/gossip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,7 @@ impl NetworkBehaviour for Behaviour {
topic: TOPOS_GOSSIP,
message: data,
source,
message_id,
},
)))
}
Expand All @@ -267,6 +268,7 @@ impl NetworkBehaviour for Behaviour {
topic: TOPOS_ECHO,
message: data,
source,
message_id,
},
)))
}
Expand All @@ -276,6 +278,7 @@ impl NetworkBehaviour for Behaviour {
topic: TOPOS_READY,
message: data,
source,
message_id,
},
)))
}
Expand Down
8 changes: 7 additions & 1 deletion crates/topos-p2p/src/event.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use libp2p::gossipsub::MessageId;
use libp2p::{identify, kad, PeerId};

use crate::behaviour::{grpc, HealthStatus};
Expand All @@ -10,6 +11,7 @@ pub enum GossipEvent {
source: Option<PeerId>,
topic: &'static str,
message: Vec<u8>,
message_id: MessageId,
},
}

Expand Down Expand Up @@ -50,7 +52,11 @@ impl From<void::Void> for ComposedEvent {
#[derive(Debug)]
pub enum Event {
/// An event emitted when a gossip message is received
Gossip { from: PeerId, data: Vec<u8> },
Gossip {
from: PeerId,
data: Vec<u8>,
message_id: MessageId,
},
/// An event emitted when the p2p layer becomes healthy
Healthy,
/// An event emitted when the p2p layer becomes unhealthy
Expand Down
3 changes: 3 additions & 0 deletions crates/topos-p2p/src/runtime/handle_event/gossipsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ impl EventHandler<GossipEvent> for Runtime {
source: Some(source),
message,
topic,
message_id,
} = event
{
if self.event_sender.capacity() < *constants::CAPACITY_EVENT_STREAM_BUFFER {
Expand All @@ -34,6 +35,7 @@ impl EventHandler<GossipEvent> for Runtime {
.send(Event::Gossip {
from: source,
data: message,
message_id: message_id.clone(),
})
.await
{
Expand All @@ -53,6 +55,7 @@ impl EventHandler<GossipEvent> for Runtime {
.send(Event::Gossip {
from: source,
data: message,
message_id: message_id.clone(),
})
.await
{
Expand Down
19 changes: 16 additions & 3 deletions crates/topos-tce/src/app_context/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,12 @@ impl AppContext {
&evt
);

if let NetEvent::Gossip { data, from } = evt {
if let NetEvent::Gossip {
data,
from,
message_id,
} = evt
{
if let Ok(DoubleEchoRequest {
request: Some(double_echo_request),
}) = DoubleEchoRequest::decode(&data[..])
Expand All @@ -38,8 +43,8 @@ impl AppContext {
entry.insert(CERTIFICATE_DELIVERY_LATENCY.start_timer());
}
info!(
"Received certificate {} from GossipSub from {}",
cert.id, from
"Received certificate {} from GossipSub from {} with message id: {}",
cert.id, from, message_id,
);

match self.validator_store.insert_pending_certificate(&cert).await {
Expand Down Expand Up @@ -110,6 +115,10 @@ impl AppContext {
signature: Some(signature),
validator_id: Some(validator_id),
}) => {
debug!(
"Received Echo message, certificate_id: {} with message id: {}",
certificate_id, message_id
);
let channel = self.tce_cli.get_double_echo_channel();
spawn(async move {
let certificate_id = certificate_id.clone().try_into().map_err(|e| {
Expand Down Expand Up @@ -156,6 +165,10 @@ impl AppContext {
signature: Some(signature),
validator_id: Some(validator_id),
}) => {
debug!(
"Received Ready message, certificate_id: {} with message id: {}",
certificate_id, message_id
);
let channel = self.tce_cli.get_double_echo_channel();
spawn(async move {
let certificate_id = certificate_id.clone().try_into().map_err(|e| {
Expand Down

0 comments on commit d1dda03

Please sign in to comment.