From 6f68603bee948cd0bdc5397ef2bcb6be7ea2e55b Mon Sep 17 00:00:00 2001 From: armyhaylenko Date: Thu, 16 Jan 2025 12:55:10 +0200 Subject: [PATCH] chore(ingester): remove panic from account processor job start --- .../src/processors/accounts_processor.rs | 27 ++++++++++++------- 1 file changed, 18 insertions(+), 9 deletions(-) diff --git a/nft_ingester/src/processors/accounts_processor.rs b/nft_ingester/src/processors/accounts_processor.rs index c831a80b..0a3f04bd 100644 --- a/nft_ingester/src/processors/accounts_processor.rs +++ b/nft_ingester/src/processors/accounts_processor.rs @@ -33,6 +33,8 @@ use crate::{error::IngesterError, redis_receiver::get_timestamp_from_id}; const WORKER_IDLE_TIMEOUT: Duration = Duration::from_millis(100); // interval after which buffer is flushed const FLUSH_INTERVAL: Duration = Duration::from_millis(500); +// interval to try & build account processor if the previous one fails +const ACCOUNT_PROCESSOR_RESTART_INTERVAL: Duration = Duration::from_secs(5); // EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v pub const USDC_MINT_BYTES: [u8; 32] = [ @@ -69,18 +71,25 @@ pub async fn run_accounts_processor>>>, ) { mutexed_tasks.lock().await.spawn(async move { - let account_processor = AccountsProcessor::build( + let account_processor = loop { + match AccountsProcessor::build( rx.resubscribe(), fees_buffer_size, - unprocessed_transactions_getter, - metrics, - message_process_metrics, - postgre_client, - rpc_client, - join_set, + unprocessed_transactions_getter.clone(), + metrics.clone(), + message_process_metrics.clone(), + postgre_client.clone(), + rpc_client.clone(), + join_set.clone(), ) - .await - .expect("Failed to build 'AccountsProcessor'!"); + .await { + Ok(processor) => break processor, + Err(e) => { + tracing::error!(%e, "Failed to build accounts processor, retrying in {}...", ACCOUNT_PROCESSOR_RESTART_INTERVAL.as_secs()); + tokio::time::sleep(ACCOUNT_PROCESSOR_RESTART_INTERVAL).await; + } + } + }; account_processor.process_accounts(rx, rocks_storage, account_buffer_size).await;