From b0f136ed9ce8c2bd5b3e763e1f37d10fc5396cf9 Mon Sep 17 00:00:00 2001 From: Michael Assaf Date: Fri, 10 Jan 2025 14:42:39 -0500 Subject: [PATCH] have msp send inclusion forest proof for msp respond of pre existing file key --- node/src/tasks/msp_upload_file.rs | 290 ++++++++++++++---------------- pallets/file-system/src/tests.rs | 72 +++++++- pallets/file-system/src/types.rs | 2 + pallets/file-system/src/utils.rs | 102 +++++------ 4 files changed, 253 insertions(+), 213 deletions(-) diff --git a/node/src/tasks/msp_upload_file.rs b/node/src/tasks/msp_upload_file.rs index 93f7ac1a1..f21856602 100644 --- a/node/src/tasks/msp_upload_file.rs +++ b/node/src/tasks/msp_upload_file.rs @@ -672,65 +672,16 @@ where .await; let read_fs = fs.read().await; - // Reject the storage request if file key already exists in the forest storage. - if read_fs.contains_file_key(&file_key.into())? { - let err_msg = format!("File key {:?} already exists in forest storage.", file_key); - debug!(target: LOG_TARGET, "{}", err_msg); - - // Reject the storage request. - let call = storage_hub_runtime::RuntimeCall::FileSystem( - pallet_file_system::Call::msp_respond_storage_requests_multiple_buckets { - storage_request_msp_response: vec![StorageRequestMspBucketResponse { - bucket_id: event.bucket_id, - accept: None, - reject: vec![RejectedStorageRequest { - file_key: H256(file_key.into()), - reason: RejectedStorageRequestReason::FileKeyAlreadyStored, - }], - }], - }, - ); - - self.storage_hub_handler - .blockchain - .send_extrinsic(call, Tip::from(0)) - .await? - .with_timeout(Duration::from_secs(60)) - .watch_for_success(&self.storage_hub_handler.blockchain) - .await?; - - return Ok(()); - } - - let available_capacity = self - .storage_hub_handler - .blockchain - .query_available_storage_capacity(own_msp_id) - .await - .map_err(|e| { - let err_msg = format!("Failed to query available storage capacity: {:?}", e); - error!( - target: LOG_TARGET, - err_msg - ); - anyhow::anyhow!(err_msg) - })?; - - // Increase storage capacity if the available capacity is less than the file size. - if available_capacity < event.size { - warn!( - target: LOG_TARGET, - "Insufficient storage capacity to accept file: {:?}", - event.file_key - ); - - let current_capacity = self + // If we do not have the file already in forest storage, we must take into account the + // available storage capacity. + if !read_fs.contains_file_key(&file_key.into())? { + let available_capacity = self .storage_hub_handler .blockchain - .query_storage_provider_capacity(own_msp_id) + .query_available_storage_capacity(own_msp_id) .await .map_err(|e| { - let err_msg = format!("Failed to query storage provider capacity: {:?}", e); + let err_msg = format!("Failed to query available storage capacity: {:?}", e); error!( target: LOG_TARGET, err_msg @@ -738,91 +689,65 @@ where anyhow::anyhow!(err_msg) })?; - let max_storage_capacity = self - .storage_hub_handler - .provider_config - .max_storage_capacity; - - if max_storage_capacity == current_capacity { - let err_msg = "Reached maximum storage capacity limit. Unable to add more more storage capacity."; + // Increase storage capacity if the available capacity is less than the file size. + if available_capacity < event.size { warn!( - target: LOG_TARGET, err_msg + target: LOG_TARGET, + "Insufficient storage capacity to accept file: {:?}", + event.file_key ); - return Err(anyhow::anyhow!(err_msg)); - } - let new_capacity = self.calculate_capacity(&event, current_capacity)?; - - let call = storage_hub_runtime::RuntimeCall::Providers( - pallet_storage_providers::Call::change_capacity { new_capacity }, - ); + let current_capacity = self + .storage_hub_handler + .blockchain + .query_storage_provider_capacity(own_msp_id) + .await + .map_err(|e| { + let err_msg = format!("Failed to query storage provider capacity: {:?}", e); + error!( + target: LOG_TARGET, + err_msg + ); + anyhow::anyhow!(err_msg) + })?; + + let max_storage_capacity = self + .storage_hub_handler + .provider_config + .max_storage_capacity; - let earliest_change_capacity_block = self - .storage_hub_handler - .blockchain - .query_earliest_change_capacity_block(own_msp_id) - .await - .map_err(|e| { - error!( - target: LOG_TARGET, - "Failed to query storage provider capacity: {:?}", e + if max_storage_capacity == current_capacity { + let err_msg = "Reached maximum storage capacity limit. Unable to add more more storage capacity."; + warn!( + target: LOG_TARGET, err_msg ); - anyhow::anyhow!("Failed to query storage provider capacity: {:?}", e) - })?; + return Err(anyhow::anyhow!(err_msg)); + } - // Wait for the earliest block where the capacity can be changed. - self.storage_hub_handler - .blockchain - .wait_for_block(earliest_change_capacity_block) - .await?; + let new_capacity = self.calculate_capacity(&event, current_capacity)?; - self.storage_hub_handler - .blockchain - .send_extrinsic(call, Tip::from(0)) - .await? - .with_timeout(Duration::from_secs(60)) - .watch_for_success(&self.storage_hub_handler.blockchain) - .await?; - - info!( - target: LOG_TARGET, - "Increased storage capacity to {:?} bytes", - new_capacity - ); - - let available_capacity = self - .storage_hub_handler - .blockchain - .query_available_storage_capacity(own_msp_id) - .await - .map_err(|e| { - error!( - target: LOG_TARGET, - "Failed to query available storage capacity: {:?}", e - ); - anyhow::anyhow!("Failed to query available storage capacity: {:?}", e) - })?; - - // Reject storage request if the new available capacity is still less than the file size. - if available_capacity < event.size { - let err_msg = "Increased storage capacity is still insufficient to volunteer for file. Rejecting storage request."; - warn!( - target: LOG_TARGET, "{}", err_msg + let call = storage_hub_runtime::RuntimeCall::Providers( + pallet_storage_providers::Call::change_capacity { new_capacity }, ); - // Build extrinsic. - let call = storage_hub_runtime::RuntimeCall::FileSystem( - pallet_file_system::Call::msp_respond_storage_requests_multiple_buckets { - storage_request_msp_response: vec![StorageRequestMspBucketResponse { - bucket_id: event.bucket_id, - accept: None, - reject: vec![RejectedStorageRequest { - file_key: H256(event.file_key.into()), - reason: RejectedStorageRequestReason::ReachedMaximumCapacity, - }], - }], - }, - ); + let earliest_change_capacity_block = self + .storage_hub_handler + .blockchain + .query_earliest_change_capacity_block(own_msp_id) + .await + .map_err(|e| { + error!( + target: LOG_TARGET, + "Failed to query storage provider capacity: {:?}", e + ); + anyhow::anyhow!("Failed to query storage provider capacity: {:?}", e) + })?; + + // Wait for the earliest block where the capacity can be changed. + self.storage_hub_handler + .blockchain + .wait_for_block(earliest_change_capacity_block) + .await?; self.storage_hub_handler .blockchain @@ -832,36 +757,93 @@ where .watch_for_success(&self.storage_hub_handler.blockchain) .await?; - return Err(anyhow::anyhow!(err_msg)); + info!( + target: LOG_TARGET, + "Increased storage capacity to {:?} bytes", + new_capacity + ); + + let available_capacity = self + .storage_hub_handler + .blockchain + .query_available_storage_capacity(own_msp_id) + .await + .map_err(|e| { + error!( + target: LOG_TARGET, + "Failed to query available storage capacity: {:?}", e + ); + anyhow::anyhow!("Failed to query available storage capacity: {:?}", e) + })?; + + // Reject storage request if the new available capacity is still less than the file size. + if available_capacity < event.size { + let err_msg = "Increased storage capacity is still insufficient to volunteer for file. Rejecting storage request."; + warn!( + target: LOG_TARGET, "{}", err_msg + ); + + // Build extrinsic. + let call = storage_hub_runtime::RuntimeCall::FileSystem( + pallet_file_system::Call::msp_respond_storage_requests_multiple_buckets { + storage_request_msp_response: vec![StorageRequestMspBucketResponse { + bucket_id: event.bucket_id, + accept: None, + reject: vec![RejectedStorageRequest { + file_key: H256(event.file_key.into()), + reason: RejectedStorageRequestReason::ReachedMaximumCapacity, + }], + }], + }, + ); + + self.storage_hub_handler + .blockchain + .send_extrinsic(call, Tip::from(0)) + .await? + .with_timeout(Duration::from_secs(60)) + .watch_for_success(&self.storage_hub_handler.blockchain) + .await?; + + return Err(anyhow::anyhow!(err_msg)); + } } } self.file_key_cleanup = Some(file_key.into()); - // Register the file for upload in the file transfer service. - for peer_id in event.user_peer_ids.iter() { - let peer_id = match std::str::from_utf8(&peer_id.as_slice()) { - Ok(str_slice) => PeerId::from_str(str_slice).map_err(|e| { - error!(target: LOG_TARGET, "Failed to convert peer ID to PeerId: {}", e); - e - })?, - Err(e) => return Err(anyhow!("Failed to convert peer ID to a string: {}", e)), - }; - self.storage_hub_handler - .file_transfer - .register_new_file_peer(peer_id, file_key) - .await - .map_err(|e| anyhow!("Failed to register new file peer: {:?}", e))?; + let mut write_file_storage = self.storage_hub_handler.file_storage.write().await; + + // Create file in file storage if it is not present so we can write uploaded chunks as soon as possible. + if write_file_storage + .get_metadata(&file_key.into()) + .map_err(|e| anyhow!("Failed to get metadata from file storage: {:?}", e))? + .is_none() + { + write_file_storage + .insert_file( + metadata.file_key::>(), + metadata, + ) + .map_err(|e| anyhow!("Failed to insert file in file storage: {:?}", e))?; + + // Register the file for upload in the file transfer service. + for peer_id in event.user_peer_ids.iter() { + let peer_id = match std::str::from_utf8(&peer_id.as_slice()) { + Ok(str_slice) => PeerId::from_str(str_slice).map_err(|e| { + error!(target: LOG_TARGET, "Failed to convert peer ID to PeerId: {}", e); + e + })?, + Err(e) => return Err(anyhow!("Failed to convert peer ID to a string: {}", e)), + }; + self.storage_hub_handler + .file_transfer + .register_new_file_peer(peer_id, file_key) + .await + .map_err(|e| anyhow!("Failed to register new file peer: {:?}", e))?; + } } - // Create file in file storage so we can write uploaded chunks as soon as possible. - let mut write_file_storage = self.storage_hub_handler.file_storage.write().await; - write_file_storage - .insert_file( - metadata.file_key::>(), - metadata, - ) - .map_err(|e| anyhow!("Failed to insert file in file storage: {:?}", e))?; drop(write_file_storage); Ok(()) diff --git a/pallets/file-system/src/tests.rs b/pallets/file-system/src/tests.rs index cb07e05a9..677c63a24 100644 --- a/pallets/file-system/src/tests.rs +++ b/pallets/file-system/src/tests.rs @@ -2871,10 +2871,9 @@ mod msp_respond_storage_request { size, msp_id, peer_ids.clone(), - None + Some(1) )); - // Compute the file key. // Compute the file key. let file_key = FileSystem::compute_file_key( owner_account_id.clone(), @@ -2884,6 +2883,11 @@ mod msp_respond_storage_request { fingerprint, ); + // Simulate a BSP already confirming the storage request. + StorageRequests::::mutate(file_key, |storage_request| { + storage_request.as_mut().unwrap().bsps_confirmed = 1; + }); + // Dispatch the MSP accept request. assert_ok!(FileSystem::msp_respond_storage_requests_multiple_buckets( RuntimeOrigin::signed(msp.clone()), @@ -2904,13 +2908,65 @@ mod msp_respond_storage_request { }], )); - // Assert that the storage was updated - assert_eq!( - file_system::StorageRequests::::get(file_key) - .unwrap() - .msp, - Some((msp_id, true)) + // Check that the storage request has been deleted since it is fulfilled. + assert_eq!(StorageRequests::::get(file_key), None); + + // Get bucket root + let bucket_root = <::Providers as shp_traits::ReadBucketsInterface>::get_root_bucket(&bucket_id).unwrap(); + + // Check that the bucket root is not default + assert_ne!(bucket_root, <::Providers as shp_traits::ReadProvidersInterface>::get_default_root()); + + // Dispatch a storage request. + assert_ok!(FileSystem::issue_storage_request( + owner_signed.clone(), + bucket_id, + location.clone(), + fingerprint, + size, + msp_id, + peer_ids.clone(), + Some(1) + )); + + // Compute the file key. + let file_key = FileSystem::compute_file_key( + owner_account_id.clone(), + bucket_id, + location.clone(), + size, + fingerprint, ); + + // Simulate a BSP already confirming the storage request. + StorageRequests::::mutate(file_key, |storage_request| { + storage_request.as_mut().unwrap().bsps_confirmed = 1; + }); + + // Dispatch the MSP accept request with the file key in the forest proof. + assert_ok!(FileSystem::msp_respond_storage_requests_multiple_buckets( + RuntimeOrigin::signed(msp.clone()), + vec![StorageRequestMspBucketResponse { + bucket_id, + accept: Some(StorageRequestMspAcceptedFileKeys { + file_keys_and_proofs: vec![FileKeyWithProof { + file_key, + proof: CompactProof { + encoded_nodes: vec![H256::default().as_ref().to_vec()], + } + }], + non_inclusion_forest_proof: CompactProof { + encoded_nodes: vec![H256::default().as_ref().to_vec(), file_key.as_ref().to_vec()], + }, + }), + reject: vec![], + }], + )); + + let post_state_bucket_root = <::Providers as shp_traits::ReadBucketsInterface>::get_root_bucket(&bucket_id).unwrap(); + + // Bucket root should not have changed + assert_eq!(bucket_root, post_state_bucket_root); }); } diff --git a/pallets/file-system/src/types.rs b/pallets/file-system/src/types.rs index 7c1376610..0e7c2a980 100644 --- a/pallets/file-system/src/types.rs +++ b/pallets/file-system/src/types.rs @@ -121,6 +121,8 @@ impl Debug for FileKeyWithProof { #[scale_info(skip_type_params(T))] pub struct StorageRequestMspAcceptedFileKeys { pub file_keys_and_proofs: Vec>, + /// File keys which have already been accepted by the MSP in a previous storage request should be included + /// in the proof. pub non_inclusion_forest_proof: ForestProof, } diff --git a/pallets/file-system/src/utils.rs b/pallets/file-system/src/utils.rs index 89df47467..a595750ef 100644 --- a/pallets/file-system/src/utils.rs +++ b/pallets/file-system/src/utils.rs @@ -854,82 +854,82 @@ where >::get(&file_key_with_proof.file_key) .ok_or(Error::::StorageRequestNotFound)?; - // Ensure that the file key IS NOT part of the bucket's forest. - if proven_keys.contains(&file_key_with_proof.file_key) { - return Err(Error::::ExpectedNonInclusionProof.into()); - } - // Check that the storage request bucket ID matches the provided bucket ID. - if storage_request_metadata.bucket_id != bucket_id { - return Err(Error::::InvalidBucketIdFileKeyPair.into()); - } + ensure!( + storage_request_metadata.bucket_id == bucket_id, + Error::::InvalidBucketIdFileKeyPair + ); // Check that the MSP is the one storing the bucket. - if !::is_bucket_stored_by_msp( - &msp_id, - &storage_request_metadata.bucket_id, - ) { - return Err(Error::::MspNotStoringBucket.into()); - } + ensure!( + ::is_bucket_stored_by_msp( + &msp_id, + &storage_request_metadata.bucket_id + ), + Error::::MspNotStoringBucket + ); // Check that the storage request has a MSP. - if storage_request_metadata.msp.is_none() { - return Err(Error::::RequestWithoutMsp.into()); - } + ensure!( + storage_request_metadata.msp.is_some(), + Error::::RequestWithoutMsp + ); let (request_msp_id, confirm_status) = storage_request_metadata.msp.unwrap(); // Check that the sender corresponds to the MSP in the storage request and that it hasn't yet confirmed storing the file. - if request_msp_id != msp_id { - return Err(Error::::NotSelectedMsp.into()); - } + ensure!(request_msp_id == msp_id, Error::::NotSelectedMsp); - if confirm_status { - return Err(Error::::MspAlreadyConfirmed.into()); - } + // Check that the MSP hasn't already confirmed storing the file. + ensure!(!confirm_status, Error::::MspAlreadyConfirmed); // Check that the MSP still has enough available capacity to store the file. - if ::available_capacity(&msp_id) - < storage_request_metadata.size - { - return Err(Error::::InsufficientAvailableCapacity.into()); - } + ensure!( + ::available_capacity(&msp_id) + >= storage_request_metadata.size, + Error::::InsufficientAvailableCapacity + ); // Get the file metadata to insert into the bucket under the file key. let file_metadata = storage_request_metadata.clone().to_file_metadata(); - accepted_files_metadata.push(file_metadata); - let chunk_challenges = Self::generate_chunk_challenges_on_sp_confirm( msp_id, file_key_with_proof.file_key, &storage_request_metadata, ); - // Check that the key proof is valid. - ::verify_key_proof( - &file_key_with_proof.file_key, - &chunk_challenges, - &file_key_with_proof.proof, - )?; + // Only check the key proof, increase the bucket size and capacity used if the file key is not in the forest proof, and + // add the file metadata to the `accepted_files_metadata` since all keys in this array will be added to the bucket forest via an apply delta. + // This can happen if the storage request was issued again by the user and the MSP has already stored the file. + if !proven_keys.contains(&file_key_with_proof.file_key) { + accepted_files_metadata.push(file_metadata); - // Increase size of the bucket. - ::increase_bucket_size( - &storage_request_metadata.bucket_id, - storage_request_metadata.size, - )?; + // Check that the key proof is valid. + ::verify_key_proof( + &file_key_with_proof.file_key, + &chunk_challenges, + &file_key_with_proof.proof, + )?; - // Increase the used capacity of the MSP - // This should not fail since we checked that the MSP has enough available capacity to store the file. - expect_or_err!( - ::increase_capacity_used( - &msp_id, + // Increase size of the bucket. + ::increase_bucket_size( + &storage_request_metadata.bucket_id, storage_request_metadata.size, - ), - "Failed to increase capacity used for MSP", - Error::::TooManyStorageRequestResponses, - result - ); + )?; + + // Increase the used capacity of the MSP + // This should not fail since we checked that the MSP has enough available capacity to store the file. + expect_or_err!( + ::increase_capacity_used( + &msp_id, + storage_request_metadata.size, + ), + "Failed to increase capacity used for MSP", + Error::::TooManyStorageRequestResponses, + result + ); + } // Notify that the storage request has been accepted by an MSP. Self::deposit_event(Event::MspAcceptedStorageRequest {