diff --git a/substrate/client/network/sync/src/blocks.rs b/substrate/client/network/sync/src/blocks.rs index cad50fef3e32..240c1ca1f8b2 100644 --- a/substrate/client/network/sync/src/blocks.rs +++ b/substrate/client/network/sync/src/blocks.rs @@ -212,31 +212,6 @@ impl BlockCollection { ready } - /// Returns the block header of the first block that is ready for importing. - /// `from` is the maximum block number for the start of the range that we are interested in. - /// The function will return None if the first block ready is higher than `from`. - /// The logic is structured to be consistent with ready_blocks(). - pub fn first_ready_block_header(&self, from: NumberFor) -> Option { - let mut prev = from; - for (&start, range_data) in &self.blocks { - if start > prev { - break - } - - match range_data { - BlockRangeState::Complete(blocks) => { - let len = (blocks.len() as u32).into(); - prev = start + len; - if let Some(BlockData { block, .. }) = blocks.first() { - return block.header.clone() - } - }, - _ => continue, - } - } - None - } - pub fn clear_queued(&mut self, hash: &B::Hash) { if let Some((from, to)) = self.queued_blocks.remove(hash) { let mut block_num = from; diff --git a/substrate/client/network/sync/src/lib.rs b/substrate/client/network/sync/src/lib.rs index a291da4a90d5..10eaa2450518 100644 --- a/substrate/client/network/sync/src/lib.rs +++ b/substrate/client/network/sync/src/lib.rs @@ -1405,27 +1405,8 @@ where /// Get the set of downloaded blocks that are ready to be queued for import. fn ready_blocks(&mut self) -> Vec> { - let start_block = self.best_queued_number + One::one(); - - // Verify that the parent of the first available block is in the chain. - // If not, we are downloading from a fork. In this case, wait until - // the start block has a parent on chain. - let parent_on_chain = - self.blocks.first_ready_block_header(start_block).map_or(false, |hdr| { - std::matches!( - self.block_status(hdr.parent_hash()).unwrap_or(BlockStatus::Unknown), - BlockStatus::InChainWithState | - BlockStatus::InChainPruned | - BlockStatus::Queued - ) - }); - - if !parent_on_chain { - return vec![] - } - self.blocks - .ready_blocks(start_block) + .ready_blocks(self.best_queued_number + One::one()) .into_iter() .map(|block_data| { let justifications = block_data @@ -3383,380 +3364,4 @@ mod test { pending_responses.remove(&peers[1]); assert_eq!(pending_responses.len(), 0); } - - #[test] - fn syncs_fork_with_partial_response_extends_tip() { - sp_tracing::try_init_simple(); - - // Set up: the two chains share the first 15 blocks before - // diverging. The other(canonical) chain fork is longer. - let max_blocks_per_request = 64; - let common_ancestor = 15; - let non_canonical_chain_length = common_ancestor + 3; - let canonical_chain_length = common_ancestor + max_blocks_per_request + 10; - - let (_chain_sync_network_provider, chain_sync_network_handle) = - NetworkServiceProvider::new(); - let mut client = Arc::new(TestClientBuilder::new().build()); - - // Blocks on the non-canonical chain. - let non_canonical_blocks = (0..non_canonical_chain_length) - .map(|_| build_block(&mut client, None, false)) - .collect::>(); - - // Blocks on the canonical chain. - let canonical_blocks = { - let mut client = Arc::new(TestClientBuilder::new().build()); - let common_blocks = non_canonical_blocks[..common_ancestor as usize] - .into_iter() - .inspect(|b| block_on(client.import(BlockOrigin::Own, (*b).clone())).unwrap()) - .cloned() - .collect::>(); - - common_blocks - .into_iter() - .chain( - (0..(canonical_chain_length - common_ancestor as u32)) - .map(|_| build_block(&mut client, None, true)), - ) - .collect::>() - }; - - let mut sync = ChainSync::new( - SyncMode::Full, - client.clone(), - ProtocolName::from("test-block-announce-protocol"), - 1, - max_blocks_per_request, - None, - chain_sync_network_handle, - ) - .unwrap(); - - // Connect the node we will sync from - let peer_id = PeerId::random(); - let canonical_tip = canonical_blocks.last().unwrap().clone(); - let mut request = sync - .new_peer(peer_id, canonical_tip.hash(), *canonical_tip.header().number()) - .unwrap() - .unwrap(); - assert_eq!(FromBlock::Number(client.info().best_number), request.from); - assert_eq!(Some(1), request.max); - - // Do the ancestor search - loop { - let block = - &canonical_blocks[unwrap_from_block_number(request.from.clone()) as usize - 1]; - let response = create_block_response(vec![block.clone()]); - - let on_block_data = sync.on_block_data(&peer_id, Some(request), response).unwrap(); - request = if let OnBlockData::Request(_peer, request) = on_block_data { - request - } else { - // We found the ancestor - break - }; - - log::trace!(target: LOG_TARGET, "Request: {request:?}"); - } - - // The response for the 64 blocks is returned in two parts: - // part 1: last 61 blocks [19..79], part 2: first 3 blocks [16-18]. - // Even though the first part extends the current chain ending at 18, - // it should not result in an import yet. - let resp_1_from = common_ancestor as u64 + max_blocks_per_request as u64; - let resp_2_from = common_ancestor as u64 + 3; - - // No import expected. - let request = get_block_request( - &mut sync, - FromBlock::Number(resp_1_from), - max_blocks_per_request as u32, - &peer_id, - ); - - let from = unwrap_from_block_number(request.from.clone()); - let mut resp_blocks = canonical_blocks[18..from as usize].to_vec(); - resp_blocks.reverse(); - let response = create_block_response(resp_blocks.clone()); - let res = sync.on_block_data(&peer_id, Some(request), response).unwrap(); - assert!(matches!( - res, - OnBlockData::Import(ImportBlocksAction{ origin: _, blocks }) if blocks.is_empty() - ),); - - // Gap filled, expect max_blocks_per_request being imported now. - let request = get_block_request(&mut sync, FromBlock::Number(resp_2_from), 3, &peer_id); - let mut resp_blocks = canonical_blocks[common_ancestor as usize..18].to_vec(); - resp_blocks.reverse(); - let response = create_block_response(resp_blocks.clone()); - let res = sync.on_block_data(&peer_id, Some(request), response).unwrap(); - let to_import: Vec<_> = match &res { - OnBlockData::Import(ImportBlocksAction { origin: _, blocks }) => { - assert_eq!(blocks.len(), sync.max_blocks_per_request as usize); - blocks - .iter() - .map(|b| { - let num = *b.header.as_ref().unwrap().number() as usize; - canonical_blocks[num - 1].clone() - }) - .collect() - }, - _ => { - panic!("Unexpected response: {res:?}"); - }, - }; - - let _ = sync.on_blocks_processed( - max_blocks_per_request as usize, - resp_blocks.len(), - resp_blocks - .iter() - .rev() - .map(|b| { - ( - Ok(BlockImportStatus::ImportedUnknown( - *b.header().number(), - Default::default(), - Some(peer_id), - )), - b.hash(), - ) - }) - .collect(), - ); - to_import.into_iter().for_each(|b| { - assert!(matches!(client.block(*b.header.parent_hash()), Ok(Some(_)))); - block_on(client.import(BlockOrigin::Own, b)).unwrap(); - }); - let expected_number = common_ancestor as u32 + max_blocks_per_request as u32; - assert_eq!(sync.best_queued_number as u32, expected_number); - assert_eq!(sync.best_queued_hash, canonical_blocks[expected_number as usize - 1].hash()); - // Sync rest of the chain. - let request = - get_block_request(&mut sync, FromBlock::Hash(canonical_tip.hash()), 10_u32, &peer_id); - let mut resp_blocks = canonical_blocks - [(canonical_chain_length - 10) as usize..canonical_chain_length as usize] - .to_vec(); - resp_blocks.reverse(); - let response = create_block_response(resp_blocks.clone()); - let res = sync.on_block_data(&peer_id, Some(request), response).unwrap(); - assert!(matches!( - res, - OnBlockData::Import(ImportBlocksAction{ origin: _, blocks }) if blocks.len() == 10 as usize - ),); - let _ = sync.on_blocks_processed( - max_blocks_per_request as usize, - resp_blocks.len(), - resp_blocks - .iter() - .rev() - .map(|b| { - ( - Ok(BlockImportStatus::ImportedUnknown( - *b.header().number(), - Default::default(), - Some(peer_id), - )), - b.hash(), - ) - }) - .collect(), - ); - resp_blocks.into_iter().rev().for_each(|b| { - assert!(matches!(client.block(*b.header.parent_hash()), Ok(Some(_)))); - block_on(client.import(BlockOrigin::Own, b)).unwrap(); - }); - let expected_number = canonical_chain_length as u32; - assert_eq!(sync.best_queued_number as u32, expected_number); - assert_eq!(sync.best_queued_hash, canonical_blocks[expected_number as usize - 1].hash()); - } - - #[test] - fn syncs_fork_with_partial_response_does_not_extend_tip() { - sp_tracing::try_init_simple(); - - // Set up: the two chains share the first 15 blocks before - // diverging. The other(canonical) chain fork is longer. - let max_blocks_per_request = 64; - let common_ancestor = 15; - let non_canonical_chain_length = common_ancestor + 3; - let canonical_chain_length = common_ancestor + max_blocks_per_request + 10; - - let (_chain_sync_network_provider, chain_sync_network_handle) = - NetworkServiceProvider::new(); - let mut client = Arc::new(TestClientBuilder::new().build()); - - // Blocks on the non-canonical chain. - let non_canonical_blocks = (0..non_canonical_chain_length) - .map(|_| build_block(&mut client, None, false)) - .collect::>(); - - // Blocks on the canonical chain. - let canonical_blocks = { - let mut client = Arc::new(TestClientBuilder::new().build()); - let common_blocks = non_canonical_blocks[..common_ancestor as usize] - .into_iter() - .inspect(|b| block_on(client.import(BlockOrigin::Own, (*b).clone())).unwrap()) - .cloned() - .collect::>(); - - common_blocks - .into_iter() - .chain( - (0..(canonical_chain_length - common_ancestor as u32)) - .map(|_| build_block(&mut client, None, true)), - ) - .collect::>() - }; - - let mut sync = ChainSync::new( - SyncMode::Full, - client.clone(), - ProtocolName::from("test-block-announce-protocol"), - 1, - max_blocks_per_request, - None, - chain_sync_network_handle, - ) - .unwrap(); - - // Connect the node we will sync from - let peer_id = PeerId::random(); - let canonical_tip = canonical_blocks.last().unwrap().clone(); - let mut request = sync - .new_peer(peer_id, canonical_tip.hash(), *canonical_tip.header().number()) - .unwrap() - .unwrap(); - assert_eq!(FromBlock::Number(client.info().best_number), request.from); - assert_eq!(Some(1), request.max); - - // Do the ancestor search - loop { - let block = - &canonical_blocks[unwrap_from_block_number(request.from.clone()) as usize - 1]; - let response = create_block_response(vec![block.clone()]); - - let on_block_data = sync.on_block_data(&peer_id, Some(request), response).unwrap(); - request = if let OnBlockData::Request(_peer, request) = on_block_data { - request - } else { - // We found the ancestor - break - }; - - log::trace!(target: LOG_TARGET, "Request: {request:?}"); - } - - // The response for the 64 blocks is returned in two parts: - // part 1: last 62 blocks [18..79], part 2: first 2 blocks [16-17]. - // Even though the first part extends the current chain ending at 18, - // it should not result in an import yet. - let resp_1_from = common_ancestor as u64 + max_blocks_per_request as u64; - let resp_2_from = common_ancestor as u64 + 2; - - // No import expected. - let request = get_block_request( - &mut sync, - FromBlock::Number(resp_1_from), - max_blocks_per_request as u32, - &peer_id, - ); - - let from = unwrap_from_block_number(request.from.clone()); - let mut resp_blocks = canonical_blocks[17..from as usize].to_vec(); - resp_blocks.reverse(); - let response = create_block_response(resp_blocks.clone()); - let res = sync.on_block_data(&peer_id, Some(request), response).unwrap(); - assert!(matches!( - res, - OnBlockData::Import(ImportBlocksAction{ origin: _, blocks }) if blocks.is_empty() - ),); - - // Gap filled, expect max_blocks_per_request being imported now. - let request = get_block_request(&mut sync, FromBlock::Number(resp_2_from), 2, &peer_id); - let mut resp_blocks = canonical_blocks[common_ancestor as usize..17].to_vec(); - resp_blocks.reverse(); - let response = create_block_response(resp_blocks.clone()); - let res = sync.on_block_data(&peer_id, Some(request), response).unwrap(); - let to_import: Vec<_> = match &res { - OnBlockData::Import(ImportBlocksAction { origin: _, blocks }) => { - assert_eq!(blocks.len(), sync.max_blocks_per_request as usize); - blocks - .iter() - .map(|b| { - let num = *b.header.as_ref().unwrap().number() as usize; - canonical_blocks[num - 1].clone() - }) - .collect() - }, - _ => { - panic!("Unexpected response: {res:?}"); - }, - }; - - let _ = sync.on_blocks_processed( - max_blocks_per_request as usize, - resp_blocks.len(), - resp_blocks - .iter() - .rev() - .map(|b| { - ( - Ok(BlockImportStatus::ImportedUnknown( - *b.header().number(), - Default::default(), - Some(peer_id), - )), - b.hash(), - ) - }) - .collect(), - ); - to_import.into_iter().for_each(|b| { - assert!(matches!(client.block(*b.header.parent_hash()), Ok(Some(_)))); - block_on(client.import(BlockOrigin::Own, b)).unwrap(); - }); - let expected_number = common_ancestor as u32 + max_blocks_per_request as u32; - assert_eq!(sync.best_queued_number as u32, expected_number); - assert_eq!(sync.best_queued_hash, canonical_blocks[expected_number as usize - 1].hash()); - // Sync rest of the chain. - let request = - get_block_request(&mut sync, FromBlock::Hash(canonical_tip.hash()), 10_u32, &peer_id); - let mut resp_blocks = canonical_blocks - [(canonical_chain_length - 10) as usize..canonical_chain_length as usize] - .to_vec(); - resp_blocks.reverse(); - let response = create_block_response(resp_blocks.clone()); - let res = sync.on_block_data(&peer_id, Some(request), response).unwrap(); - assert!(matches!( - res, - OnBlockData::Import(ImportBlocksAction{ origin: _, blocks }) if blocks.len() == 10 as usize - ),); - let _ = sync.on_blocks_processed( - max_blocks_per_request as usize, - resp_blocks.len(), - resp_blocks - .iter() - .rev() - .map(|b| { - ( - Ok(BlockImportStatus::ImportedUnknown( - *b.header().number(), - Default::default(), - Some(peer_id), - )), - b.hash(), - ) - }) - .collect(), - ); - resp_blocks.into_iter().rev().for_each(|b| { - assert!(matches!(client.block(*b.header.parent_hash()), Ok(Some(_)))); - block_on(client.import(BlockOrigin::Own, b)).unwrap(); - }); - let expected_number = canonical_chain_length as u32; - assert_eq!(sync.best_queued_number as u32, expected_number); - assert_eq!(sync.best_queued_hash, canonical_blocks[expected_number as usize - 1].hash()); - } }