Skip to content

Commit

Permalink
feat: p2p status message
Browse files Browse the repository at this point in the history
  • Loading branch information
Vid201 committed Nov 3, 2024
1 parent 0395f24 commit 33fd007
Show file tree
Hide file tree
Showing 14 changed files with 125 additions and 33 deletions.
19 changes: 17 additions & 2 deletions crates/grpc/src/uopool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -449,9 +449,24 @@ where
info!("Starting p2p mode without bootnodes");
}

let mut p2p_network = Network::new(config.clone(), mempool_channels)
// fetch latest block information for p2p
let latest_block_number = eth_client
.get_block_number()
.await
.expect("p2p network init failed");
.expect("get block number failed (needed for p2p)");
let latest_block_hash = eth_client
.get_block(latest_block_number)
.await
.expect("get block hash failed (needed for p2p)")
.expect("get block hash failed (needed for p2p)");

let mut p2p_network = Network::new(
config.clone(),
(latest_block_hash.hash.unwrap_or_default(), latest_block_number.as_u64()),
mempool_channels,
)
.await
.expect("p2p network init failed");

tokio::spawn(async move {
loop {
Expand Down
18 changes: 18 additions & 0 deletions crates/mempool/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,13 +107,31 @@ where

pub fn register_block_updates(&self, mut block_stream: BlockStream) {
let mut uopool = self.uopool();
let network = self.network.clone();
tokio::spawn(async move {
while let Some(hash) = block_stream.next().await {
if let Ok(hash) = hash {
let h: H256 = hash;
let _ = Self::handle_block_update(h, &mut uopool)
.await
.map_err(|e| warn!("Failed to handle block update: {:?}", e));

// update p2p latest block info
if let Some(ref network) = network {
if let Ok(block_number) =
uopool.entry_point.eth_client().get_block_number().await.map_err(|e| {
warn!("Failed to get block number: {:?}", e);
e
})
{
let _ = network
.unbounded_send(NetworkMessage::NewBlock {
block_hash: hash,
block_number: block_number.as_u64(),
})
.map_err(|e| warn!("Failed to send new block message: {:?}", e));
}
}
}
}
});
Expand Down
17 changes: 17 additions & 0 deletions crates/mempool/src/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,7 @@ pub trait UserOperationAct:
AddRemoveUserOp + UserOperationOp + ClearOp + Send + Sync + DynClone
{
}

dyn_clone::clone_trait_object!(UserOperationAct);
impl<T> UserOperationAct for T where
T: AddRemoveUserOp + UserOperationOp + ClearOp + Send + Sync + Clone
Expand All @@ -328,6 +329,7 @@ pub trait UserOperationAddrAct:
AddRemoveUserOpHash + UserOperationAddrOp + ClearOp + Send + Sync + DynClone
{
}

dyn_clone::clone_trait_object!(UserOperationAddrAct);
impl<T> UserOperationAddrAct for T where
T: AddRemoveUserOpHash + UserOperationAddrOp + ClearOp + Send + Sync + Clone
Expand All @@ -338,6 +340,7 @@ pub trait UserOperationCodeHashAct:
UserOperationCodeHashOp + ClearOp + Send + Sync + DynClone
{
}

dyn_clone::clone_trait_object!(UserOperationCodeHashAct);
impl<T> UserOperationCodeHashAct for T where
T: UserOperationCodeHashOp + ClearOp + Send + Sync + Clone
Expand Down Expand Up @@ -366,6 +369,7 @@ impl Mempool {
user_operations_code_hashes,
}
}

pub fn add(&mut self, uo: UserOperation) -> Result<UserOperationHash, MempoolErrorKind> {
let (sender, factory, paymaster) = uo.get_entities();
let uo_hash = uo.hash;
Expand All @@ -379,12 +383,14 @@ impl Mempool {
}
Ok(uo_hash)
}

pub fn get(
&self,
uo_hash: &UserOperationHash,
) -> Result<Option<UserOperation>, MempoolErrorKind> {
self.user_operations.get_by_uo_hash(uo_hash)
}

pub fn get_all_by_sender(&self, addr: &Address) -> Vec<UserOperation> {
let uos_by_sender = self.user_operations_by_sender.get_all_by_address(addr);
uos_by_sender
Expand All @@ -393,12 +399,15 @@ impl Mempool {
.flatten()
.collect()
}

pub fn get_number_by_sender(&self, addr: &Address) -> usize {
self.user_operations_by_sender.get_number_by_address(addr)
}

pub fn get_number_by_entity(&self, addr: &Address) -> usize {
self.user_operations_by_entity.get_number_by_address(addr)
}

pub fn get_prev_by_sender(&self, uo: &UserOperation) -> Option<UserOperation> {
self.user_operations_by_sender
.get_all_by_address(&uo.sender)
Expand All @@ -408,22 +417,26 @@ impl Mempool {
.filter(|uo_prev| uo_prev.nonce == uo.nonce)
.max_by_key(|uo_prev| uo_prev.max_priority_fee_per_gas)
}

pub fn has_code_hashes(&self, uo_hash: &UserOperationHash) -> Result<bool, MempoolErrorKind> {
self.user_operations_code_hashes.has_code_hashes(uo_hash)
}

pub fn set_code_hashes(
&mut self,
uo_hash: &UserOperationHash,
hashes: Vec<CodeHash>,
) -> Result<(), MempoolErrorKind> {
self.user_operations_code_hashes.set_code_hashes(uo_hash, hashes)
}

pub fn get_code_hashes(
&self,
uo_hash: &UserOperationHash,
) -> Result<Vec<CodeHash>, MempoolErrorKind> {
self.user_operations_code_hashes.get_code_hashes(uo_hash)
}

pub fn remove(&mut self, uo_hash: &UserOperationHash) -> Result<bool, MempoolErrorKind> {
let uo = if let Some(user_op) = self.user_operations.get_by_uo_hash(uo_hash)? {
user_op
Expand All @@ -449,6 +462,7 @@ impl Mempool {

Ok(true)
}

pub fn remove_by_entity(&mut self, entity: &Address) -> Result<(), MempoolErrorKind> {
let uos = self.user_operations_by_entity.get_all_by_address(entity);

Expand All @@ -458,13 +472,16 @@ impl Mempool {

Ok(())
}

// Get UserOperations sorted by max_priority_fee_per_gas without dup sender
pub fn get_sorted(&self) -> Result<Vec<UserOperation>, MempoolErrorKind> {
self.user_operations.get_sorted()
}

pub fn get_all(&self) -> Result<Vec<UserOperation>, MempoolErrorKind> {
self.user_operations.get_all()
}

pub fn clear(&mut self) {
self.user_operations.clear();
self.user_operations_by_sender.clear();
Expand Down
2 changes: 1 addition & 1 deletion crates/mempool/src/validate/sanity/sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ impl<M: Middleware> SanityCheck<M> for Sender {
(!code.is_empty() && !uo.init_code.is_empty())
{
return Err(SanityError::Sender {
inner: format!("sender {0} is an existing contract, or the initCode {1} is not empty (but not both)", uo.sender, uo.init_code),
inner: format!("sender {0:?} is an existing contract, or the initCode {1} is not empty (but not both)", uo.sender, uo.init_code),
});
}

Expand Down
12 changes: 3 additions & 9 deletions crates/p2p/src/rpc/codec/ssz_snappy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::rpc::{
methods::{
GoodbyeReason, MetaData, MetaDataRequest, Ping, PooledUserOpHashesRequest,
PooledUserOpHashesResponse, PooledUserOpsByHashRequest, PooledUserOpsByHashResponse,
RPCResponse, StatusMessage,
RPCResponse, Status,
},
outbound::OutboundRequest,
protocol::{InboundRequest, Protocol, ProtocolId},
Expand Down Expand Up @@ -80,10 +80,8 @@ impl Decoder for SSZSnappyInboundCodec {
let mut buffer = vec![];
snap::read::FrameDecoder::<&[u8]>::new(src).read_to_end(&mut buffer)?;

trace!("Inbound request buffer {:?}", buffer);

let request = match self.protocol.protocol {
Protocol::Status => InboundRequest::Status(StatusMessage::deserialize(&buffer)?),
Protocol::Status => InboundRequest::Status(Status::deserialize(&buffer)?),
Protocol::Goodbye => InboundRequest::Goodbye(GoodbyeReason::deserialize(&buffer)?),
Protocol::Ping => InboundRequest::Ping(Ping::deserialize(&buffer)?),
Protocol::MetaData => InboundRequest::MetaData(MetaDataRequest::deserialize(&buffer)?),
Expand Down Expand Up @@ -156,8 +154,6 @@ impl Decoder for SSZSnappyOutboundCodec {
type Error = Error;

fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
trace!("Outbound response buffer {:?}", src);

// response_chunk ::= <result> | <encoding-dependent-header> | <encoded-payload>

// TODO: response chunks
Expand All @@ -173,9 +169,7 @@ impl Decoder for SSZSnappyOutboundCodec {
snap::read::FrameDecoder::<&[u8]>::new(src).read_to_end(&mut decompressed_data)?;

let response = match self.protocol.protocol {
Protocol::Status => {
RPCResponse::Status(StatusMessage::deserialize(&decompressed_data)?)
}
Protocol::Status => RPCResponse::Status(Status::deserialize(&decompressed_data)?),
Protocol::Goodbye => {
RPCResponse::Goodbye(GoodbyeReason::deserialize(&decompressed_data)?)
}
Expand Down
4 changes: 0 additions & 4 deletions crates/p2p/src/rpc/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,8 +193,6 @@ impl RPCHandler {
bytes.clear();
codec.encode(response, &mut bytes)?;

trace!("Sending {:?} bytes", bytes.len());

socket.write_all(&bytes).await?;
socket.close().await?;

Expand Down Expand Up @@ -228,8 +226,6 @@ impl RPCHandler {
let mut codec = SSZSnappyOutboundCodec::new(protocol_id);
codec.encode(request, &mut bytes)?;

trace!("Sending {:?} bytes", bytes.len());

socket.write_all(&bytes).await?;
socket.close().await?;

Expand Down
10 changes: 5 additions & 5 deletions crates/p2p/src/rpc/methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@ pub struct MetaData {
}

#[derive(ssz_rs_derive::Serializable, Clone, Debug, PartialEq, Default)]
pub struct StatusMessage {
chain_id: u64,
block_hash: [u8; 32],
block_number: u64,
pub struct Status {
pub chain_id: u64,
pub block_hash: [u8; 32],
pub block_number: u64,
}

#[derive(Clone, Debug, PartialEq, Default)]
Expand Down Expand Up @@ -127,7 +127,7 @@ pub struct RequestId(pub(crate) u64);

#[derive(Debug, Clone, PartialEq)]
pub enum RPCResponse {
Status(StatusMessage),
Status(Status),
Goodbye(GoodbyeReason),
Pong(Ping),
MetaData(MetaData),
Expand Down
4 changes: 2 additions & 2 deletions crates/p2p/src/rpc/outbound.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use super::{
methods::{
GoodbyeReason, MetaDataRequest, Ping, PooledUserOpHashesRequest,
PooledUserOpsByHashRequest, StatusMessage,
PooledUserOpsByHashRequest, Status,
},
protocol::{InboundRequest, Protocol, ProtocolId},
};
Expand All @@ -10,7 +10,7 @@ use libp2p::{core::UpgradeInfo, OutboundUpgrade, Stream};

#[derive(Debug, Clone, PartialEq)]
pub enum OutboundRequest {
Status(StatusMessage),
Status(Status),
Goodbye(GoodbyeReason),
Ping(Ping),
MetaData(MetaDataRequest),
Expand Down
4 changes: 2 additions & 2 deletions crates/p2p/src/rpc/protocol.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use super::{
methods::{
GoodbyeReason, MetaDataRequest, Ping, PooledUserOpHashesRequest,
PooledUserOpsByHashRequest, StatusMessage,
PooledUserOpsByHashRequest, Status,
},
outbound::OutboundRequest,
};
Expand Down Expand Up @@ -99,7 +99,7 @@ impl Display for Encoding {

#[derive(Debug, Clone, PartialEq)]
pub enum InboundRequest {
Status(StatusMessage),
Status(Status),
Goodbye(GoodbyeReason),
Ping(Ping),
MetaData(MetaDataRequest),
Expand Down
32 changes: 29 additions & 3 deletions crates/p2p/src/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use crate::{
},
peer_manager::{PeerManager, PeerManagerEvent},
rpc::{
methods::{MetaData, MetaDataRequest, Ping, RPCResponse, RequestId},
methods::{MetaData, MetaDataRequest, Ping, RPCResponse, RequestId, Status},
outbound::OutboundRequest,
protocol::InboundRequest,
RPCEvent, RPC,
Expand All @@ -32,7 +32,7 @@ use crate::{
};
use alloy_chains::Chain;
use discv5::Enr;
use ethers::types::Address;
use ethers::types::{Address, H256};
use futures::channel::{
mpsc::{UnboundedReceiver, UnboundedSender},
oneshot::Sender,
Expand Down Expand Up @@ -129,7 +129,11 @@ impl From<Network> for Swarm<Behaviour> {
}

impl Network {
pub async fn new(config: Config, mempool_channels: Vec<MempoolChannel>) -> eyre::Result<Self> {
pub async fn new(
config: Config,
latest_block: (H256, u64),
mempool_channels: Vec<MempoolChannel>,
) -> eyre::Result<Self> {
// Handle private key
let key = if let Some(key) = load_private_key_from_file(&config.node_key_file) {
key
Expand Down Expand Up @@ -205,6 +209,8 @@ impl Network {
metadata,
trusted_peers,
config.chain_spec.clone(),
latest_block.0,
latest_block.1,
))
};

Expand Down Expand Up @@ -274,6 +280,14 @@ impl Network {
self.network_globals.local_metadata()
}

pub fn status(&self) -> Status {
Status {
chain_id: self.network_globals.chain_spec().chain.id(),
block_hash: *self.network_globals.latest_block_hash().as_fixed_bytes(),
block_number: self.network_globals.latest_block_number(),
}
}

/// handle gossipsub event
fn handle_gossipsub_event(&self, event: Box<gossipsub::Event>) -> Option<NetworkEvent> {
match *event {
Expand Down Expand Up @@ -357,6 +371,11 @@ impl Network {
.expect("channel should exist");
None
}
InboundRequest::Status(_status) => {
// TODO: verify status message
sender.send(RPCResponse::Status(self.status())).expect("channel should exist");
None
}
InboundRequest::Goodbye(_) => None,
_ => Some(NetworkEvent::RequestMessage { peer_id, request, sender }),
},
Expand Down Expand Up @@ -475,6 +494,13 @@ impl Network {
}
}
}
NetworkMessage::NewBlock { block_hash, block_number } => {
let mut latest_block_hash = self.network_globals.latest_block_hash.write();
*latest_block_hash = block_hash;
let mut latest_block_number =
self.network_globals.latest_block_number.write();
*latest_block_number = block_number;
}
_ => {}
}
}
Expand Down
Loading

0 comments on commit 33fd007

Please sign in to comment.