diff --git a/rust/Cargo.lock b/rust/Cargo.lock index f0b5a9412..e2a1d4346 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -138,6 +138,14 @@ version = "1.0.71" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c7d0618f0e0b7e8ff11427422b64564d5fb0be1940354bfe2e0529b18a9d9b8" +[[package]] +name = "aptos-in-memory-cache" +version = "0.1.0" +source = "git+https://github.com/aptos-labs/aptos-core.git?rev=07952ba261dd8301581e449e26ce17bbbc1adc46#07952ba261dd8301581e449e26ce17bbbc1adc46" +dependencies = [ + "parking_lot", +] + [[package]] name = "aptos-moving-average" version = "0.1.0" @@ -249,6 +257,34 @@ dependencies = [ "syn 2.0.48", ] +[[package]] +name = "attribute-derive" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c124f12ade4e670107b132722d0ad1a5c9790bcbc1b265336369ea05626b4498" +dependencies = [ + "attribute-derive-macro", + "proc-macro2", + "quote", + "syn 2.0.48", +] + +[[package]] +name = "attribute-derive-macro" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b217a07446e0fb086f83401a98297e2d81492122f5874db5391bd270a185f88" +dependencies = [ + "collection_literals", + "interpolator", + "proc-macro-error", + "proc-macro-utils", + "proc-macro2", + "quote", + "quote-use", + "syn 2.0.48", +] + [[package]] name = "autocfg" version = "1.1.0" @@ -530,6 +566,12 @@ version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2da6da31387c7e4ef160ffab6d5e7f00c42626fe39aea70a7b0f1773f7dd6c1b" +[[package]] +name = "collection_literals" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "186dce98367766de751c42c4f03970fc60fc012296e706ccbb9d5df9b6c1e271" + [[package]] name = "colorchoice" version = "1.0.0" @@ -684,6 +726,19 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "dashmap" +version = "5.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "978747c1d849a7d2ee5e8adc0159961c48fb7e5db2f06af6723b80123bb53856" +dependencies = [ + "cfg-if", + "hashbrown 0.14.0", + "lock_api", + "once_cell", + "parking_lot_core", +] + [[package]] name = "debugid" version = "0.8.0" @@ -714,6 +769,17 @@ dependencies = [ "serde", ] +[[package]] +name = "derive-where" +version = "1.2.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "62d671cc41a825ebabc75757b62d3d168c577f9149b2d49ece1dad1f72119d25" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.48", +] + [[package]] name = "diesel" version = "2.1.4" @@ -1098,6 +1164,26 @@ dependencies = [ "version_check", ] +[[package]] +name = "get-size" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "47b61e2dab7eedce93a83ab3468b919873ff16bac5a3e704011ff836d22b2120" +dependencies = [ + "get-size-derive", +] + +[[package]] +name = "get-size-derive" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "13a1bcfb855c1f340d5913ab542e36f25a1c56f57de79022928297632435dec2" +dependencies = [ + "attribute-derive", + "quote", + "syn 2.0.48", +] + [[package]] name = "getrandom" version = "0.2.10" @@ -1545,6 +1631,12 @@ version = "3.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8bb03732005da905c88227371639bf1ad885cc712789c011c31c5fb3ab3ccf02" +[[package]] +name = "interpolator" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "71dd52191aae121e8611f1e8dc3e324dd0dd1dee1e6dd91d10ee07a3cfb4d9d8" + [[package]] name = "io-lifetimes" version = "1.0.11" @@ -2350,6 +2442,41 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "proc-macro-error" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da25490ff9892aab3fcf7c36f08cfb902dd3e71ca0f9f9517bea02a73a5ce38c" +dependencies = [ + "proc-macro-error-attr", + "proc-macro2", + "quote", + "syn 1.0.109", + "version_check", +] + +[[package]] +name = "proc-macro-error-attr" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a1be40180e52ecc98ad80b184934baf3d0d29f979574e439af5a55274b35f869" +dependencies = [ + "proc-macro2", + "quote", + "version_check", +] + +[[package]] +name = "proc-macro-utils" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f59e109e2f795a5070e69578c4dc101068139f74616778025ae1011d4cd41a8" +dependencies = [ + "proc-macro2", + "quote", + "smallvec", +] + [[package]] name = "proc-macro2" version = "1.0.76" @@ -2367,6 +2494,7 @@ dependencies = [ "allocative", "allocative_derive", "anyhow", + "aptos-in-memory-cache", "aptos-moving-average", "aptos-protos", "async-trait", @@ -2376,6 +2504,7 @@ dependencies = [ "canonical_json", "chrono", "clap", + "dashmap", "diesel", "diesel-async", "diesel_migrations", @@ -2383,6 +2512,7 @@ dependencies = [ "field_count", "futures", "futures-util", + "get-size", "google-cloud-googleapis", "google-cloud-pubsub", "google-cloud-storage", @@ -2411,11 +2541,13 @@ dependencies = [ "tiny-keccak", "tokio", "tokio-postgres", + "tokio-util", "tonic 0.11.0", "tracing", "unescape", "url", "uuid", + "warp", ] [[package]] @@ -2546,6 +2678,29 @@ dependencies = [ "proc-macro2", ] +[[package]] +name = "quote-use" +version = "0.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a7b5abe3fe82fdeeb93f44d66a7b444dedf2e4827defb0a8e69c437b2de2ef94" +dependencies = [ + "quote", + "quote-use-macros", + "syn 2.0.48", +] + +[[package]] +name = "quote-use-macros" +version = "0.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97ea44c7e20f16017a76a245bb42188517e13d16dcb1aa18044bc406cdc3f4af" +dependencies = [ + "derive-where", + "proc-macro2", + "quote", + "syn 2.0.48", +] + [[package]] name = "rand" version = "0.8.5" @@ -3579,6 +3734,7 @@ checksum = "806fe8c2c87eccc8b3267cbae29ed3ab2d0bd37fca70ab622e46aaa9375ddb7d" dependencies = [ "bytes", "futures-core", + "futures-io", "futures-sink", "pin-project-lite", "tokio", diff --git a/rust/Cargo.toml b/rust/Cargo.toml index 6443c7a8c..6dc4ce07e 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -10,7 +10,7 @@ homepage = "https://aptoslabs.com" license = "Apache-2.0" publish = false repository = "https://github.com/aptos-labs/aptos-indexer-processors" -rust-version = "1.75" +rust-version = "1.78" [workspace.dependencies] processor = { path = "processor" } @@ -20,6 +20,7 @@ aptos-moving-average = { path = "moving-average" } ahash = { version = "0.8.7", features = ["serde"] } anyhow = "1.0.62" aptos-protos = { git = "https://github.com/aptos-labs/aptos-core.git", rev = "d76b5bb423b78b2b9affc72d3853f0d973d3f11f" } +aptos-in-memory-cache = { git = "https://github.com/aptos-labs/aptos-core.git", rev = "07952ba261dd8301581e449e26ce17bbbc1adc46" } aptos-system-utils = { git = "https://github.com/aptos-labs/aptos-core.git", rev = "4541add3fd29826ec57f22658ca286d2d6134b93" } async-trait = "0.1.53" backtrace = "0.3.58" @@ -30,6 +31,7 @@ bigdecimal = { version = "0.4.0", features = ["serde"] } bitflags = "2.5.0" chrono = { version = "0.4.19", features = ["clock", "serde"] } clap = { version = "4.3.5", features = ["derive", "unstable-styles"] } +dashmap = "5.2.0" # Do NOT enable the postgres feature here, it is conditionally enabled in a feature # block in the Cargo.toml file for the processor crate. # https://github.com/aptos-labs/aptos-indexer-processors/pull/325 @@ -54,6 +56,7 @@ field_count = "0.1.1" futures = "0.3.30" futures-core = "0.3.25" futures-util = "0.3.21" +get-size = { version = "0.1.4", features = ["derive"] } gcloud-sdk = { version = "0.20.4", features = [ "google-cloud-bigquery-storage-v1", ] } @@ -73,6 +76,7 @@ pbjson = "0.5.1" prometheus = { version = "0.13.0", default-features = false } prost = { version = "0.12.3", features = ["no-recursion-limit"] } prost-types = "0.12.3" +quick_cache = "0.4.1" regex = "1.5.5" reqwest = { version = "0.11.20", features = [ "blocking", @@ -91,6 +95,7 @@ toml = "0.7.4" tracing-subscriber = { version = "0.3.17", features = ["json", "env-filter"] } tiny-keccak = { version = "2.0.2", features = ["keccak", "sha3"] } tokio = { version = "1.35.1", features = ["full"] } +tokio-util = { version = "0.7.2", features = ["compat", "codec"] } tonic = { version = "0.11.0", features = [ "tls", "tls-roots", diff --git a/rust/processor/Cargo.toml b/rust/processor/Cargo.toml index 92a967057..f40725823 100644 --- a/rust/processor/Cargo.toml +++ b/rust/processor/Cargo.toml @@ -15,6 +15,7 @@ rust-version = { workspace = true } [dependencies] ahash = { workspace = true } anyhow = { workspace = true } +aptos-in-memory-cache = { workspace = true } aptos-moving-average = { workspace = true } aptos-protos = { workspace = true } async-trait = { workspace = true } @@ -23,6 +24,7 @@ bigdecimal = { workspace = true } bitflags = { workspace = true } chrono = { workspace = true } clap = { workspace = true } +dashmap = { workspace = true } diesel = { workspace = true } diesel-async = { workspace = true } diesel_migrations = { workspace = true } @@ -30,6 +32,7 @@ enum_dispatch = { workspace = true } field_count = { workspace = true } futures = { workspace = true } futures-util = { workspace = true } +get-size = { workspace = true } google-cloud-googleapis = { workspace = true } google-cloud-pubsub = { workspace = true } hex = { workspace = true } @@ -48,10 +51,12 @@ sha2 = { workspace = true } sha3 = { workspace = true } strum = { workspace = true } tokio = { workspace = true } +tokio-util = { workspace = true } tonic = { workspace = true } tracing = { workspace = true } unescape = { workspace = true } url = { workspace = true } +warp = { workspace = true } # Postgres SSL support native-tls = { workspace = true } diff --git a/rust/processor/src/db/common/models/events_models/events.rs b/rust/processor/src/db/common/models/events_models/events.rs index 6747636ab..15fc2f7bd 100644 --- a/rust/processor/src/db/common/models/events_models/events.rs +++ b/rust/processor/src/db/common/models/events_models/events.rs @@ -9,12 +9,16 @@ use crate::{ }; use aptos_protos::transaction::v1::Event as EventPB; use field_count::FieldCount; +use get_size::GetSize; use serde::{Deserialize, Serialize}; +use std::sync::Arc; // p99 currently is 303 so using 300 as a safe max length const EVENT_TYPE_MAX_LENGTH: usize = 300; -#[derive(Clone, Debug, Deserialize, FieldCount, Identifiable, Insertable, Serialize)] +#[derive( + Clone, Debug, Deserialize, FieldCount, Identifiable, Insertable, Serialize, Eq, PartialEq, +)] #[diesel(primary_key(transaction_version, event_index))] #[diesel(table_name = events)] pub struct Event { @@ -74,3 +78,86 @@ impl Event { // Prevent conflicts with other things named `Event` pub type EventModel = Event; + +#[derive(Clone, Debug, GetSize, Deserialize, Serialize, Eq, PartialEq)] +pub struct EventContext { + pub coin_type: String, +} + +#[derive(Clone, Debug, GetSize, Serialize, Deserialize, Eq, PartialEq)] +pub struct EventStreamMessage { + pub sequence_number: i64, + pub creation_number: i64, + pub account_address: String, + pub transaction_version: i64, + pub transaction_block_height: i64, + pub type_: String, + #[get_size(size_fn = get_serde_json_size_estimate)] + pub data: serde_json::Value, + pub event_index: i64, + pub indexed_type: String, + #[get_size(size = 12)] + pub transaction_timestamp: chrono::NaiveDateTime, + pub context: Option, +} + +fn get_serde_json_size_estimate(value: &serde_json::Value) -> usize { + match value { + serde_json::Value::Null => 0, + serde_json::Value::Bool(_) => 1, + serde_json::Value::Number(_) => 8, + serde_json::Value::String(s) => s.len(), + serde_json::Value::Array(arr) => arr.iter().map(get_serde_json_size_estimate).sum(), + serde_json::Value::Object(obj) => obj + .iter() + .map(|(k, v)| k.len() + get_serde_json_size_estimate(v)) + .sum(), + } +} + +impl EventStreamMessage { + pub fn from_event( + event: &Event, + context: Option, + transaction_timestamp: chrono::NaiveDateTime, + ) -> Self { + EventStreamMessage { + account_address: event.account_address.clone(), + creation_number: event.creation_number, + sequence_number: event.sequence_number, + transaction_version: event.transaction_version, + transaction_block_height: event.transaction_block_height, + type_: event.type_.clone(), + data: event.data.clone(), + event_index: event.event_index, + indexed_type: event.indexed_type.clone(), + transaction_timestamp, + context: context.clone(), + } + } +} + +#[derive(Clone, Debug, GetSize, Serialize, Deserialize, Eq, PartialEq)] +pub struct CachedEvents { + pub transaction_version: i64, + pub events: Vec>, +} + +impl CachedEvents { + pub fn from_event_stream_message( + transaction_version: i64, + event_stream_message: Vec>, + ) -> Self { + CachedEvents { + transaction_version, + events: event_stream_message.clone(), + } + } + + pub fn empty(transaction_version: i64) -> Self { + CachedEvents { + transaction_version, + events: Vec::with_capacity(0), + } + } +} diff --git a/rust/processor/src/processors/event_stream_processor.rs b/rust/processor/src/processors/event_stream_processor.rs new file mode 100644 index 000000000..0004149cf --- /dev/null +++ b/rust/processor/src/processors/event_stream_processor.rs @@ -0,0 +1,170 @@ +// Copyright © Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use crate::{ + db::common::models::{ + events_models::events::{CachedEvents, EventContext, EventModel, EventStreamMessage}, + fungible_asset_models::{ + v2_fungible_asset_activities::{EventToCoinType, FungibleAssetActivity}, + v2_fungible_asset_balances::FungibleAssetBalance, + }, + }, + processors::{DefaultProcessingResult, ProcessingResult, ProcessorName, ProcessorTrait}, + utils::{ + database::ArcDbPool, + in_memory_cache::InMemoryCache, + util::{get_entry_function_from_user_request, parse_timestamp}, + }, +}; +use ahash::AHashMap; +use aptos_in_memory_cache::Cache; +use aptos_protos::transaction::v1::{transaction::TxnData, write_set_change::Change, Transaction}; +use async_trait::async_trait; +use std::{fmt::Debug, sync::Arc}; + +pub struct EventStreamProcessor { + connection_pool: ArcDbPool, + cache: Arc, +} + +impl EventStreamProcessor { + pub fn new(connection_pool: ArcDbPool, cache: Arc) -> Self { + Self { + connection_pool, + cache, + } + } +} + +impl Debug for EventStreamProcessor { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let state = &self.connection_pool.state(); + write!( + f, + "EventStreamProcessor {{ connections: {:?} idle_connections: {:?} }}", + state.connections, state.idle_connections + ) + } +} + +#[async_trait] +impl ProcessorTrait for EventStreamProcessor { + fn name(&self) -> &'static str { + ProcessorName::EventStreamProcessor.into() + } + + async fn process_transactions( + &self, + transactions: Vec, + start_version: u64, + end_version: u64, + _: Option, + ) -> anyhow::Result { + let processing_start = std::time::Instant::now(); + let mut batch = vec![]; + for txn in &transactions { + let txn_version = txn.version as i64; + let block_height = txn.block_height as i64; + let txn_data = txn.txn_data.as_ref().expect("Txn Data doesn't exit!"); + let transaction_info = txn.info.as_ref().expect("Transaction info doesn't exist!"); + let txn_timestamp = parse_timestamp(txn.timestamp.as_ref().unwrap(), txn_version); + let default = vec![]; + let (raw_events, _user_request, entry_function_id_str) = match txn_data { + TxnData::BlockMetadata(tx_inner) => (&tx_inner.events, None, None), + TxnData::Genesis(tx_inner) => (&tx_inner.events, None, None), + TxnData::User(tx_inner) => { + let user_request = tx_inner + .request + .as_ref() + .expect("Sends is not present in user txn"); + let entry_function_id_str = get_entry_function_from_user_request(user_request); + (&tx_inner.events, Some(user_request), entry_function_id_str) + }, + _ => (&default, None, None), + }; + + // This is because v1 events (deposit/withdraw) don't have coin type so the only way is to match + // the event to the resource using the event guid + let mut event_to_v1_coin_type: EventToCoinType = AHashMap::new(); + + for (index, wsc) in transaction_info.changes.iter().enumerate() { + if let Change::WriteResource(write_resource) = wsc.change.as_ref().unwrap() { + if let Some((_balance, _current_balance, event_to_coin)) = + FungibleAssetBalance::get_v1_from_write_resource( + write_resource, + index as i64, + txn_version, + txn_timestamp, + ) + .unwrap() + { + event_to_v1_coin_type.extend(event_to_coin); + } + } + } + + let mut event_context = AHashMap::new(); + for (index, event) in raw_events.iter().enumerate() { + // Only support v1 for now + if let Some(v1_activity) = FungibleAssetActivity::get_v1_from_event( + event, + txn_version, + block_height, + txn_timestamp, + &entry_function_id_str, + &event_to_v1_coin_type, + index as i64, + ) + .unwrap_or_else(|e| { + tracing::error!( + transaction_version = txn_version, + index = index, + error = ?e, + "[Parser] error parsing fungible asset activity v1"); + panic!("[Parser] error parsing fungible asset activity v1"); + }) { + event_context.insert((txn_version, index as i64), EventContext { + coin_type: v1_activity.asset_type.clone(), + }); + } + } + + batch.push(CachedEvents { + transaction_version: txn_version, + events: EventModel::from_events(raw_events, txn_version, block_height) + .iter() + .map(|event| { + let context = event_context + .get(&(txn_version, event.event_index)) + .cloned(); + Arc::new(EventStreamMessage::from_event( + event, + context, + txn_timestamp.clone(), + )) + }) + .collect(), + }); + } + + for events in batch { + self.cache + .insert(events.transaction_version, events.clone()); + } + + let processing_duration_in_secs = processing_start.elapsed().as_secs_f64(); + Ok(ProcessingResult::DefaultProcessingResult( + DefaultProcessingResult { + start_version, + end_version, + last_transaction_timestamp: transactions.last().unwrap().timestamp.clone(), + processing_duration_in_secs, + db_insertion_duration_in_secs: 0.0, + }, + )) + } + + fn connection_pool(&self) -> &ArcDbPool { + &self.connection_pool + } +} diff --git a/rust/processor/src/processors/mod.rs b/rust/processor/src/processors/mod.rs index b3d33de5d..908a1a43d 100644 --- a/rust/processor/src/processors/mod.rs +++ b/rust/processor/src/processors/mod.rs @@ -8,6 +8,7 @@ pub mod account_transactions_processor; pub mod ans_processor; pub mod coin_processor; pub mod default_processor; +pub mod event_stream_processor; pub mod events_processor; pub mod fungible_asset_processor; pub mod monitoring_processor; @@ -25,6 +26,7 @@ use self::{ ans_processor::{AnsProcessor, AnsProcessorConfig}, coin_processor::CoinProcessor, default_processor::DefaultProcessor, + event_stream_processor::EventStreamProcessor, events_processor::EventsProcessor, fungible_asset_processor::FungibleAssetProcessor, monitoring_processor::MonitoringProcessor, @@ -184,6 +186,7 @@ pub enum ProcessorConfig { AnsProcessor(AnsProcessorConfig), CoinProcessor, DefaultProcessor, + EventStreamProcessor, EventsProcessor, FungibleAssetProcessor, MonitoringProcessor, @@ -231,6 +234,7 @@ pub enum Processor { AnsProcessor, CoinProcessor, DefaultProcessor, + EventStreamProcessor, EventsProcessor, FungibleAssetProcessor, MonitoringProcessor, diff --git a/rust/processor/src/utils/counters.rs b/rust/processor/src/utils/counters.rs index f4a6a57e1..5f83e183e 100644 --- a/rust/processor/src/utils/counters.rs +++ b/rust/processor/src/utils/counters.rs @@ -3,8 +3,9 @@ use once_cell::sync::Lazy; use prometheus::{ - register_gauge_vec, register_int_counter, register_int_counter_vec, register_int_gauge_vec, - GaugeVec, IntCounter, IntCounterVec, IntGaugeVec, + register_gauge, register_gauge_vec, register_int_counter, register_int_counter_vec, + register_int_gauge, register_int_gauge_vec, Gauge, GaugeVec, IntCounter, IntCounterVec, + IntGauge, IntGaugeVec, }; pub enum ProcessorStep { @@ -282,3 +283,35 @@ pub static PARQUET_HANDLER_BUFFER_SIZE: Lazy = Lazy::new(|| { ) .unwrap() }); + +/// Indexer gRPC to Processor 1 serve latency +pub static GRPC_TO_PROCESSOR_1_SERVE_LATENCY_IN_SECS: Lazy = Lazy::new(|| { + register_gauge!( + "indexer_grpc_to_processor_1_serve_latency_in_secs", + "Indexer gRPC to Processor 1 serve latency" + ) + .unwrap() +}); + +/// First value in cache +pub static FIRST_TRANSACTION_VERSION_IN_CACHE: Lazy = Lazy::new(|| { + register_int_gauge!( + "indexer_first_transaction_version_in_cache", + "First value in cache" + ) + .unwrap() +}); + +/// Last value in cache +pub static LAST_TRANSACTION_VERSION_IN_CACHE: Lazy = Lazy::new(|| { + register_int_gauge!( + "indexer_last_transaction_version_in_cache", + "Last value in cache" + ) + .unwrap() +}); + +/// Size of cache in bytes +pub static CACHE_SIZE_IN_BYTES: Lazy = Lazy::new(|| { + register_int_gauge!("indexer_cache_size_in_bytes", "Size of cache in bytes").unwrap() +}); diff --git a/rust/processor/src/utils/event_ordering.rs b/rust/processor/src/utils/event_ordering.rs new file mode 100644 index 000000000..ff6fa76a9 --- /dev/null +++ b/rust/processor/src/utils/event_ordering.rs @@ -0,0 +1,88 @@ +use super::{counters::CACHE_SIZE_IN_BYTES, stream::EventCacheKey}; +use crate::{ + models::events_models::events::{CachedEvent, EventOrder, EventStreamMessage}, + utils::counters::LAST_TRANSACTION_VERSION_IN_CACHE, +}; +use ahash::AHashMap; +use aptos_in_memory_cache::StreamableOrderedCache; +use kanal::AsyncReceiver; +use std::sync::Arc; +use tracing::error; + +#[derive(Clone, Debug, Eq, PartialEq)] +pub struct TransactionEvents { + pub transaction_version: i64, + pub transaction_timestamp: chrono::NaiveDateTime, + pub events: Vec, +} + +impl Ord for TransactionEvents { + // Comparison must be reversed because BinaryHeap is a max-heap + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + other.transaction_version.cmp(&self.transaction_version) + } +} + +impl PartialOrd for TransactionEvents { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +pub struct EventOrdering + 'static> { + rx: AsyncReceiver>, + cache: Arc, +} + +impl + 'static> EventOrdering { + pub fn new(rx: AsyncReceiver>, cache: Arc) -> Self { + Self { rx, cache } + } + + pub async fn run(&self, starting_version: i64) { + let mut map = AHashMap::new(); + let rx = self.rx.clone(); + let mut next_transaction_version = starting_version; + + loop { + let batch_events = rx.recv().await.unwrap_or_else(|e| { + error!( + error = ?e, + "[Event Stream] Failed to receive message from channel" + ); + panic!(); + }); + + for events in batch_events { + map.insert(events.transaction_version, events); + } + + while let Some(transaction_events) = map.remove(&next_transaction_version) { + let transaction_timestamp = transaction_events.transaction_timestamp; + let num_events = transaction_events.events.len(); + if num_events == 0 { + // Add empty event if transaction doesn't have any events + self.cache.insert( + EventCacheKey::new(transaction_events.transaction_version, 0), + CachedEvent::empty(transaction_events.transaction_version), + ); + } else { + // Add all events to cache + for event in transaction_events.events { + self.cache.insert( + EventCacheKey::new(event.transaction_version, event.event_index), + CachedEvent::from_event_stream_message( + &EventStreamMessage::from_event_order(&event, transaction_timestamp), + num_events, + ), + ); + } + } + LAST_TRANSACTION_VERSION_IN_CACHE + .set(self.cache.last_key().unwrap().transaction_version); + CACHE_SIZE_IN_BYTES.set(self.cache.total_size() as i64); + next_transaction_version += 1; + } + } + } +} diff --git a/rust/processor/src/utils/filter.rs b/rust/processor/src/utils/filter.rs new file mode 100644 index 000000000..c20ebe07b --- /dev/null +++ b/rust/processor/src/utils/filter.rs @@ -0,0 +1,20 @@ +use dashmap::DashSet; + +#[derive(Clone, Debug, Default)] +pub struct EventFilter { + pub accounts: DashSet, + pub types: DashSet, +} + +impl EventFilter { + pub fn new() -> Self { + Self { + accounts: DashSet::new(), + types: DashSet::new(), + } + } + + pub fn is_empty(&self) -> bool { + self.accounts.is_empty() && self.types.is_empty() + } +} diff --git a/rust/processor/src/utils/filter_editor.rs b/rust/processor/src/utils/filter_editor.rs new file mode 100644 index 000000000..f08b12f0f --- /dev/null +++ b/rust/processor/src/utils/filter_editor.rs @@ -0,0 +1,75 @@ +// Copyright © Aptos Foundation + +use crate::utils::filter::EventFilter; +use futures::{stream::SplitStream, StreamExt}; +use std::sync::Arc; +use tokio::sync::Notify; +use tracing::{error, info}; +use warp::filters::ws::WebSocket; + +pub struct FilterEditor { + rx: SplitStream, + filter: Arc, + filter_edit_notify: Arc, +} + +impl FilterEditor { + pub fn new( + rx: SplitStream, + filter: Arc, + filter_edit_notify: Arc, + ) -> Self { + info!("Received WebSocket connection"); + Self { + rx, + filter, + filter_edit_notify, + } + } + + /// Maintains websocket connection and sends messages from channel + pub async fn run(&mut self) { + while let Some(Ok(msg)) = self.rx.next().await { + if let Ok(policy) = msg.to_str() { + let policy = policy.split(',').collect::>(); + match policy[0] { + "account" => match policy[1] { + "add" => { + self.filter.accounts.insert(policy[2].to_string()); + }, + "remove" => { + self.filter.accounts.remove(policy[2]); + }, + _ => { + error!("[Event Stream] Invalid filter command: {}", policy[1]); + }, + }, + "type" => match policy[1] { + "add" => { + self.filter.types.insert(policy[2].to_string()); + }, + "remove" => { + self.filter.types.remove(policy[2]); + }, + _ => { + error!("[Event Stream] Invalid filter command: {}", policy[1]); + }, + }, + _ => { + error!("[Event Stream] Invalid filter type: {}", policy[0]); + }, + } + self.filter_edit_notify.notify_waiters(); + } + } + } +} + +pub async fn spawn_filter_editor( + rx: SplitStream, + filter: Arc, + filter_edit_notify: Arc, +) { + let mut filter = FilterEditor::new(rx, filter, filter_edit_notify); + filter.run().await; +} diff --git a/rust/processor/src/utils/in_memory_cache.rs b/rust/processor/src/utils/in_memory_cache.rs new file mode 100644 index 000000000..18dcd7cc3 --- /dev/null +++ b/rust/processor/src/utils/in_memory_cache.rs @@ -0,0 +1,194 @@ +// Copyright © Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use crate::db::common::models::events_models::events::CachedEvents; +use aptos_in_memory_cache::{caches::sync_mutex::SyncMutexCache, Cache, SizedCache}; +use futures::{stream, Stream}; +use get_size::GetSize; +use std::sync::{ + atomic::{AtomicI64, Ordering}, + Arc, +}; +use tokio::{sync::Notify, task::JoinHandle}; + +#[derive(Debug, Clone)] +pub struct InMemoryCacheMetadata { + pub eviction_trigger_size_in_bytes: usize, + pub target_size_in_bytes: usize, + pub capacity: usize, +} + +// #[derive(Debug)] +pub struct InMemoryCache { + pub metadata: Arc, + pub cache: Arc>, + pub eviction_notify: Arc, + // Metadata for stream + pub head: Arc, + pub tail: Arc, + pub watermark: Arc, + pub stream_notify: Arc, +} + +impl Cache for InMemoryCache { + fn get(&self, key: &i64) -> Option> { + let key = &(*key as usize); + self.cache.get(key).and_then(|entry| { + if entry.key == *key { + return Some(entry.value.clone()); + } + None + }) + } + + fn insert(&self, key: i64, value: CachedEvents) { + let size_in_bytes = value.get_size(); + self.cache + .insert_with_size(key as usize, Arc::new(value), size_in_bytes); + + // Fill pointers if cache was empty + if self.head.load(Ordering::Relaxed) == -1 { + self.head.store(key, Ordering::Relaxed); + } + + if self.tail.load(Ordering::Relaxed) == -1 { + self.tail.store(key, Ordering::Relaxed); + } + + if self.watermark.load(Ordering::Relaxed) == -1 { + self.watermark.store(key, Ordering::Relaxed); + } + + // Since pointers have been filled, the unwraps below are safe + // Update watermark to highest seen transaction version + if key > self.watermark.load(Ordering::Relaxed) { + self.watermark.store(key, Ordering::Relaxed); + } + + // Update tail to the latest consecutive transaction version + loop { + let tail = self.tail.load(Ordering::Relaxed); + let next_tail = self.get(&(tail + 1)); + + // If the next transaction does not exist or is not consecutive, break + // Unwrap ok because next_tail is not None + if next_tail.is_none() || next_tail.unwrap().transaction_version != tail + 1 { + break; + } + + // Update tail and notify stream + self.tail.store(tail + 1, Ordering::Relaxed); + self.stream_notify.notify_waiters(); + } + + // Notify eviction task if cache size exceeds trigger size + if self.cache.total_size() >= self.metadata.eviction_trigger_size_in_bytes { + self.eviction_notify.notify_one(); + } + } + + fn total_size(&self) -> usize { + self.cache.total_size() as usize + } +} + +impl InMemoryCache { + pub fn with_capacity( + eviction_trigger_size_in_bytes: usize, + target_size_in_bytes: usize, + capacity: usize, + ) -> Arc { + let c = SyncMutexCache::with_capacity(capacity); + let metadata = Arc::new(InMemoryCacheMetadata { + eviction_trigger_size_in_bytes, + target_size_in_bytes, + capacity: c.capacity(), + }); + let cache = Arc::new(c); + let eviction_notify = Arc::new(Notify::new()); + + let out = Arc::new(Self { + metadata, + cache, + eviction_notify, + stream_notify: Arc::new(Notify::new()), + head: Arc::new(AtomicI64::new(-1)), + tail: Arc::new(AtomicI64::new(-1)), + watermark: Arc::new(AtomicI64::new(-1)), + }); + + spawn_eviction_task(out.clone()); + + out + } + + /// Returns a stream of values in the cache starting from the given (transaction_version, event_index). + /// If the stream falls behind, the stream will return None for the next value (indicating that it should be reset). + pub fn get_stream( + &self, + starting_key: Option, + ) -> impl Stream> + '_ { + // Start from the starting key if provided, otherwise start from the last key + let initial_state = starting_key.unwrap_or(self.tail.load(Ordering::Relaxed)); + + Box::pin(stream::unfold(initial_state, move |state| { + async move { + // If the current key is None, the cache is empty + // Wait until a new value is inserted before assigning it + let mut current_transaction_version = state; + if current_transaction_version == -1 { + self.eviction_notify.notified().await; + // Ok to unwrap because the last_transaction_version should be populated after the first insert + current_transaction_version = self.tail.load(Ordering::Relaxed); + } + + let last_transaction_version = self.tail.load(Ordering::Relaxed); + + // Stream is ahead of cache + // If the last value in the cache has already been streamed, wait until the next value is inserted and return it + if current_transaction_version == last_transaction_version + 1 { + // Wait until the next value is inserted + self.stream_notify.notified().await; + let cached_events = self.get(¤t_transaction_version).unwrap(); + return Some((cached_events, current_transaction_version + 1)); + } + // Stream is in cache bounds + // If the next value to stream is in the cache, return it + else if let Some(cached_events) = self.get(¤t_transaction_version) { + return Some((cached_events, current_transaction_version + 1)); + } + + // If we get here, the stream is behind the cache + // Stop the stream + None + } + })) + } +} + +/// Perform cache eviction on a separate task. +fn spawn_eviction_task(cache: Arc) -> JoinHandle<()> { + tokio::spawn(async move { + loop { + cache.eviction_notify.notified().await; + // Evict entries until the cache size is below the target size + while cache.total_size() > cache.metadata.target_size_in_bytes { + // Unwrap ok because eviction_notify is only notified after head is populated + let eviction_key = cache.head.load(Ordering::Relaxed) as usize; + if let Some(value) = cache.cache.evict(&eviction_key) { + if value.key > eviction_key { + cache.cache.insert_with_size( + value.key, + value.value.clone(), + value.size_in_bytes, + ); + break; + } + } + + // Update head + cache.head.store(eviction_key as i64, Ordering::Relaxed); + } + } + }) +} diff --git a/rust/processor/src/utils/mod.rs b/rust/processor/src/utils/mod.rs index 4f13167fe..aa0c5044f 100644 --- a/rust/processor/src/utils/mod.rs +++ b/rust/processor/src/utils/mod.rs @@ -3,4 +3,8 @@ pub mod counters; pub mod database; +pub mod filter; +pub mod filter_editor; +pub mod in_memory_cache; +pub mod stream; pub mod util; diff --git a/rust/processor/src/utils/stream.rs b/rust/processor/src/utils/stream.rs new file mode 100644 index 000000000..a291b3a16 --- /dev/null +++ b/rust/processor/src/utils/stream.rs @@ -0,0 +1,128 @@ +// Copyright © Aptos Foundation + +use super::in_memory_cache::InMemoryCache; +use crate::{ + db::common::models::events_models::events::CachedEvents, + utils::{counters::GRPC_TO_PROCESSOR_1_SERVE_LATENCY_IN_SECS, filter::EventFilter}, +}; +use futures::{stream::SplitSink, SinkExt, StreamExt}; +use std::{fmt::Debug, sync::Arc}; +use tokio::sync::Notify; +use tracing::{info, warn}; +use warp::filters::ws::{Message, WebSocket}; + +pub struct Stream { + tx: SplitSink, + filter: Arc, + cache: Arc, + filter_edit_notify: Arc, +} + +impl Stream { + pub fn new( + tx: SplitSink, + filter: Arc, + cache: Arc, + filter_edit_notify: Arc, + ) -> Self { + info!("Received WebSocket connection"); + Self { + tx, + filter, + cache, + filter_edit_notify, + } + } + + /// Maintains websocket connection and sends messages from channel + pub async fn run(&mut self, starting_event: Option) { + let cache = self.cache.clone(); + let mut stream = Box::pin(cache.get_stream(starting_event)); + while let Some(cached_events) = stream.next().await { + if self.filter.is_empty() { + self.filter_edit_notify.notified().await; + } + + if let Err(e) = self.send_events(cached_events).await { + warn!( + error = ?e, + "Error sending events to WebSocket" + ); + break; + } + } + + if let Err(e) = self.tx.send(Message::text("Stream ended")).await { + warn!("Error sending error message: {:?}", e); + } + + if let Err(e) = self.tx.send(Message::close()).await { + warn!("Error sending close message: {:?}", e); + } + } + + async fn send_events(&mut self, cached_events: Arc) -> anyhow::Result<()> { + for event in cached_events.events.clone() { + if self.filter.accounts.contains(&event.account_address) + || self.filter.types.contains(&event.type_) + { + GRPC_TO_PROCESSOR_1_SERVE_LATENCY_IN_SECS.set({ + use chrono::TimeZone; + let transaction_timestamp = + chrono::Utc.from_utc_datetime(&event.transaction_timestamp); + let transaction_timestamp = std::time::SystemTime::from(transaction_timestamp); + std::time::SystemTime::now() + .duration_since(transaction_timestamp) + .unwrap_or_default() + .as_secs_f64() + }); + let msg = serde_json::to_string(&event).unwrap_or_default(); + info!( + account_address = event.account_address, + transaction_version = event.transaction_version, + event_index = event.event_index, + event = msg, + "Sending event through WebSocket" + ); + + if let Err(e) = self.tx.send(Message::text(msg)).await { + warn!( + error = ?e, + "[Event Stream] Failed to send message to WebSocket" + ); + return Err(anyhow::anyhow!( + "Failed to send message to WebSocket: {}", + e + )); + } + } + } + Ok(()) + } +} + +pub async fn spawn_stream( + tx: SplitSink, + filter: Arc, + cache: Arc, + starting_event: Option, + filter_edit_notify: Arc, +) { + let mut stream = Stream::new(tx, filter, cache, filter_edit_notify); + stream.run(starting_event).await; +} + +#[derive(Debug, Clone, Eq, PartialEq, Hash)] +pub struct EventCacheKey { + pub transaction_version: i64, + pub event_index: i64, +} + +impl EventCacheKey { + pub fn new(transaction_version: i64, event_index: i64) -> Self { + Self { + transaction_version, + event_index, + } + } +} diff --git a/rust/processor/src/worker.rs b/rust/processor/src/worker.rs index bf5de00f7..7841c0d87 100644 --- a/rust/processor/src/worker.rs +++ b/rust/processor/src/worker.rs @@ -9,7 +9,8 @@ use crate::{ processors::{ account_transactions_processor::AccountTransactionsProcessor, ans_processor::AnsProcessor, coin_processor::CoinProcessor, default_processor::DefaultProcessor, - events_processor::EventsProcessor, fungible_asset_processor::FungibleAssetProcessor, + event_stream_processor::EventStreamProcessor, events_processor::EventsProcessor, + fungible_asset_processor::FungibleAssetProcessor, monitoring_processor::MonitoringProcessor, nft_metadata_processor::NftMetadataProcessor, objects_processor::ObjectsProcessor, parquet_default_processor::DefaultParquetProcessor, stake_processor::StakeProcessor, token_processor::TokenProcessor, @@ -33,6 +34,10 @@ use crate::{ database::{ execute_with_better_error_conn, new_db_pool, run_pending_migrations, ArcDbPool, }, + filter::EventFilter, + filter_editor::spawn_filter_editor, + in_memory_cache::InMemoryCache, + stream::spawn_stream, util::{time_diff_since_pb_timestamp_in_secs, timestamp_to_iso, timestamp_to_unixtime}, }, }; @@ -40,11 +45,13 @@ use ahash::AHashMap; use anyhow::{Context, Result}; use aptos_moving_average::MovingAverage; use bitflags::bitflags; +use futures::StreamExt; use kanal::AsyncSender; -use std::collections::HashSet; -use tokio::task::JoinHandle; +use std::{collections::HashSet, sync::Arc}; +use tokio::{sync::Notify, task::JoinHandle}; use tracing::{debug, error, info}; use url::Url; +use warp::Filter; // this is how large the fetch queue should be. Each bucket should have a max of 80MB or so, so a batch // of 50 means that we could potentially have at least 4.8GB of data in memory at any given time and that we should provision @@ -64,6 +71,28 @@ bitflags! { } } +/// Handles WebSocket connection from /filter endpoint +async fn handle_websocket( + websocket: warp::ws::WebSocket, + query_params: AHashMap, + cache: Arc, +) { + let (tx, rx) = websocket.split(); + let filter = Arc::new(EventFilter::new()); + + let start: Option = query_params.get("start").map(|s| s.parse::().unwrap()); + + let filter_edit = filter.clone(); + let filter_edit_notify = Arc::new(Notify::new()); + + let filter_edit_notify_clone = filter_edit_notify.clone(); + tokio::spawn( + async move { spawn_filter_editor(rx, filter_edit, filter_edit_notify_clone).await }, + ); + + spawn_stream(tx, filter.clone(), cache.clone(), start, filter_edit_notify).await; +} + pub struct Worker { pub db_pool: ArcDbPool, pub processor_config: ProcessorConfig, @@ -201,6 +230,8 @@ impl Worker { "[Parser] Building processor", ); + let cache = InMemoryCache::with_capacity(3_300_000_000, 3_000_000_000, 1_000_000); + let concurrent_tasks = self.number_concurrent_processing_tasks; // get the chain id @@ -264,6 +295,8 @@ impl Worker { .await }); + println!("1"); + // Create a gap detector task that will panic if there is a gap in the processing let (gap_detector_sender, gap_detector_receiver) = kanal::bounded_async::(BUFFER_SIZE); @@ -275,6 +308,7 @@ impl Worker { self.deprecated_tables, self.db_pool.clone(), Some(gap_detector_sender.clone()), + cache.clone(), ); let gap_detection_batch_size: u64 = self.parquet_gap_detection_batch_size; @@ -290,6 +324,7 @@ impl Worker { self.deprecated_tables, self.db_pool.clone(), None, + cache.clone(), ); let gap_detection_batch_size = self.gap_detection_batch_size; @@ -299,6 +334,7 @@ impl Worker { Some(gap_detector_sender), ) }; + println!("2"); tokio::spawn(async move { create_gap_detector_status_tracker_loop( @@ -310,6 +346,25 @@ impl Worker { .await; }); + println!("3"); + if self.processor_config.name() == "event_stream_processor" { + println!("4"); + // Create web server + let cache_arc = cache.clone(); + let cache_ws = warp::any().map(move || cache_arc.clone()); + tokio::spawn(async move { + let ws_route = warp::path("stream") + .and(warp::ws()) + .and(warp::query::>()) + .and(cache_ws) + .map(|ws: warp::ws::Ws, query_params, cache| { + ws.on_upgrade(move |socket| handle_websocket(socket, query_params, cache)) + }); + + warp::serve(ws_route).run(([0, 0, 0, 0], 12345)).await; + }); + } + // This is the consumer side of the channel. These are the major states: // 1. We're backfilling so we should expect many concurrent threads to process transactions // 2. We're caught up so we should expect a single thread to process transactions @@ -327,8 +382,13 @@ impl Worker { let mut processor_tasks = vec![fetcher_task]; for task_index in 0..concurrent_tasks { - let join_handle: JoinHandle<()> = self - .launch_processor_task(task_index, receiver.clone(), gap_detector_sender.clone()) + let join_handle = self + .launch_processor_task( + task_index, + receiver.clone(), + gap_detector_sender.clone(), + cache.clone(), + ) .await; processor_tasks.push(join_handle); } @@ -352,6 +412,7 @@ impl Worker { task_index: usize, receiver: kanal::AsyncReceiver, gap_detector_sender: Option>, + cache: Arc, ) -> JoinHandle<()> { let processor_name = self.processor_config.name(); let stream_address = self.indexer_grpc_data_service_address.to_string(); @@ -365,6 +426,7 @@ impl Worker { self.deprecated_tables, self.db_pool.clone(), gap_detector_sender.clone(), + cache.clone(), ); let concurrent_tasks = self.number_concurrent_processing_tasks; @@ -814,6 +876,7 @@ pub fn build_processor( deprecated_tables: TableFlags, db_pool: ArcDbPool, gap_detector_sender: Option>, // Parquet only + cache: Arc, ) -> Processor { match config { ProcessorConfig::AccountTransactionsProcessor => Processor::from( @@ -835,6 +898,9 @@ pub fn build_processor( ProcessorConfig::EventsProcessor => { Processor::from(EventsProcessor::new(db_pool, per_table_chunk_sizes)) }, + ProcessorConfig::EventStreamProcessor => { + Processor::from(EventStreamProcessor::new(db_pool, cache)) + }, ProcessorConfig::FungibleAssetProcessor => { Processor::from(FungibleAssetProcessor::new(db_pool, per_table_chunk_sizes)) },