From bb5a4e798f667cf034c9b51c0ff57763a661bfb1 Mon Sep 17 00:00:00 2001 From: Daniel Porteous Date: Fri, 14 Jun 2024 14:34:36 +0200 Subject: [PATCH] [Data Service] Implement simple upstream transaction filtering There are two types of transaction filtering we will support in the future: 1. Per stream configuration: The downstream declares what txns they want to receive. 2. Global configuration: At the data service level we refuse to include full txns for all streams. This PR implements the second of these, using @CapCap's work here: https://github.com/aptos-labs/aptos-indexer-processors/pull/398. Rather than not sending txns at all if they match the blocklist filters, we just omit the writesets and events. Not sending the txns entirely would cause issues with processors, which today assume that they will receive all txns. --- Cargo.lock | 88 +++++++++++++++---- Cargo.toml | 1 + .../indexer-grpc-data-service/Cargo.toml | 1 + .../indexer-grpc-data-service/src/config.rs | 29 +++--- .../indexer-grpc-data-service/src/service.rs | 47 ++++++---- 5 files changed, 119 insertions(+), 47 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c647e0e6c619b..63c9de2df3c46 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2005,6 +2005,7 @@ dependencies = [ "tonic 0.11.0", "tonic-reflection", "tracing", + "transaction-filter", "uuid", ] @@ -6655,12 +6656,12 @@ dependencies = [ [[package]] name = "darling" -version = "0.20.3" +version = "0.20.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0209d94da627ab5605dcccf08bb18afa5009cfbef48d8a8b7d7bdbc79be25c5e" +checksum = "83b2eb4d90d12bdda5ed17de686c2acb4c57914f8f921b8da7e112b5a36f3fe1" dependencies = [ - "darling_core 0.20.3", - "darling_macro 0.20.3", + "darling_core 0.20.9", + "darling_macro 0.20.9", ] [[package]] @@ -6693,15 +6694,15 @@ dependencies = [ [[package]] name = "darling_core" -version = "0.20.3" +version = "0.20.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "177e3443818124b357d8e76f53be906d60937f0d3a90773a664fa63fa253e621" +checksum = "622687fe0bac72a04e5599029151f5796111b90f1baaa9b544d807a5e31cd120" dependencies = [ "fnv", "ident_case", "proc-macro2", "quote", - "strsim 0.10.0", + "strsim 0.11.1", "syn 2.0.48", ] @@ -6729,11 +6730,11 @@ dependencies = [ [[package]] name = "darling_macro" -version = "0.20.3" +version = "0.20.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "836a9bbc7ad63342d6d6e7b815ccab164bc77a2d95d84bc3117a8c0d5c98e2d5" +checksum = "733cabb43482b1a1b53eee8583c2b9e8684d592215ea83efd305dd31bc2f0178" dependencies = [ - "darling_core 0.20.3", + "darling_core 0.20.9", "quote", "syn 2.0.48", ] @@ -6878,6 +6879,37 @@ dependencies = [ "syn 2.0.48", ] +[[package]] +name = "derive_builder" +version = "0.20.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0350b5cb0331628a5916d6c5c0b72e97393b8b6b03b47a9284f4e7f5a405ffd7" +dependencies = [ + "derive_builder_macro", +] + +[[package]] +name = "derive_builder_core" +version = "0.20.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d48cda787f839151732d396ac69e3473923d54312c070ee21e9effcaa8ca0b1d" +dependencies = [ + "darling 0.20.9", + "proc-macro2", + "quote", + "syn 2.0.48", +] + +[[package]] +name = "derive_builder_macro" +version = "0.20.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "206868b8242f27cecce124c19fd88157fbd0dd334df2587f36417bafbc85097b" +dependencies = [ + "derive_builder_core", + "syn 2.0.48", +] + [[package]] name = "derive_more" version = "0.99.17" @@ -10105,9 +10137,9 @@ dependencies = [ [[package]] name = "memchr" -version = "2.7.1" +version = "2.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "523dc4f511e55ab87b694dc30d0f820d60906ef06413f93d4d7a1385599cc149" +checksum = "78ca9ab1a0babb1e7d5695e3530886289c18cf2f87ec19a575a0abdce112e3a3" [[package]] name = "memmap2" @@ -14314,7 +14346,7 @@ version = "3.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "93634eb5f75a2323b16de4748022ac4297f9e76b6dced2be287a099f41b5e788" dependencies = [ - "darling 0.20.3", + "darling 0.20.9", "proc-macro2", "quote", "syn 2.0.48", @@ -14921,6 +14953,12 @@ version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623" +[[package]] +name = "strsim" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f" + [[package]] name = "structopt" version = "0.3.26" @@ -15307,18 +15345,18 @@ checksum = "222a222a5bfe1bba4a77b45ec488a741b3cb8872e5e499451fd7d0129c9c7c3d" [[package]] name = "thiserror" -version = "1.0.56" +version = "1.0.61" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d54378c645627613241d077a3a79db965db602882668f9136ac42af9ecb730ad" +checksum = "c546c80d6be4bc6a00c0f01730c08df82eaa7a7a61f11d656526506112cc1709" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.56" +version = "1.0.61" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fa0faa943b50f3db30a20aa7e265dbc66076993efed8463e8de414e5d06d3471" +checksum = "46c3384250002a6d5af4d114f2845d37b57521033f30d5c3f46c4d70e1197533" dependencies = [ "proc-macro2", "quote", @@ -15947,6 +15985,22 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "transaction-filter" +version = "0.1.0" +source = "git+https://github.com/aptos-labs/aptos-indexer-processors.git?rev=35c362a10224ba0a22af8d03c515652c6bc62eb4#35c362a10224ba0a22af8d03c515652c6bc62eb4" +dependencies = [ + "anyhow", + "aptos-protos 1.3.0 (git+https://github.com/aptos-labs/aptos-core.git?tag=aptos-node-v1.12.1)", + "derive_builder", + "memchr", + "prost 0.12.3", + "serde", + "serde_json", + "serde_yaml 0.9.34+deprecated", + "thiserror", +] + [[package]] name = "treediff" version = "3.0.2" diff --git a/Cargo.toml b/Cargo.toml index fdf320dbd41aa..be18d06bdca1b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -473,6 +473,7 @@ ark-groth16 = "0.4.0" ark-serialize = "0.4.0" ark-std = { version = "0.4.0", features = ["getrandom"] } aptos-moving-average = { git = "https://github.com/aptos-labs/aptos-indexer-processors.git", rev = "4801acae7aea30d7e96bbfbe5ec5b04056dfa4cf" } +transaction-filter = { git = "https://github.com/aptos-labs/aptos-indexer-processors.git", rev = "35c362a10224ba0a22af8d03c515652c6bc62eb4" } assert_approx_eq = "1.1.0" assert_unordered = "0.3.5" async-channel = "1.7.1" diff --git a/ecosystem/indexer-grpc/indexer-grpc-data-service/Cargo.toml b/ecosystem/indexer-grpc/indexer-grpc-data-service/Cargo.toml index c378e10000297..780909520fd49 100644 --- a/ecosystem/indexer-grpc/indexer-grpc-data-service/Cargo.toml +++ b/ecosystem/indexer-grpc/indexer-grpc-data-service/Cargo.toml @@ -31,6 +31,7 @@ tokio = { workspace = true } tokio-stream = { workspace = true } tonic = { workspace = true } tonic-reflection = { workspace = true } +transaction-filter = { workspace = true } tracing = { workspace = true } uuid = { workspace = true } diff --git a/ecosystem/indexer-grpc/indexer-grpc-data-service/src/config.rs b/ecosystem/indexer-grpc/indexer-grpc-data-service/src/config.rs index 6824343676579..6fb764432b7f7 100644 --- a/ecosystem/indexer-grpc/indexer-grpc-data-service/src/config.rs +++ b/ecosystem/indexer-grpc/indexer-grpc-data-service/src/config.rs @@ -16,6 +16,7 @@ use aptos_protos::{ use serde::{Deserialize, Serialize}; use std::{collections::HashSet, net::SocketAddr, sync::Arc}; use tonic::{codec::CompressionEncoding, transport::Server}; +use transaction_filter::BooleanTransactionFilter; pub const SERVER_NAME: &str = "idxdatasvc"; @@ -69,9 +70,17 @@ pub struct IndexerGrpcDataServiceConfig { pub enable_cache_compression: bool, #[serde(default)] pub in_memory_cache_config: InMemoryCacheConfig, - /// Sender addresses to ignore. Transactions from these addresses will not be indexed. - #[serde(default = "IndexerGrpcDataServiceConfig::default_sender_addresses_to_ignore")] - pub sender_addresses_to_ignore: Vec, + /// Any transaction that matches this filter will be stripped, meaning we remove + /// events and writesets from it before sending it downstream. This should only be + /// used in an emergency situation, e.g. when txns related to a certain module are + /// too large and are causing issues for the data service. Learn more here: + /// + /// https://www.notion.so/aptoslabs/Runbook-c006a37259394ac2ba904d6b54d180fa?pvs=4#171c210964ec42a89574fc80154f9e85 + /// + /// Generally you will want to start with this with an OR, and then list out + /// separate filters that describe each type of txn we want to strip. + #[serde(default = "IndexerGrpcDataServiceConfig::default_txns_to_strip_filter")] + pub txns_to_strip_filter: BooleanTransactionFilter, } impl IndexerGrpcDataServiceConfig { @@ -84,7 +93,7 @@ impl IndexerGrpcDataServiceConfig { redis_read_replica_address: RedisUrl, enable_cache_compression: bool, in_memory_cache_config: InMemoryCacheConfig, - sender_addresses_to_ignore: Vec, + txns_to_strip_filter: BooleanTransactionFilter, ) -> Self { Self { data_service_grpc_tls_config, @@ -97,7 +106,7 @@ impl IndexerGrpcDataServiceConfig { redis_read_replica_address, enable_cache_compression, in_memory_cache_config, - sender_addresses_to_ignore, + txns_to_strip_filter, } } @@ -109,8 +118,9 @@ impl IndexerGrpcDataServiceConfig { false } - pub const fn default_sender_addresses_to_ignore() -> Vec { - vec![] + pub const fn default_txns_to_strip_filter() -> BooleanTransactionFilter { + // This filter matches no txns. + BooleanTransactionFilter::new_or(vec![]) } } @@ -162,10 +172,7 @@ impl RunnableConfig for IndexerGrpcDataServiceConfig { self.redis_read_replica_address.clone(), self.file_store_config.clone(), self.data_service_response_channel_size, - self.sender_addresses_to_ignore - .clone() - .into_iter() - .collect::>(), + self.txns_to_strip_filter.clone(), cache_storage_format, Arc::new(in_memory_cache), )?; diff --git a/ecosystem/indexer-grpc/indexer-grpc-data-service/src/service.rs b/ecosystem/indexer-grpc/indexer-grpc-data-service/src/service.rs index 525189bd9a4f1..269f9170a4e16 100644 --- a/ecosystem/indexer-grpc/indexer-grpc-data-service/src/service.rs +++ b/ecosystem/indexer-grpc/indexer-grpc-data-service/src/service.rs @@ -42,6 +42,7 @@ use tokio::sync::mpsc::{channel, error::SendTimeoutError}; use tokio_stream::wrappers::ReceiverStream; use tonic::{Request, Response, Status}; use tracing::{error, info, warn}; +use transaction_filter::{BooleanTransactionFilter, Filterable}; use uuid::Uuid; type ResponseStream = Pin> + Send>>; @@ -77,7 +78,7 @@ pub struct RawDataServerWrapper { pub redis_client: Arc, pub file_store_config: IndexerGrpcFileStoreConfig, pub data_service_response_channel_size: usize, - pub sender_addresses_to_ignore: HashSet, + pub txns_to_strip_filter: BooleanTransactionFilter, pub cache_storage_format: StorageFormat, in_memory_cache: Arc, } @@ -87,7 +88,7 @@ impl RawDataServerWrapper { redis_address: RedisUrl, file_store_config: IndexerGrpcFileStoreConfig, data_service_response_channel_size: usize, - sender_addresses_to_ignore: HashSet, + txns_to_strip_filter: BooleanTransactionFilter, cache_storage_format: StorageFormat, in_memory_cache: Arc, ) -> anyhow::Result { @@ -99,7 +100,7 @@ impl RawDataServerWrapper { ), file_store_config, data_service_response_channel_size, - sender_addresses_to_ignore, + txns_to_strip_filter, cache_storage_format, in_memory_cache, }) @@ -175,7 +176,7 @@ impl RawData for RawDataServerWrapper { let redis_client = self.redis_client.clone(); let cache_storage_format = self.cache_storage_format; let request_metadata = Arc::new(request_metadata); - let sender_addresses_to_ignore = self.sender_addresses_to_ignore.clone(); + let txns_to_strip_filter = self.txns_to_strip_filter.clone(); let in_memory_cache = self.in_memory_cache.clone(); tokio::spawn({ let request_metadata = request_metadata.clone(); @@ -187,7 +188,7 @@ impl RawData for RawDataServerWrapper { request_metadata, transactions_count, tx, - sender_addresses_to_ignore, + txns_to_strip_filter, current_version, in_memory_cache, ) @@ -375,7 +376,7 @@ async fn data_fetcher_task( request_metadata: Arc, transactions_count: Option, tx: tokio::sync::mpsc::Sender>, - sender_addresses_to_ignore: HashSet, + txns_to_strip_filter: BooleanTransactionFilter, mut current_version: u64, in_memory_cache: Arc, ) { @@ -513,7 +514,7 @@ async fn data_fetcher_task( let resp_items = get_transactions_responses_builder( transaction_data, chain_id as u32, - &sender_addresses_to_ignore, + &txns_to_strip_filter, ); let data_latency_in_secs = resp_items .last() @@ -657,11 +658,10 @@ fn ensure_sequential_transactions(mut batches: Vec>) -> Vec, chain_id: u32, - sender_addresses_to_ignore: &HashSet, + txns_to_strip_filter: &BooleanTransactionFilter, ) -> Vec { - let filtered_transactions = - filter_transactions_for_sender_addresses(transactions, sender_addresses_to_ignore); - let chunks = chunk_transactions(filtered_transactions, MESSAGE_SIZE_LIMIT); + let stripped_transactions = strip_transactions(transactions, txns_to_strip_filter); + let chunks = chunk_transactions(stripped_transactions, MESSAGE_SIZE_LIMIT); chunks .into_iter() .map(|chunk| TransactionsResponse { @@ -937,21 +937,29 @@ async fn channel_send_multiple_with_timeout( Ok(()) } -fn filter_transactions_for_sender_addresses( +/// This function strips transactions that match the given filter. Stripping means we +/// remove the payload, signature, events, and writesets. Note, the filter can be +/// composed of many conditions, see `BooleanTransactionFilter` for more. +fn strip_transactions( transactions: Vec, - sender_addresses_to_ignore: &HashSet, + txns_to_strip_filter: &BooleanTransactionFilter, ) -> Vec { transactions .into_iter() .map(|mut txn| { - if let Some(TxnData::User(user_transaction)) = txn.txn_data.as_mut() { - if let Some(utr) = user_transaction.request.as_mut() { - if sender_addresses_to_ignore.contains(&utr.sender) { + // Note: `is_allowed` means the txn is matches the filter, in which case + // we strip it. + if txns_to_strip_filter.is_allowed(&txn) { + // TODO: transaction-filter needs to be moved to aptos-core, see https://aptos-org.slack.com/archives/C0468USBLQJ/p1718368277898979?thread_ts=1718356561.304619&cid=C0468USBLQJ + if let Some(info) = txn.info.as_mut() { + info.changes = vec![]; + } + if let Some(TxnData::User(user_transaction)) = txn.txn_data.as_mut() { + user_transaction.events = vec![]; + if let Some(utr) = user_transaction.request.as_mut() { // Wipe the payload and signature. utr.payload = None; utr.signature = None; - user_transaction.events = vec![]; - txn.info.as_mut().unwrap().changes = vec![]; } } } @@ -962,7 +970,7 @@ fn filter_transactions_for_sender_addresses( #[cfg(test)] mod tests { - use super::{ensure_sequential_transactions, filter_transactions_for_sender_addresses}; + use super::ensure_sequential_transactions; use aptos_protos::transaction::v1::{ transaction::TxnData, Event, Signature, Transaction, TransactionInfo, TransactionPayload, UserTransaction, UserTransactionRequest, WriteSetChange, @@ -1014,6 +1022,7 @@ mod tests { assert_eq!(transactions1.last().unwrap().version, 11); } + // TODO: Adapt this for strip_transactions instead. #[test] fn test_transactions_are_filter_correctly() { let sender_address = "0x1234".to_string();