diff --git a/Cargo.lock b/Cargo.lock index c647e0e6c619b..63c9de2df3c46 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2005,6 +2005,7 @@ dependencies = [ "tonic 0.11.0", "tonic-reflection", "tracing", + "transaction-filter", "uuid", ] @@ -6655,12 +6656,12 @@ dependencies = [ [[package]] name = "darling" -version = "0.20.3" +version = "0.20.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0209d94da627ab5605dcccf08bb18afa5009cfbef48d8a8b7d7bdbc79be25c5e" +checksum = "83b2eb4d90d12bdda5ed17de686c2acb4c57914f8f921b8da7e112b5a36f3fe1" dependencies = [ - "darling_core 0.20.3", - "darling_macro 0.20.3", + "darling_core 0.20.9", + "darling_macro 0.20.9", ] [[package]] @@ -6693,15 +6694,15 @@ dependencies = [ [[package]] name = "darling_core" -version = "0.20.3" +version = "0.20.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "177e3443818124b357d8e76f53be906d60937f0d3a90773a664fa63fa253e621" +checksum = "622687fe0bac72a04e5599029151f5796111b90f1baaa9b544d807a5e31cd120" dependencies = [ "fnv", "ident_case", "proc-macro2", "quote", - "strsim 0.10.0", + "strsim 0.11.1", "syn 2.0.48", ] @@ -6729,11 +6730,11 @@ dependencies = [ [[package]] name = "darling_macro" -version = "0.20.3" +version = "0.20.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "836a9bbc7ad63342d6d6e7b815ccab164bc77a2d95d84bc3117a8c0d5c98e2d5" +checksum = "733cabb43482b1a1b53eee8583c2b9e8684d592215ea83efd305dd31bc2f0178" dependencies = [ - "darling_core 0.20.3", + "darling_core 0.20.9", "quote", "syn 2.0.48", ] @@ -6878,6 +6879,37 @@ dependencies = [ "syn 2.0.48", ] +[[package]] +name = "derive_builder" +version = "0.20.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0350b5cb0331628a5916d6c5c0b72e97393b8b6b03b47a9284f4e7f5a405ffd7" +dependencies = [ + "derive_builder_macro", +] + +[[package]] +name = "derive_builder_core" +version = "0.20.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d48cda787f839151732d396ac69e3473923d54312c070ee21e9effcaa8ca0b1d" +dependencies = [ + "darling 0.20.9", + "proc-macro2", + "quote", + "syn 2.0.48", +] + +[[package]] +name = "derive_builder_macro" +version = "0.20.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "206868b8242f27cecce124c19fd88157fbd0dd334df2587f36417bafbc85097b" +dependencies = [ + "derive_builder_core", + "syn 2.0.48", +] + [[package]] name = "derive_more" version = "0.99.17" @@ -10105,9 +10137,9 @@ dependencies = [ [[package]] name = "memchr" -version = "2.7.1" +version = "2.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "523dc4f511e55ab87b694dc30d0f820d60906ef06413f93d4d7a1385599cc149" +checksum = "78ca9ab1a0babb1e7d5695e3530886289c18cf2f87ec19a575a0abdce112e3a3" [[package]] name = "memmap2" @@ -14314,7 +14346,7 @@ version = "3.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "93634eb5f75a2323b16de4748022ac4297f9e76b6dced2be287a099f41b5e788" dependencies = [ - "darling 0.20.3", + "darling 0.20.9", "proc-macro2", "quote", "syn 2.0.48", @@ -14921,6 +14953,12 @@ version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623" +[[package]] +name = "strsim" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f" + [[package]] name = "structopt" version = "0.3.26" @@ -15307,18 +15345,18 @@ checksum = "222a222a5bfe1bba4a77b45ec488a741b3cb8872e5e499451fd7d0129c9c7c3d" [[package]] name = "thiserror" -version = "1.0.56" +version = "1.0.61" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d54378c645627613241d077a3a79db965db602882668f9136ac42af9ecb730ad" +checksum = "c546c80d6be4bc6a00c0f01730c08df82eaa7a7a61f11d656526506112cc1709" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.56" +version = "1.0.61" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fa0faa943b50f3db30a20aa7e265dbc66076993efed8463e8de414e5d06d3471" +checksum = "46c3384250002a6d5af4d114f2845d37b57521033f30d5c3f46c4d70e1197533" dependencies = [ "proc-macro2", "quote", @@ -15947,6 +15985,22 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "transaction-filter" +version = "0.1.0" +source = "git+https://github.com/aptos-labs/aptos-indexer-processors.git?rev=35c362a10224ba0a22af8d03c515652c6bc62eb4#35c362a10224ba0a22af8d03c515652c6bc62eb4" +dependencies = [ + "anyhow", + "aptos-protos 1.3.0 (git+https://github.com/aptos-labs/aptos-core.git?tag=aptos-node-v1.12.1)", + "derive_builder", + "memchr", + "prost 0.12.3", + "serde", + "serde_json", + "serde_yaml 0.9.34+deprecated", + "thiserror", +] + [[package]] name = "treediff" version = "3.0.2" diff --git a/Cargo.toml b/Cargo.toml index fdf320dbd41aa..be18d06bdca1b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -473,6 +473,7 @@ ark-groth16 = "0.4.0" ark-serialize = "0.4.0" ark-std = { version = "0.4.0", features = ["getrandom"] } aptos-moving-average = { git = "https://github.com/aptos-labs/aptos-indexer-processors.git", rev = "4801acae7aea30d7e96bbfbe5ec5b04056dfa4cf" } +transaction-filter = { git = "https://github.com/aptos-labs/aptos-indexer-processors.git", rev = "35c362a10224ba0a22af8d03c515652c6bc62eb4" } assert_approx_eq = "1.1.0" assert_unordered = "0.3.5" async-channel = "1.7.1" diff --git a/ecosystem/indexer-grpc/indexer-grpc-data-service/Cargo.toml b/ecosystem/indexer-grpc/indexer-grpc-data-service/Cargo.toml index c378e10000297..780909520fd49 100644 --- a/ecosystem/indexer-grpc/indexer-grpc-data-service/Cargo.toml +++ b/ecosystem/indexer-grpc/indexer-grpc-data-service/Cargo.toml @@ -31,6 +31,7 @@ tokio = { workspace = true } tokio-stream = { workspace = true } tonic = { workspace = true } tonic-reflection = { workspace = true } +transaction-filter = { workspace = true } tracing = { workspace = true } uuid = { workspace = true } diff --git a/ecosystem/indexer-grpc/indexer-grpc-data-service/src/config.rs b/ecosystem/indexer-grpc/indexer-grpc-data-service/src/config.rs index 6824343676579..6fb764432b7f7 100644 --- a/ecosystem/indexer-grpc/indexer-grpc-data-service/src/config.rs +++ b/ecosystem/indexer-grpc/indexer-grpc-data-service/src/config.rs @@ -16,6 +16,7 @@ use aptos_protos::{ use serde::{Deserialize, Serialize}; use std::{collections::HashSet, net::SocketAddr, sync::Arc}; use tonic::{codec::CompressionEncoding, transport::Server}; +use transaction_filter::BooleanTransactionFilter; pub const SERVER_NAME: &str = "idxdatasvc"; @@ -69,9 +70,17 @@ pub struct IndexerGrpcDataServiceConfig { pub enable_cache_compression: bool, #[serde(default)] pub in_memory_cache_config: InMemoryCacheConfig, - /// Sender addresses to ignore. Transactions from these addresses will not be indexed. - #[serde(default = "IndexerGrpcDataServiceConfig::default_sender_addresses_to_ignore")] - pub sender_addresses_to_ignore: Vec, + /// Any transaction that matches this filter will be stripped, meaning we remove + /// events and writesets from it before sending it downstream. This should only be + /// used in an emergency situation, e.g. when txns related to a certain module are + /// too large and are causing issues for the data service. Learn more here: + /// + /// https://www.notion.so/aptoslabs/Runbook-c006a37259394ac2ba904d6b54d180fa?pvs=4#171c210964ec42a89574fc80154f9e85 + /// + /// Generally you will want to start with this with an OR, and then list out + /// separate filters that describe each type of txn we want to strip. + #[serde(default = "IndexerGrpcDataServiceConfig::default_txns_to_strip_filter")] + pub txns_to_strip_filter: BooleanTransactionFilter, } impl IndexerGrpcDataServiceConfig { @@ -84,7 +93,7 @@ impl IndexerGrpcDataServiceConfig { redis_read_replica_address: RedisUrl, enable_cache_compression: bool, in_memory_cache_config: InMemoryCacheConfig, - sender_addresses_to_ignore: Vec, + txns_to_strip_filter: BooleanTransactionFilter, ) -> Self { Self { data_service_grpc_tls_config, @@ -97,7 +106,7 @@ impl IndexerGrpcDataServiceConfig { redis_read_replica_address, enable_cache_compression, in_memory_cache_config, - sender_addresses_to_ignore, + txns_to_strip_filter, } } @@ -109,8 +118,9 @@ impl IndexerGrpcDataServiceConfig { false } - pub const fn default_sender_addresses_to_ignore() -> Vec { - vec![] + pub const fn default_txns_to_strip_filter() -> BooleanTransactionFilter { + // This filter matches no txns. + BooleanTransactionFilter::new_or(vec![]) } } @@ -162,10 +172,7 @@ impl RunnableConfig for IndexerGrpcDataServiceConfig { self.redis_read_replica_address.clone(), self.file_store_config.clone(), self.data_service_response_channel_size, - self.sender_addresses_to_ignore - .clone() - .into_iter() - .collect::>(), + self.txns_to_strip_filter.clone(), cache_storage_format, Arc::new(in_memory_cache), )?; diff --git a/ecosystem/indexer-grpc/indexer-grpc-data-service/src/service.rs b/ecosystem/indexer-grpc/indexer-grpc-data-service/src/service.rs index 525189bd9a4f1..269f9170a4e16 100644 --- a/ecosystem/indexer-grpc/indexer-grpc-data-service/src/service.rs +++ b/ecosystem/indexer-grpc/indexer-grpc-data-service/src/service.rs @@ -42,6 +42,7 @@ use tokio::sync::mpsc::{channel, error::SendTimeoutError}; use tokio_stream::wrappers::ReceiverStream; use tonic::{Request, Response, Status}; use tracing::{error, info, warn}; +use transaction_filter::{BooleanTransactionFilter, Filterable}; use uuid::Uuid; type ResponseStream = Pin> + Send>>; @@ -77,7 +78,7 @@ pub struct RawDataServerWrapper { pub redis_client: Arc, pub file_store_config: IndexerGrpcFileStoreConfig, pub data_service_response_channel_size: usize, - pub sender_addresses_to_ignore: HashSet, + pub txns_to_strip_filter: BooleanTransactionFilter, pub cache_storage_format: StorageFormat, in_memory_cache: Arc, } @@ -87,7 +88,7 @@ impl RawDataServerWrapper { redis_address: RedisUrl, file_store_config: IndexerGrpcFileStoreConfig, data_service_response_channel_size: usize, - sender_addresses_to_ignore: HashSet, + txns_to_strip_filter: BooleanTransactionFilter, cache_storage_format: StorageFormat, in_memory_cache: Arc, ) -> anyhow::Result { @@ -99,7 +100,7 @@ impl RawDataServerWrapper { ), file_store_config, data_service_response_channel_size, - sender_addresses_to_ignore, + txns_to_strip_filter, cache_storage_format, in_memory_cache, }) @@ -175,7 +176,7 @@ impl RawData for RawDataServerWrapper { let redis_client = self.redis_client.clone(); let cache_storage_format = self.cache_storage_format; let request_metadata = Arc::new(request_metadata); - let sender_addresses_to_ignore = self.sender_addresses_to_ignore.clone(); + let txns_to_strip_filter = self.txns_to_strip_filter.clone(); let in_memory_cache = self.in_memory_cache.clone(); tokio::spawn({ let request_metadata = request_metadata.clone(); @@ -187,7 +188,7 @@ impl RawData for RawDataServerWrapper { request_metadata, transactions_count, tx, - sender_addresses_to_ignore, + txns_to_strip_filter, current_version, in_memory_cache, ) @@ -375,7 +376,7 @@ async fn data_fetcher_task( request_metadata: Arc, transactions_count: Option, tx: tokio::sync::mpsc::Sender>, - sender_addresses_to_ignore: HashSet, + txns_to_strip_filter: BooleanTransactionFilter, mut current_version: u64, in_memory_cache: Arc, ) { @@ -513,7 +514,7 @@ async fn data_fetcher_task( let resp_items = get_transactions_responses_builder( transaction_data, chain_id as u32, - &sender_addresses_to_ignore, + &txns_to_strip_filter, ); let data_latency_in_secs = resp_items .last() @@ -657,11 +658,10 @@ fn ensure_sequential_transactions(mut batches: Vec>) -> Vec, chain_id: u32, - sender_addresses_to_ignore: &HashSet, + txns_to_strip_filter: &BooleanTransactionFilter, ) -> Vec { - let filtered_transactions = - filter_transactions_for_sender_addresses(transactions, sender_addresses_to_ignore); - let chunks = chunk_transactions(filtered_transactions, MESSAGE_SIZE_LIMIT); + let stripped_transactions = strip_transactions(transactions, txns_to_strip_filter); + let chunks = chunk_transactions(stripped_transactions, MESSAGE_SIZE_LIMIT); chunks .into_iter() .map(|chunk| TransactionsResponse { @@ -937,21 +937,29 @@ async fn channel_send_multiple_with_timeout( Ok(()) } -fn filter_transactions_for_sender_addresses( +/// This function strips transactions that match the given filter. Stripping means we +/// remove the payload, signature, events, and writesets. Note, the filter can be +/// composed of many conditions, see `BooleanTransactionFilter` for more. +fn strip_transactions( transactions: Vec, - sender_addresses_to_ignore: &HashSet, + txns_to_strip_filter: &BooleanTransactionFilter, ) -> Vec { transactions .into_iter() .map(|mut txn| { - if let Some(TxnData::User(user_transaction)) = txn.txn_data.as_mut() { - if let Some(utr) = user_transaction.request.as_mut() { - if sender_addresses_to_ignore.contains(&utr.sender) { + // Note: `is_allowed` means the txn is matches the filter, in which case + // we strip it. + if txns_to_strip_filter.is_allowed(&txn) { + // TODO: transaction-filter needs to be moved to aptos-core, see https://aptos-org.slack.com/archives/C0468USBLQJ/p1718368277898979?thread_ts=1718356561.304619&cid=C0468USBLQJ + if let Some(info) = txn.info.as_mut() { + info.changes = vec![]; + } + if let Some(TxnData::User(user_transaction)) = txn.txn_data.as_mut() { + user_transaction.events = vec![]; + if let Some(utr) = user_transaction.request.as_mut() { // Wipe the payload and signature. utr.payload = None; utr.signature = None; - user_transaction.events = vec![]; - txn.info.as_mut().unwrap().changes = vec![]; } } } @@ -962,7 +970,7 @@ fn filter_transactions_for_sender_addresses( #[cfg(test)] mod tests { - use super::{ensure_sequential_transactions, filter_transactions_for_sender_addresses}; + use super::ensure_sequential_transactions; use aptos_protos::transaction::v1::{ transaction::TxnData, Event, Signature, Transaction, TransactionInfo, TransactionPayload, UserTransaction, UserTransactionRequest, WriteSetChange, @@ -1014,6 +1022,7 @@ mod tests { assert_eq!(transactions1.last().unwrap().version, 11); } + // TODO: Adapt this for strip_transactions instead. #[test] fn test_transactions_are_filter_correctly() { let sender_address = "0x1234".to_string();