Skip to content

Commit

Permalink
fix: applying account and transction filters to grpc subscription req…
Browse files Browse the repository at this point in the history
…uest
  • Loading branch information
kespinola committed Apr 15, 2024
1 parent a00ec75 commit b5d93b9
Show file tree
Hide file tree
Showing 7 changed files with 40 additions and 17 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ jobs:
run: |
cargo tree
git checkout Cargo.lock
cargo tree --frozen
cargo tree
# fmt
- name: Check fmt
Expand Down
2 changes: 1 addition & 1 deletion grpc-ingest/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,4 @@ psql:

```
PGPASSWORD=solana psql -h localhost -U solana -d solana
```
```
18 changes: 11 additions & 7 deletions grpc-ingest/config-grpc2redis.yml
Original file line number Diff line number Diff line change
@@ -1,21 +1,25 @@
prometheus: 127.0.0.1:8873
endpoint: http://127.0.0.1:10000
x_token: null
commitment: processed
commitment: finalized
accounts:
stream: ACCOUNTS
stream_maxlen: 100_000_000
stream_data_key: data
filters:
- owner:
- TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA
- metaqbxxUerdq28cj1RbAWkYQm3ybzjb6a8bt518x1s
filter:
owner:
- "metaqbxxUerdq28cj1RbAWkYQm3ybzjb6a8bt518x1s"
- "TokenzQdBNbLqP5VEhdkAS6EPFLC1PHnBqCXEpPxuEb"
- "TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA"
- "ATokenGPvbdGVxr1b2hvZbsiqW5xWH25efTNsLJA8knL"
- "BGUMAp9Gq7iTEuizy4pqaxsTyUCBK68MDfK752saRPUY"
- "CoREENxT6tW1HoK8ypY1SxRMZTcVPm7R94rH4PZNhX7d"
transactions:
stream: TRANSACTIONS
stream_maxlen: 10_000_000
stream_data_key: data
filters:
- account_include:
filter:
account_include:
- BGUMAp9Gq7iTEuizy4pqaxsTyUCBK68MDfK752saRPUY
redis:
url: redis://localhost:6379
Expand Down
4 changes: 2 additions & 2 deletions grpc-ingest/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ pub struct ConfigGrpcAccounts {
#[serde(default = "ConfigGrpcAccounts::default_stream_data_key")]
pub stream_data_key: String,

pub filters: Vec<ConfigGrpcRequestAccounts>,
pub filter: ConfigGrpcRequestAccounts,
}

impl ConfigGrpcAccounts {
Expand Down Expand Up @@ -90,7 +90,7 @@ pub struct ConfigGrpcTransactions {
#[serde(default = "ConfigGrpcTransactions::default_stream_data_key")]
pub stream_data_key: String,

pub filters: Vec<ConfigGrpcRequestTransactions>,
pub filter: ConfigGrpcRequestTransactions,
}

impl ConfigGrpcTransactions {
Expand Down
25 changes: 22 additions & 3 deletions grpc-ingest/src/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,18 @@ use {
anyhow::Context,
futures::stream::StreamExt,
redis::{streams::StreamMaxlen, RedisResult, Value as RedisValue},
std::collections::HashMap,
std::{sync::Arc, time::Duration},
tokio::{
task::JoinSet,
time::{sleep, Instant},
},
tracing::warn,
yellowstone_grpc_client::GeyserGrpcClient,
yellowstone_grpc_proto::{prelude::subscribe_update::UpdateOneof, prost::Message},
yellowstone_grpc_proto::{
geyser::SubscribeRequest, prelude::subscribe_update::UpdateOneof, prost::Message,
},
yellowstone_grpc_tools::config::GrpcRequestToProto,
};

pub async fn run(config: ConfigGrpc) -> anyhow::Result<()> {
Expand Down Expand Up @@ -41,7 +45,22 @@ pub async fn run(config: ConfigGrpc) -> anyhow::Result<()> {
.connect()
.await
.context("failed to connect go gRPC")?;
let (mut _subscribe_tx, mut stream) = client.subscribe().await?;

let mut accounts = HashMap::with_capacity(1);
let mut transactions = HashMap::with_capacity(1);

accounts.insert("das".to_string(), config.accounts.filter.clone().to_proto());
transactions.insert(
"das".to_string(),
config.transactions.filter.clone().to_proto(),
);

let request = SubscribeRequest {
accounts,
transactions,
..Default::default()
};
let (mut _subscribe_tx, mut stream) = client.subscribe_with_request(Some(request)).await?;

// recv-send loop
let mut shutdown = create_shutdown()?;
Expand Down Expand Up @@ -117,7 +136,7 @@ pub async fn run(config: ConfigGrpc) -> anyhow::Result<()> {
let result: RedisResult<RedisValue> =
pipe.atomic().query_async(&mut connection).await;

let status = if result.is_ok() { Ok(()) } else { Err(()) };
let status = result.map(|_| ()).map_err(|_| ());
redis_xadd_status_inc(&config.accounts.stream, status, pipe_accounts);
redis_xadd_status_inc(&config.transactions.stream, status, pipe_transactions);

Expand Down
4 changes: 2 additions & 2 deletions nft_ingester/src/tasks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -324,8 +324,8 @@ impl TaskManager {
tokio::task::spawn(async move {
while let Some(task) = receiver.recv().await {
if let Some(task_created_time) = task.created_at {
let bus_time =
Utc::now().timestamp_millis() - task_created_time.timestamp_millis();
let bus_time = Utc::now().timestamp_millis()
- task_created_time.and_utc().timestamp_millis();
metric! {
statsd_histogram!("ingester.bgtask.bus_time", bus_time as u64, "type" => task.name);
}
Expand Down
2 changes: 1 addition & 1 deletion program_transformers/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ pub struct TransactionInfo {
pub meta_inner_instructions: Vec<InnerInstructions>,
}

#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)]
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct DownloadMetadataInfo {
asset_data_id: Vec<u8>,
uri: String,
Expand Down

0 comments on commit b5d93b9

Please sign in to comment.