diff --git a/rust/Cargo.lock b/rust/Cargo.lock index 337f1d18b..9ccee2ff9 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -147,7 +147,7 @@ checksum = "b3d1d046238990b9cf5bcde22a3fb3584ee5cf65fb2765f454ed428c7a0063da" [[package]] name = "aptos-indexer-processor-sdk" version = "0.1.0" -source = "git+https://github.com/aptos-labs/aptos-indexer-processor-sdk.git?rev=9ecd252ccff53023664562001dd04c2886488c0d#9ecd252ccff53023664562001dd04c2886488c0d" +source = "git+https://github.com/aptos-labs/aptos-indexer-processor-sdk.git?rev=028e5d3df748fbb846f2058159bd1f8304c96aba#028e5d3df748fbb846f2058159bd1f8304c96aba" dependencies = [ "anyhow", "aptos-indexer-transaction-stream", @@ -179,7 +179,7 @@ dependencies = [ [[package]] name = "aptos-indexer-processor-sdk-server-framework" version = "1.0.0" -source = "git+https://github.com/aptos-labs/aptos-indexer-processor-sdk.git?rev=9ecd252ccff53023664562001dd04c2886488c0d#9ecd252ccff53023664562001dd04c2886488c0d" +source = "git+https://github.com/aptos-labs/aptos-indexer-processor-sdk.git?rev=028e5d3df748fbb846f2058159bd1f8304c96aba#028e5d3df748fbb846f2058159bd1f8304c96aba" dependencies = [ "anyhow", "aptos-indexer-processor-sdk", @@ -212,10 +212,10 @@ dependencies = [ [[package]] name = "aptos-indexer-transaction-stream" version = "0.1.0" -source = "git+https://github.com/aptos-labs/aptos-indexer-processor-sdk.git?rev=9ecd252ccff53023664562001dd04c2886488c0d#9ecd252ccff53023664562001dd04c2886488c0d" +source = "git+https://github.com/aptos-labs/aptos-indexer-processor-sdk.git?rev=028e5d3df748fbb846f2058159bd1f8304c96aba#028e5d3df748fbb846f2058159bd1f8304c96aba" dependencies = [ "anyhow", - "aptos-moving-average 0.1.0 (git+https://github.com/aptos-labs/aptos-indexer-processor-sdk.git?rev=9ecd252ccff53023664562001dd04c2886488c0d)", + "aptos-moving-average 0.1.0 (git+https://github.com/aptos-labs/aptos-indexer-processor-sdk.git?rev=028e5d3df748fbb846f2058159bd1f8304c96aba)", "aptos-protos 1.3.1 (git+https://github.com/aptos-labs/aptos-core.git?rev=5c48aee129b5a141be2792ffa3d9bd0a1a61c9cb)", "chrono", "futures-util", @@ -240,7 +240,7 @@ dependencies = [ [[package]] name = "aptos-moving-average" version = "0.1.0" -source = "git+https://github.com/aptos-labs/aptos-indexer-processor-sdk.git?rev=9ecd252ccff53023664562001dd04c2886488c0d#9ecd252ccff53023664562001dd04c2886488c0d" +source = "git+https://github.com/aptos-labs/aptos-indexer-processor-sdk.git?rev=028e5d3df748fbb846f2058159bd1f8304c96aba#028e5d3df748fbb846f2058159bd1f8304c96aba" dependencies = [ "chrono", ] @@ -2203,7 +2203,7 @@ dependencies = [ [[package]] name = "instrumented-channel" version = "0.1.0" -source = "git+https://github.com/aptos-labs/aptos-indexer-processor-sdk.git?rev=9ecd252ccff53023664562001dd04c2886488c0d#9ecd252ccff53023664562001dd04c2886488c0d" +source = "git+https://github.com/aptos-labs/aptos-indexer-processor-sdk.git?rev=028e5d3df748fbb846f2058159bd1f8304c96aba#028e5d3df748fbb846f2058159bd1f8304c96aba" dependencies = [ "delegate", "derive_builder", @@ -4066,7 +4066,10 @@ checksum = "f3cb5ba0dc43242ce17de99c180e96db90b235b8a9fdc9543c96d2209116bd9f" [[package]] name = "sample" version = "0.1.0" -source = "git+https://github.com/aptos-labs/aptos-indexer-processor-sdk.git?rev=9ecd252ccff53023664562001dd04c2886488c0d#9ecd252ccff53023664562001dd04c2886488c0d" +source = "git+https://github.com/aptos-labs/aptos-indexer-processor-sdk.git?rev=028e5d3df748fbb846f2058159bd1f8304c96aba#028e5d3df748fbb846f2058159bd1f8304c96aba" +dependencies = [ + "tracing", +] [[package]] name = "schannel" diff --git a/rust/Cargo.toml b/rust/Cargo.toml index bf84ea6ce..4f497b010 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -30,8 +30,8 @@ testing-transactions = { path = "testing-transactions" } ahash = { version = "0.8.7", features = ["serde"] } anyhow = "1.0.86" -aptos-indexer-processor-sdk = { git = "https://github.com/aptos-labs/aptos-indexer-processor-sdk.git", rev = "9ecd252ccff53023664562001dd04c2886488c0d" } -aptos-indexer-processor-sdk-server-framework = { git = "https://github.com/aptos-labs/aptos-indexer-processor-sdk.git", rev = "9ecd252ccff53023664562001dd04c2886488c0d" } +aptos-indexer-processor-sdk = { git = "https://github.com/aptos-labs/aptos-indexer-processor-sdk.git", rev = "028e5d3df748fbb846f2058159bd1f8304c96aba" } +aptos-indexer-processor-sdk-server-framework = { git = "https://github.com/aptos-labs/aptos-indexer-processor-sdk.git", rev = "028e5d3df748fbb846f2058159bd1f8304c96aba" } aptos-protos = { git = "https://github.com/aptos-labs/aptos-core.git", rev = "5c48aee129b5a141be2792ffa3d9bd0a1a61c9cb" } aptos-system-utils = { git = "https://github.com/aptos-labs/aptos-core.git", rev = "202bdccff2b2d333a385ae86a4fcf23e89da9f62" } aptos-indexer-test-transactions = { git = "https://github.com/aptos-labs/aptos-core.git", rev = "202bdccff2b2d333a385ae86a4fcf23e89da9f62" } diff --git a/rust/processor/src/db/common/models/fungible_asset_models/v2_fungible_asset_balances.rs b/rust/processor/src/db/common/models/fungible_asset_models/v2_fungible_asset_balances.rs index 41f9ffb18..3fef4fbcc 100644 --- a/rust/processor/src/db/common/models/fungible_asset_models/v2_fungible_asset_balances.rs +++ b/rust/processor/src/db/common/models/fungible_asset_models/v2_fungible_asset_balances.rs @@ -29,7 +29,10 @@ use bigdecimal::{BigDecimal, Zero}; use field_count::FieldCount; use lazy_static::lazy_static; use serde::{Deserialize, Serialize}; -use std::borrow::Borrow; +use std::{ + borrow::Borrow, + hash::{Hash, Hasher}, +}; // Storage id pub type CurrentFungibleAssetBalancePK = String; @@ -154,6 +157,12 @@ pub struct CurrentFungibleAssetBalance { pub token_standard: String, } +// impl Hash for CurrentFungibleAssetBalance { +// fn hash(&self, state: &mut H) { +// self.storage_id.hash(state); +// } +// } + /// Note that this used to be called current_unified_fungible_asset_balances_to_be_renamed /// and was renamed to current_fungible_asset_balances to facilitate migration #[derive(Clone, Debug, Deserialize, FieldCount, Identifiable, Insertable, Serialize, Default)] diff --git a/rust/processor/src/processors/fungible_asset_processor.rs b/rust/processor/src/processors/fungible_asset_processor.rs index 902aa6841..972a135e6 100644 --- a/rust/processor/src/processors/fungible_asset_processor.rs +++ b/rust/processor/src/processors/fungible_asset_processor.rs @@ -239,7 +239,7 @@ fn insert_fungible_asset_balances_query( ) } -fn insert_current_fungible_asset_balances_query( +pub fn insert_current_fungible_asset_balances_query( items_to_insert: Vec, ) -> ( impl QueryFragment + diesel::query_builder::QueryId + Send, @@ -451,7 +451,7 @@ impl ProcessorTrait for FungibleAssetProcessor { } /// V2 coin is called fungible assets and this flow includes all data from V1 in coin_processor -async fn parse_v2_coin( +pub async fn parse_v2_coin( transactions: &[Transaction], ) -> ( Vec, diff --git a/rust/sdk-processor/src/config/indexer_processor_config.rs b/rust/sdk-processor/src/config/indexer_processor_config.rs index 361ec56b3..f176b0a9d 100644 --- a/rust/sdk-processor/src/config/indexer_processor_config.rs +++ b/rust/sdk-processor/src/config/indexer_processor_config.rs @@ -2,7 +2,10 @@ // SPDX-License-Identifier: Apache-2.0 use super::{db_config::DbConfig, processor_config::ProcessorConfig}; -use crate::processors::events_processor::EventsProcessor; +use crate::processors::{ + events_processor::EventsProcessor, fungible_asset_processor::FungibleAssetProcessor, +}; +use ahash::HashSet; use anyhow::Result; use aptos_indexer_processor_sdk::aptos_indexer_transaction_stream::TransactionStreamConfig; use aptos_indexer_processor_sdk_server_framework::RunnableConfig; @@ -14,6 +17,10 @@ pub struct IndexerProcessorConfig { pub processor_config: ProcessorConfig, pub transaction_stream_config: TransactionStreamConfig, pub db_config: DbConfig, + + // String vector for deprecated tables to skip db writes + #[serde(default)] + pub deprecated_tables: HashSet, } #[async_trait::async_trait] @@ -24,6 +31,10 @@ impl RunnableConfig for IndexerProcessorConfig { let events_processor = EventsProcessor::new(self.clone()).await?; events_processor.run_processor().await }, + ProcessorConfig::FungibleAssetProcessor(_) => { + let fungible_asset_processor = FungibleAssetProcessor::new(self.clone()).await?; + fungible_asset_processor.run_processor().await + }, } } diff --git a/rust/sdk-processor/src/config/processor_config.rs b/rust/sdk-processor/src/config/processor_config.rs index 541458a1a..434311597 100644 --- a/rust/sdk-processor/src/config/processor_config.rs +++ b/rust/sdk-processor/src/config/processor_config.rs @@ -1,4 +1,6 @@ -use crate::processors::events_processor::EventsProcessorConfig; +use crate::processors::{ + events_processor::EventsProcessorConfig, fungible_asset_processor::FungibleAssetProcessorConfig, +}; use serde::{Deserialize, Serialize}; /// This enum captures the configs for all the different processors that are defined. @@ -35,6 +37,7 @@ use serde::{Deserialize, Serialize}; )] pub enum ProcessorConfig { EventsProcessor(EventsProcessorConfig), + FungibleAssetProcessor(FungibleAssetProcessorConfig), } impl ProcessorConfig { diff --git a/rust/sdk-processor/src/processors/events_processor.rs b/rust/sdk-processor/src/processors/events_processor.rs index 379b01358..e84303b00 100644 --- a/rust/sdk-processor/src/processors/events_processor.rs +++ b/rust/sdk-processor/src/processors/events_processor.rs @@ -94,8 +94,15 @@ impl EventsProcessor { .await?; check_or_update_chain_id(grpc_chain_id as i64, self.db_pool.clone()).await?; - let ProcessorConfig::EventsProcessor(events_processor_config) = - self.config.processor_config; + let events_processor_config = match self.config.processor_config { + ProcessorConfig::EventsProcessor(events_processor_config) => events_processor_config, + _ => { + return Err(anyhow::anyhow!( + "Invalid processor config for EventsProcessor: {:?}", + self.config.processor_config + )) + }, + }; let channel_size = events_processor_config.channel_size; // Define processor steps @@ -129,8 +136,13 @@ impl EventsProcessor { continue; } debug!( - "Finished processing events from versions [{:?}, {:?}]", - txn_context.start_version, txn_context.end_version, + "Finished processing events from versions {}", + txn_context + .get_versions() + .iter() + .map(|(x, y)| format!("[{}, {}]", x, y)) + .collect::>() + .join(", "), ); }, Err(e) => { diff --git a/rust/sdk-processor/src/processors/fungible_asset_processor.rs b/rust/sdk-processor/src/processors/fungible_asset_processor.rs new file mode 100644 index 000000000..c484d8fa4 --- /dev/null +++ b/rust/sdk-processor/src/processors/fungible_asset_processor.rs @@ -0,0 +1,158 @@ +use crate::{ + config::{ + db_config::DbConfig, indexer_processor_config::IndexerProcessorConfig, + processor_config::ProcessorConfig, + }, + steps::{ + common::latest_processed_version_tracker::LatestVersionProcessedTracker, + events_processor::{EventsExtractor, EventsStorer}, + fungible_asset_processor::{ + fungible_asset_deduper::FungibleAssetDeduper, + fungible_asset_extractor::FungibleAssetExtractor, + fungible_asset_storer::FungibleAssetStorer, + }, + }, + utils::{ + chain_id::check_or_update_chain_id, + database::{new_db_pool, run_migrations, ArcDbPool}, + starting_version::get_starting_version, + }, +}; +use ahash::AHashMap; +use anyhow::Result; +use aptos_indexer_processor_sdk::{ + aptos_indexer_transaction_stream::{TransactionStream, TransactionStreamConfig}, + builder::ProcessorBuilder, + common_steps::TransactionStreamStep, + traits::IntoRunnableStep, +}; +use serde::{Deserialize, Serialize}; +use std::time::Duration; +use tracing::{debug, info}; + +#[derive(Clone, Debug, Deserialize, Serialize)] +#[serde(deny_unknown_fields)] +pub struct FungibleAssetProcessorConfig { + // Number of rows to insert, per chunk, for each DB table. Default per table is ~32,768 (2**16/2) + #[serde(default = "AHashMap::new")] + pub per_table_chunk_sizes: AHashMap, + // Size of channel between steps + #[serde(default = "FungibleAssetProcessorConfig::default_channel_size")] + pub channel_size: usize, +} + +impl FungibleAssetProcessorConfig { + pub const fn default_channel_size() -> usize { + 10 + } +} + +pub struct FungibleAssetProcessor { + pub config: IndexerProcessorConfig, + pub db_pool: ArcDbPool, +} + +impl FungibleAssetProcessor { + pub async fn new(config: IndexerProcessorConfig) -> Result { + match config.db_config { + DbConfig::PostgresConfig(ref postgres_config) => { + let conn_pool = new_db_pool( + &postgres_config.connection_string, + Some(postgres_config.db_pool_size), + ) + .await + .map_err(|e| { + anyhow::anyhow!( + "Failed to create connection pool for PostgresConfig: {:?}", + e + ) + })?; + + Ok(Self { + config, + db_pool: conn_pool, + }) + }, + } + } + + pub async fn run_processor(self) -> Result<()> { + let processor_name = self.config.processor_config.name(); + + // (Optional) Run migrations + match self.config.db_config { + DbConfig::PostgresConfig(ref postgres_config) => { + run_migrations( + postgres_config.connection_string.clone(), + self.db_pool.clone(), + ) + .await; + }, + } + + // (Optional) Merge the starting version from config and the latest processed version from the DB + let starting_version = get_starting_version(&self.config, self.db_pool.clone()).await?; + + // (Optional) Check and update the ledger chain id to ensure we're indexing the correct chain + let grpc_chain_id = TransactionStream::new(self.config.transaction_stream_config.clone()) + .await? + .get_chain_id() + .await?; + check_or_update_chain_id(grpc_chain_id as i64, self.db_pool.clone()).await?; + + let fa_config = match self.config.processor_config { + ProcessorConfig::FungibleAssetProcessor(fa_config) => fa_config, + _ => return Err(anyhow::anyhow!("Processor config is wrong type")), + }; + let channel_size = fa_config.channel_size; + + // Define processor steps + let transaction_stream = TransactionStreamStep::new(TransactionStreamConfig { + starting_version: Some(starting_version), + ..self.config.transaction_stream_config + }) + .await?; + let fa_extractor = FungibleAssetExtractor {}; + let fa_deduper = FungibleAssetDeduper::new(Duration::from_secs(1)); + let fa_storer = FungibleAssetStorer::new(self.db_pool.clone(), fa_config); + let version_tracker = LatestVersionProcessedTracker::new( + self.db_pool.clone(), + starting_version, + processor_name.to_string(), + ); + + // Connect processor steps together + let (_, buffer_receiver) = ProcessorBuilder::new_with_inputless_first_step( + transaction_stream.into_runnable_step(), + ) + .connect_to(fa_extractor.into_runnable_step(), channel_size) + .connect_to(fa_deduper.into_runnable_step(), channel_size) + .connect_to(fa_storer.into_runnable_step(), channel_size) + .connect_to(version_tracker.into_runnable_step(), channel_size) + .end_and_return_output_receiver(channel_size); + + // (Optional) Parse the results + loop { + match buffer_receiver.recv().await { + Ok(txn_context) => { + if txn_context.data.is_empty() { + continue; + } + debug!( + "Finished processing versions {}", + txn_context + .get_versions() + .iter() + .map(|(x, y)| format!("[{}, {}]", x, y)) + .collect::>() + .join(", "), + ); + }, + Err(e) => { + info!("No more transactions in channel: {:?}", e); + break Ok(()); + }, + } + } + } +} diff --git a/rust/sdk-processor/src/processors/mod.rs b/rust/sdk-processor/src/processors/mod.rs index 110ce858e..29d570b27 100644 --- a/rust/sdk-processor/src/processors/mod.rs +++ b/rust/sdk-processor/src/processors/mod.rs @@ -1 +1,2 @@ pub mod events_processor; +pub mod fungible_asset_processor; diff --git a/rust/sdk-processor/src/steps/common/latest_processed_version_tracker.rs b/rust/sdk-processor/src/steps/common/latest_processed_version_tracker.rs index 54b7e7b78..66df23377 100644 --- a/rust/sdk-processor/src/steps/common/latest_processed_version_tracker.rs +++ b/rust/sdk-processor/src/steps/common/latest_processed_version_tracker.rs @@ -2,6 +2,7 @@ use crate::utils::database::{execute_with_better_error, ArcDbPool}; use ahash::AHashMap; use anyhow::Result; use aptos_indexer_processor_sdk::{ + aptos_protos, traits::{ pollable_async_step::PollableAsyncRunType, NamedStep, PollableAsyncStep, Processable, }, @@ -11,10 +12,18 @@ use aptos_indexer_processor_sdk::{ use async_trait::async_trait; use diesel::{upsert::excluded, ExpressionMethods}; use processor::{db::common::models::processor_status::ProcessorStatus, schema::processor_status}; +use std::marker::PhantomData; use tracing::info; const UPDATE_PROCESSOR_STATUS_SECS: u64 = 1; +pub struct ProcessedBatch { + pub start_version: u64, + pub end_version: u64, + pub start_transaction_timestamp: Option, + pub end_transaction_timestamp: Option, +} + pub struct LatestVersionProcessedTracker where Self: Sized + Send + 'static, @@ -25,9 +34,10 @@ where // Next version to process that we expect. next_version: u64, // Last successful batch of sequentially processed transactions. Includes metadata to write to storage. - last_success_batch: Option>, + last_success_batch: Option, // Tracks all the versions that have been processed out of order. - seen_versions: AHashMap>, + seen_versions: AHashMap, + _marker: PhantomData, } impl LatestVersionProcessedTracker @@ -42,10 +52,11 @@ where next_version: starting_version, last_success_batch: None, seen_versions: AHashMap::new(), + _marker: PhantomData, } } - fn update_last_success_batch(&mut self, current_batch: TransactionContext) { + fn update_last_success_batch(&mut self, current_batch: ProcessedBatch) { let mut new_prev_batch = current_batch; // While there are batches in seen_versions that are in order, update the new_prev_batch to the next batch. while let Some(next_version) = self.seen_versions.remove(&(new_prev_batch.end_version + 1)) @@ -107,41 +118,33 @@ where &mut self, current_batch: TransactionContext, ) -> Result>, ProcessorError> { - // info!( - // start_version = current_batch.start_version, - // end_version = current_batch.end_version, - // step_name = self.name(), - // "Processing versions" - // ); - // If there's a gap in the next_version and current_version, save the current_version to seen_versions for - // later processing. - if self.next_version != current_batch.start_version { - info!( - expected_next_version = self.next_version, - step = self.name(), - batch_version = current_batch.start_version, - "Gap detected", - ); - self.seen_versions - .insert(current_batch.start_version, TransactionContext { - data: vec![], // No data is needed for tracking. This is to avoid clone. - start_version: current_batch.start_version, - end_version: current_batch.end_version, - start_transaction_timestamp: current_batch.start_transaction_timestamp.clone(), - end_transaction_timestamp: current_batch.end_transaction_timestamp.clone(), - total_size_in_bytes: current_batch.total_size_in_bytes, + for context in current_batch.context.iter() { + // If there's a gap in the next_version and current_version, save the current_version to seen_versions for + // later processing. + if self.next_version != context.start_version { + tracing::debug!( + next_version = self.next_version, + step = self.name(), + "Gap detected starting from version: {}", + context.start_version + ); + self.seen_versions + .insert(context.start_version, ProcessedBatch { + start_version: context.start_version, + end_version: context.end_version, + start_transaction_timestamp: context.start_transaction_timestamp.clone(), + end_transaction_timestamp: context.end_transaction_timestamp.clone(), + }); + } else { + tracing::debug!("No gap detected"); + // If the current_batch is the next expected version, update the last success batch + self.update_last_success_batch(ProcessedBatch { + start_version: context.start_version, + end_version: context.end_version, + start_transaction_timestamp: context.start_transaction_timestamp.clone(), + end_transaction_timestamp: context.end_transaction_timestamp.clone(), }); - } else { - // info!("No gap detected"); - // If the current_batch is the next expected version, update the last success batch - self.update_last_success_batch(TransactionContext { - data: vec![], // No data is needed for tracking. This is to avoid clone. - start_version: current_batch.start_version, - end_version: current_batch.end_version, - start_transaction_timestamp: current_batch.start_transaction_timestamp.clone(), - end_transaction_timestamp: current_batch.end_transaction_timestamp.clone(), - total_size_in_bytes: current_batch.total_size_in_bytes, - }); + } } // Pass through Ok(Some(current_batch)) diff --git a/rust/sdk-processor/src/steps/events_processor/events_extractor.rs b/rust/sdk-processor/src/steps/events_processor/events_extractor.rs index 7aec68b29..bb8317b51 100644 --- a/rust/sdk-processor/src/steps/events_processor/events_extractor.rs +++ b/rust/sdk-processor/src/steps/events_processor/events_extractor.rs @@ -68,11 +68,7 @@ impl Processable for EventsExtractor { .collect::>(); Ok(Some(TransactionContext { data: events, - start_version: item.start_version, - end_version: item.end_version, - start_transaction_timestamp: item.start_transaction_timestamp, - end_transaction_timestamp: item.end_transaction_timestamp, - total_size_in_bytes: item.total_size_in_bytes, + context: item.context, })) } } diff --git a/rust/sdk-processor/src/steps/events_processor/events_storer.rs b/rust/sdk-processor/src/steps/events_processor/events_storer.rs index c5560155e..1cfa970ac 100644 --- a/rust/sdk-processor/src/steps/events_processor/events_storer.rs +++ b/rust/sdk-processor/src/steps/events_processor/events_storer.rs @@ -84,15 +84,26 @@ impl Processable for EventsStorer { match execute_res { Ok(_) => { debug!( - "Events version [{}, {}] stored successfully", - events.start_version, events.end_version + "Events version {} stored successfully", + events + .get_versions() + .iter() + .map(|(x, y)| format!("[{}, {}]", x, y)) + .collect::>() + .join(", "), ); Ok(Some(events)) }, Err(e) => Err(ProcessorError::DBStoreError { message: format!( - "Failed to store events versions {} to {}: {:?}", - events.start_version, events.end_version, e, + "Failed to store events versions {:?}: {:?}", + events + .get_versions() + .iter() + .map(|(x, y)| format!("[{}, {}]", x, y)) + .collect::>() + .join(", "), + e, ), // TODO: fix it with a debug_query. query: None, diff --git a/rust/sdk-processor/src/steps/fungible_asset_processor/fungible_asset_deduper.rs b/rust/sdk-processor/src/steps/fungible_asset_processor/fungible_asset_deduper.rs new file mode 100644 index 000000000..8b9bed061 --- /dev/null +++ b/rust/sdk-processor/src/steps/fungible_asset_processor/fungible_asset_deduper.rs @@ -0,0 +1,121 @@ +use ahash::AHashMap; +use aptos_indexer_processor_sdk::{ + aptos_protos::transaction::v1::Transaction, + traits::{ + async_step::AsyncRunType, AsyncStep, NamedStep, PollableAsyncRunType, PollableAsyncStep, + Processable, + }, + types::transaction_context::{Context, TransactionContext}, + utils::errors::ProcessorError, +}; +use async_trait::async_trait; +use diesel::associations::Identifiable; +use processor::db::common::models::{ + coin_models::coin_supply::CoinSupply, + fungible_asset_models::{ + v2_fungible_asset_activities::FungibleAssetActivity, + v2_fungible_asset_balances::{ + CurrentFungibleAssetBalance, CurrentFungibleAssetBalancePK, + CurrentUnifiedFungibleAssetBalance, FungibleAssetBalance, + }, + v2_fungible_metadata::FungibleAssetMetadataModel, + }, +}; +use std::{collections::BTreeSet, time::Duration}; + +pub struct FungibleAssetDeduper +where + Self: Sized + Send + 'static, +{ + // The duration to collect and dedup data before releasing it + poll_interval: Duration, + merged_cfab: AHashMap, + contexts: BTreeSet, +} + +impl FungibleAssetDeduper +where + Self: Sized + Send + 'static, +{ + pub fn new(poll_interval: Duration) -> Self { + Self { + poll_interval, + merged_cfab: AHashMap::new(), + contexts: BTreeSet::new(), + } + } +} + +#[async_trait] +impl Processable for FungibleAssetDeduper { + type Input = ( + Vec, + Vec, + Vec, + Vec, + Vec, + Vec, + ); + type Output = ( + Vec, + Vec, + Vec, + Vec, + Vec, + Vec, + ); + type RunType = PollableAsyncRunType; + + async fn process( + &mut self, + items: TransactionContext, + ) -> Result>, ProcessorError> { + let (_, _, _, cfab, _, _) = &items.data[0]; + + // Update transaction contexts + for context in items.context.iter() { + self.contexts.insert(context.clone()); + } + + // Dedup + for balance in cfab.iter() { + self.merged_cfab + .insert(balance.id().to_string(), balance.clone()); + } + + Ok(None) + } +} + +#[async_trait] +impl PollableAsyncStep for FungibleAssetDeduper { + fn poll_interval(&self) -> Duration { + self.poll_interval + } + + async fn poll( + &mut self, + ) -> Result>>, ProcessorError> { + let transaction_context = TransactionContext::new_with_contexts( + vec![( + vec![], + vec![], + vec![], + std::mem::take(&mut self.merged_cfab) + .values() + .cloned() + .collect::>(), + vec![], + vec![], + )], + self.contexts.clone(), + ); + Ok(Some(vec![transaction_context])) + } +} + +impl NamedStep for FungibleAssetDeduper { + fn name(&self) -> String { + "FungibleAssetDeduper".to_string() + } +} diff --git a/rust/sdk-processor/src/steps/fungible_asset_processor/fungible_asset_extractor.rs b/rust/sdk-processor/src/steps/fungible_asset_processor/fungible_asset_extractor.rs new file mode 100644 index 000000000..03db6cac2 --- /dev/null +++ b/rust/sdk-processor/src/steps/fungible_asset_processor/fungible_asset_extractor.rs @@ -0,0 +1,85 @@ +use aptos_indexer_processor_sdk::{ + aptos_protos::transaction::v1::Transaction, + traits::{async_step::AsyncRunType, AsyncStep, NamedStep, Processable}, + types::transaction_context::TransactionContext, + utils::errors::ProcessorError, +}; +use async_trait::async_trait; +use processor::{ + db::common::models::{ + coin_models::coin_supply::CoinSupply, + fungible_asset_models::{ + v2_fungible_asset_activities::FungibleAssetActivity, + v2_fungible_asset_balances::{ + CurrentFungibleAssetBalance, CurrentUnifiedFungibleAssetBalance, + FungibleAssetBalance, + }, + v2_fungible_metadata::FungibleAssetMetadataModel, + }, + }, + processors::fungible_asset_processor::parse_v2_coin, +}; + +pub struct FungibleAssetExtractor +where + Self: Sized + Send + 'static, {} + +#[async_trait] +impl Processable for FungibleAssetExtractor { + type Input = Transaction; + type Output = ( + Vec, + Vec, + Vec, + Vec, + Vec, + Vec, + ); + type RunType = AsyncRunType; + + async fn process( + &mut self, + transactions: TransactionContext, + ) -> Result< + Option< + TransactionContext<( + Vec, + Vec, + Vec, + Vec, + Vec, + Vec, + )>, + >, + ProcessorError, + > { + let ( + fungible_asset_activities, + fungible_asset_metadata, + fungible_asset_balances, + current_fungible_asset_balances, + current_unified_fungible_asset_balances, + coin_supply, + ) = parse_v2_coin(&transactions.data).await; + + Ok(Some(TransactionContext { + data: vec![( + fungible_asset_activities, + fungible_asset_metadata, + fungible_asset_balances, + current_fungible_asset_balances, + current_unified_fungible_asset_balances, + coin_supply, + )], + context: transactions.context, + })) + } +} + +impl AsyncStep for FungibleAssetExtractor {} + +impl NamedStep for FungibleAssetExtractor { + fn name(&self) -> String { + "FungibleAssetExtractor".to_string() + } +} diff --git a/rust/sdk-processor/src/steps/fungible_asset_processor/fungible_asset_storer.rs b/rust/sdk-processor/src/steps/fungible_asset_processor/fungible_asset_storer.rs new file mode 100644 index 000000000..74e13cfc9 --- /dev/null +++ b/rust/sdk-processor/src/steps/fungible_asset_processor/fungible_asset_storer.rs @@ -0,0 +1,129 @@ +use crate::{ + db::common::models::events_models::events::EventModel, + processors::{ + events_processor::EventsProcessorConfig, + fungible_asset_processor::FungibleAssetProcessorConfig, + }, + utils::database::{execute_in_chunks, get_config_table_chunk_size, ArcDbPool}, +}; +use ahash::AHashMap; +use anyhow::Result; +use aptos_indexer_processor_sdk::{ + traits::{async_step::AsyncRunType, AsyncStep, NamedStep, Processable}, + types::transaction_context::TransactionContext, + utils::errors::ProcessorError, +}; +use async_trait::async_trait; +use diesel::{ + pg::{upsert::excluded, Pg}, + query_builder::QueryFragment, + ExpressionMethods, +}; +use processor::{ + db::common::models::{ + coin_models::coin_supply::CoinSupply, + fungible_asset_models::{ + v2_fungible_asset_activities::FungibleAssetActivity, + v2_fungible_asset_balances::{ + CurrentFungibleAssetBalance, CurrentUnifiedFungibleAssetBalance, + FungibleAssetBalance, + }, + v2_fungible_metadata::FungibleAssetMetadataModel, + }, + }, + processors::fungible_asset_processor::insert_current_fungible_asset_balances_query, + schema, +}; +use tracing::debug; + +pub struct FungibleAssetStorer +where + Self: Sized + Send + 'static, +{ + conn_pool: ArcDbPool, + processor_config: FungibleAssetProcessorConfig, +} + +impl FungibleAssetStorer { + pub fn new(conn_pool: ArcDbPool, processor_config: FungibleAssetProcessorConfig) -> Self { + Self { + conn_pool, + processor_config, + } + } +} + +#[async_trait] +impl Processable for FungibleAssetStorer { + type Input = ( + Vec, + Vec, + Vec, + Vec, + Vec, + Vec, + ); + type Output = ( + Vec, + Vec, + Vec, + Vec, + Vec, + Vec, + ); + type RunType = AsyncRunType; + + async fn process( + &mut self, + items: TransactionContext, + ) -> Result>, ProcessorError> { + let (_, _, _, current_fungible_asset_balances, _, _) = &items.data[0]; + + let execute_res = execute_in_chunks( + self.conn_pool.clone(), + insert_current_fungible_asset_balances_query, + current_fungible_asset_balances, + get_config_table_chunk_size::( + "current_fungible_asset_balances", + &self.processor_config.per_table_chunk_sizes, + ), + ) + .await; + match execute_res { + Ok(_) => { + debug!( + "FA version {} stored successfully", + items + .get_versions() + .iter() + .map(|(x, y)| format!("[{}, {}]", x, y)) + .collect::>() + .join(", "), + ); + Ok(Some(items)) + }, + Err(e) => Err(ProcessorError::DBStoreError { + message: format!( + "Failed to store events versions {}: {:?}", + items + .get_versions() + .iter() + .map(|(x, y)| format!("[{}, {}]", x, y)) + .collect::>() + .join(", "), + e, + ), + // TODO: fix it with a debug_query. + query: None, + }), + } + } +} + +impl AsyncStep for FungibleAssetStorer {} + +impl NamedStep for FungibleAssetStorer { + fn name(&self) -> String { + "FungibleAssetStorer".to_string() + } +} diff --git a/rust/sdk-processor/src/steps/fungible_asset_processor/mod.rs b/rust/sdk-processor/src/steps/fungible_asset_processor/mod.rs new file mode 100644 index 000000000..cd2a70c15 --- /dev/null +++ b/rust/sdk-processor/src/steps/fungible_asset_processor/mod.rs @@ -0,0 +1,3 @@ +pub mod fungible_asset_deduper; +pub mod fungible_asset_extractor; +pub mod fungible_asset_storer; diff --git a/rust/sdk-processor/src/steps/mod.rs b/rust/sdk-processor/src/steps/mod.rs index 0d9f7cf7a..3c039e318 100644 --- a/rust/sdk-processor/src/steps/mod.rs +++ b/rust/sdk-processor/src/steps/mod.rs @@ -1,2 +1,3 @@ pub mod common; pub mod events_processor; +pub mod fungible_asset_processor;