diff --git a/Cargo.lock b/Cargo.lock index a689a53e..c170aceb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4777,6 +4777,7 @@ dependencies = [ "clap 4.5.4", "coingecko", "criterion", + "csv", "entities", "figment", "flatbuffers 23.5.26", diff --git a/nft_ingester/Cargo.toml b/nft_ingester/Cargo.toml index 76eff1ce..a4f6f55b 100644 --- a/nft_ingester/Cargo.toml +++ b/nft_ingester/Cargo.toml @@ -90,18 +90,20 @@ zip-extract = { workspace = true } assertables = "7" base64 = { workspace = true } spl-pod = { workspace = true } +# TODO: remove +csv = { workspace = true } [[bench]] name = "integrated_benchmark" harness = false -[[bench]] -name = "ingester_benchmark" -harness = false - -[[bench]] -name = "synchronizer_benchmark" -harness = false +# [[bench]] +# name = "ingester_benchmark" +# harness = false +# +# [[bench]] +# name = "synchronizer_benchmark" +# harness = false [features] rpc_tests = [] diff --git a/nft_ingester/benches/ingester_benchmark.rs b/nft_ingester/benches/ingester_benchmark.rs index fbe704f9..907167ba 100644 --- a/nft_ingester/benches/ingester_benchmark.rs +++ b/nft_ingester/benches/ingester_benchmark.rs @@ -1,151 +1,151 @@ -use std::{fs::File, sync::Arc}; - -use criterion::{criterion_group, criterion_main, Criterion}; -use metrics_utils::{ - red::RequestErrorDurationMetrics, BackfillerMetricsConfig, IngesterMetricsConfig, -}; -use nft_ingester::{ - backfiller::{DirectBlockParser, TransactionsParser}, - buffer::Buffer, - processors::transaction_based::bubblegum_updates_processor::BubblegumTxProcessor, - transaction_ingester, -}; -use rocks_db::{bubblegum_slots::BubblegumSlotGetter, migrator::MigrationState, Storage}; -use setup::TestEnvironment; -use testcontainers::clients::Cli; -use tokio::{sync::Mutex, task::JoinSet}; - -async fn setup_environment<'a>( - cli: &'a Cli, -) -> (TestEnvironment<'a>, setup::rocks::GeneratedAssets) { - setup::TestEnvironment::create(cli, 0, 100).await -} - -async fn bench_ingest( - rocks_client_raw: Arc, - rocks_dest: Arc, - workers_count: usize, - chunk_size: usize, - permits: usize, -) { - let buffer = Arc::new(Buffer::new()); - - let bubblegum_updates_processor = Arc::new(BubblegumTxProcessor::new( - rocks_dest.clone(), - Arc::new(IngesterMetricsConfig::new()), - buffer.json_tasks.clone(), - )); - - let tx_ingester = Arc::new(transaction_ingester::BackfillTransactionIngester::new( - bubblegum_updates_processor.clone(), - )); - - let consumer = Arc::new(DirectBlockParser::new( - tx_ingester.clone(), - rocks_dest.clone(), - Arc::new(BackfillerMetricsConfig::new()), - )); - - let transactions_parser = Arc::new(TransactionsParser::new( - rocks_client_raw.clone(), - Arc::new(BubblegumSlotGetter::new(rocks_client_raw.clone())), - consumer, - rocks_client_raw, - Arc::new(BackfillerMetricsConfig::new()), - workers_count, - chunk_size, - )); - let (_, rx) = tokio::sync::broadcast::channel::<()>(1); - transactions_parser.parse_raw_transactions(rx, permits, None).await; -} - -fn ingest_benchmark(c: &mut Criterion) { - let tx_storage_dir = tempfile::TempDir::new().unwrap(); - - let storage_archieve = File::open("./tests/artifacts/test_rocks.zip").unwrap(); - - zip_extract::extract(storage_archieve, tx_storage_dir.path(), false).unwrap(); - let tasks = JoinSet::new(); - let mutexed_tasks = Arc::new(Mutex::new(tasks)); - let red_metrics = Arc::new(RequestErrorDurationMetrics::new()); - let transactions_storage = Storage::open( - &format!("{}{}", tx_storage_dir.path().to_str().unwrap(), "/test_rocks"), - mutexed_tasks.clone(), - red_metrics, - MigrationState::Last, - ) - .unwrap(); - - let rocks_storage = Arc::new(transactions_storage); - - let cli: Cli = Cli::default(); - let rt = tokio::runtime::Runtime::new().unwrap(); - let (env, _generated_assets) = rt.block_on(setup_environment(&cli)); - let mut group = c.benchmark_group("Ingestion Group"); - group.sample_size(10); - group.measurement_time(std::time::Duration::from_secs(60)); - group.bench_function("10 worker mode, 1 in chunk", |b| { - b.iter(|| { - rt.block_on(bench_ingest( - rocks_storage.clone(), - env.rocks_env.storage.clone(), - 10, - 1, - 1, - )) - }) - }); - group.bench_function("20 worker mode, 1 in chunk", |b| { - b.iter(|| { - rt.block_on(bench_ingest( - rocks_storage.clone(), - env.rocks_env.storage.clone(), - 20, - 1, - 1, - )) - }) - }); - group.bench_function("50 worker mode, 1 in a chunk", |b| { - b.iter(|| { - rt.block_on(bench_ingest( - rocks_storage.clone(), - env.rocks_env.storage.clone(), - 50, - 1, - 1, - )) - }) - }); - group.bench_function("100 worker mode, 1 in a chunk", |b| { - b.iter(|| { - rt.block_on(bench_ingest( - rocks_storage.clone(), - env.rocks_env.storage.clone(), - 100, - 1, - 1, - )) - }) - }); - group.bench_function("5 workers mode, 1 in a chunk", |b| { - b.iter(|| { - rt.block_on(bench_ingest(rocks_storage.clone(), env.rocks_env.storage.clone(), 5, 1, 1)) - }) - }); - group.bench_function("10 workers mode, 10 in a chunk", |b| { - b.iter(|| { - rt.block_on(bench_ingest( - rocks_storage.clone(), - env.rocks_env.storage.clone(), - 10, - 10, - 1, - )) - }) - }); - rt.block_on(env.teardown()); -} - -criterion_group!(benches, ingest_benchmark); -criterion_main!(benches); +// use std::{fs::File, sync::Arc}; +// +// use criterion::{criterion_group, criterion_main, Criterion}; +// use metrics_utils::{ +// red::RequestErrorDurationMetrics, BackfillerMetricsConfig, IngesterMetricsConfig, +// }; +// use nft_ingester::{ +// backfiller::{DirectBlockParser, TransactionsParser}, +// buffer::Buffer, +// processors::transaction_based::bubblegum_updates_processor::BubblegumTxProcessor, +// transaction_ingester, +// }; +// use rocks_db::{bubblegum_slots::BubblegumSlotGetter, migrator::MigrationState, Storage}; +// use setup::TestEnvironment; +// use testcontainers::clients::Cli; +// use tokio::{sync::Mutex, task::JoinSet}; +// +// async fn setup_environment<'a>( +// cli: &'a Cli, +// ) -> (TestEnvironment<'a>, setup::rocks::GeneratedAssets) { +// setup::TestEnvironment::create(cli, 0, 100).await +// } +// +// async fn bench_ingest( +// rocks_client_raw: Arc, +// rocks_dest: Arc, +// workers_count: usize, +// chunk_size: usize, +// permits: usize, +// ) { +// let buffer = Arc::new(Buffer::new()); +// +// let bubblegum_updates_processor = Arc::new(BubblegumTxProcessor::new( +// rocks_dest.clone(), +// Arc::new(IngesterMetricsConfig::new()), +// buffer.json_tasks.clone(), +// )); +// +// let tx_ingester = Arc::new(transaction_ingester::BackfillTransactionIngester::new( +// bubblegum_updates_processor.clone(), +// )); +// +// let consumer = Arc::new(DirectBlockParser::new( +// tx_ingester.clone(), +// rocks_dest.clone(), +// Arc::new(BackfillerMetricsConfig::new()), +// )); +// +// let transactions_parser = Arc::new(TransactionsParser::new( +// rocks_client_raw.clone(), +// Arc::new(BubblegumSlotGetter::new(rocks_client_raw.clone())), +// consumer, +// rocks_client_raw, +// Arc::new(BackfillerMetricsConfig::new()), +// workers_count, +// chunk_size, +// )); +// let (_, rx) = tokio::sync::broadcast::channel::<()>(1); +// transactions_parser.parse_raw_transactions(rx, permits, None).await; +// } +// +// fn ingest_benchmark(c: &mut Criterion) { +// let tx_storage_dir = tempfile::TempDir::new().unwrap(); +// +// let storage_archieve = File::open("./tests/artifacts/test_rocks.zip").unwrap(); +// +// zip_extract::extract(storage_archieve, tx_storage_dir.path(), false).unwrap(); +// let tasks = JoinSet::new(); +// let mutexed_tasks = Arc::new(Mutex::new(tasks)); +// let red_metrics = Arc::new(RequestErrorDurationMetrics::new()); +// let transactions_storage = Storage::open( +// &format!("{}{}", tx_storage_dir.path().to_str().unwrap(), "/test_rocks"), +// mutexed_tasks.clone(), +// red_metrics, +// MigrationState::Last, +// ) +// .unwrap(); +// +// let rocks_storage = Arc::new(transactions_storage); +// +// let cli: Cli = Cli::default(); +// let rt = tokio::runtime::Runtime::new().unwrap(); +// let (env, _generated_assets) = rt.block_on(setup_environment(&cli)); +// let mut group = c.benchmark_group("Ingestion Group"); +// group.sample_size(10); +// group.measurement_time(std::time::Duration::from_secs(60)); +// group.bench_function("10 worker mode, 1 in chunk", |b| { +// b.iter(|| { +// rt.block_on(bench_ingest( +// rocks_storage.clone(), +// env.rocks_env.storage.clone(), +// 10, +// 1, +// 1, +// )) +// }) +// }); +// group.bench_function("20 worker mode, 1 in chunk", |b| { +// b.iter(|| { +// rt.block_on(bench_ingest( +// rocks_storage.clone(), +// env.rocks_env.storage.clone(), +// 20, +// 1, +// 1, +// )) +// }) +// }); +// group.bench_function("50 worker mode, 1 in a chunk", |b| { +// b.iter(|| { +// rt.block_on(bench_ingest( +// rocks_storage.clone(), +// env.rocks_env.storage.clone(), +// 50, +// 1, +// 1, +// )) +// }) +// }); +// group.bench_function("100 worker mode, 1 in a chunk", |b| { +// b.iter(|| { +// rt.block_on(bench_ingest( +// rocks_storage.clone(), +// env.rocks_env.storage.clone(), +// 100, +// 1, +// 1, +// )) +// }) +// }); +// group.bench_function("5 workers mode, 1 in a chunk", |b| { +// b.iter(|| { +// rt.block_on(bench_ingest(rocks_storage.clone(), env.rocks_env.storage.clone(), 5, 1, 1)) +// }) +// }); +// group.bench_function("10 workers mode, 10 in a chunk", |b| { +// b.iter(|| { +// rt.block_on(bench_ingest( +// rocks_storage.clone(), +// env.rocks_env.storage.clone(), +// 10, +// 10, +// 1, +// )) +// }) +// }); +// rt.block_on(env.teardown()); +// } +// +// criterion_group!(benches, ingest_benchmark); +// criterion_main!(benches); diff --git a/nft_ingester/benches/integrated_benchmark.rs b/nft_ingester/benches/integrated_benchmark.rs index 0f196308..0c239b1f 100644 --- a/nft_ingester/benches/integrated_benchmark.rs +++ b/nft_ingester/benches/integrated_benchmark.rs @@ -1,22 +1,33 @@ -use std::sync::Arc; +use std::{path::PathBuf, str::FromStr, sync::Arc, time::Duration}; use criterion::{criterion_group, criterion_main, Criterion}; -use entities::api_req_params::SearchAssets; -use metrics_utils::ApiMetricsConfig; -use nft_ingester::{api::middleware::JsonDownloaderMiddleware, index_syncronizer::Synchronizer}; -use rocks_db::storage_traits::AssetIndexReader; +use entities::api_req_params::{Options, SearchAssets}; +use interface::price_fetcher::TokenPriceFetcher; +use metrics_utils::{red::RequestErrorDurationMetrics, ApiMetricsConfig}; +use nft_ingester::api::account_balance::AccountBalanceGetterImpl; +use rocks_db::{storage_traits::AssetIndexReader, Storage}; use setup::TestEnvironment; +use solana_sdk::pubkey::Pubkey; use testcontainers::clients::Cli; -use usecase::proofs::MaybeProofChecker; +use tokio::task::JoinError; +use tokio::task::JoinSet; const SLOT_UPDATED: u64 = 100; -async fn benchmark_search_assets( - api: Arc>, - limit: u32, -) { - let payload = SearchAssets { limit: Some(limit), ..Default::default() }; - let _res = api.search_assets(payload).await.unwrap(); +async fn benchmark_search_assets(storage: Arc, asset_ids: &[Pubkey], owner_address: Pubkey) { + let _res = storage + .get_asset_selected_maps_async( + asset_ids.to_vec(), + &Some(owner_address), + &Options { + show_unverified_collections: true, + show_collection_metadata: true, + show_inscription: true, + show_fungible: true, + }, + ) + .await + .unwrap(); // You can add more assertions or processing here as needed } @@ -28,41 +39,77 @@ async fn setup_environment<'a>( } fn search_assets_benchmark(c: &mut Criterion) { - let cli: Cli = Cli::default(); - let limit: u32 = 1000; // Number of records to fetch + let mut group = c.benchmark_group("Search Assets"); + group.warm_up_time(Duration::from_secs(60)).sample_size(100); let rt = tokio::runtime::Runtime::new().unwrap(); - let (env, _generated_assets) = rt.block_on(setup_environment(&cli)); - let api = nft_ingester::api::api_impl::DasApi::new( - env.pg_env.client.clone(), - env.rocks_env.storage.clone(), - Arc::new(ApiMetricsConfig::new()), - None, - 100, - None, - ); + // let rpc_url = std::env::var("BENCHMARK_RPC_URL").unwrap(); + // let client = Arc::new(RpcClient::new(rpc_url.to_string())); + // let api = nft_ingester::api::api_impl::DasApi::new( + // env.pg_env.client.clone(), + // env.rocks_env.storage.clone(), + // Arc::new(ApiMetricsConfig::new()), + // None, + // None, + // 1000, + // None, + // None, + // JsonMiddlewareConfig::default(), + // Arc::new(AccountBalanceGetterImpl::new(client.clone())), + // None, + // Arc::new(RaydiumTokenPriceFetcher::new( + // "".to_string(), // API url, is not used in tests + // raydium_price_fetcher::CACHE_TTL, + // None, + // )), + // "11111111111111111111111111111111".to_string(), + // ); + let red_metrics = RequestErrorDurationMetrics::new(); + let primary_path = std::env::var("BENCHMARK_PRIMARY_PATH") + .expect("primary path to be provided via BENCHMARK_PRIMARY_PATH"); + let secondary_path = std::env::var("BENCHMARK_SECONDARY_PATH") + .expect("secondary path to be provided via BENCHMARK_SECONDARY_PATH"); + let owner_address = Pubkey::from_str( + &std::env::var("BENCHMARK_OWNER_ADDRESS") + .expect("owner address to be provided via BENCHMARK_OWNER_ADDRESS"), + ) + .unwrap(); + let asset_ids_csv_path = std::env::var("BENCHMARK_ASSET_IDS_PATH") + .expect("asset ids csv path to be provided via BENCHMARK_ASSET_IDS_PATH"); + let reader = + csv::ReaderBuilder::new().has_headers(false).from_path(asset_ids_csv_path).unwrap(); + let asset_ids = reader + .into_records() + .map(|r| Pubkey::from_str(&r.unwrap().into_iter().next().unwrap()).unwrap()) + .collect::>(); + let storage = Arc::new(Storage::open_secondary( + PathBuf::from_str(&primary_path).expect("primary path to be valid"), + PathBuf::from_str(&secondary_path).expect("secondary path to be valid"), + Arc::new(tokio::sync::Mutex::new(JoinSet::>::new())), + Arc::new(red_metrics), + rocks_db::migrator::MigrationState::Last, + ).expect("open secondary storage")); - let api = Arc::new(api); - c.bench_function("search_assets", |b| { - b.iter(|| rt.block_on(benchmark_search_assets(api.clone(), limit))) + group.bench_function("search_assets", |b| { + b.iter(|| rt.block_on(benchmark_search_assets(storage.clone(), &asset_ids, owner_address))) }); - rt.block_on(env.teardown()); + group.finish(); } -async fn bench_delete_op( - pg_client: Arc, - rocks_db: Arc, - assets: setup::rocks::GeneratedAssets, -) { - let pubkeys = assets.pubkeys[50000..51000].to_vec(); - Synchronizer::syncronize_batch( - rocks_db.clone(), - pg_client.clone(), - &pubkeys, - Default::default(), - ) - .await - .unwrap(); -} +// async fn bench_delete_op( +// pg_client: Arc, +// rocks_db: Arc, +// assets: setup::rocks::GeneratedAssets, +// ) { +// let pubkeys = assets.pubkeys[50000..51000].to_vec(); +// Synchronizer::syncronize_batch( +// rocks_db.clone(), +// pg_client.clone(), +// &pubkeys, +// Default::default(), +// ) +// .await +// .unwrap(); +// } async fn bench_get_asset_indexes( rocks_db: Arc, @@ -70,7 +117,7 @@ async fn bench_get_asset_indexes( ) { let pubkeys = assets.pubkeys[50000..51000].to_vec(); - rocks_db.get_asset_indexes(&pubkeys).await.unwrap(); + rocks_db.get_nft_asset_indexes(&pubkeys).await.unwrap(); } async fn bench_get_dynamic_data_batch( @@ -87,15 +134,15 @@ fn pg_delete_benchmark(c: &mut Criterion) { let (env, generated_assets) = rt.block_on(setup_environment(&cli)); let mut group = c.benchmark_group("My Group"); - group.bench_function("delete_creators_with_select", |b| { - b.iter(|| { - rt.block_on(bench_delete_op( - env.pg_env.client.clone(), - env.rocks_env.storage.clone(), - generated_assets.clone(), - )) - }) - }); + // group.bench_function("delete_creators_with_select", |b| { + // b.iter(|| { + // rt.block_on(bench_delete_op( + // env.pg_env.client.clone(), + // env.rocks_env.storage.clone(), + // generated_assets.clone(), + // )) + // }) + // }); group.bench_function("get asset indexes", |b| { b.iter(|| { @@ -118,5 +165,5 @@ fn pg_delete_benchmark(c: &mut Criterion) { rt.block_on(env.teardown()); } -criterion_group!(benches, search_assets_benchmark, pg_delete_benchmark,); +criterion_group!(benches, search_assets_benchmark); criterion_main!(benches); diff --git a/nft_ingester/benches/synchronizer_benchmark.rs b/nft_ingester/benches/synchronizer_benchmark.rs index 6bdef564..0ec86742 100644 --- a/nft_ingester/benches/synchronizer_benchmark.rs +++ b/nft_ingester/benches/synchronizer_benchmark.rs @@ -1,67 +1,67 @@ -use std::sync::{atomic::AtomicBool, Arc}; - -use criterion::{criterion_group, criterion_main, Criterion}; -use metrics_utils::SynchronizerMetricsConfig; -use setup::TestEnvironment; -use sqlx::Executor; -use testcontainers::clients::Cli; - -async fn setup_environment<'a>( - cli: &'a Cli, -) -> (TestEnvironment<'a>, setup::rocks::GeneratedAssets) { - let (env, _) = setup::TestEnvironment::create(cli, 0, 100).await; - let cnt = 1_000_000; // Number of records for the setup - let assets = env.rocks_env.generate_assets(cnt, 100).await; - (env, assets) -} - -async fn bench_synchronize(env: Arc>, batch_size: usize) { - sqlx::query("update last_synced_key set last_synced_asset_update_key = null where id = 1;") - .execute(&env.pg_env.pool) - .await - .unwrap(); - let metrics = Arc::new(SynchronizerMetricsConfig::new()); - let syncronizer = nft_ingester::index_syncronizer::Synchronizer::new( - env.rocks_env.storage.clone(), - env.pg_env.client.clone(), - env.pg_env.client.clone(), - batch_size, - "".to_string(), - metrics.clone(), - 1, - false, - ); - - let (_, rx) = tokio::sync::broadcast::channel::<()>(1); - syncronizer.synchronize_asset_indexes(&rx, 0).await.unwrap(); -} - -fn sync_benchmark(c: &mut Criterion) { - let cli: Cli = Cli::default(); - let rt = tokio::runtime::Runtime::new().unwrap(); - let (env, _generated_assets) = rt.block_on(setup_environment(&cli)); - let mut group = c.benchmark_group("Syncronizer Group"); - group.sample_size(10); - group.measurement_time(std::time::Duration::from_secs(60)); - let env = Arc::new(env); - group.bench_function("200k batch size", |b| { - b.iter(|| rt.block_on(bench_synchronize(env.clone(), 200_000))) - }); - group.bench_function("1M batch size", |b| { - b.iter(|| rt.block_on(bench_synchronize(env.clone(), 1_000_000))) - }); - group.bench_function("10k batch size", |b| { - b.iter(|| rt.block_on(bench_synchronize(env.clone(), 10_000))) - }); - group.bench_function("small batches of 1000 records, as before", |b| { - b.iter(|| rt.block_on(bench_synchronize(env.clone(), 1000))) - }); - group.bench_function("100k batch size", |b| { - b.iter(|| rt.block_on(bench_synchronize(env.clone(), 100_000))) - }); - - rt.block_on(env.teardown()); -} - -criterion_group!(benches, sync_benchmark); -criterion_main!(benches); +// use std::sync::{atomic::AtomicBool, Arc}; +// +// use criterion::{criterion_group, criterion_main, Criterion}; +// use metrics_utils::SynchronizerMetricsConfig; +// use setup::TestEnvironment; +// use sqlx::Executor; +// use testcontainers::clients::Cli; +// +// async fn setup_environment<'a>( +// cli: &'a Cli, +// ) -> (TestEnvironment<'a>, setup::rocks::GeneratedAssets) { +// let (env, _) = setup::TestEnvironment::create(cli, 0, 100).await; +// let cnt = 1_000_000; // Number of records for the setup +// let assets = env.rocks_env.generate_assets(cnt, 100).await; +// (env, assets) +// } +// +// async fn bench_synchronize(env: Arc>, batch_size: usize) { +// sqlx::query("update last_synced_key set last_synced_asset_update_key = null where id = 1;") +// .execute(&env.pg_env.pool) +// .await +// .unwrap(); +// let metrics = Arc::new(SynchronizerMetricsConfig::new()); +// let syncronizer = nft_ingester::index_syncronizer::Synchronizer::new( +// env.rocks_env.storage.clone(), +// env.pg_env.client.clone(), +// env.pg_env.client.clone(), +// batch_size, +// "".to_string(), +// metrics.clone(), +// 1, +// false, +// ); +// +// let (_, rx) = tokio::sync::broadcast::channel::<()>(1); +// syncronizer.synchronize_asset_indexes(&rx, 0).await.unwrap(); +// } +// +// fn sync_benchmark(c: &mut Criterion) { +// let cli: Cli = Cli::default(); +// let rt = tokio::runtime::Runtime::new().unwrap(); +// let (env, _generated_assets) = rt.block_on(setup_environment(&cli)); +// let mut group = c.benchmark_group("Syncronizer Group"); +// group.sample_size(10); +// group.measurement_time(std::time::Duration::from_secs(60)); +// let env = Arc::new(env); +// group.bench_function("200k batch size", |b| { +// b.iter(|| rt.block_on(bench_synchronize(env.clone(), 200_000))) +// }); +// group.bench_function("1M batch size", |b| { +// b.iter(|| rt.block_on(bench_synchronize(env.clone(), 1_000_000))) +// }); +// group.bench_function("10k batch size", |b| { +// b.iter(|| rt.block_on(bench_synchronize(env.clone(), 10_000))) +// }); +// group.bench_function("small batches of 1000 records, as before", |b| { +// b.iter(|| rt.block_on(bench_synchronize(env.clone(), 1000))) +// }); +// group.bench_function("100k batch size", |b| { +// b.iter(|| rt.block_on(bench_synchronize(env.clone(), 100_000))) +// }); +// +// rt.block_on(env.teardown()); +// } +// +// criterion_group!(benches, sync_benchmark); +// criterion_main!(benches);