Skip to content

Commit

Permalink
Merge pull request #351 from metaplex-foundation/chore/mtg-1041-drop-…
Browse files Browse the repository at this point in the history
…legacy-types

MTG-1041: drop legacy types
  • Loading branch information
armyhaylenko authored Jan 16, 2025
2 parents de63b9c + 8868956 commit 11ca9d8
Show file tree
Hide file tree
Showing 15 changed files with 120 additions and 297 deletions.
44 changes: 13 additions & 31 deletions consistency_check/src/bin/compressed_assets/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,8 @@ pub async fn main() {
let file = File::open(config.trees_file_path).unwrap();
let mut rdr = csv::Reader::from_reader(file);

let keys: Vec<String> = rdr
.records()
.filter_map(Result::ok)
.map(|r| r.as_slice().to_string())
.collect();
let keys: Vec<String> =
rdr.records().filter_map(Result::ok).map(|r| r.as_slice().to_string()).collect();

let rpc_client = Arc::new(RpcClient::new(config.rpc_endpoint));

Expand Down Expand Up @@ -138,11 +135,7 @@ pub async fn main() {
let shutdown_token_clone = shutdown_token.clone();

// update rate on the background
tokio::spawn(update_rate(
shutdown_token_clone,
assets_processed_clone,
rate_clone,
));
tokio::spawn(update_rate(shutdown_token_clone, assets_processed_clone, rate_clone));

// write found problematic assets to the files
writers.spawn(async move {
Expand Down Expand Up @@ -208,7 +201,7 @@ async fn verify_tree_batch(
let mut f_ch = failed_check.lock().await;
f_ch.insert(tree.clone());
continue;
}
},
};

let tree_config_key = Pubkey::find_program_address(&[t_key.as_ref()], &mpl_bubblegum::ID).0;
Expand Down Expand Up @@ -337,7 +330,7 @@ async fn process_assets_batch(
)
.await;
}
}
},
Err(e) => {
save_asset_w_inv_proofs(
assets_with_failed_proofs.clone(),
Expand All @@ -347,18 +340,15 @@ async fn process_assets_batch(
Some(e),
)
.await;
}
},
}
} else {
save_asset_w_inv_proofs(
assets_with_missed_proofs.clone(),
failed_proofs.clone(),
tree.clone(),
asset.clone(),
Some(format!(
"API did not return any proofs for asset: {:?}",
asset
)),
Some(format!("API did not return any proofs for asset: {:?}", asset)),
)
.await;
}
Expand Down Expand Up @@ -407,11 +397,8 @@ async fn check_if_asset_proofs_valid(asset_proofs_response: AssetProof) -> Resul
.map_err(|e| e.to_string())?
.to_bytes();

let recomputed_root = recompute(
leaf_key,
asset_proofs.as_ref(),
asset_proofs_response.node_index as u32,
);
let recomputed_root =
recompute(leaf_key, asset_proofs.as_ref(), asset_proofs_response.node_index as u32);

Ok(recomputed_root == root_key)
}
Expand Down Expand Up @@ -508,9 +495,8 @@ mod tests {
tree_id: "AxM84SgtLjS51ffA9DucZpGZc3xKDF7H4zU7U6hJQYbR".to_string(),
};

let proofs_valid = check_if_asset_proofs_valid(correct_asset_proofs_response.clone())
.await
.unwrap();
let proofs_valid =
check_if_asset_proofs_valid(correct_asset_proofs_response.clone()).await.unwrap();

assert!(proofs_valid);

Expand All @@ -519,19 +505,15 @@ mod tests {
invalid_first_proof_hash.proof[0] =
"GuR1VgjoFvHU1vkh81LK1znDyWGjf1B2e4rQ4zQivAvT".to_string();

let proofs_valid = check_if_asset_proofs_valid(invalid_first_proof_hash)
.await
.unwrap();
let proofs_valid = check_if_asset_proofs_valid(invalid_first_proof_hash).await.unwrap();

assert_eq!(proofs_valid, false);

let mut invalid_leaf_hash = correct_asset_proofs_response.clone();
// change leaf hash to incorrect one
invalid_leaf_hash.leaf = "GuR1VgjoFvHU1vkh81LK1znDyWGjf1B2e4rQ4zQivAvT".to_string();

let proofs_valid = check_if_asset_proofs_valid(invalid_leaf_hash)
.await
.unwrap();
let proofs_valid = check_if_asset_proofs_valid(invalid_leaf_hash).await.unwrap();

assert_eq!(proofs_valid, false);
}
Expand Down
57 changes: 15 additions & 42 deletions consistency_check/src/bin/jsons/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,19 +66,12 @@ pub async fn main() {
let args = Args::parse();

match args.cmd {
Commands::CheckConsistency {
rocks_path,
postgre_creds,
batch,
} => {
Commands::CheckConsistency { rocks_path, postgre_creds, batch } => {
check_jsons_consistency(rocks_path, postgre_creds, batch).await;
}
Commands::ChangeStatus {
postgre_creds,
file_path,
} => {
},
Commands::ChangeStatus { postgre_creds, file_path } => {
change_jsons_status(postgre_creds, file_path).await;
}
},
}
}

Expand All @@ -101,11 +94,8 @@ async fn change_jsons_status(postgre_creds: String, file_path: String) {
let spinner_style =
ProgressStyle::with_template("{prefix:>10.bold.dim} {spinner} total={human_pos} {msg}")
.unwrap();
let links_spinner = Arc::new(
ProgressBar::new_spinner()
.with_style(spinner_style)
.with_prefix("links"),
);
let links_spinner =
Arc::new(ProgressBar::new_spinner().with_style(spinner_style).with_prefix("links"));
let mut links_processed = 0;

let mut missed_jsons = csv::Reader::from_path(file_path).unwrap();
Expand Down Expand Up @@ -223,20 +213,13 @@ async fn check_jsons_consistency(rocks_path: String, postgre_creds: String, batc
let assets_processed_clone = assets_processed.clone();
let shutdown_token_clone = shutdown_token.clone();
let rate_clone = rate.clone();
tokio::spawn(update_rate(
shutdown_token_clone,
assets_processed_clone,
rate_clone,
));
tokio::spawn(update_rate(shutdown_token_clone, assets_processed_clone, rate_clone));

let mut last_key_in_batch = None;

info!("Launching main loop...");
loop {
match index_pg_storage
.get_tasks(batch as i64, last_key_in_batch.clone())
.await
{
match index_pg_storage.get_tasks(batch as i64, last_key_in_batch.clone()).await {
Ok(tasks) => {
if tasks.is_empty() {
info!(
Expand All @@ -256,11 +239,7 @@ async fn check_jsons_consistency(rocks_path: String, postgre_creds: String, batc
.map(|t| t.metadata_url.clone())
.collect();

match db_client
.asset_offchain_data
.batch_get(keys_to_check.clone())
.await
{
match db_client.asset_offchain_data.batch_get(keys_to_check.clone()).await {
Ok(jsons) => {
let mut ms_jn = missed_jsons.lock().await;

Expand All @@ -280,17 +259,14 @@ async fn check_jsons_consistency(rocks_path: String, postgre_creds: String, batc
count_of_missed_jsons.fetch_add(1, Ordering::Relaxed);
}
}
}
},
Err(e) => {
error!(
"Error during selecting data from the Rocks: {}",
e.to_string()
);
error!("Error during selecting data from the Rocks: {}", e.to_string());
count_of_missed_jsons
.fetch_add(keys_to_check.len() as u64, Ordering::Relaxed);
let mut ms_jn = missed_jsons.lock().await;
ms_jn.extend(keys_to_check);
}
},
}

assets_processed.fetch_add(tasks.len() as u64, Ordering::Relaxed);
Expand All @@ -300,14 +276,11 @@ async fn check_jsons_consistency(rocks_path: String, postgre_creds: String, batc
info!("Selected from the Postgre less jSONs that expected - meaning it's finished");
break;
}
}
},
Err(e) => {
error!(
"Error during selecting data from the Postgre: {}",
e.to_string()
);
error!("Error during selecting data from the Postgre: {}", e.to_string());
tokio::time::sleep(Duration::from_secs(5)).await;
}
},
}

let current_missed_jsons = count_of_missed_jsons.load(Ordering::Relaxed);
Expand Down
47 changes: 19 additions & 28 deletions consistency_check/src/bin/regular_assets/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,8 @@ pub async fn main() {
let spinner_style =
ProgressStyle::with_template("{prefix:>10.bold.dim} {spinner} total={human_pos} {msg}")
.unwrap();
let accounts_spinner = Arc::new(
ProgressBar::new_spinner()
.with_style(spinner_style)
.with_prefix("accs"),
);
let accounts_spinner =
Arc::new(ProgressBar::new_spinner().with_style(spinner_style).with_prefix("accs"));
let assets_processed = Arc::new(AtomicU64::new(0));
let rate = Arc::new(Mutex::new(0.0));

Expand Down Expand Up @@ -156,11 +153,7 @@ pub async fn main() {
let assets_processed_clone = assets_processed.clone();
let shutdown_token_clone = shutdown_token.clone();
let rate_clone = rate.clone();
tokio::spawn(update_rate(
shutdown_token_clone,
assets_processed_clone,
rate_clone,
));
tokio::spawn(update_rate(shutdown_token_clone, assets_processed_clone, rate_clone));

'outer: for append_vec in snapshot_loader.iter() {
match append_vec {
Expand All @@ -173,9 +166,8 @@ pub async fn main() {
let account = account.access().unwrap();

if account.account_meta.owner == *CORE_KEY {
if let Err(e) = nfts_channel_tx
.send((AccountType::Core, account.meta.pubkey))
.await
if let Err(e) =
nfts_channel_tx.send((AccountType::Core, account.meta.pubkey)).await
{
error!("Could not send core key to the channel: {}", e.to_string());
}
Expand All @@ -184,9 +176,8 @@ pub async fn main() {
{
// there only 2 types of accounts for that programs, so if it's not mint it's token account
if account.data.len() == MINT_ACC_DATA_SIZE {
if let Err(e) = nfts_channel_tx
.send((AccountType::Mint, account.meta.pubkey))
.await
if let Err(e) =
nfts_channel_tx.send((AccountType::Mint, account.meta.pubkey)).await
{
error!("Could not send mint key to the channel: {}", e.to_string());
}
Expand All @@ -199,7 +190,7 @@ pub async fn main() {
}
}
}
}
},
Err(error) => error!("append_vec: {:?}", error),
};
}
Expand Down Expand Up @@ -270,16 +261,16 @@ async fn process_nfts(
m_d.insert(key.to_string());
drop(m_d);
}
}
},
Err(e) => {
error!(
"Error during checking asset data key existence: {}",
e.to_string()
);
}
},
}
match acc_type {
AccountType::Core => {} // already checked above
AccountType::Core => {}, // already checked above
// if we've got mint account we also should check spl_mints column
AccountType::Mint => match rocks_db_cloned.spl_mints.has_key(key).await {
Ok(exist) => {
Expand All @@ -290,13 +281,13 @@ async fn process_nfts(
m_d.insert(key.to_string());
drop(m_d);
}
}
},
Err(e) => {
error!(
"Error during checking mint key existence: {}",
e.to_string()
);
}
},
},
}

Expand All @@ -322,11 +313,11 @@ async fn process_nfts(

drop(permit);
});
}
},
None => {
// if None is received - channel was closed
break;
}
},
}
}
Ok(())
Expand Down Expand Up @@ -377,13 +368,13 @@ async fn process_fungibles(
m_d.insert(key.to_string());
drop(m_d);
}
}
},
Err(e) => {
error!(
"Error during checking token accounts key existence: {}",
e.to_string()
);
}
},
}

let current_assets_processed =
Expand All @@ -408,11 +399,11 @@ async fn process_fungibles(

drop(permit);
});
}
},
None => {
// if None is received - channel was closed
break;
}
},
}
}
Ok(())
Expand Down
Loading

0 comments on commit 11ca9d8

Please sign in to comment.