diff --git a/auction-server/README.md b/auction-server/README.md index c860dfd3..08792dcc 100644 --- a/auction-server/README.md +++ b/auction-server/README.md @@ -66,7 +66,7 @@ docker run --name jaeger \ -e COLLECTOR_OTLP_ENABLED=true \ -p 16686:16686 \ -p 4317:4317 \ - jaegertracing/all-in-one + jaegertracing/all-in-one:1.63.0 ``` And access the jaeger UI at `http://127.0.0.1:16686`. diff --git a/auction-server/src/api.rs b/auction-server/src/api.rs index ddcb1bf8..5680d1ff 100644 --- a/auction-server/src/api.rs +++ b/auction-server/src/api.rs @@ -89,7 +89,7 @@ async fn root() -> String { pub mod profile; pub(crate) mod ws; -#[derive(Debug)] +#[derive(Debug, Clone)] pub enum RestError { /// The request contained invalid parameters. BadParameters(String), diff --git a/auction-server/src/auction/entities/bid.rs b/auction-server/src/auction/entities/bid.rs index 8f91f8c5..23e81334 100644 --- a/auction-server/src/auction/entities/bid.rs +++ b/auction-server/src/auction/entities/bid.rs @@ -32,7 +32,11 @@ use { transaction::VersionedTransaction, }, std::{ - fmt::Debug, + fmt::{ + Debug, + Display, + Formatter, + }, hash::Hash, }, time::OffsetDateTime, @@ -170,7 +174,7 @@ pub type PermissionKey = <::BidChainDataType as BidChainData pub type TxHash = <::BidStatusType as BidStatus>::TxHash; pub trait BidChainData: Send + Sync + Clone + Debug + PartialEq { - type PermissionKey: Send + Sync + Debug + Hash + Eq + Clone; + type PermissionKey: Send + Sync + Debug + Hash + Eq + Clone + Debug; fn get_permission_key(&self) -> Self::PermissionKey; } @@ -269,3 +273,17 @@ impl From<(Bid, bool)> for MulticallData { } } } + +pub struct BidContainerTracing<'a, T: ChainTrait>(pub &'a [Bid]); +impl Display for BidContainerTracing<'_, T> { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!( + f, + "{:?}", + self.0 + .iter() + .map(|x| x.id.to_string()) + .collect::>() + ) + } +} diff --git a/auction-server/src/auction/repository/add_auction.rs b/auction-server/src/auction/repository/add_auction.rs index 77c01738..52f7738f 100644 --- a/auction-server/src/auction/repository/add_auction.rs +++ b/auction-server/src/auction/repository/add_auction.rs @@ -8,10 +8,12 @@ use { }; impl Repository { + #[tracing::instrument(skip_all, name = "add_auction_repo", fields(auction_id))] pub async fn add_auction( &self, auction: entities::Auction, ) -> anyhow::Result> { + tracing::Span::current().record("auction_id", auction.id.to_string()); sqlx::query!( "INSERT INTO auction (id, creation_time, permission_key, chain_id, chain_type, bid_collection_time) VALUES ($1, $2, $3, $4, $5, $6)", auction.id, diff --git a/auction-server/src/auction/repository/conclude_auction.rs b/auction-server/src/auction/repository/conclude_auction.rs index f263c64a..4c495774 100644 --- a/auction-server/src/auction/repository/conclude_auction.rs +++ b/auction-server/src/auction/repository/conclude_auction.rs @@ -13,11 +13,12 @@ use { }; impl Repository { - #[tracing::instrument(skip_all)] + #[tracing::instrument(skip_all, name = "conclude_auction_repo", fields(auction_id))] pub async fn conclude_auction( &self, auction: entities::Auction, ) -> anyhow::Result> { + tracing::Span::current().record("auction_id", auction.id.to_string()); let mut auction = auction.clone(); let now = OffsetDateTime::now_utc(); auction.conclusion_time = Some(now); diff --git a/auction-server/src/auction/repository/remove_in_memory_submitted_auction.rs b/auction-server/src/auction/repository/remove_in_memory_submitted_auction.rs index d9683033..fcc240c1 100644 --- a/auction-server/src/auction/repository/remove_in_memory_submitted_auction.rs +++ b/auction-server/src/auction/repository/remove_in_memory_submitted_auction.rs @@ -7,8 +7,9 @@ use { }; impl Repository { - #[tracing::instrument(skip_all)] + #[tracing::instrument(skip_all, fields(auction_id))] pub async fn remove_in_memory_submitted_auction(&self, auction: entities::Auction) { + tracing::Span::current().record("auction_id", auction.id.to_string()); let mut write_guard = self.in_memory_store.submitted_auctions.write().await; write_guard.retain(|a| a.id != auction.id); } diff --git a/auction-server/src/auction/repository/submit_auction.rs b/auction-server/src/auction/repository/submit_auction.rs index cf5a511c..0ecca68b 100644 --- a/auction-server/src/auction/repository/submit_auction.rs +++ b/auction-server/src/auction/repository/submit_auction.rs @@ -14,12 +14,15 @@ use { }; impl Repository { - #[tracing::instrument(skip_all)] + #[tracing::instrument(skip_all, name = "submit_auction_repo", fields(auction_id, tx_hash))] pub async fn submit_auction( &self, auction: entities::Auction, transaction_hash: entities::TxHash, ) -> anyhow::Result> { + tracing::Span::current().record("auction_id", auction.id.to_string()); + tracing::Span::current().record("tx_hash", format!("{:?}", transaction_hash)); + let mut auction = auction.clone(); let now = OffsetDateTime::now_utc(); auction.tx_hash = Some(transaction_hash.clone()); diff --git a/auction-server/src/auction/service/auction_manager.rs b/auction-server/src/auction/service/auction_manager.rs index 7291ded5..96585984 100644 --- a/auction-server/src/auction/service/auction_manager.rs +++ b/auction-server/src/auction/service/auction_manager.rs @@ -42,16 +42,12 @@ use { rpc_config::RpcSendTransactionConfig, }, solana_sdk::{ - bs58, commitment_config::CommitmentConfig, signature::{ Signature, Signer, }, - transaction::{ - TransactionError, - VersionedTransaction, - }, + transaction::TransactionError, }, std::{ fmt::Debug, @@ -156,17 +152,23 @@ impl AuctionManager for Service { true } - #[tracing::instrument(skip_all)] + #[tracing::instrument(skip_all, fields(auction_id, bid_ids, simulation_result))] async fn get_winner_bids( &self, auction: &entities::Auction, ) -> Result>> { + tracing::Span::current().record("auction_id", auction.id.to_string()); + // TODO How we want to perform simulation, pruning, and determination if auction.bids.is_empty() { return Ok(vec![]); } let mut bids = auction.bids.clone(); + tracing::Span::current().record( + "bid_ids", + tracing::field::display(entities::BidContainerTracing(&bids)), + ); bids.sort_by(|a, b| b.amount.cmp(&a.amount)); let bids: Vec> = bids.into_iter().take(TOTAL_BIDS_PER_AUCTION_EVM).collect(); @@ -180,6 +182,8 @@ impl AuctionManager for Service { ) .await?; + tracing::Span::current().record("simulation_result", format!("{:?}", simulation_result)); + match simulation_result .iter() .position(|status| status.external_success) @@ -189,7 +193,7 @@ impl AuctionManager for Service { } } - #[tracing::instrument(skip_all)] + #[tracing::instrument(skip_all, fields(tx_hash))] async fn submit_bids( &self, permission_key: entities::PermissionKey, @@ -211,14 +215,23 @@ impl AuctionManager for Service { .send() .await? .tx_hash(); + tracing::Span::current().record("tx_hash", format!("{:?}", tx_hash)); Ok(tx_hash) } + #[tracing::instrument(skip_all, fields(bid_ids, tx_hash, auction_id, result))] async fn get_bid_results( &self, bids: Vec>, bid_status_auction: entities::BidStatusAuction, ) -> Result>> { + tracing::Span::current().record( + "bid_ids", + tracing::field::display(entities::BidContainerTracing(&bids)), + ); + tracing::Span::current().record("tx_hash", format!("{:?}", bid_status_auction.tx_hash)); + tracing::Span::current().record("auction_id", bid_status_auction.id.to_string()); + let receipt = self .config .chain_config @@ -226,9 +239,11 @@ impl AuctionManager for Service { .get_transaction_receipt(bid_status_auction.tx_hash) .await .map_err(|e| anyhow::anyhow!("Failed to get transaction receipt: {:?}", e))?; + match receipt { Some(receipt) => { let decoded_logs = Self::decode_logs_for_receipt(&receipt); + tracing::Span::current().record("result", format!("{:?}", decoded_logs)); Ok(Some( bids.iter() .map(|b| { @@ -354,14 +369,21 @@ impl AuctionManager for Service { trigger % CONCLUSION_TRIGGER_INTERVAL_SVM == 1 } + #[tracing::instrument(skip_all, fields(auction_id, bid_ids, simulation_result))] async fn get_winner_bids( &self, auction: &entities::Auction, ) -> Result>> { + tracing::Span::current().record("auction_id", auction.id.to_string()); + tracing::Span::current().record( + "bid_ids", + tracing::field::display(entities::BidContainerTracing(&auction.bids)), + ); let mut bids = auction.bids.clone(); bids.sort_by(|a, b| b.amount.cmp(&a.amount)); + let mut results = vec![]; for bid in bids.iter() { - match self + let result = self .simulate_bid(&entities::BidCreate { chain_id: bid.chain_id.clone(), initiation_time: bid.initiation_time, @@ -370,20 +392,26 @@ impl AuctionManager for Service { transaction: bid.chain_data.transaction.clone(), }, }) - .await - { + .await; + results.push(result.clone()); + match result { Err(RestError::SimulationError { result: _, reason: _, }) => {} // Either simulation was successful or we can't simulate at this moment - _ => return Ok(vec![bid.clone()]), + _ => { + tracing::Span::current().record("simulation_result", format!("{:?}", results)); + return Ok(vec![bid.clone()]); + } } } + + tracing::Span::current().record("simulation_result", format!("{:?}", results)); Ok(vec![]) } - #[tracing::instrument(skip_all)] + #[tracing::instrument(skip_all, fields(tx_hash))] async fn submit_bids( &self, _permission_key: entities::PermissionKey, @@ -395,8 +423,11 @@ impl AuctionManager for Service { let mut bid = bids[0].clone(); self.add_relayer_signature(&mut bid); - match self.send_transaction(&bid.chain_data.transaction).await { - Ok(response) => Ok(response), + match self.send_transaction(&bid).await { + Ok(response) => { + tracing::Span::current().record("tx_hash", response.to_string()); + Ok(response) + } Err(e) => { tracing::error!(error = ?e, "Error while submitting bid"); Err(anyhow::anyhow!(e)) @@ -404,11 +435,18 @@ impl AuctionManager for Service { } } + #[tracing::instrument(skip_all, fields(bid_ids, tx_hash, auction_id, result))] async fn get_bid_results( &self, bids: Vec>, bid_status_auction: entities::BidStatusAuction, ) -> Result>> { + tracing::Span::current().record( + "bid_ids", + tracing::field::display(entities::BidContainerTracing(&bids)), + ); + tracing::Span::current().record("tx_hash", bid_status_auction.tx_hash.to_string()); + tracing::Span::current().record("auction_id", bid_status_auction.id.to_string()); if bids.is_empty() { return Ok(Some(vec![])); } @@ -428,6 +466,8 @@ impl AuctionManager for Service { ) .await?; + tracing::Span::current().record("status", format!("{:?}", status)); + let status = match status { Some(res) => match res { Ok(()) => entities::BidStatusSvm::Won { @@ -522,10 +562,48 @@ impl Service { relayer.sign_message(&serialized_message); } + #[tracing::instrument(skip_all, fields(bid_id, total_tries, tx_hash))] + async fn blocking_send_transaction(&self, bid: entities::Bid, signature: Signature) { + tracing::Span::current().record("bid_id", bid.id.to_string()); + tracing::Span::current().record("tx_hash", signature.to_string()); + let config = RpcSendTransactionConfig { + skip_preflight: true, + max_retries: Some(0), + ..RpcSendTransactionConfig::default() + }; + let mut receiver = self.config.chain_config.log_sender.subscribe(); + for retry_count in 0..SEND_TRANSACTION_RETRY_COUNT_SVM { + tokio::time::sleep(Duration::from_secs(2)).await; + + // Do not wait for the logs to be received + // just check if the transaction is in the logs already + while let Ok(log) = receiver.try_recv() { + if log.value.signature.eq(&signature.to_string()) { + tracing::Span::current().record("total_tries", retry_count + 1); + return; + } + } + if let Err(e) = self + .config + .chain_config + .client + .send_transaction_with_config(&bid.chain_data.transaction, config) + .await + { + tracing::error!(error = ?e, "Failed to resend transaction"); + } + } + + tracing::Span::current().record("total_tries", SEND_TRANSACTION_RETRY_COUNT_SVM + 1); + } + + #[tracing::instrument(skip_all, fields(bid_id))] async fn send_transaction( &self, - tx: &VersionedTransaction, + bid: &entities::Bid, ) -> solana_client::client_error::Result { + tracing::Span::current().record("bid_id", bid.id.to_string()); + let tx = &bid.chain_data.transaction; let config = RpcSendTransactionConfig { skip_preflight: true, max_retries: Some(0), @@ -537,32 +615,10 @@ impl Service { .tx_broadcaster_client .send_transaction_with_config(tx, config) .await?; - let tx_cloned = tx.clone(); - let mut receiver = self.config.chain_config.log_sender.subscribe(); - let signature_bs58 = bs58::encode(res).into_string(); self.task_tracker.spawn({ - let service = self.clone(); + let (service, bid) = (self.clone(), bid.clone()); async move { - for _ in 0..SEND_TRANSACTION_RETRY_COUNT_SVM { - tokio::time::sleep(Duration::from_secs(2)).await; - - // Do not wait for the logs to be received - // just check if the transaction is in the logs already - while let Ok(log) = receiver.try_recv() { - if log.value.signature.eq(&signature_bs58) { - return; - } - } - if let Err(e) = service - .config - .chain_config - .client - .send_transaction_with_config(&tx_cloned, config) - .await - { - tracing::error!(error = ?e, "Failed to resend transaction"); - } - } + service.blocking_send_transaction(bid, res).await; } }); Ok(res) diff --git a/auction-server/src/auction/service/conclude_auction.rs b/auction-server/src/auction/service/conclude_auction.rs index 3c42b52a..5aabcf10 100644 --- a/auction-server/src/auction/service/conclude_auction.rs +++ b/auction-server/src/auction/service/conclude_auction.rs @@ -19,14 +19,22 @@ impl Service where Service: AuctionManager, { + #[tracing::instrument(skip_all, fields(auction_id, tx_hash, bid_ids, bid_statuses))] pub async fn conclude_auction(&self, input: ConcludeAuctionInput) -> anyhow::Result<()> { let auction = input.auction; + tracing::Span::current().record("auction_id", auction.id.to_string()); if let Some(tx_hash) = auction.tx_hash.clone() { + tracing::Span::current().record("tx_hash", format!("{:?}", tx_hash)); let bids = self .repo .get_in_memory_submitted_bids_for_auction(auction.clone()) .await; + tracing::Span::current().record( + "bid_ids", + tracing::field::display(entities::BidContainerTracing(&bids)), + ); + if let Some(bid_statuses) = self .get_bid_results( bids.clone(), @@ -37,6 +45,8 @@ where ) .await? { + tracing::Span::current().record("bid_statuses", format!("{:?}", bid_statuses)); + let auction = self .repo .conclude_auction(auction) diff --git a/auction-server/src/auction/service/handle_auction.rs b/auction-server/src/auction/service/handle_auction.rs index 9c5d552c..2fc260b9 100644 --- a/auction-server/src/auction/service/handle_auction.rs +++ b/auction-server/src/auction/service/handle_auction.rs @@ -24,11 +24,18 @@ impl Service where Service: AuctionManager, { + #[tracing::instrument(skip_all, fields(auction_id, bid_ids, winner_bid_ids))] async fn submit_auction<'a>( &self, auction: entities::Auction, _auction_mutex_gaurd: MutexGuard<'a, ()>, ) -> anyhow::Result<()> { + tracing::Span::current().record("auction_id", auction.id.to_string()); + tracing::Span::current().record( + "bid_ids", + tracing::field::display(entities::BidContainerTracing(&auction.bids)), + ); + let permission_key = auction.permission_key.clone(); if !auction.is_ready(Service::AUCTION_MINIMUM_LIFETIME) { tracing::info!(permission_key = ?permission_key, "Auction is not ready yet"); @@ -36,6 +43,10 @@ where } let winner_bids = self.get_winner_bids(&auction).await?; + tracing::Span::current().record( + "winner_bid_ids", + tracing::field::display(entities::BidContainerTracing(&winner_bids)), + ); if winner_bids.is_empty() { join_all(auction.bids.into_iter().map(|bid| { self.update_bid_status(UpdateBidStatusInput { @@ -83,6 +94,7 @@ where Ok(()) } + #[tracing::instrument(skip_all, fields(bid_ids, auction_id))] async fn submit_auction_for_lock( &self, permission_key: &entities::PermissionKey, @@ -96,8 +108,16 @@ where .get_in_memory_bids_by_permission_key(permission_key) .await; + tracing::Span::current().record( + "bid_ids", + tracing::field::display(entities::BidContainerTracing(&bids)), + ); + match entities::Auction::try_new(bids, bid_collection_time) { - Some(auction) => self.submit_auction(auction, acquired_lock).await, + Some(auction) => { + tracing::Span::current().record("auction_id", auction.id.to_string()); + self.submit_auction(auction, acquired_lock).await + } None => Ok(()), } } diff --git a/auction-server/src/auction/service/handle_bid.rs b/auction-server/src/auction/service/handle_bid.rs index 89d46f47..8c88860c 100644 --- a/auction-server/src/auction/service/handle_bid.rs +++ b/auction-server/src/auction/service/handle_bid.rs @@ -21,7 +21,7 @@ impl Service where Service: Verification, { - #[tracing::instrument(skip_all)] + #[tracing::instrument(skip_all, fields(bid_id))] pub async fn handle_bid( &self, input: HandleBidInput, @@ -31,8 +31,11 @@ where bid_create: input.bid_create.clone(), }) .await?; - self.repo + let bid = self + .repo .add_bid(input.bid_create, &chain_data, &amount) - .await + .await?; + tracing::Span::current().record("bid_id", bid.id.to_string()); + Ok(bid) } } diff --git a/auction-server/src/auction/service/update_bid_status.rs b/auction-server/src/auction/service/update_bid_status.rs index c5cc4eb2..cf473e26 100644 --- a/auction-server/src/auction/service/update_bid_status.rs +++ b/auction-server/src/auction/service/update_bid_status.rs @@ -21,7 +21,11 @@ pub struct UpdateBidStatusInput { } impl Service { + #[tracing::instrument(skip_all, fields(bid_id, status))] pub async fn update_bid_status(&self, input: UpdateBidStatusInput) -> Result<(), RestError> { + tracing::Span::current().record("bid_id", input.bid.id.to_string()); + tracing::Span::current().record("status", format!("{:?}", input.new_status)); + let is_updated = self .repo .update_bid_status(input.bid.clone(), input.new_status.clone())