From e430a580e06a86128ddff7dfe22c58820a751ebb Mon Sep 17 00:00:00 2001 From: Stephane Gurgenidze Date: Thu, 2 Jan 2025 13:42:33 +0400 Subject: [PATCH] refactor(malus-collator): move malicious behavior logic from script to Collator implementation --- .../undying/collator/src/lib.rs | 155 +++++++++++++++- .../undying/collator/src/main.rs | 169 ++---------------- 2 files changed, 166 insertions(+), 158 deletions(-) diff --git a/polkadot/parachain/test-parachains/undying/collator/src/lib.rs b/polkadot/parachain/test-parachains/undying/collator/src/lib.rs index bf8536adbc4f..72b83ad7a738 100644 --- a/polkadot/parachain/test-parachains/undying/collator/src/lib.rs +++ b/polkadot/parachain/test-parachains/undying/collator/src/lib.rs @@ -19,18 +19,26 @@ use codec::{Decode, Encode}; use futures::channel::oneshot; use futures_timer::Delay; +use polkadot_cli::ProvideRuntimeApi; use polkadot_node_primitives::{ maybe_compress_pov, Collation, CollationResult, CollationSecondedSignal, CollatorFn, - MaybeCompressedPoV, PoV, Statement, UpwardMessages, + MaybeCompressedPoV, PoV, Statement, SubmitCollationParams, UpwardMessages, }; -use polkadot_primitives::{CollatorId, CollatorPair, Hash}; +use polkadot_node_subsystem::messages::CollationGenerationMessage; +use polkadot_primitives::{ + vstaging::{ClaimQueueOffset, DEFAULT_CLAIM_QUEUE_OFFSET}, + CollatorId, CollatorPair, CoreIndex, Hash, Id as ParaId, OccupiedCoreAssumption, +}; +use polkadot_service::{Backend, Handle, HeaderBackend, NewFull, ParachainHost}; use sp_core::Pair; + use std::{ collections::HashMap, sync::{ atomic::{AtomicU32, Ordering}, Arc, Mutex, }, + thread::sleep, time::Duration, }; use test_parachain_undying::{ @@ -337,6 +345,149 @@ impl Collator { } } } + + pub fn send_same_collations_to_all_assigned_cores( + &self, + full_node: &NewFull, + mut overseer_handle: Handle, + para_id: ParaId, + ) { + let client = full_node.client.clone(); + let backend = full_node.backend.clone(); + + let collation_function = + self.create_collation_function(full_node.task_manager.spawn_handle()); + + full_node + .task_manager + .spawn_handle() + .spawn("malus-undying-collator", None, async move { + // In each iteration, build a collation and submit + // it to all cores assigned to the parachain. + loop { + let relay_parent = backend.blockchain().info().best_hash; + + // Get the list of cores assigned to the parachain. + let claim_queue = match client.runtime_api().claim_queue(relay_parent) { + Ok(claim_queue) => + if claim_queue.is_empty() { + log::info!(target: LOG_TARGET, "Claim queue is empty."); + continue; + } else { + claim_queue + }, + Err(error) => { + log::error!( + target: LOG_TARGET, + "Failed to query claim queue runtime API: {error:?}" + ); + continue; + }, + }; + + let claim_queue_offset = ClaimQueueOffset(DEFAULT_CLAIM_QUEUE_OFFSET); + + let scheduled_cores: Vec = claim_queue + .iter() + .filter_map(move |(core_index, paras)| { + Some((*core_index, *paras.get(claim_queue_offset.0 as usize)?)) + }) + .filter_map(|(core_index, core_para_id)| { + (core_para_id == para_id).then_some(core_index) + }) + .collect(); + + if scheduled_cores.is_empty() { + println!("Scheduled cores is empty."); + continue; + } + + // Fetch validation data for the collation. + let validation_data = match client.runtime_api().persisted_validation_data( + relay_parent, + para_id, + OccupiedCoreAssumption::Included, + ) { + Ok(Some(validation_data)) => validation_data, + Ok(None) => { + log::warn!( + target: LOG_TARGET, + "Persisted validation data is None." + ); + continue; + }, + Err(error) => { + log::error!( + target: LOG_TARGET, + "Failed to query persisted validation data runtime API: {error:?}" + ); + continue; + }, + }; + + // Generate the collation. + let collation = + match collation_function(relay_parent, &validation_data).await { + Some(collation) => collation, + None => { + log::warn!( + target: LOG_TARGET, + "Collation result is None." + ); + continue; + }, + } + .collation; + + // Fetch the validation code hash. + let validation_code_hash = match client.runtime_api().validation_code_hash( + relay_parent, + para_id, + OccupiedCoreAssumption::Included, + ) { + Ok(Some(validation_code_hash)) => validation_code_hash, + Ok(None) => { + log::warn!( + target: LOG_TARGET, + "Validation code hash is None." + ); + continue; + }, + Err(error) => { + log::error!( + target: LOG_TARGET, + "Failed to query validation code hash runtime API: {error:?}" + ); + continue; + }, + }; + + // Submit the same collation for all assigned cores. + for core_index in &scheduled_cores { + let submit_collation_params = SubmitCollationParams { + relay_parent, + collation: collation.clone(), + parent_head: validation_data.parent_head.clone(), + validation_code_hash, + result_sender: None, + core_index: *core_index, + }; + + overseer_handle + .send_msg( + CollationGenerationMessage::SubmitCollation( + submit_collation_params, + ), + "Collator", + ) + .await; + } + + // Wait before repeating the process. + sleep(Duration::from_secs(6 as u64)); + } + }); + } } use sp_core::traits::SpawnNamed; diff --git a/polkadot/parachain/test-parachains/undying/collator/src/main.rs b/polkadot/parachain/test-parachains/undying/collator/src/main.rs index 6c80fdc66fa1..1f3addccf467 100644 --- a/polkadot/parachain/test-parachains/undying/collator/src/main.rs +++ b/polkadot/parachain/test-parachains/undying/collator/src/main.rs @@ -16,23 +16,17 @@ //! Collator for the `Undying` test parachain. -use polkadot_cli::{Error, ProvideRuntimeApi, Result}; -use polkadot_node_primitives::{CollationGenerationConfig, SubmitCollationParams}; +use polkadot_cli::{Error, Result}; +use polkadot_node_primitives::CollationGenerationConfig; use polkadot_node_subsystem::messages::{CollationGenerationMessage, CollatorProtocolMessage}; -use polkadot_primitives::{ - vstaging::{ClaimQueueOffset, DEFAULT_CLAIM_QUEUE_OFFSET}, - CoreIndex, Id as ParaId, OccupiedCoreAssumption, -}; -use polkadot_service::{Backend, HeaderBackend, ParachainHost}; +use polkadot_primitives::{self, Id as ParaId}; use sc_cli::{Error as SubstrateCliError, SubstrateCli}; use sp_core::hexdisplay::HexDisplay; use std::{ fs, io::{self, Write}, - thread::sleep, - time::Duration, }; -use test_parachain_undying_collator::{Collator, LOG_TARGET}; +use test_parachain_undying_collator::Collator; mod cli; use cli::Cli; @@ -111,6 +105,7 @@ fn main() -> Result<()> { .map_err(|e| e.to_string())?; let mut overseer_handle = full_node .overseer_handle + .clone() .expect("Overseer handle should be initialized for collators"); let genesis_head_hex = @@ -126,8 +121,8 @@ fn main() -> Result<()> { let config = CollationGenerationConfig { key: collator.collator_key(), - // Set collation function to None if it is a malicious collator - // and submit collations manually later. + // If the collator is malicious, disable the collation function + // (set to None) and manually handle collation submission later. collator: if cli.run.malus { None } else { @@ -146,151 +141,13 @@ fn main() -> Result<()> { .send_msg(CollatorProtocolMessage::CollateOn(para_id), "Collator") .await; - // Check if it is a malicious collator. + // If the collator is malicious, simulate malicious behavior + // by sending the same collations to all assigned cores. if cli.run.malus { - let client = full_node.client.clone(); - let backend = full_node.backend.clone(); - - let collation_function = - collator.create_collation_function(full_node.task_manager.spawn_handle()); - - full_node.task_manager.spawn_handle().spawn( - "malus-undying-collator", - None, - async move { - loop { - let relay_parent = backend.blockchain().info().best_hash; - - // Get all assigned cores for the given parachain. - let claim_queue = - match client.runtime_api().claim_queue(relay_parent) { - Ok(claim_queue) => - if claim_queue.is_empty() { - log::info!(target: LOG_TARGET, "Claim queue is empty."); - continue; - } else { - claim_queue - }, - Err(error) => { - log::error!( - target: LOG_TARGET, - "Failed to query claim queue runtime API: {error:?}" - ); - continue; - }, - }; - - let claim_queue_offset = - ClaimQueueOffset(DEFAULT_CLAIM_QUEUE_OFFSET); - - let scheduled_cores: Vec = claim_queue - .iter() - .filter_map(move |(core_index, paras)| { - Some(( - *core_index, - *paras.get(claim_queue_offset.0 as usize)?, - )) - }) - .filter_map(|(core_index, core_para_id)| { - (core_para_id == para_id).then_some(core_index) - }) - .collect(); - - if scheduled_cores.is_empty() { - println!("Scheduled cores is empty."); - continue; - } - - // Get the collation. - let validation_data = - match client.runtime_api().persisted_validation_data( - relay_parent, - para_id, - OccupiedCoreAssumption::Included, - ) { - Ok(Some(validation_data)) => validation_data, - Ok(None) => { - log::warn!( - target: LOG_TARGET, - "Persisted validation data is None." - ); - continue; - }, - Err(error) => { - log::error!( - target: LOG_TARGET, - "Failed to query persisted validation data runtime API: {error:?}" - ); - continue; - }, - }; - - let collation = match collation_function( - relay_parent, - &validation_data, - ) - .await - { - Some(collation) => collation, - None => { - log::warn!( - target: LOG_TARGET, - "Collation result is None." - ); - continue; - }, - } - .collation; - - // Get validation code hash. - let validation_code_hash = - match client.runtime_api().validation_code_hash( - relay_parent, - para_id, - OccupiedCoreAssumption::Included, - ) { - Ok(Some(validation_code_hash)) => validation_code_hash, - Ok(None) => { - log::warn!( - target: LOG_TARGET, - "Validation code hash is None." - ); - continue; - }, - Err(error) => { - log::error!( - target: LOG_TARGET, - "Failed to query validation code hash runtime API: {error:?}" - ); - continue; - }, - }; - - // Submit the same collation for each assigned core. - for core_index in &scheduled_cores { - let submit_collation_params = SubmitCollationParams { - relay_parent, - collation: collation.clone(), - parent_head: validation_data.parent_head.clone(), - validation_code_hash, - result_sender: None, - core_index: *core_index, - }; - - overseer_handle - .send_msg( - CollationGenerationMessage::SubmitCollation( - submit_collation_params, - ), - "Collator", - ) - .await; - } - - // Wait before submitting the next collation. - sleep(Duration::from_secs(6 as u64)); - } - }, + collator.send_same_collations_to_all_assigned_cores( + &full_node, + overseer_handle, + para_id, ); }