diff --git a/docker-compose.yaml b/docker-compose.yaml index a3691eef1..e4c58540f 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -38,7 +38,8 @@ services: network_mode: host volumes: - ${API_ROCKS_DB_PATH_CONTAINER}:${API_ROCKS_DB_PATH_CONTAINER}:ro - - ${API_ROCKS_DB_SECONDARY_PATH_CONTAINER}:${API_ROCKS_DB_SECONDARY_PATH_CONTAINER}:rw + - ${API_ROCKS_DB_SECONDARY_FIRST_PATH_CONTAINER}:${API_ROCKS_DB_SECONDARY_FIRST_PATH_CONTAINER}:rw + - ${API_ROCKS_DB_SECONDARY_SECOND_PATH_CONTAINER}:${API_ROCKS_DB_SECONDARY_SECOND_PATH_CONTAINER}:rw - ${API_ARCHIVES_DIR}:${API_ARCHIVES_DIR}:ro - ${API_FILE_STORAGE_PATH}:${API_FILE_STORAGE_PATH_CONTAINER}:rw - ./heaps:/usr/src/app/heaps:rw diff --git a/nft_ingester/benches/integrated_benchmark.rs b/nft_ingester/benches/integrated_benchmark.rs index 547bac64b..c13aab27f 100644 --- a/nft_ingester/benches/integrated_benchmark.rs +++ b/nft_ingester/benches/integrated_benchmark.rs @@ -1,6 +1,9 @@ use criterion::{criterion_group, criterion_main, Criterion}; use entities::api_req_params::SearchAssets; -use nft_ingester::{api::middleware::JsonDownloaderMiddleware, index_syncronizer::Synchronizer}; +use nft_ingester::{ + api::middleware::JsonDownloaderMiddleware, index_syncronizer::Synchronizer, + rocks_db::RocksDbManager, +}; use rocks_db::storage_traits::AssetIndexReader; use setup::TestEnvironment; use std::sync::Arc; @@ -35,9 +38,11 @@ fn search_assets_benchmark(c: &mut Criterion) { let limit: u32 = 1000; // Number of records to fetch let rt = tokio::runtime::Runtime::new().unwrap(); let (env, _generated_assets) = rt.block_on(setup_environment(&cli)); + let rocks_db = RocksDbManager::new_primary(env.rocks_env.storage.clone()).into(); + let api = nft_ingester::api::api_impl::DasApi::new( env.pg_env.client.clone(), - env.rocks_env.storage.clone(), + rocks_db, Arc::new(ApiMetricsConfig::new()), None, 100, diff --git a/nft_ingester/src/api/api_impl.rs b/nft_ingester/src/api/api_impl.rs index 369c7885b..6f195da0f 100644 --- a/nft_ingester/src/api/api_impl.rs +++ b/nft_ingester/src/api/api_impl.rs @@ -17,6 +17,7 @@ use crate::api::dapi::rpc_asset_models::Asset; use crate::api::error::DasApiError; use crate::api::*; use crate::config::JsonMiddlewareConfig; +use crate::rocks_db::RocksDbManager; use dapi::get_asset_signatures::get_asset_signatures; use dapi::get_core_fees::get_core_fees; use dapi::get_token_accounts::get_token_accounts; @@ -47,7 +48,7 @@ where PPC: ProcessingPossibilityChecker + Sync + Send + 'static, { pub(crate) pg_client: Arc, - rocks_db: Arc, + rocks_db: Arc, metrics: Arc, proof_checker: Option>, tree_gaps_checker: Option>, @@ -77,7 +78,7 @@ where #[allow(clippy::too_many_arguments)] pub fn new( pg_client: Arc, - rocks_db: Arc, + rocks_db: Arc, metrics: Arc, proof_checker: Option>, tree_gaps_checker: Option>, @@ -244,7 +245,7 @@ where let id = validate_pubkey(payload.id.clone())?; let assets = get_proof_for_assets( - self.rocks_db.clone(), + self.rocks_db.acquire(), vec![id], self.proof_checker.clone(), &self.tree_gaps_checker, @@ -284,7 +285,7 @@ where .collect::, _>>()?; let res = get_proof_for_assets( - self.rocks_db.clone(), + self.rocks_db.acquire(), ids, self.proof_checker.clone(), &self.tree_gaps_checker, @@ -311,7 +312,7 @@ where let options = payload.options.unwrap_or_default(); let res = get_asset( - self.rocks_db.clone(), + self.rocks_db.acquire(), id, options, self.json_downloader.clone(), @@ -359,7 +360,7 @@ where let options = payload.options.unwrap_or_default(); let res = get_asset_batch( - self.rocks_db.clone(), + self.rocks_db.acquire(), ids, options, self.json_downloader.clone(), @@ -391,7 +392,7 @@ where let res = self .process_request( self.pg_client.clone(), - self.rocks_db.clone(), + self.rocks_db.acquire(), payload, tasks, ) @@ -415,7 +416,7 @@ where let res = self .process_request( self.pg_client.clone(), - self.rocks_db.clone(), + self.rocks_db.acquire(), payload, tasks, ) @@ -439,7 +440,7 @@ where let res = self .process_request( self.pg_client.clone(), - self.rocks_db.clone(), + self.rocks_db.acquire(), payload, tasks, ) @@ -463,7 +464,7 @@ where let res = self .process_request( self.pg_client.clone(), - self.rocks_db.clone(), + self.rocks_db.acquire(), payload, tasks, ) @@ -514,7 +515,7 @@ where } let res = get_token_accounts( - self.rocks_db.clone(), + self.rocks_db.acquire(), owner, mint, limit.unwrap_or(DEFAULT_LIMIT as u32).into(), @@ -583,7 +584,7 @@ where let res = self .process_request( self.pg_client.clone(), - self.rocks_db.clone(), + self.rocks_db.acquire(), payload, tasks, ) @@ -639,7 +640,7 @@ where Self::validate_basic_pagination(&pagination, self.max_page_limit)?; let res = get_asset_signatures( - self.rocks_db.clone(), + self.rocks_db.acquire(), id, tree, leaf_index, diff --git a/nft_ingester/src/api/service.rs b/nft_ingester/src/api/service.rs index 5f6c76f38..0e894f3fe 100644 --- a/nft_ingester/src/api/service.rs +++ b/nft_ingester/src/api/service.rs @@ -15,6 +15,7 @@ use usecase::proofs::MaybeProofChecker; use uuid::Uuid; use crate::api::backfilling_state_consistency::BackfillingStateConsistencyChecker; +use crate::rocks_db::RocksDbManager; use interface::consistency_check::ConsistencyChecker; use metrics_utils::ApiMetricsConfig; use rocks_db::Storage; @@ -50,7 +51,7 @@ pub(crate) struct MiddlewaresData { #[allow(clippy::too_many_arguments)] pub async fn start_api( pg_client: Arc, - rocks_db: Arc, + rocks_db: Arc, rx: Receiver<()>, metrics: Arc, port: u16, @@ -84,7 +85,7 @@ pub async fn start_api( tasks.clone(), rx.resubscribe(), pg_client.clone(), - rocks_db.clone(), + rocks_db.acquire(), consistence_synchronization_api_threshold, ) .await; @@ -98,7 +99,7 @@ pub async fn start_api( .run( tasks.clone(), rx.resubscribe(), - rocks_db.clone(), + rocks_db.acquire(), consistence_backfilling_slots_threshold, ) .await; diff --git a/nft_ingester/src/bin/api/main.rs b/nft_ingester/src/bin/api/main.rs index b46607cf1..f5923cba9 100644 --- a/nft_ingester/src/bin/api/main.rs +++ b/nft_ingester/src/bin/api/main.rs @@ -7,6 +7,7 @@ use nft_ingester::config::{init_logger, setup_config, ApiConfig}; use nft_ingester::error::IngesterError; use nft_ingester::init::graceful_stop; use nft_ingester::json_worker::JsonWorker; +use nft_ingester::rocks_db::RocksDbManager; use prometheus_client::registry::Registry; use tracing::{error, info}; @@ -26,7 +27,8 @@ use usecase::proofs::MaybeProofChecker; static GLOBAL: jemallocator::Jemalloc = jemallocator::Jemalloc; pub const DEFAULT_ROCKSDB_PATH: &str = "./my_rocksdb"; -pub const DEFAULT_SECONDARY_ROCKSDB_PATH: &str = "./my_rocksdb_secondary"; +pub const DEFAULT_SECONDARY_FIRST_ROCKSDB_PATH_CONTAINER: &str = "./my_rocksdb_secondary_first"; +pub const DEFAULT_SECONDARY_SECOND_ROCKSDB_PATH_CONTAINER: &str = "./my_rocksdb_secondary_second"; #[tokio::main(flavor = "multi_thread")] pub async fn main() -> Result<(), IngesterError> { @@ -84,24 +86,39 @@ pub async fn main() -> Result<(), IngesterError> { .rocks_db_path_container .clone() .unwrap_or(DEFAULT_ROCKSDB_PATH.to_string()); - let secondary_storage_path = config - .rocks_db_secondary_path_container + + let secondary_storage_first_path = config + .rocks_db_secondary_first_path_container .clone() - .unwrap_or(DEFAULT_SECONDARY_ROCKSDB_PATH.to_string()); - let storage = Storage::open_secondary( + .unwrap_or(DEFAULT_SECONDARY_FIRST_ROCKSDB_PATH_CONTAINER.to_string()); + let secondary_storage_first = Storage::open_secondary( &primary_storage_path, - &secondary_storage_path, + &secondary_storage_first_path, mutexed_tasks.clone(), red_metrics.clone(), MigrationState::Last, ) .unwrap(); - let rocks_storage = Arc::new(storage); + let secondary_storage_second_path = config + .rocks_db_secondary_first_path_container + .clone() + .unwrap_or(DEFAULT_SECONDARY_SECOND_ROCKSDB_PATH_CONTAINER.to_string()); + let secondary_storage_second_path = Storage::open_secondary( + &primary_storage_path, + &secondary_storage_second_path, + mutexed_tasks.clone(), + red_metrics.clone(), + MigrationState::Last, + ) + .unwrap(); + let rocks_db_manager = Arc::new(RocksDbManager::new_secondary( + secondary_storage_first, + secondary_storage_second_path, + )); let rpc_client = Arc::new(RpcClient::new(config.rpc_host)); let account_balance_getter = Arc::new(AccountBalanceGetterImpl::new(rpc_client.clone())); - let cloned_rocks_storage = rocks_storage.clone(); let mut proof_checker = None; if config.check_proofs { proof_checker = Some(Arc::new(MaybeProofChecker::new( @@ -117,7 +134,7 @@ pub async fn main() -> Result<(), IngesterError> { Some(Arc::new( JsonWorker::new( pg_client.clone(), - rocks_storage.clone(), + rocks_db_manager.clone(), json_downloader_metrics.clone(), ) .await, @@ -136,17 +153,18 @@ pub async fn main() -> Result<(), IngesterError> { if config.skip_check_tree_gaps { None } else { - Some(cloned_rocks_storage.clone()) + Some(rocks_db_manager.acquire()) } }; let (shutdown_tx, shutdown_rx) = broadcast::channel::<()>(1); let cloned_tasks = mutexed_tasks.clone(); let cloned_rx = shutdown_rx.resubscribe(); + let rocks_db_manager_clone = rocks_db_manager.clone(); mutexed_tasks.lock().await.spawn(async move { match start_api( pg_client.clone(), - cloned_rocks_storage.clone(), + rocks_db_manager_clone, cloned_rx, metrics.clone(), config.server_port, @@ -179,16 +197,16 @@ pub async fn main() -> Result<(), IngesterError> { // setup dependencies for grpc server let uc = usecase::asset_streamer::AssetStreamer::new( config.peer_grpc_max_gap_slots, - rocks_storage.clone(), + rocks_db_manager.acquire(), ); let bs = usecase::raw_blocks_streamer::BlocksStreamer::new( config.peer_grpc_max_gap_slots, - rocks_storage.clone(), + rocks_db_manager.acquire(), ); let serv = grpc::service::PeerGapFillerServiceImpl::new( Arc::new(uc), Arc::new(bs), - rocks_storage.clone(), + rocks_db_manager.acquire(), ); let addr = format!("0.0.0.0:{}", config.peer_grpc_port).parse()?; let mut cloned_rx = shutdown_rx.resubscribe(); @@ -206,17 +224,9 @@ pub async fn main() -> Result<(), IngesterError> { Ok(()) }); - // try synchronizing secondary rocksdb instance every config.rocks_sync_interval_seconds let cloned_rx = shutdown_rx.resubscribe(); - let cloned_rocks_storage = rocks_storage.clone(); - let dur = tokio::time::Duration::from_secs(config.rocks_sync_interval_seconds); mutexed_tasks.lock().await.spawn(async move { - while cloned_rx.is_empty() { - if let Err(e) = cloned_rocks_storage.db.try_catch_up_with_primary() { - error!("Sync rocksdb error: {}", e); - } - tokio::time::sleep(dur).await; - } + rocks_db_manager.catch_up(cloned_rx).await; Ok(()) }); diff --git a/nft_ingester/src/bin/ingester/main.rs b/nft_ingester/src/bin/ingester/main.rs index ae8696821..0d08ac474 100644 --- a/nft_ingester/src/bin/ingester/main.rs +++ b/nft_ingester/src/bin/ingester/main.rs @@ -50,7 +50,7 @@ use nft_ingester::init::{graceful_stop, init_index_storage_with_migration, init_ use nft_ingester::json_worker::JsonWorker; use nft_ingester::message_handler::MessageHandlerIngester; use nft_ingester::redis_receiver::RedisReceiver; -use nft_ingester::rocks_db::{perform_backup, receive_last_saved_slot, restore_rocksdb}; +use nft_ingester::rocks_db::{perform_backup, receive_last_saved_slot, restore_rocksdb, RocksDbManager}; use nft_ingester::tcp_receiver::{connect_to_geyser, connect_to_snapshot_receiver, TcpReceiver}; use nft_ingester::transaction_ingester::BackfillTransactionIngester; use nft_ingester::transaction_processor::run_transaction_processor; @@ -251,25 +251,20 @@ pub async fn main() -> Result<(), IngesterError> { let last_saved_slot = primary_rocks_storage.last_saved_slot()?.unwrap_or_default(); let first_processed_slot = Arc::new(AtomicU64::new(0)); let first_processed_slot_clone = first_processed_slot.clone(); - let cloned_rocks_storage = primary_rocks_storage.clone(); let cloned_rx = shutdown_rx.resubscribe(); let cloned_tx = shutdown_tx.clone(); mutexed_tasks.lock().await.spawn(receive_last_saved_slot( cloned_rx, cloned_tx, - cloned_rocks_storage, + primary_rocks_storage.clone(), first_processed_slot_clone, last_saved_slot, )); - + let rocks_db = Arc::new(RocksDbManager::new_primary(primary_rocks_storage)); let json_processor = Arc::new( - JsonWorker::new( - index_pg_storage.clone(), - primary_rocks_storage.clone(), - metrics_state.json_downloader_metrics.clone(), - ) - .await, + JsonWorker::new(index_pg_storage.clone(), rocks_db.clone(), metrics_state.json_downloader_metrics.clone()) + .await, ); let grpc_client = Client::connect(config.clone()) @@ -282,7 +277,6 @@ pub async fn main() -> Result<(), IngesterError> { tokio_sleep(Duration::from_millis(100)).await } - let cloned_rocks_storage = primary_rocks_storage.clone(); if shutdown_rx.is_empty() { let gaped_data_client_clone = gaped_data_client.clone(); @@ -290,18 +284,17 @@ pub async fn main() -> Result<(), IngesterError> { let cloned_rx = shutdown_rx.resubscribe(); mutexed_tasks.lock().await.spawn(process_asset_details_stream_wrapper( cloned_rx, - cloned_rocks_storage, + rocks_db.acquire(), last_saved_slot, first_processed_slot_value, gaped_data_client_clone.clone(), false, )); - let cloned_rocks_storage = primary_rocks_storage.clone(); let cloned_rx = shutdown_rx.resubscribe(); mutexed_tasks.lock().await.spawn(process_asset_details_stream_wrapper( cloned_rx, - cloned_rocks_storage, + rocks_db.acquire(), last_saved_slot, first_processed_slot_value, gaped_data_client_clone, @@ -310,7 +303,6 @@ pub async fn main() -> Result<(), IngesterError> { } }; - let cloned_rocks_storage = primary_rocks_storage.clone(); let cloned_api_metrics = metrics_state.api_metrics.clone(); let account_balance_getter = Arc::new(AccountBalanceGetterImpl::new(rpc_client.clone())); let proof_checker = config.check_proofs.then_some(Arc::new(MaybeProofChecker::new( @@ -335,16 +327,17 @@ pub async fn main() -> Result<(), IngesterError> { if api_config.skip_check_tree_gaps { None } else { - Some(cloned_rocks_storage.clone()) + Some(rocks_db.acquire()) } }; let cloned_index_storage = index_pg_storage.clone(); let file_storage_path = api_config.file_storage_path_container.clone(); + let rocks_db_clone = rocks_db.clone(); mutexed_tasks.lock().await.spawn(async move { match start_api( cloned_index_storage, - cloned_rocks_storage.clone(), + rocks_db_clone, cloned_rx, cloned_api_metrics, api_config.server_port, @@ -374,7 +367,7 @@ pub async fn main() -> Result<(), IngesterError> { }); let geyser_bubblegum_updates_processor = Arc::new(BubblegumTxProcessor::new( - primary_rocks_storage.clone(), + rocks_db.acquire(), metrics_state.ingester_metrics.clone(), buffer.json_tasks.clone(), )); @@ -418,7 +411,7 @@ pub async fn main() -> Result<(), IngesterError> { .spawn(json_worker::run(cloned_jp, cloned_rx).map(|_| Ok(()))); let backfill_bubblegum_updates_processor = Arc::new(BubblegumTxProcessor::new( - primary_rocks_storage.clone(), + rocks_db.acquire(), metrics_state.ingester_metrics.clone(), buffer.json_tasks.clone(), )); @@ -433,13 +426,14 @@ pub async fn main() -> Result<(), IngesterError> { .await, ); let backfiller = - Arc::new(Backfiller::new(primary_rocks_storage.clone(), backfiller_source.clone(), backfiller_config.clone())); + Arc::new(Backfiller::new(rocks_db.acquire(), backfiller_source.clone(), backfiller_config.clone())); let rpc_backfiller = Arc::new(BackfillRPC::connect(config.backfill_rpc_address.clone())); if config.run_bubblegum_backfiller { if backfiller_config.should_reingest { warn!("'Reingest' flag is set, deleting last fetched slot."); - primary_rocks_storage + rocks_db + .acquire() .delete_parameter::(rocks_db::parameters::Parameter::LastFetchedSlot) .await?; } @@ -448,7 +442,7 @@ pub async fn main() -> Result<(), IngesterError> { BackfillerMode::IngestDirectly => { let consumer = Arc::new(DirectBlockParser::new( tx_ingester.clone(), - primary_rocks_storage.clone(), + rocks_db.acquire(), metrics_state.backfiller_metrics.clone(), )); backfiller @@ -463,7 +457,7 @@ pub async fn main() -> Result<(), IngesterError> { info!("Running Backfiller directly from bigtable to ingester."); } BackfillerMode::Persist => { - let consumer = primary_rocks_storage.clone(); + let consumer = rocks_db.acquire(); backfiller .start_backfill( mutexed_tasks.clone(), @@ -478,14 +472,14 @@ pub async fn main() -> Result<(), IngesterError> { BackfillerMode::IngestPersisted => { let consumer = Arc::new(DirectBlockParser::new( tx_ingester.clone(), - primary_rocks_storage.clone(), + rocks_db.acquire(), metrics_state.backfiller_metrics.clone(), )); - let producer = primary_rocks_storage.clone(); + let producer = rocks_db.acquire(); let transactions_parser = Arc::new(TransactionsParser::new( - primary_rocks_storage.clone(), - Arc::new(BubblegumSlotGetter::new(primary_rocks_storage.clone())), + rocks_db.acquire(), + Arc::new(BubblegumSlotGetter::new(rocks_db.acquire())), consumer, producer, metrics_state.backfiller_metrics.clone(), @@ -526,11 +520,11 @@ pub async fn main() -> Result<(), IngesterError> { // run perpetual slot persister let rx = shutdown_rx.resubscribe(); - let consumer = primary_rocks_storage.clone(); + let consumer = rocks_db.acquire(); let producer = backfiller_source.clone(); let metrics = Arc::new(BackfillerMetricsConfig::new()); metrics.register_with_prefix(&mut metrics_state.registry, "slot_persister_"); - let slot_getter = Arc::new(BubblegumSlotGetter::new(primary_rocks_storage.clone())); + let slot_getter = Arc::new(BubblegumSlotGetter::new(rocks_db.acquire())); let backfiller_clone = backfiller.clone(); mutexed_tasks.lock().await.spawn(run_perpetual_slot_processing( backfiller_clone, @@ -546,13 +540,13 @@ pub async fn main() -> Result<(), IngesterError> { let rx = shutdown_rx.resubscribe(); let consumer = Arc::new(DirectBlockParser::new( tx_ingester.clone(), - primary_rocks_storage.clone(), + rocks_db.acquire(), metrics_state.backfiller_metrics.clone(), )); - let producer = primary_rocks_storage.clone(); + let producer = rocks_db.acquire(); let metrics = Arc::new(BackfillerMetricsConfig::new()); metrics.register_with_prefix(&mut metrics_state.registry, "slot_ingester_"); - let slot_getter = Arc::new(IngestableSlotGetter::new(primary_rocks_storage.clone())); + let slot_getter = Arc::new(IngestableSlotGetter::new(rocks_db.acquire())); let backfiller_clone = backfiller.clone(); let backup = backfiller_source.clone(); mutexed_tasks.lock().await.spawn(run_perpetual_slot_processing( @@ -583,10 +577,10 @@ pub async fn main() -> Result<(), IngesterError> { }); } // setup dependencies for grpc server - let uc = AssetStreamer::new(config.peer_grpc_max_gap_slots, primary_rocks_storage.clone()); - let bs = BlocksStreamer::new(config.peer_grpc_max_gap_slots, primary_rocks_storage.clone()); - let serv = PeerGapFillerServiceImpl::new(Arc::new(uc), Arc::new(bs), primary_rocks_storage.clone()); - let asset_url_serv = AssetUrlServiceImpl::new(primary_rocks_storage.clone()); + let uc = AssetStreamer::new(config.peer_grpc_max_gap_slots, rocks_db.acquire()); + let bs = BlocksStreamer::new(config.peer_grpc_max_gap_slots, rocks_db.acquire()); + let serv = PeerGapFillerServiceImpl::new(Arc::new(uc), Arc::new(bs), rocks_db.acquire()); + let asset_url_serv = AssetUrlServiceImpl::new(rocks_db.acquire()); let addr = format!("0.0.0.0:{}", config.peer_grpc_port).parse()?; // Spawn the gRPC server task and add to JoinSet let mut rx = shutdown_rx.resubscribe(); @@ -603,11 +597,10 @@ pub async fn main() -> Result<(), IngesterError> { Ok(()) }); - Scheduler::run_in_background(Scheduler::new(primary_rocks_storage.clone())).await; + Scheduler::run_in_background(Scheduler::new(rocks_db.acquire())).await; - let rocks_clone = primary_rocks_storage.clone(); let signature_fetcher = SignatureFetcher::new( - rocks_clone, + rocks_db.acquire(), rpc_backfiller.clone(), tx_ingester.clone(), metrics_state.rpc_backfiller_metrics.clone(), @@ -642,10 +635,10 @@ pub async fn main() -> Result<(), IngesterError> { if config.run_sequence_consistent_checker { let force_reingestable_slot_processor = Arc::new(ForceReingestableSlotGetter::new( - primary_rocks_storage.clone(), + rocks_db.acquire(), Arc::new(DirectBlockParser::new( tx_ingester.clone(), - primary_rocks_storage.clone(), + rocks_db.acquire(), metrics_state.backfiller_metrics.clone(), )), )); @@ -655,7 +648,7 @@ pub async fn main() -> Result<(), IngesterError> { backfiller_source.clone(), metrics_state.backfiller_metrics.clone(), ), - primary_rocks_storage.clone(), + rocks_db.acquire(), metrics_state.sequence_consistent_gapfill_metrics.clone(), shutdown_rx.resubscribe(), rpc_backfiller.clone(), @@ -671,7 +664,7 @@ pub async fn main() -> Result<(), IngesterError> { metrics.register_with_prefix(&mut metrics_state.registry, "force_slot_persister_"); if let Some(client) = grpc_client { let force_reingestable_transactions_parser = Arc::new(TransactionsParser::new( - primary_rocks_storage.clone(), + rocks_db.acquire(), force_reingestable_slot_processor.clone(), force_reingestable_slot_processor.clone(), Arc::new(client), @@ -685,7 +678,7 @@ pub async fn main() -> Result<(), IngesterError> { .spawn(run_slot_force_persister(force_reingestable_transactions_parser, rx)); } else { let force_reingestable_transactions_parser = Arc::new(TransactionsParser::new( - primary_rocks_storage.clone(), + rocks_db.acquire(), force_reingestable_slot_processor.clone(), force_reingestable_slot_processor.clone(), producer.clone(), @@ -699,11 +692,8 @@ pub async fn main() -> Result<(), IngesterError> { .spawn(run_slot_force_persister(force_reingestable_transactions_parser, rx)); } - let fork_cleaner = ForkCleaner::new( - primary_rocks_storage.clone(), - primary_rocks_storage.clone(), - metrics_state.fork_cleaner_metrics.clone(), - ); + let fork_cleaner = + ForkCleaner::new(rocks_db.acquire(), rocks_db.acquire(), metrics_state.fork_cleaner_metrics.clone()); let rx = shutdown_rx.resubscribe(); let metrics = metrics_state.fork_cleaner_metrics.clone(); mutexed_tasks.lock().await.spawn(run_fork_cleaner( @@ -720,7 +710,7 @@ pub async fn main() -> Result<(), IngesterError> { let arweave = Arc::new(arweave); let batch_mint_processor = Arc::new(BatchMintProcessor::new( index_pg_storage.clone(), - primary_rocks_storage.clone(), + rocks_db.acquire(), Arc::new(NoopBatchMintTxSender), arweave, file_storage_path, @@ -735,7 +725,7 @@ pub async fn main() -> Result<(), IngesterError> { } let batch_mint_persister = BatchMintPersister::new( - primary_rocks_storage.clone(), + rocks_db.acquire(), BatchMintDownloaderForPersister, metrics_state.batch_mint_persisting_metrics.clone(), ); diff --git a/nft_ingester/src/config.rs b/nft_ingester/src/config.rs index 3553ffdf2..355f0802d 100644 --- a/nft_ingester/src/config.rs +++ b/nft_ingester/src/config.rs @@ -282,7 +282,8 @@ pub struct SynchronizerConfig { pub struct ApiConfig { pub database_config: DatabaseConfig, pub rocks_db_path_container: Option, - pub rocks_db_secondary_path_container: Option, + pub rocks_db_secondary_first_path_container: Option, + pub rocks_db_secondary_second_path_container: Option, pub rocks_sync_interval_seconds: u64, pub metrics_port: Option, pub server_port: u16, diff --git a/nft_ingester/src/json_worker.rs b/nft_ingester/src/json_worker.rs index d4778c093..e95b40eae 100644 --- a/nft_ingester/src/json_worker.rs +++ b/nft_ingester/src/json_worker.rs @@ -1,5 +1,6 @@ use crate::api::dapi::rpc_asset_convertors::parse_files; use crate::config::{setup_config, IngesterConfig, INGESTER_CONFIG_PREFIX}; +use crate::rocks_db::RocksDbManager; use async_trait::async_trait; use entities::enums::TaskStatus; use entities::models::{JsonDownloadTask, OffChainData}; @@ -10,7 +11,6 @@ use postgre_client::tasks::UpdatedTask; use postgre_client::PgClient; use reqwest::{Client, ClientBuilder}; use rocks_db::asset_previews::UrlToDownload; -use rocks_db::Storage; use std::collections::HashMap; use std::sync::Arc; use tokio::sync::broadcast::Receiver; @@ -28,7 +28,7 @@ pub const CLIENT_TIMEOUT: u64 = 5; pub struct JsonWorker { pub db_client: Arc, - pub rocks_db: Arc, + pub rocks_db: Arc, pub num_of_parallel_workers: i32, pub metrics: Arc, } @@ -36,7 +36,7 @@ pub struct JsonWorker { impl JsonWorker { pub async fn new( db_client: Arc, - rocks_db: Arc, + rocks_db: Arc, metrics: Arc, ) -> Self { let config: IngesterConfig = setup_config(INGESTER_CONFIG_PREFIX); @@ -404,6 +404,7 @@ impl JsonPersister for JsonWorker { .collect::>(); self.rocks_db + .acquire() .asset_offchain_data .put_batch(rocks_updates) .await @@ -411,6 +412,7 @@ impl JsonPersister for JsonWorker { if let Err(e) = self .rocks_db + .acquire() .urls_to_download .put_batch(urls_to_download) .await diff --git a/nft_ingester/src/rocks_db.rs b/nft_ingester/src/rocks_db.rs index 3e4aae595..3cfedec5e 100644 --- a/nft_ingester/src/rocks_db.rs +++ b/nft_ingester/src/rocks_db.rs @@ -5,7 +5,7 @@ use rocks_db::errors::BackupServiceError; use rocks_db::storage_traits::AssetSlotStorage; use rocks_db::{backup_service, Storage}; use std::fs::{create_dir_all, remove_dir_all}; -use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; use std::sync::Arc; use std::time::Duration; use tokio::sync::broadcast::{Receiver, Sender}; @@ -81,3 +81,83 @@ pub async fn restore_rocksdb(config: &IngesterConfig) -> Result<(), BackupServic info!("restore_rocksdb fin"); Ok(()) } + +pub enum RocksDbManager { + Primary(Arc), + Secondary(RocksDbSecondaryDuplicateMode), +} + +impl RocksDbManager { + /// Instantiate a new RocksDbManager with primary storage + pub fn new_primary(primary: Arc) -> Self { + RocksDbManager::Primary(primary) + } + + /// Instantiate a new RocksDbManager with secondary storage + pub fn new_secondary(primary: Storage, secondary: Storage) -> Self { + RocksDbManager::Secondary(RocksDbSecondaryDuplicateMode { + rocks_db_instances: [primary.into(), secondary.into()], + serving_instance_toggle: AtomicBool::new(false), + }) + } + + /// Returns the current storage instance + /// + /// It's always the same storage for primary mode + /// For secondary mode, it will return a storage instance + /// based on it's sync status with the primary storage + pub fn acquire(&self) -> Arc { + match self { + RocksDbManager::Primary(storage) => storage.clone(), + RocksDbManager::Secondary(duplicate_mode) => duplicate_mode.rocks_db_instances + [duplicate_mode + .serving_instance_toggle + .load(Ordering::Relaxed) as usize] + .clone(), + } + } + + /// Syncronize secondary rocksdb with primary + /// + /// One of the DB will be blocked while the other one is processing request + pub async fn catch_up(&self, shutdown_rx: Receiver<()>) { + if !shutdown_rx.is_empty() { + return; + } + + const SLEEP_TIME_MS: Duration = Duration::from_millis(10); + + match self { + RocksDbManager::Primary(_) => {} + RocksDbManager::Secondary(duplicate_mode) => { + let free_node_idx = (duplicate_mode + .serving_instance_toggle + .load(Ordering::Relaxed) as usize + + 1) + % 2; + let free_node = &duplicate_mode.rocks_db_instances[free_node_idx]; + + while Arc::::strong_count(free_node) > 1 { + if !shutdown_rx.is_empty() { + return; + } + + tokio_sleep(SLEEP_TIME_MS).await; + } + + if let Err(e) = free_node.db.try_catch_up_with_primary() { + error!("Sync rocksdb error: {}", e); + } + + duplicate_mode + .serving_instance_toggle + .store(free_node_idx != 0, Ordering::Relaxed); + } + } + } +} + +pub struct RocksDbSecondaryDuplicateMode { + rocks_db_instances: [Arc; 2], + serving_instance_toggle: AtomicBool, +} diff --git a/nft_ingester/tests/api_tests.rs b/nft_ingester/tests/api_tests.rs index f5f409beb..4638a060e 100644 --- a/nft_ingester/tests/api_tests.rs +++ b/nft_ingester/tests/api_tests.rs @@ -7,6 +7,7 @@ mod tests { ShadowInterestBearingConfig, ShadowTransferFee, ShadowTransferFeeConfig, UnixTimestamp, }; use blockbuster::programs::token_extensions::MintAccountExtensions; + use nft_ingester::rocks_db::RocksDbManager; use std::str::FromStr; use std::{collections::HashMap, sync::Arc}; @@ -72,6 +73,7 @@ mod tests { let cnt = 20; let cli = Cli::default(); let (env, generated_assets) = setup::TestEnvironment::create(&cli, cnt, SLOT_UPDATED).await; + let rocks_db = Arc::new(RocksDbManager::new_primary(env.rocks_env.storage.clone())); let api = nft_ingester::api::api_impl::DasApi::< MaybeProofChecker, JsonWorker, @@ -81,7 +83,7 @@ mod tests { Storage, >::new( env.pg_env.client.clone(), - env.rocks_env.storage.clone(), + rocks_db, Arc::new(ApiMetricsConfig::new()), None, None, @@ -499,6 +501,7 @@ mod tests { let cnt = 20; let cli = Cli::default(); let (env, _) = setup::TestEnvironment::create(&cli, cnt, SLOT_UPDATED).await; + let rocks_db = Arc::new(RocksDbManager::new_primary(env.rocks_env.storage.clone())); let api = nft_ingester::api::api_impl::DasApi::< MaybeProofChecker, JsonWorker, @@ -508,7 +511,7 @@ mod tests { Storage, >::new( env.pg_env.client.clone(), - env.rocks_env.storage.clone(), + rocks_db, Arc::new(ApiMetricsConfig::new()), None, None, @@ -648,6 +651,7 @@ mod tests { let cnt = 20; let cli = Cli::default(); let (env, _) = setup::TestEnvironment::create(&cli, cnt, SLOT_UPDATED).await; + let rocks_db = Arc::new(RocksDbManager::new_primary(env.rocks_env.storage.clone())); let api = nft_ingester::api::api_impl::DasApi::< MaybeProofChecker, JsonWorker, @@ -657,7 +661,7 @@ mod tests { Storage, >::new( env.pg_env.client.clone(), - env.rocks_env.storage.clone(), + rocks_db, Arc::new(ApiMetricsConfig::new()), None, None, @@ -775,6 +779,7 @@ mod tests { let cnt = 20; let cli = Cli::default(); let (env, _) = setup::TestEnvironment::create(&cli, cnt, SLOT_UPDATED).await; + let rocks_db = Arc::new(RocksDbManager::new_primary(env.rocks_env.storage.clone())); let api = nft_ingester::api::api_impl::DasApi::< MaybeProofChecker, JsonWorker, @@ -784,7 +789,7 @@ mod tests { Storage, >::new( env.pg_env.client.clone(), - env.rocks_env.storage.clone(), + rocks_db.clone(), Arc::new(ApiMetricsConfig::new()), None, None, @@ -866,14 +871,14 @@ mod tests { metadata: "{\"msg\": \"hallo\"}".to_string(), }; - env.rocks_env - .storage + rocks_db + .acquire() .asset_offchain_data .put(offchain_data.url.clone(), offchain_data) .unwrap(); let mut batch_storage = BatchSaveStorage::new( - env.rocks_env.storage.clone(), + rocks_db.acquire(), 10, Arc::new(IngesterMetricsConfig::new()), ); @@ -915,7 +920,7 @@ mod tests { }; let mut batch_storage = BatchSaveStorage::new( - env.rocks_env.storage.clone(), + rocks_db.acquire(), 10, Arc::new(IngesterMetricsConfig::new()), ); @@ -946,6 +951,7 @@ mod tests { let cnt = 20; let cli = Cli::default(); let (env, _) = setup::TestEnvironment::create(&cli, cnt, SLOT_UPDATED).await; + let rocks_db = Arc::new(RocksDbManager::new_primary(env.rocks_env.storage.clone())); let api = nft_ingester::api::api_impl::DasApi::< MaybeProofChecker, JsonWorker, @@ -955,7 +961,7 @@ mod tests { Storage, >::new( env.pg_env.client.clone(), - env.rocks_env.storage.clone(), + rocks_db.clone(), Arc::new(ApiMetricsConfig::new()), None, None, @@ -1053,14 +1059,14 @@ mod tests { metadata: "{\"msg\": \"hallo\"}".to_string(), }; - env.rocks_env - .storage + rocks_db + .acquire() .asset_offchain_data .put(metadata.url.clone(), metadata) .unwrap(); let mut batch_storage = BatchSaveStorage::new( - env.rocks_env.storage.clone(), + rocks_db.acquire(), 10, Arc::new(IngesterMetricsConfig::new()), ); @@ -1118,6 +1124,7 @@ mod tests { let cnt = 20; let cli = Cli::default(); let (env, _) = setup::TestEnvironment::create(&cli, cnt, SLOT_UPDATED).await; + let rocks_db = Arc::new(RocksDbManager::new_primary(env.rocks_env.storage.clone())); let api = nft_ingester::api::api_impl::DasApi::< MaybeProofChecker, JsonWorker, @@ -1127,7 +1134,7 @@ mod tests { Storage, >::new( env.pg_env.client.clone(), - env.rocks_env.storage.clone(), + rocks_db.clone(), Arc::new(ApiMetricsConfig::new()), None, None, @@ -1220,7 +1227,7 @@ mod tests { ); let mut batch_storage = BatchSaveStorage::new( - env.rocks_env.storage.clone(), + rocks_db.acquire(), 10, Arc::new(IngesterMetricsConfig::new()), ); @@ -1239,8 +1246,8 @@ mod tests { }, ) .unwrap(); - env.rocks_env - .storage + rocks_db + .acquire() .asset_offchain_data .put(metadata_ofch.url.clone(), metadata_ofch) .unwrap(); @@ -1275,6 +1282,7 @@ mod tests { let cnt = 20; let cli = Cli::default(); let (env, _) = setup::TestEnvironment::create(&cli, cnt, SLOT_UPDATED).await; + let rocks_db = Arc::new(RocksDbManager::new_primary(env.rocks_env.storage.clone())); let api = nft_ingester::api::api_impl::DasApi::< MaybeProofChecker, JsonWorker, @@ -1284,7 +1292,7 @@ mod tests { Storage, >::new( env.pg_env.client.clone(), - env.rocks_env.storage.clone(), + rocks_db, Arc::new(ApiMetricsConfig::new()), None, None, @@ -1495,6 +1503,7 @@ mod tests { let cnt = 20; let cli = Cli::default(); let (env, _) = setup::TestEnvironment::create(&cli, cnt, SLOT_UPDATED).await; + let rocks_db = Arc::new(RocksDbManager::new_primary(env.rocks_env.storage.clone())); let api = nft_ingester::api::api_impl::DasApi::< MaybeProofChecker, JsonWorker, @@ -1504,7 +1513,7 @@ mod tests { Storage, >::new( env.pg_env.client.clone(), - env.rocks_env.storage.clone(), + rocks_db.clone(), Arc::new(ApiMetricsConfig::new()), None, None, @@ -1615,7 +1624,7 @@ mod tests { } let mut batch_storage = BatchSaveStorage::new( - env.rocks_env.storage.clone(), + rocks_db.acquire(), 10, Arc::new(IngesterMetricsConfig::new()), ); @@ -1718,6 +1727,7 @@ mod tests { let cnt = 20; let cli = Cli::default(); let (env, _) = setup::TestEnvironment::create(&cli, cnt, SLOT_UPDATED).await; + let rocks_db = Arc::new(RocksDbManager::new_primary(env.rocks_env.storage.clone())); let api = nft_ingester::api::api_impl::DasApi::< MaybeProofChecker, JsonWorker, @@ -1727,7 +1737,7 @@ mod tests { Storage, >::new( env.pg_env.client.clone(), - env.rocks_env.storage.clone(), + rocks_db.clone(), Arc::new(ApiMetricsConfig::new()), None, None, @@ -1801,7 +1811,7 @@ mod tests { } let mut batch_storage = BatchSaveStorage::new( - env.rocks_env.storage.clone(), + rocks_db.acquire(), 10, Arc::new(IngesterMetricsConfig::new()), ); @@ -1982,6 +1992,7 @@ mod tests { let cnt = 20; let cli = Cli::default(); let (env, generated_assets) = setup::TestEnvironment::create(&cli, cnt, SLOT_UPDATED).await; + let rocks_db = Arc::new(RocksDbManager::new_primary(env.rocks_env.storage.clone())); let api = nft_ingester::api::api_impl::DasApi::< MaybeProofChecker, JsonWorker, @@ -1991,7 +2002,7 @@ mod tests { Storage, >::new( env.pg_env.client.clone(), - env.rocks_env.storage.clone(), + rocks_db.clone(), Arc::new(ApiMetricsConfig::new()), None, None, @@ -2045,6 +2056,7 @@ mod tests { let cnt = 20; let cli = Cli::default(); let (env, generated_assets) = setup::TestEnvironment::create(&cli, cnt, SLOT_UPDATED).await; + let rocks_db = Arc::new(RocksDbManager::new_primary(env.rocks_env.storage.clone())); let api = nft_ingester::api::api_impl::DasApi::< MaybeProofChecker, JsonWorker, @@ -2054,7 +2066,7 @@ mod tests { Storage, >::new( env.pg_env.client.clone(), - env.rocks_env.storage.clone(), + rocks_db.clone(), Arc::new(ApiMetricsConfig::new()), None, None, @@ -2105,6 +2117,7 @@ mod tests { let cnt = 20; let cli = Cli::default(); let (env, generated_assets) = setup::TestEnvironment::create(&cli, cnt, SLOT_UPDATED).await; + let rocks_db = Arc::new(RocksDbManager::new_primary(env.rocks_env.storage.clone())); let api = nft_ingester::api::api_impl::DasApi::< MaybeProofChecker, JsonWorker, @@ -2114,7 +2127,7 @@ mod tests { Storage, >::new( env.pg_env.client.clone(), - env.rocks_env.storage.clone(), + rocks_db.clone(), Arc::new(ApiMetricsConfig::new()), None, None, @@ -2165,6 +2178,7 @@ mod tests { let cnt = 20; let cli = Cli::default(); let (env, generated_assets) = setup::TestEnvironment::create(&cli, cnt, SLOT_UPDATED).await; + let rocks_db = Arc::new(RocksDbManager::new_primary(env.rocks_env.storage.clone())); let api = nft_ingester::api::api_impl::DasApi::< MaybeProofChecker, JsonWorker, @@ -2174,7 +2188,7 @@ mod tests { Storage, >::new( env.pg_env.client.clone(), - env.rocks_env.storage.clone(), + rocks_db.clone(), Arc::new(ApiMetricsConfig::new()), None, None, @@ -2269,6 +2283,7 @@ mod tests { .with(predicate::eq(url)) .times(1) .returning(move |_| Ok(offchain_data.to_string())); + let rocks_db = Arc::new(RocksDbManager::new_primary(env.rocks_env.storage.clone())); let api = nft_ingester::api::api_impl::DasApi::< MaybeProofChecker, @@ -2279,7 +2294,7 @@ mod tests { Storage, >::new( env.pg_env.client.clone(), - env.rocks_env.storage.clone(), + rocks_db.clone(), Arc::new(ApiMetricsConfig::new()), None, None, @@ -2429,6 +2444,7 @@ mod tests { let cnt = 20; let cli = Cli::default(); let (env, _) = setup::TestEnvironment::create(&cli, cnt, SLOT_UPDATED).await; + let rocks_db = Arc::new(RocksDbManager::new_primary(env.rocks_env.storage.clone())); let api = nft_ingester::api::api_impl::DasApi::< MaybeProofChecker, JsonWorker, @@ -2438,10 +2454,10 @@ mod tests { Storage, >::new( env.pg_env.client.clone(), - env.rocks_env.storage.clone(), + rocks_db.clone(), Arc::new(ApiMetricsConfig::new()), None, - Some(env.rocks_env.storage.clone()), + Some(rocks_db.acquire()), 50, None, None, @@ -2488,6 +2504,7 @@ mod tests { let cnt = 20; let cli = Cli::default(); let (env, _) = setup::TestEnvironment::create(&cli, cnt, SLOT_UPDATED).await; + let rocks_db = Arc::new(RocksDbManager::new_primary(env.rocks_env.storage.clone())); let api = nft_ingester::api::api_impl::DasApi::< MaybeProofChecker, JsonWorker, @@ -2497,7 +2514,7 @@ mod tests { Storage, >::new( env.pg_env.client.clone(), - env.rocks_env.storage.clone(), + rocks_db.clone(), Arc::new(ApiMetricsConfig::new()), None, None, @@ -2578,6 +2595,7 @@ mod tests { let total_assets = 2000; let cli = Cli::default(); let (env, _) = setup::TestEnvironment::create(&cli, total_assets, SLOT_UPDATED).await; + let rocks_db = Arc::new(RocksDbManager::new_primary(env.rocks_env.storage.clone())); let api = nft_ingester::api::api_impl::DasApi::< MaybeProofChecker, JsonWorker, @@ -2587,7 +2605,7 @@ mod tests { Storage, >::new( env.pg_env.client.clone(), - env.rocks_env.storage.clone(), + rocks_db.clone(), Arc::new(ApiMetricsConfig::new()), None, None, @@ -2657,11 +2675,9 @@ mod tests { let cnt = 0; let cli = Cli::default(); let (env, _) = setup::TestEnvironment::create(&cli, cnt, 100).await; - let solana_price_updater = SolanaPriceUpdater::new( - env.rocks_env.storage.clone(), - CoinGeckoPriceFetcher::new(), - 30, - ); + let rocks_db = Arc::new(RocksDbManager::new_primary(env.rocks_env.storage.clone())); + let solana_price_updater = + SolanaPriceUpdater::new(rocks_db.acquire(), CoinGeckoPriceFetcher::new(), 30); solana_price_updater.update_price().await.unwrap(); let mut mock_account_balance_getter = MockAccountBalanceGetter::new(); mock_account_balance_getter @@ -2676,7 +2692,7 @@ mod tests { Storage, >::new( env.pg_env.client.clone(), - env.rocks_env.storage.clone(), + rocks_db, Arc::new(ApiMetricsConfig::new()), None, None, @@ -2738,12 +2754,14 @@ mod tests { }, ); }); + let rocks_db = Arc::new(RocksDbManager::new_primary(env.rocks_env.storage.clone())); + let rock_db_instance = rocks_db.acquire(); let (d, o) = tokio::join!( env.rocks_env .storage .asset_dynamic_data .put_batch(collection_dynamic_details), - env.rocks_env.storage.asset_offchain_data.put_async( + rock_db_instance.asset_offchain_data.put_async( "http://example.com".to_string(), OffChainData { url: "http://example.com".to_string(), @@ -2788,6 +2806,7 @@ mod tests { d.unwrap(); o.unwrap(); + let rocks_db = Arc::new(RocksDbManager::new_primary(env.rocks_env.storage.clone())); let api = nft_ingester::api::api_impl::DasApi::< MaybeProofChecker, JsonWorker, @@ -2797,7 +2816,7 @@ mod tests { Storage, >::new( env.pg_env.client.clone(), - env.rocks_env.storage.clone(), + rocks_db.clone(), Arc::new(ApiMetricsConfig::new()), None, None, @@ -2930,6 +2949,7 @@ mod tests { let cnt = 10; let cli = Cli::default(); let (env, generated_assets) = setup::TestEnvironment::create(&cli, cnt, 100).await; + let rocks_db = Arc::new(RocksDbManager::new_primary(env.rocks_env.storage.clone())); let asset_pk = generated_assets.static_details.first().unwrap().pubkey; env.rocks_env .storage @@ -2952,7 +2972,7 @@ mod tests { }, ) .unwrap(); - env.rocks_env.storage.inscription_data.put(inscription_data_pk, InscriptionData { + rocks_db.acquire().inscription_data.put(inscription_data_pk, InscriptionData { pubkey: inscription_data_pk, data: general_purpose::STANDARD.decode("eyJwIjoic3BsLTIwIiwib3AiOiJkZXBsb3kiLCJ0aWNrIjoiaGVsaXVzIiwibWF4IjoiMTAwMCIsImxpbSI6IjEifQ==").unwrap(), write_version: 1000, @@ -2967,7 +2987,7 @@ mod tests { Storage, >::new( env.pg_env.client.clone(), - env.rocks_env.storage.clone(), + rocks_db.clone(), Arc::new(ApiMetricsConfig::new()), None, None, @@ -3019,8 +3039,9 @@ mod tests { let cnt = 100; let cli = Cli::default(); let (env, generated_assets) = setup::TestEnvironment::create(&cli, cnt, 100).await; + let rocks_db = Arc::new(RocksDbManager::new_primary(env.rocks_env.storage.clone())); let synchronizer = nft_ingester::index_syncronizer::Synchronizer::new( - env.rocks_env.storage.clone(), + rocks_db.acquire(), env.pg_env.client.clone(), env.pg_env.client.clone(), 200_000, @@ -3082,8 +3103,9 @@ mod tests { amount: 30000, write_version: 10, }; + let rocks_db = Arc::new(RocksDbManager::new_primary(env.rocks_env.storage.clone())); let mut batch_storage = BatchSaveStorage::new( - env.rocks_env.storage.clone(), + rocks_db.acquire(), 10, Arc::new(IngesterMetricsConfig::new()), ); @@ -3154,7 +3176,7 @@ mod tests { Storage, >::new( env.pg_env.client.clone(), - env.rocks_env.storage.clone(), + rocks_db, Arc::new(ApiMetricsConfig::new()), None, None, @@ -3425,9 +3447,9 @@ mod tests { token_group_member: None, }), }; - + let rocks_db = Arc::new(RocksDbManager::new_primary(env.rocks_env.storage.clone())); let mut batch_storage = BatchSaveStorage::new( - env.rocks_env.storage.clone(), + rocks_db.acquire(), 10, Arc::new(IngesterMetricsConfig::new()), ); @@ -3447,7 +3469,7 @@ mod tests { Storage, >::new( env.pg_env.client.clone(), - env.rocks_env.storage.clone(), + rocks_db, Arc::new(ApiMetricsConfig::new()), None, None, diff --git a/nft_ingester/tests/batch_mint_test.rs b/nft_ingester/tests/batch_mint_test.rs index c4453438a..1f7f65d0a 100644 --- a/nft_ingester/tests/batch_mint_test.rs +++ b/nft_ingester/tests/batch_mint_test.rs @@ -38,6 +38,7 @@ use nft_ingester::config::JsonMiddlewareConfig; use nft_ingester::error::IngesterError; use nft_ingester::json_worker::JsonWorker; use nft_ingester::raydium_price_fetcher::RaydiumTokenPriceFetcher; +use nft_ingester::rocks_db::RocksDbManager; use plerkle_serialization::serializer::serialize_transaction; use postgre_client::PgClient; use rocks_db::batch_mint::FailedBatchMintKey; @@ -411,6 +412,7 @@ async fn batch_mint_with_verified_creators_test() { .persist_batch_mint(&rx, batch_mint_to_verify.unwrap(), None) .await; + let rocks_db = RocksDbManager::new_primary(env.rocks_env.storage.clone()).into(); let api = nft_ingester::api::api_impl::DasApi::< MaybeProofChecker, JsonWorker, @@ -420,7 +422,7 @@ async fn batch_mint_with_verified_creators_test() { Storage, >::new( env.pg_env.client.clone(), - env.rocks_env.storage.clone(), + rocks_db, Arc::new(ApiMetricsConfig::new()), None, None, @@ -566,6 +568,7 @@ async fn batch_mint_with_unverified_creators_test() { .persist_batch_mint(&rx, batch_mint_to_verify.unwrap(), None) .await; + let rocks_db = RocksDbManager::new_primary(env.rocks_env.storage.clone()).into(); let api = nft_ingester::api::api_impl::DasApi::< MaybeProofChecker, JsonWorker, @@ -575,7 +578,7 @@ async fn batch_mint_with_unverified_creators_test() { Storage, >::new( env.pg_env.client.clone(), - env.rocks_env.storage.clone(), + rocks_db, Arc::new(ApiMetricsConfig::new()), None, None, @@ -664,6 +667,7 @@ async fn batch_mint_persister_test() { let merkle_tree = generate_merkle_tree_from_batch_mint(&test_batch_mint); + let rocks_db = RocksDbManager::new_primary(env.rocks_env.storage.clone()).into(); let api = nft_ingester::api::api_impl::DasApi::< MaybeProofChecker, JsonWorker, @@ -673,7 +677,7 @@ async fn batch_mint_persister_test() { Storage, >::new( env.pg_env.client.clone(), - env.rocks_env.storage.clone(), + rocks_db, Arc::new(ApiMetricsConfig::new()), None, None, diff --git a/nft_ingester/tests/bubblegum_tests.rs b/nft_ingester/tests/bubblegum_tests.rs index 2097b14e0..3bd99aad3 100644 --- a/nft_ingester/tests/bubblegum_tests.rs +++ b/nft_ingester/tests/bubblegum_tests.rs @@ -9,6 +9,7 @@ mod tests { use nft_ingester::config::JsonMiddlewareConfig; use nft_ingester::json_worker::JsonWorker; use nft_ingester::raydium_price_fetcher::RaydiumTokenPriceFetcher; + use nft_ingester::rocks_db::RocksDbManager; use nft_ingester::{ backfiller::{DirectBlockParser, TransactionsParser}, bubblegum_updates_processor::BubblegumTxProcessor, @@ -63,6 +64,7 @@ mod tests { let cnt = 20; let cli = Cli::default(); let (env, _generated_assets) = setup::TestEnvironment::create(&cli, cnt, 100).await; + let rocks_db = Arc::new(RocksDbManager::new_primary(env.rocks_env.storage.clone())); let api = nft_ingester::api::api_impl::DasApi::< MaybeProofChecker, JsonWorker, @@ -72,7 +74,7 @@ mod tests { Storage, >::new( env.pg_env.client.clone(), - env.rocks_env.storage.clone(), + rocks_db.clone(), Arc::new(ApiMetricsConfig::new()), None, None, @@ -191,6 +193,7 @@ mod tests { let cnt = 20; let cli = Cli::default(); let (env, _generated_assets) = setup::TestEnvironment::create(&cli, cnt, 100).await; + let rocks_db = Arc::new(RocksDbManager::new_primary(env.rocks_env.storage.clone())); let api = nft_ingester::api::api_impl::DasApi::< MaybeProofChecker, JsonWorker, @@ -200,7 +203,7 @@ mod tests { Storage, >::new( env.pg_env.client.clone(), - env.rocks_env.storage.clone(), + rocks_db.clone(), Arc::new(ApiMetricsConfig::new()), None, None, diff --git a/nft_ingester/tests/decompress.rs b/nft_ingester/tests/decompress.rs index b492a3d1c..9225bc9ef 100644 --- a/nft_ingester/tests/decompress.rs +++ b/nft_ingester/tests/decompress.rs @@ -12,6 +12,7 @@ mod tests { use nft_ingester::json_worker::JsonWorker; use nft_ingester::mplx_updates_processor::MplxAccountsProcessor; use nft_ingester::raydium_price_fetcher::RaydiumTokenPriceFetcher; + use nft_ingester::rocks_db::RocksDbManager; use nft_ingester::{ backfiller::{DirectBlockParser, TransactionsParser}, bubblegum_updates_processor::BubblegumTxProcessor, @@ -222,6 +223,7 @@ mod tests { .asset_offchain_data .put(metadata.url.clone(), metadata) .unwrap(); + let rocks_db = Arc::new(RocksDbManager::new_primary(env.rocks_env.storage.clone())); let api = nft_ingester::api::api_impl::DasApi::< MaybeProofChecker, @@ -232,7 +234,7 @@ mod tests { Storage, >::new( env.pg_env.client.clone(), - env.rocks_env.storage.clone(), + rocks_db.clone(), Arc::new(ApiMetricsConfig::new()), None, None, @@ -312,6 +314,7 @@ mod tests { .asset_offchain_data .put(metadata.url.clone(), metadata) .unwrap(); + let rocks_db = Arc::new(RocksDbManager::new_primary(env.rocks_env.storage.clone())); let api = nft_ingester::api::api_impl::DasApi::< MaybeProofChecker, @@ -322,7 +325,7 @@ mod tests { Storage, >::new( env.pg_env.client.clone(), - env.rocks_env.storage.clone(), + rocks_db.clone(), Arc::new(ApiMetricsConfig::new()), None, None, @@ -402,6 +405,7 @@ mod tests { .asset_offchain_data .put(metadata.url.clone(), metadata) .unwrap(); + let rocks_db = Arc::new(RocksDbManager::new_primary(env.rocks_env.storage.clone())); let api = nft_ingester::api::api_impl::DasApi::< MaybeProofChecker, @@ -412,7 +416,7 @@ mod tests { Storage, >::new( env.pg_env.client.clone(), - env.rocks_env.storage.clone(), + rocks_db.clone(), Arc::new(ApiMetricsConfig::new()), None, None, @@ -492,6 +496,7 @@ mod tests { .asset_offchain_data .put(metadata.url.clone(), metadata) .unwrap(); + let rocks_db = Arc::new(RocksDbManager::new_primary(env.rocks_env.storage.clone())); let api = nft_ingester::api::api_impl::DasApi::< MaybeProofChecker, @@ -502,7 +507,7 @@ mod tests { Storage, >::new( env.pg_env.client.clone(), - env.rocks_env.storage.clone(), + rocks_db.clone(), Arc::new(ApiMetricsConfig::new()), None, None, diff --git a/nft_ingester/tests/dump_tests.rs b/nft_ingester/tests/dump_tests.rs index 6bbb834fd..d557b7d56 100644 --- a/nft_ingester/tests/dump_tests.rs +++ b/nft_ingester/tests/dump_tests.rs @@ -118,230 +118,230 @@ mod tests { pg_env.teardown().await; temp_dir.close().unwrap(); } -} -#[cfg(test)] -#[cfg(feature = "integration_tests")] -mod mtg_441_tests { - use entities::api_req_params::{GetAsset, Options}; - use interface::account_balance::MockAccountBalanceGetter; - use metrics_utils::ApiMetricsConfig; - use nft_ingester::api::dapi::rpc_asset_models::Asset; - use nft_ingester::api::DasApi; - use nft_ingester::config::JsonMiddlewareConfig; - use nft_ingester::json_worker::JsonWorker; - use nft_ingester::raydium_price_fetcher::RaydiumTokenPriceFetcher; - use rocks_db::Storage; - use serde_json::Value; - use setup::rocks::RocksTestEnvironmentSetup; - use setup::TestEnvironment; - use std::sync::Arc; - use testcontainers::clients::Cli; - use tokio::sync::Mutex; - use tokio::task::JoinSet; - use usecase::proofs::MaybeProofChecker; + mod mtg_441_tests { + use entities::api_req_params::{GetAsset, Options}; + use interface::account_balance::MockAccountBalanceGetter; + use metrics_utils::ApiMetricsConfig; + use nft_ingester::api::dapi::rpc_asset_models::Asset; + use nft_ingester::api::DasApi; + use nft_ingester::config::JsonMiddlewareConfig; + use nft_ingester::json_worker::JsonWorker; + use nft_ingester::raydium_price_fetcher::RaydiumTokenPriceFetcher; + use nft_ingester::rocks_db::RocksDbManager; + use rocks_db::Storage; + use serde_json::Value; + use setup::rocks::RocksTestEnvironmentSetup; + use setup::TestEnvironment; + use std::sync::Arc; + use testcontainers::clients::Cli; + use tokio::sync::Mutex; + use tokio::task::JoinSet; + use usecase::proofs::MaybeProofChecker; - const SLOT_UPDATED: u64 = 100; + const SLOT_UPDATED: u64 = 100; - fn get_das_api( - env: &TestEnvironment, - ) -> DasApi< - MaybeProofChecker, - JsonWorker, - JsonWorker, - MockAccountBalanceGetter, - RaydiumTokenPriceFetcher, - Storage, - > { - DasApi::< + fn get_das_api( + env: &TestEnvironment, + ) -> DasApi< MaybeProofChecker, JsonWorker, JsonWorker, MockAccountBalanceGetter, RaydiumTokenPriceFetcher, Storage, - >::new( - env.pg_env.client.clone(), - env.rocks_env.storage.clone(), - Arc::new(ApiMetricsConfig::new()), - None, - None, - 50, - None, - None, - JsonMiddlewareConfig::default(), - Arc::new(MockAccountBalanceGetter::new()), - None, - Arc::new(RaydiumTokenPriceFetcher::default()), - ) - } - - fn parse_asset(json: Value) -> Asset { - serde_json::from_value::(json).expect("Cannot parse 'Asset'.") - } - - #[tokio::test] - #[tracing_test::traced_test] - async fn authority_none_collection_authority_some() { - let cli = Cli::default(); - let (env, generated_assets) = setup::TestEnvironment::create_and_setup_from_closures( - &cli, - 20, - SLOT_UPDATED, - RocksTestEnvironmentSetup::static_data_for_mpl, - RocksTestEnvironmentSetup::without_authority, - RocksTestEnvironmentSetup::test_owner, - RocksTestEnvironmentSetup::dynamic_data, - RocksTestEnvironmentSetup::collection_with_authority, - ) - .await; + > { + let rocks_db = Arc::new(RocksDbManager::new_primary(env.rocks_env.storage.clone())); + DasApi::< + MaybeProofChecker, + JsonWorker, + JsonWorker, + MockAccountBalanceGetter, + RaydiumTokenPriceFetcher, + Storage, + >::new( + env.pg_env.client.clone(), + rocks_db.clone(), + Arc::new(ApiMetricsConfig::new()), + None, + None, + 50, + None, + None, + JsonMiddlewareConfig::default(), + Arc::new(MockAccountBalanceGetter::new()), + None, + Arc::new(RaydiumTokenPriceFetcher::default()), + ) + } - let first_pubkey = generated_assets - .static_details - .first() - .expect("Cannot get first pubkey.") - .pubkey; + fn parse_asset(json: Value) -> Asset { + serde_json::from_value::(json).expect("Cannot parse 'Asset'.") + } - let mutexed_tasks = Arc::new(Mutex::new(JoinSet::new())); - let api_res = get_das_api(&env) - .get_asset( - GetAsset { - id: first_pubkey.to_string(), - options: Some(Options { - show_unverified_collections: true, - ..Default::default() - }), - }, - mutexed_tasks, + #[tokio::test] + #[tracing_test::traced_test] + async fn authority_none_collection_authority_some() { + let cli = Cli::default(); + let (env, generated_assets) = setup::TestEnvironment::create_and_setup_from_closures( + &cli, + 20, + SLOT_UPDATED, + RocksTestEnvironmentSetup::static_data_for_mpl, + RocksTestEnvironmentSetup::without_authority, + RocksTestEnvironmentSetup::test_owner, + RocksTestEnvironmentSetup::dynamic_data, + RocksTestEnvironmentSetup::collection_with_authority, ) .await; - assert!(api_res.is_ok()); - let api_res = api_res.expect("Cannot run api call."); - let res = parse_asset(api_res); - assert!(res.id.eq(&first_pubkey.to_string())); - } + let first_pubkey = generated_assets + .static_details + .first() + .expect("Cannot get first pubkey.") + .pubkey; - #[tokio::test] - #[tracing_test::traced_test] - async fn authority_some_collection_authority_none() { - let cli = Cli::default(); - let (env, generated_assets) = setup::TestEnvironment::create_and_setup_from_closures( - &cli, - 20, - SLOT_UPDATED, - RocksTestEnvironmentSetup::static_data_for_mpl, - RocksTestEnvironmentSetup::with_authority, - RocksTestEnvironmentSetup::test_owner, - RocksTestEnvironmentSetup::dynamic_data, - RocksTestEnvironmentSetup::collection_without_authority, - ) - .await; + let mutexed_tasks = Arc::new(Mutex::new(JoinSet::new())); + let api_res = get_das_api(&env) + .get_asset( + GetAsset { + id: first_pubkey.to_string(), + options: Some(Options { + show_unverified_collections: true, + ..Default::default() + }), + }, + mutexed_tasks, + ) + .await; - let first_pubkey = generated_assets - .static_details - .first() - .expect("Cannot get first pubkey.") - .pubkey; + assert!(api_res.is_ok()); + let api_res = api_res.expect("Cannot run api call."); + let res = parse_asset(api_res); + assert!(res.id.eq(&first_pubkey.to_string())); + } - let mutexed_tasks = Arc::new(Mutex::new(JoinSet::new())); - let api_res = get_das_api(&env) - .get_asset( - GetAsset { - id: first_pubkey.to_string(), - options: Some(Options { - show_unverified_collections: true, - ..Default::default() - }), - }, - mutexed_tasks, + #[tokio::test] + #[tracing_test::traced_test] + async fn authority_some_collection_authority_none() { + let cli = Cli::default(); + let (env, generated_assets) = setup::TestEnvironment::create_and_setup_from_closures( + &cli, + 20, + SLOT_UPDATED, + RocksTestEnvironmentSetup::static_data_for_mpl, + RocksTestEnvironmentSetup::with_authority, + RocksTestEnvironmentSetup::test_owner, + RocksTestEnvironmentSetup::dynamic_data, + RocksTestEnvironmentSetup::collection_without_authority, ) .await; - assert!(api_res.is_ok()); - let api_res = api_res.expect("Cannot run api call."); - let res = parse_asset(api_res); - assert!(res.id.eq(&first_pubkey.to_string())); - } - #[tokio::test] - #[tracing_test::traced_test] - async fn authority_some_collection_authority_some() { - let cli = Cli::default(); - let (env, generated_assets) = setup::TestEnvironment::create_and_setup_from_closures( - &cli, - 20, - SLOT_UPDATED, - RocksTestEnvironmentSetup::static_data_for_mpl, - RocksTestEnvironmentSetup::with_authority, - RocksTestEnvironmentSetup::test_owner, - RocksTestEnvironmentSetup::dynamic_data, - RocksTestEnvironmentSetup::collection_with_authority, - ) - .await; + let first_pubkey = generated_assets + .static_details + .first() + .expect("Cannot get first pubkey.") + .pubkey; - let first_pubkey = generated_assets - .static_details - .first() - .expect("Cannot get first pubkey.") - .pubkey; + let mutexed_tasks = Arc::new(Mutex::new(JoinSet::new())); + let api_res = get_das_api(&env) + .get_asset( + GetAsset { + id: first_pubkey.to_string(), + options: Some(Options { + show_unverified_collections: true, + ..Default::default() + }), + }, + mutexed_tasks, + ) + .await; + assert!(api_res.is_ok()); + let api_res = api_res.expect("Cannot run api call."); + let res = parse_asset(api_res); + assert!(res.id.eq(&first_pubkey.to_string())); + } - let mutexed_tasks = Arc::new(Mutex::new(JoinSet::new())); - let api_res = get_das_api(&env) - .get_asset( - GetAsset { - id: first_pubkey.to_string(), - options: Some(Options { - show_unverified_collections: true, - ..Default::default() - }), - }, - mutexed_tasks, + #[tokio::test] + #[tracing_test::traced_test] + async fn authority_some_collection_authority_some() { + let cli = Cli::default(); + let (env, generated_assets) = setup::TestEnvironment::create_and_setup_from_closures( + &cli, + 20, + SLOT_UPDATED, + RocksTestEnvironmentSetup::static_data_for_mpl, + RocksTestEnvironmentSetup::with_authority, + RocksTestEnvironmentSetup::test_owner, + RocksTestEnvironmentSetup::dynamic_data, + RocksTestEnvironmentSetup::collection_with_authority, ) .await; - assert!(api_res.is_ok()); - let api_res = api_res.expect("Cannot run api call."); - let res = parse_asset(api_res); - assert!(res.id.eq(&first_pubkey.to_string())); - } - #[tokio::test] - #[tracing_test::traced_test] - async fn authority_none_collection_authority_none() { - let cli = Cli::default(); - let (env, generated_assets) = setup::TestEnvironment::create_and_setup_from_closures( - &cli, - 20, - SLOT_UPDATED, - RocksTestEnvironmentSetup::static_data_for_mpl, - RocksTestEnvironmentSetup::without_authority, - RocksTestEnvironmentSetup::test_owner, - RocksTestEnvironmentSetup::dynamic_data, - RocksTestEnvironmentSetup::collection_without_authority, - ) - .await; + let first_pubkey = generated_assets + .static_details + .first() + .expect("Cannot get first pubkey.") + .pubkey; - let first_pubkey = generated_assets - .static_details - .first() - .expect("Cannot get first pubkey.") - .pubkey; + let mutexed_tasks = Arc::new(Mutex::new(JoinSet::new())); + let api_res = get_das_api(&env) + .get_asset( + GetAsset { + id: first_pubkey.to_string(), + options: Some(Options { + show_unverified_collections: true, + ..Default::default() + }), + }, + mutexed_tasks, + ) + .await; + assert!(api_res.is_ok()); + let api_res = api_res.expect("Cannot run api call."); + let res = parse_asset(api_res); + assert!(res.id.eq(&first_pubkey.to_string())); + } - let mutexed_tasks = Arc::new(Mutex::new(JoinSet::new())); - let api_res = get_das_api(&env) - .get_asset( - GetAsset { - id: first_pubkey.to_string(), - options: Some(Options { - show_unverified_collections: true, - ..Default::default() - }), - }, - mutexed_tasks, + #[tokio::test] + #[tracing_test::traced_test] + async fn authority_none_collection_authority_none() { + let cli = Cli::default(); + let (env, generated_assets) = setup::TestEnvironment::create_and_setup_from_closures( + &cli, + 20, + SLOT_UPDATED, + RocksTestEnvironmentSetup::static_data_for_mpl, + RocksTestEnvironmentSetup::without_authority, + RocksTestEnvironmentSetup::test_owner, + RocksTestEnvironmentSetup::dynamic_data, + RocksTestEnvironmentSetup::collection_without_authority, ) .await; - assert!(api_res.is_ok()); - let api_res = api_res.expect("Cannot run api call."); - let res = parse_asset(api_res); - assert!(res.id.eq(&first_pubkey.to_string())); + + let first_pubkey = generated_assets + .static_details + .first() + .expect("Cannot get first pubkey.") + .pubkey; + + let mutexed_tasks = Arc::new(Mutex::new(JoinSet::new())); + let api_res = get_das_api(&env) + .get_asset( + GetAsset { + id: first_pubkey.to_string(), + options: Some(Options { + show_unverified_collections: true, + ..Default::default() + }), + }, + mutexed_tasks, + ) + .await; + assert!(api_res.is_ok()); + let api_res = api_res.expect("Cannot run api call."); + let res = parse_asset(api_res); + assert!(res.id.eq(&first_pubkey.to_string())); + } } }