Skip to content

Commit

Permalink
refactor(malus-collator): move malicious behavior logic from script t…
Browse files Browse the repository at this point in the history
…o Collator implementation
  • Loading branch information
sw10pa committed Jan 7, 2025
1 parent 0c0d625 commit e430a58
Show file tree
Hide file tree
Showing 2 changed files with 166 additions and 158 deletions.
155 changes: 153 additions & 2 deletions polkadot/parachain/test-parachains/undying/collator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,26 @@
use codec::{Decode, Encode};
use futures::channel::oneshot;
use futures_timer::Delay;
use polkadot_cli::ProvideRuntimeApi;
use polkadot_node_primitives::{
maybe_compress_pov, Collation, CollationResult, CollationSecondedSignal, CollatorFn,
MaybeCompressedPoV, PoV, Statement, UpwardMessages,
MaybeCompressedPoV, PoV, Statement, SubmitCollationParams, UpwardMessages,
};
use polkadot_primitives::{CollatorId, CollatorPair, Hash};
use polkadot_node_subsystem::messages::CollationGenerationMessage;
use polkadot_primitives::{
vstaging::{ClaimQueueOffset, DEFAULT_CLAIM_QUEUE_OFFSET},
CollatorId, CollatorPair, CoreIndex, Hash, Id as ParaId, OccupiedCoreAssumption,
};
use polkadot_service::{Backend, Handle, HeaderBackend, NewFull, ParachainHost};
use sp_core::Pair;

use std::{
collections::HashMap,
sync::{
atomic::{AtomicU32, Ordering},
Arc, Mutex,
},
thread::sleep,
time::Duration,
};
use test_parachain_undying::{
Expand Down Expand Up @@ -337,6 +345,149 @@ impl Collator {
}
}
}

pub fn send_same_collations_to_all_assigned_cores(
&self,
full_node: &NewFull,
mut overseer_handle: Handle,
para_id: ParaId,
) {
let client = full_node.client.clone();
let backend = full_node.backend.clone();

let collation_function =
self.create_collation_function(full_node.task_manager.spawn_handle());

full_node
.task_manager
.spawn_handle()
.spawn("malus-undying-collator", None, async move {
// In each iteration, build a collation and submit
// it to all cores assigned to the parachain.
loop {
let relay_parent = backend.blockchain().info().best_hash;

// Get the list of cores assigned to the parachain.
let claim_queue = match client.runtime_api().claim_queue(relay_parent) {
Ok(claim_queue) =>
if claim_queue.is_empty() {
log::info!(target: LOG_TARGET, "Claim queue is empty.");
continue;
} else {
claim_queue
},
Err(error) => {
log::error!(
target: LOG_TARGET,
"Failed to query claim queue runtime API: {error:?}"
);
continue;
},
};

let claim_queue_offset = ClaimQueueOffset(DEFAULT_CLAIM_QUEUE_OFFSET);

let scheduled_cores: Vec<CoreIndex> = claim_queue
.iter()
.filter_map(move |(core_index, paras)| {
Some((*core_index, *paras.get(claim_queue_offset.0 as usize)?))
})
.filter_map(|(core_index, core_para_id)| {
(core_para_id == para_id).then_some(core_index)
})
.collect();

if scheduled_cores.is_empty() {
println!("Scheduled cores is empty.");
continue;
}

// Fetch validation data for the collation.
let validation_data = match client.runtime_api().persisted_validation_data(
relay_parent,
para_id,
OccupiedCoreAssumption::Included,
) {
Ok(Some(validation_data)) => validation_data,
Ok(None) => {
log::warn!(
target: LOG_TARGET,
"Persisted validation data is None."
);
continue;
},
Err(error) => {
log::error!(
target: LOG_TARGET,
"Failed to query persisted validation data runtime API: {error:?}"
);
continue;
},
};

// Generate the collation.
let collation =
match collation_function(relay_parent, &validation_data).await {
Some(collation) => collation,
None => {
log::warn!(
target: LOG_TARGET,
"Collation result is None."
);
continue;
},
}
.collation;

// Fetch the validation code hash.
let validation_code_hash = match client.runtime_api().validation_code_hash(
relay_parent,
para_id,
OccupiedCoreAssumption::Included,
) {
Ok(Some(validation_code_hash)) => validation_code_hash,
Ok(None) => {
log::warn!(
target: LOG_TARGET,
"Validation code hash is None."
);
continue;
},
Err(error) => {
log::error!(
target: LOG_TARGET,
"Failed to query validation code hash runtime API: {error:?}"
);
continue;
},
};

// Submit the same collation for all assigned cores.
for core_index in &scheduled_cores {
let submit_collation_params = SubmitCollationParams {
relay_parent,
collation: collation.clone(),
parent_head: validation_data.parent_head.clone(),
validation_code_hash,
result_sender: None,
core_index: *core_index,
};

overseer_handle
.send_msg(
CollationGenerationMessage::SubmitCollation(
submit_collation_params,
),
"Collator",
)
.await;
}

// Wait before repeating the process.
sleep(Duration::from_secs(6 as u64));
}
});
}
}

use sp_core::traits::SpawnNamed;
Expand Down
169 changes: 13 additions & 156 deletions polkadot/parachain/test-parachains/undying/collator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,17 @@

//! Collator for the `Undying` test parachain.
use polkadot_cli::{Error, ProvideRuntimeApi, Result};
use polkadot_node_primitives::{CollationGenerationConfig, SubmitCollationParams};
use polkadot_cli::{Error, Result};
use polkadot_node_primitives::CollationGenerationConfig;
use polkadot_node_subsystem::messages::{CollationGenerationMessage, CollatorProtocolMessage};
use polkadot_primitives::{
vstaging::{ClaimQueueOffset, DEFAULT_CLAIM_QUEUE_OFFSET},
CoreIndex, Id as ParaId, OccupiedCoreAssumption,
};
use polkadot_service::{Backend, HeaderBackend, ParachainHost};
use polkadot_primitives::{self, Id as ParaId};
use sc_cli::{Error as SubstrateCliError, SubstrateCli};
use sp_core::hexdisplay::HexDisplay;
use std::{
fs,
io::{self, Write},
thread::sleep,
time::Duration,
};
use test_parachain_undying_collator::{Collator, LOG_TARGET};
use test_parachain_undying_collator::Collator;

mod cli;
use cli::Cli;
Expand Down Expand Up @@ -111,6 +105,7 @@ fn main() -> Result<()> {
.map_err(|e| e.to_string())?;
let mut overseer_handle = full_node
.overseer_handle
.clone()
.expect("Overseer handle should be initialized for collators");

let genesis_head_hex =
Expand All @@ -126,8 +121,8 @@ fn main() -> Result<()> {

let config = CollationGenerationConfig {
key: collator.collator_key(),
// Set collation function to None if it is a malicious collator
// and submit collations manually later.
// If the collator is malicious, disable the collation function
// (set to None) and manually handle collation submission later.
collator: if cli.run.malus {
None
} else {
Expand All @@ -146,151 +141,13 @@ fn main() -> Result<()> {
.send_msg(CollatorProtocolMessage::CollateOn(para_id), "Collator")
.await;

// Check if it is a malicious collator.
// If the collator is malicious, simulate malicious behavior
// by sending the same collations to all assigned cores.
if cli.run.malus {
let client = full_node.client.clone();
let backend = full_node.backend.clone();

let collation_function =
collator.create_collation_function(full_node.task_manager.spawn_handle());

full_node.task_manager.spawn_handle().spawn(
"malus-undying-collator",
None,
async move {
loop {
let relay_parent = backend.blockchain().info().best_hash;

// Get all assigned cores for the given parachain.
let claim_queue =
match client.runtime_api().claim_queue(relay_parent) {
Ok(claim_queue) =>
if claim_queue.is_empty() {
log::info!(target: LOG_TARGET, "Claim queue is empty.");
continue;
} else {
claim_queue
},
Err(error) => {
log::error!(
target: LOG_TARGET,
"Failed to query claim queue runtime API: {error:?}"
);
continue;
},
};

let claim_queue_offset =
ClaimQueueOffset(DEFAULT_CLAIM_QUEUE_OFFSET);

let scheduled_cores: Vec<CoreIndex> = claim_queue
.iter()
.filter_map(move |(core_index, paras)| {
Some((
*core_index,
*paras.get(claim_queue_offset.0 as usize)?,
))
})
.filter_map(|(core_index, core_para_id)| {
(core_para_id == para_id).then_some(core_index)
})
.collect();

if scheduled_cores.is_empty() {
println!("Scheduled cores is empty.");
continue;
}

// Get the collation.
let validation_data =
match client.runtime_api().persisted_validation_data(
relay_parent,
para_id,
OccupiedCoreAssumption::Included,
) {
Ok(Some(validation_data)) => validation_data,
Ok(None) => {
log::warn!(
target: LOG_TARGET,
"Persisted validation data is None."
);
continue;
},
Err(error) => {
log::error!(
target: LOG_TARGET,
"Failed to query persisted validation data runtime API: {error:?}"
);
continue;
},
};

let collation = match collation_function(
relay_parent,
&validation_data,
)
.await
{
Some(collation) => collation,
None => {
log::warn!(
target: LOG_TARGET,
"Collation result is None."
);
continue;
},
}
.collation;

// Get validation code hash.
let validation_code_hash =
match client.runtime_api().validation_code_hash(
relay_parent,
para_id,
OccupiedCoreAssumption::Included,
) {
Ok(Some(validation_code_hash)) => validation_code_hash,
Ok(None) => {
log::warn!(
target: LOG_TARGET,
"Validation code hash is None."
);
continue;
},
Err(error) => {
log::error!(
target: LOG_TARGET,
"Failed to query validation code hash runtime API: {error:?}"
);
continue;
},
};

// Submit the same collation for each assigned core.
for core_index in &scheduled_cores {
let submit_collation_params = SubmitCollationParams {
relay_parent,
collation: collation.clone(),
parent_head: validation_data.parent_head.clone(),
validation_code_hash,
result_sender: None,
core_index: *core_index,
};

overseer_handle
.send_msg(
CollationGenerationMessage::SubmitCollation(
submit_collation_params,
),
"Collator",
)
.await;
}

// Wait before submitting the next collation.
sleep(Duration::from_secs(6 as u64));
}
},
collator.send_same_collations_to_all_assigned_cores(
&full_node,
overseer_handle,
para_id,
);
}

Expand Down

0 comments on commit e430a58

Please sign in to comment.