Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rtso/dedup #527

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 10 additions & 7 deletions rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ testing-transactions = { path = "testing-transactions" }

ahash = { version = "0.8.7", features = ["serde"] }
anyhow = "1.0.86"
aptos-indexer-processor-sdk = { git = "https://github.com/aptos-labs/aptos-indexer-processor-sdk.git", rev = "9ecd252ccff53023664562001dd04c2886488c0d" }
aptos-indexer-processor-sdk-server-framework = { git = "https://github.com/aptos-labs/aptos-indexer-processor-sdk.git", rev = "9ecd252ccff53023664562001dd04c2886488c0d" }
aptos-indexer-processor-sdk = { git = "https://github.com/aptos-labs/aptos-indexer-processor-sdk.git", rev = "028e5d3df748fbb846f2058159bd1f8304c96aba" }
aptos-indexer-processor-sdk-server-framework = { git = "https://github.com/aptos-labs/aptos-indexer-processor-sdk.git", rev = "028e5d3df748fbb846f2058159bd1f8304c96aba" }
aptos-protos = { git = "https://github.com/aptos-labs/aptos-core.git", rev = "5c48aee129b5a141be2792ffa3d9bd0a1a61c9cb" }
aptos-system-utils = { git = "https://github.com/aptos-labs/aptos-core.git", rev = "202bdccff2b2d333a385ae86a4fcf23e89da9f62" }
aptos-indexer-test-transactions = { git = "https://github.com/aptos-labs/aptos-core.git", rev = "202bdccff2b2d333a385ae86a4fcf23e89da9f62" }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@ use bigdecimal::{BigDecimal, Zero};
use field_count::FieldCount;
use lazy_static::lazy_static;
use serde::{Deserialize, Serialize};
use std::borrow::Borrow;
use std::{
borrow::Borrow,
hash::{Hash, Hasher},
};

// Storage id
pub type CurrentFungibleAssetBalancePK = String;
Expand Down Expand Up @@ -154,6 +157,12 @@ pub struct CurrentFungibleAssetBalance {
pub token_standard: String,
}

// impl Hash for CurrentFungibleAssetBalance {
// fn hash<H: Hasher>(&self, state: &mut H) {
// self.storage_id.hash(state);
// }
// }

/// Note that this used to be called current_unified_fungible_asset_balances_to_be_renamed
/// and was renamed to current_fungible_asset_balances to facilitate migration
#[derive(Clone, Debug, Deserialize, FieldCount, Identifiable, Insertable, Serialize, Default)]
Expand Down
4 changes: 2 additions & 2 deletions rust/processor/src/processors/fungible_asset_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ fn insert_fungible_asset_balances_query(
)
}

fn insert_current_fungible_asset_balances_query(
pub fn insert_current_fungible_asset_balances_query(
items_to_insert: Vec<CurrentFungibleAssetBalance>,
) -> (
impl QueryFragment<Pg> + diesel::query_builder::QueryId + Send,
Expand Down Expand Up @@ -451,7 +451,7 @@ impl ProcessorTrait for FungibleAssetProcessor {
}

/// V2 coin is called fungible assets and this flow includes all data from V1 in coin_processor
async fn parse_v2_coin(
pub async fn parse_v2_coin(
transactions: &[Transaction],
) -> (
Vec<FungibleAssetActivity>,
Expand Down
13 changes: 12 additions & 1 deletion rust/sdk-processor/src/config/indexer_processor_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@
// SPDX-License-Identifier: Apache-2.0

use super::{db_config::DbConfig, processor_config::ProcessorConfig};
use crate::processors::events_processor::EventsProcessor;
use crate::processors::{
events_processor::EventsProcessor, fungible_asset_processor::FungibleAssetProcessor,
};
use ahash::HashSet;
use anyhow::Result;
use aptos_indexer_processor_sdk::aptos_indexer_transaction_stream::TransactionStreamConfig;
use aptos_indexer_processor_sdk_server_framework::RunnableConfig;
Expand All @@ -14,6 +17,10 @@ pub struct IndexerProcessorConfig {
pub processor_config: ProcessorConfig,
pub transaction_stream_config: TransactionStreamConfig,
pub db_config: DbConfig,

// String vector for deprecated tables to skip db writes
#[serde(default)]
pub deprecated_tables: HashSet<String>,
}

#[async_trait::async_trait]
Expand All @@ -24,6 +31,10 @@ impl RunnableConfig for IndexerProcessorConfig {
let events_processor = EventsProcessor::new(self.clone()).await?;
events_processor.run_processor().await
},
ProcessorConfig::FungibleAssetProcessor(_) => {
let fungible_asset_processor = FungibleAssetProcessor::new(self.clone()).await?;
fungible_asset_processor.run_processor().await
},
}
}

Expand Down
5 changes: 4 additions & 1 deletion rust/sdk-processor/src/config/processor_config.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use crate::processors::events_processor::EventsProcessorConfig;
use crate::processors::{
events_processor::EventsProcessorConfig, fungible_asset_processor::FungibleAssetProcessorConfig,
};
use serde::{Deserialize, Serialize};

/// This enum captures the configs for all the different processors that are defined.
Expand Down Expand Up @@ -35,6 +37,7 @@ use serde::{Deserialize, Serialize};
)]
pub enum ProcessorConfig {
EventsProcessor(EventsProcessorConfig),
FungibleAssetProcessor(FungibleAssetProcessorConfig),
}

impl ProcessorConfig {
Expand Down
20 changes: 16 additions & 4 deletions rust/sdk-processor/src/processors/events_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,15 @@ impl EventsProcessor {
.await?;
check_or_update_chain_id(grpc_chain_id as i64, self.db_pool.clone()).await?;

let ProcessorConfig::EventsProcessor(events_processor_config) =
self.config.processor_config;
let events_processor_config = match self.config.processor_config {
ProcessorConfig::EventsProcessor(events_processor_config) => events_processor_config,
_ => {
return Err(anyhow::anyhow!(
"Invalid processor config for EventsProcessor: {:?}",
self.config.processor_config
))
},
};
let channel_size = events_processor_config.channel_size;

// Define processor steps
Expand Down Expand Up @@ -129,8 +136,13 @@ impl EventsProcessor {
continue;
}
debug!(
"Finished processing events from versions [{:?}, {:?}]",
txn_context.start_version, txn_context.end_version,
"Finished processing events from versions {}",
txn_context
.get_versions()
.iter()
.map(|(x, y)| format!("[{}, {}]", x, y))
.collect::<Vec<_>>()
.join(", "),
);
},
Err(e) => {
Expand Down
158 changes: 158 additions & 0 deletions rust/sdk-processor/src/processors/fungible_asset_processor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
use crate::{
config::{
db_config::DbConfig, indexer_processor_config::IndexerProcessorConfig,
processor_config::ProcessorConfig,
},
steps::{
common::latest_processed_version_tracker::LatestVersionProcessedTracker,
events_processor::{EventsExtractor, EventsStorer},
fungible_asset_processor::{
fungible_asset_deduper::FungibleAssetDeduper,
fungible_asset_extractor::FungibleAssetExtractor,
fungible_asset_storer::FungibleAssetStorer,
},
},
utils::{
chain_id::check_or_update_chain_id,
database::{new_db_pool, run_migrations, ArcDbPool},
starting_version::get_starting_version,
},
};
use ahash::AHashMap;
use anyhow::Result;
use aptos_indexer_processor_sdk::{
aptos_indexer_transaction_stream::{TransactionStream, TransactionStreamConfig},
builder::ProcessorBuilder,
common_steps::TransactionStreamStep,
traits::IntoRunnableStep,
};
use serde::{Deserialize, Serialize};
use std::time::Duration;
use tracing::{debug, info};

#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(deny_unknown_fields)]
pub struct FungibleAssetProcessorConfig {
// Number of rows to insert, per chunk, for each DB table. Default per table is ~32,768 (2**16/2)
#[serde(default = "AHashMap::new")]
pub per_table_chunk_sizes: AHashMap<String, usize>,
// Size of channel between steps
#[serde(default = "FungibleAssetProcessorConfig::default_channel_size")]
pub channel_size: usize,
}

impl FungibleAssetProcessorConfig {
pub const fn default_channel_size() -> usize {
10
}
}

pub struct FungibleAssetProcessor {
pub config: IndexerProcessorConfig,
pub db_pool: ArcDbPool,
}

impl FungibleAssetProcessor {
pub async fn new(config: IndexerProcessorConfig) -> Result<Self> {
match config.db_config {
DbConfig::PostgresConfig(ref postgres_config) => {
let conn_pool = new_db_pool(
&postgres_config.connection_string,
Some(postgres_config.db_pool_size),
)
.await
.map_err(|e| {
anyhow::anyhow!(
"Failed to create connection pool for PostgresConfig: {:?}",
e
)
})?;

Ok(Self {
config,
db_pool: conn_pool,
})
},
}
}

pub async fn run_processor(self) -> Result<()> {
let processor_name = self.config.processor_config.name();

// (Optional) Run migrations
match self.config.db_config {
DbConfig::PostgresConfig(ref postgres_config) => {
run_migrations(
postgres_config.connection_string.clone(),
self.db_pool.clone(),
)
.await;
},
}

// (Optional) Merge the starting version from config and the latest processed version from the DB
let starting_version = get_starting_version(&self.config, self.db_pool.clone()).await?;

// (Optional) Check and update the ledger chain id to ensure we're indexing the correct chain
let grpc_chain_id = TransactionStream::new(self.config.transaction_stream_config.clone())
.await?
.get_chain_id()
.await?;
check_or_update_chain_id(grpc_chain_id as i64, self.db_pool.clone()).await?;

let fa_config = match self.config.processor_config {
ProcessorConfig::FungibleAssetProcessor(fa_config) => fa_config,
_ => return Err(anyhow::anyhow!("Processor config is wrong type")),
};
let channel_size = fa_config.channel_size;

// Define processor steps
let transaction_stream = TransactionStreamStep::new(TransactionStreamConfig {
starting_version: Some(starting_version),
..self.config.transaction_stream_config
})
.await?;
let fa_extractor = FungibleAssetExtractor {};
let fa_deduper = FungibleAssetDeduper::new(Duration::from_secs(1));
let fa_storer = FungibleAssetStorer::new(self.db_pool.clone(), fa_config);
let version_tracker = LatestVersionProcessedTracker::new(
self.db_pool.clone(),
starting_version,
processor_name.to_string(),
);

// Connect processor steps together
let (_, buffer_receiver) = ProcessorBuilder::new_with_inputless_first_step(
transaction_stream.into_runnable_step(),
)
.connect_to(fa_extractor.into_runnable_step(), channel_size)
.connect_to(fa_deduper.into_runnable_step(), channel_size)
.connect_to(fa_storer.into_runnable_step(), channel_size)
.connect_to(version_tracker.into_runnable_step(), channel_size)
.end_and_return_output_receiver(channel_size);

// (Optional) Parse the results
loop {
match buffer_receiver.recv().await {
Ok(txn_context) => {
if txn_context.data.is_empty() {
continue;
}
debug!(
"Finished processing versions {}",
txn_context
.get_versions()
.iter()
.map(|(x, y)| format!("[{}, {}]", x, y))
.collect::<Vec<_>>()
.join(", "),
);
},
Err(e) => {
info!("No more transactions in channel: {:?}", e);
break Ok(());
},
}
}
}
}
1 change: 1 addition & 0 deletions rust/sdk-processor/src/processors/mod.rs
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
pub mod events_processor;
pub mod fungible_asset_processor;
Loading
Loading