Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: MSP responding/accepting file key already accepted previously #322

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
290 changes: 136 additions & 154 deletions node/src/tasks/msp_upload_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -672,157 +672,82 @@ where
.await;
let read_fs = fs.read().await;

// Reject the storage request if file key already exists in the forest storage.
if read_fs.contains_file_key(&file_key.into())? {
let err_msg = format!("File key {:?} already exists in forest storage.", file_key);
debug!(target: LOG_TARGET, "{}", err_msg);

// Reject the storage request.
let call = storage_hub_runtime::RuntimeCall::FileSystem(
pallet_file_system::Call::msp_respond_storage_requests_multiple_buckets {
storage_request_msp_response: vec![StorageRequestMspBucketResponse {
bucket_id: event.bucket_id,
accept: None,
reject: vec![RejectedStorageRequest {
file_key: H256(file_key.into()),
reason: RejectedStorageRequestReason::FileKeyAlreadyStored,
}],
}],
},
);

self.storage_hub_handler
.blockchain
.send_extrinsic(call, Tip::from(0))
.await?
.with_timeout(Duration::from_secs(60))
.watch_for_success(&self.storage_hub_handler.blockchain)
.await?;

return Ok(());
}

let available_capacity = self
.storage_hub_handler
.blockchain
.query_available_storage_capacity(own_msp_id)
.await
.map_err(|e| {
let err_msg = format!("Failed to query available storage capacity: {:?}", e);
error!(
target: LOG_TARGET,
err_msg
);
anyhow::anyhow!(err_msg)
})?;

// Increase storage capacity if the available capacity is less than the file size.
if available_capacity < event.size {
warn!(
target: LOG_TARGET,
"Insufficient storage capacity to accept file: {:?}",
event.file_key
);

let current_capacity = self
// If we do not have the file already in forest storage, we must take into account the
// available storage capacity.
if !read_fs.contains_file_key(&file_key.into())? {
let available_capacity = self
.storage_hub_handler
.blockchain
.query_storage_provider_capacity(own_msp_id)
.query_available_storage_capacity(own_msp_id)
.await
.map_err(|e| {
let err_msg = format!("Failed to query storage provider capacity: {:?}", e);
let err_msg = format!("Failed to query available storage capacity: {:?}", e);
error!(
target: LOG_TARGET,
err_msg
);
anyhow::anyhow!(err_msg)
})?;

let max_storage_capacity = self
.storage_hub_handler
.provider_config
.max_storage_capacity;

if max_storage_capacity == current_capacity {
let err_msg = "Reached maximum storage capacity limit. Unable to add more more storage capacity.";
// Increase storage capacity if the available capacity is less than the file size.
if available_capacity < event.size {
warn!(
target: LOG_TARGET, err_msg
target: LOG_TARGET,
"Insufficient storage capacity to accept file: {:?}",
event.file_key
);
return Err(anyhow::anyhow!(err_msg));
}

let new_capacity = self.calculate_capacity(&event, current_capacity)?;

let call = storage_hub_runtime::RuntimeCall::Providers(
pallet_storage_providers::Call::change_capacity { new_capacity },
);
let current_capacity = self
.storage_hub_handler
.blockchain
.query_storage_provider_capacity(own_msp_id)
.await
.map_err(|e| {
let err_msg = format!("Failed to query storage provider capacity: {:?}", e);
error!(
target: LOG_TARGET,
err_msg
);
anyhow::anyhow!(err_msg)
})?;

let max_storage_capacity = self
.storage_hub_handler
.provider_config
.max_storage_capacity;

let earliest_change_capacity_block = self
.storage_hub_handler
.blockchain
.query_earliest_change_capacity_block(own_msp_id)
.await
.map_err(|e| {
error!(
target: LOG_TARGET,
"Failed to query storage provider capacity: {:?}", e
if max_storage_capacity == current_capacity {
let err_msg = "Reached maximum storage capacity limit. Unable to add more more storage capacity.";
warn!(
target: LOG_TARGET, err_msg
);
anyhow::anyhow!("Failed to query storage provider capacity: {:?}", e)
})?;
return Err(anyhow::anyhow!(err_msg));
}

// Wait for the earliest block where the capacity can be changed.
self.storage_hub_handler
.blockchain
.wait_for_block(earliest_change_capacity_block)
.await?;
let new_capacity = self.calculate_capacity(&event, current_capacity)?;

self.storage_hub_handler
.blockchain
.send_extrinsic(call, Tip::from(0))
.await?
.with_timeout(Duration::from_secs(60))
.watch_for_success(&self.storage_hub_handler.blockchain)
.await?;

info!(
target: LOG_TARGET,
"Increased storage capacity to {:?} bytes",
new_capacity
);

let available_capacity = self
.storage_hub_handler
.blockchain
.query_available_storage_capacity(own_msp_id)
.await
.map_err(|e| {
error!(
target: LOG_TARGET,
"Failed to query available storage capacity: {:?}", e
);
anyhow::anyhow!("Failed to query available storage capacity: {:?}", e)
})?;

// Reject storage request if the new available capacity is still less than the file size.
if available_capacity < event.size {
let err_msg = "Increased storage capacity is still insufficient to volunteer for file. Rejecting storage request.";
warn!(
target: LOG_TARGET, "{}", err_msg
let call = storage_hub_runtime::RuntimeCall::Providers(
pallet_storage_providers::Call::change_capacity { new_capacity },
);

// Build extrinsic.
let call = storage_hub_runtime::RuntimeCall::FileSystem(
pallet_file_system::Call::msp_respond_storage_requests_multiple_buckets {
storage_request_msp_response: vec![StorageRequestMspBucketResponse {
bucket_id: event.bucket_id,
accept: None,
reject: vec![RejectedStorageRequest {
file_key: H256(event.file_key.into()),
reason: RejectedStorageRequestReason::ReachedMaximumCapacity,
}],
}],
},
);
let earliest_change_capacity_block = self
.storage_hub_handler
.blockchain
.query_earliest_change_capacity_block(own_msp_id)
.await
.map_err(|e| {
error!(
target: LOG_TARGET,
"Failed to query storage provider capacity: {:?}", e
);
anyhow::anyhow!("Failed to query storage provider capacity: {:?}", e)
})?;

// Wait for the earliest block where the capacity can be changed.
self.storage_hub_handler
.blockchain
.wait_for_block(earliest_change_capacity_block)
.await?;

self.storage_hub_handler
.blockchain
Expand All @@ -832,36 +757,93 @@ where
.watch_for_success(&self.storage_hub_handler.blockchain)
.await?;

return Err(anyhow::anyhow!(err_msg));
info!(
target: LOG_TARGET,
"Increased storage capacity to {:?} bytes",
new_capacity
);

let available_capacity = self
.storage_hub_handler
.blockchain
.query_available_storage_capacity(own_msp_id)
.await
.map_err(|e| {
error!(
target: LOG_TARGET,
"Failed to query available storage capacity: {:?}", e
);
anyhow::anyhow!("Failed to query available storage capacity: {:?}", e)
})?;

// Reject storage request if the new available capacity is still less than the file size.
if available_capacity < event.size {
let err_msg = "Increased storage capacity is still insufficient to volunteer for file. Rejecting storage request.";
warn!(
target: LOG_TARGET, "{}", err_msg
);

// Build extrinsic.
let call = storage_hub_runtime::RuntimeCall::FileSystem(
pallet_file_system::Call::msp_respond_storage_requests_multiple_buckets {
storage_request_msp_response: vec![StorageRequestMspBucketResponse {
bucket_id: event.bucket_id,
accept: None,
reject: vec![RejectedStorageRequest {
file_key: H256(event.file_key.into()),
reason: RejectedStorageRequestReason::ReachedMaximumCapacity,
}],
}],
},
);

self.storage_hub_handler
.blockchain
.send_extrinsic(call, Tip::from(0))
.await?
.with_timeout(Duration::from_secs(60))
.watch_for_success(&self.storage_hub_handler.blockchain)
.await?;

return Err(anyhow::anyhow!(err_msg));
}
}
}

self.file_key_cleanup = Some(file_key.into());

// Register the file for upload in the file transfer service.
for peer_id in event.user_peer_ids.iter() {
let peer_id = match std::str::from_utf8(&peer_id.as_slice()) {
Ok(str_slice) => PeerId::from_str(str_slice).map_err(|e| {
error!(target: LOG_TARGET, "Failed to convert peer ID to PeerId: {}", e);
e
})?,
Err(e) => return Err(anyhow!("Failed to convert peer ID to a string: {}", e)),
};
self.storage_hub_handler
.file_transfer
.register_new_file_peer(peer_id, file_key)
.await
.map_err(|e| anyhow!("Failed to register new file peer: {:?}", e))?;
let mut write_file_storage = self.storage_hub_handler.file_storage.write().await;

// Create file in file storage if it is not present so we can write uploaded chunks as soon as possible.
if write_file_storage
.get_metadata(&file_key.into())
.map_err(|e| anyhow!("Failed to get metadata from file storage: {:?}", e))?
.is_none()
{
write_file_storage
.insert_file(
metadata.file_key::<HashT<StorageProofsMerkleTrieLayout>>(),
metadata,
)
.map_err(|e| anyhow!("Failed to insert file in file storage: {:?}", e))?;

// Register the file for upload in the file transfer service.
for peer_id in event.user_peer_ids.iter() {
let peer_id = match std::str::from_utf8(&peer_id.as_slice()) {
Ok(str_slice) => PeerId::from_str(str_slice).map_err(|e| {
error!(target: LOG_TARGET, "Failed to convert peer ID to PeerId: {}", e);
e
})?,
Err(e) => return Err(anyhow!("Failed to convert peer ID to a string: {}", e)),
};
self.storage_hub_handler
.file_transfer
.register_new_file_peer(peer_id, file_key)
.await
.map_err(|e| anyhow!("Failed to register new file peer: {:?}", e))?;
}
}

// Create file in file storage so we can write uploaded chunks as soon as possible.
let mut write_file_storage = self.storage_hub_handler.file_storage.write().await;
write_file_storage
.insert_file(
metadata.file_key::<HashT<StorageProofsMerkleTrieLayout>>(),
metadata,
)
.map_err(|e| anyhow!("Failed to insert file in file storage: {:?}", e))?;
drop(write_file_storage);

Ok(())
Expand Down
Loading
Loading