Skip to content

Commit

Permalink
Add more fields to tracing (#261)
Browse files Browse the repository at this point in the history
  • Loading branch information
danimhr authored Nov 27, 2024
1 parent e6e3774 commit c9c4105
Show file tree
Hide file tree
Showing 12 changed files with 168 additions and 50 deletions.
2 changes: 1 addition & 1 deletion auction-server/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ docker run --name jaeger \
-e COLLECTOR_OTLP_ENABLED=true \
-p 16686:16686 \
-p 4317:4317 \
jaegertracing/all-in-one
jaegertracing/all-in-one:1.63.0
```

And access the jaeger UI at `http://127.0.0.1:16686`.
Expand Down
2 changes: 1 addition & 1 deletion auction-server/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ async fn root() -> String {
pub mod profile;
pub(crate) mod ws;

#[derive(Debug)]
#[derive(Debug, Clone)]
pub enum RestError {
/// The request contained invalid parameters.
BadParameters(String),
Expand Down
22 changes: 20 additions & 2 deletions auction-server/src/auction/entities/bid.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,11 @@ use {
transaction::VersionedTransaction,
},
std::{
fmt::Debug,
fmt::{
Debug,
Display,
Formatter,
},
hash::Hash,
},
time::OffsetDateTime,
Expand Down Expand Up @@ -170,7 +174,7 @@ pub type PermissionKey<T> = <<T as ChainTrait>::BidChainDataType as BidChainData
pub type TxHash<T> = <<T as ChainTrait>::BidStatusType as BidStatus>::TxHash;

pub trait BidChainData: Send + Sync + Clone + Debug + PartialEq {
type PermissionKey: Send + Sync + Debug + Hash + Eq + Clone;
type PermissionKey: Send + Sync + Debug + Hash + Eq + Clone + Debug;

fn get_permission_key(&self) -> Self::PermissionKey;
}
Expand Down Expand Up @@ -269,3 +273,17 @@ impl From<(Bid<Evm>, bool)> for MulticallData {
}
}
}

pub struct BidContainerTracing<'a, T: ChainTrait>(pub &'a [Bid<T>]);
impl<T: ChainTrait> Display for BidContainerTracing<'_, T> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
f,
"{:?}",
self.0
.iter()
.map(|x| x.id.to_string())
.collect::<Vec<String>>()
)
}
}
2 changes: 2 additions & 0 deletions auction-server/src/auction/repository/add_auction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@ use {
};

impl<T: ChainTrait> Repository<T> {
#[tracing::instrument(skip_all, name = "add_auction_repo", fields(auction_id))]
pub async fn add_auction(
&self,
auction: entities::Auction<T>,
) -> anyhow::Result<entities::Auction<T>> {
tracing::Span::current().record("auction_id", auction.id.to_string());
sqlx::query!(
"INSERT INTO auction (id, creation_time, permission_key, chain_id, chain_type, bid_collection_time) VALUES ($1, $2, $3, $4, $5, $6)",
auction.id,
Expand Down
3 changes: 2 additions & 1 deletion auction-server/src/auction/repository/conclude_auction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,12 @@ use {
};

impl<T: ChainTrait> Repository<T> {
#[tracing::instrument(skip_all)]
#[tracing::instrument(skip_all, name = "conclude_auction_repo", fields(auction_id))]
pub async fn conclude_auction(
&self,
auction: entities::Auction<T>,
) -> anyhow::Result<entities::Auction<T>> {
tracing::Span::current().record("auction_id", auction.id.to_string());
let mut auction = auction.clone();
let now = OffsetDateTime::now_utc();
auction.conclusion_time = Some(now);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@ use {
};

impl<T: ChainTrait> Repository<T> {
#[tracing::instrument(skip_all)]
#[tracing::instrument(skip_all, fields(auction_id))]
pub async fn remove_in_memory_submitted_auction(&self, auction: entities::Auction<T>) {
tracing::Span::current().record("auction_id", auction.id.to_string());
let mut write_guard = self.in_memory_store.submitted_auctions.write().await;
write_guard.retain(|a| a.id != auction.id);
}
Expand Down
5 changes: 4 additions & 1 deletion auction-server/src/auction/repository/submit_auction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,15 @@ use {
};

impl<T: ChainTrait> Repository<T> {
#[tracing::instrument(skip_all)]
#[tracing::instrument(skip_all, name = "submit_auction_repo", fields(auction_id, tx_hash))]
pub async fn submit_auction(
&self,
auction: entities::Auction<T>,
transaction_hash: entities::TxHash<T>,
) -> anyhow::Result<entities::Auction<T>> {
tracing::Span::current().record("auction_id", auction.id.to_string());
tracing::Span::current().record("tx_hash", format!("{:?}", transaction_hash));

let mut auction = auction.clone();
let now = OffsetDateTime::now_utc();
auction.tx_hash = Some(transaction_hash.clone());
Expand Down
134 changes: 95 additions & 39 deletions auction-server/src/auction/service/auction_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,12 @@ use {
rpc_config::RpcSendTransactionConfig,
},
solana_sdk::{
bs58,
commitment_config::CommitmentConfig,
signature::{
Signature,
Signer,
},
transaction::{
TransactionError,
VersionedTransaction,
},
transaction::TransactionError,
},
std::{
fmt::Debug,
Expand Down Expand Up @@ -156,17 +152,23 @@ impl AuctionManager<Evm> for Service<Evm> {
true
}

#[tracing::instrument(skip_all)]
#[tracing::instrument(skip_all, fields(auction_id, bid_ids, simulation_result))]
async fn get_winner_bids(
&self,
auction: &entities::Auction<Evm>,
) -> Result<Vec<entities::Bid<Evm>>> {
tracing::Span::current().record("auction_id", auction.id.to_string());

// TODO How we want to perform simulation, pruning, and determination
if auction.bids.is_empty() {
return Ok(vec![]);
}

let mut bids = auction.bids.clone();
tracing::Span::current().record(
"bid_ids",
tracing::field::display(entities::BidContainerTracing(&bids)),
);
bids.sort_by(|a, b| b.amount.cmp(&a.amount));
let bids: Vec<entities::Bid<Evm>> =
bids.into_iter().take(TOTAL_BIDS_PER_AUCTION_EVM).collect();
Expand All @@ -180,6 +182,8 @@ impl AuctionManager<Evm> for Service<Evm> {
)
.await?;

tracing::Span::current().record("simulation_result", format!("{:?}", simulation_result));

match simulation_result
.iter()
.position(|status| status.external_success)
Expand All @@ -189,7 +193,7 @@ impl AuctionManager<Evm> for Service<Evm> {
}
}

#[tracing::instrument(skip_all)]
#[tracing::instrument(skip_all, fields(tx_hash))]
async fn submit_bids(
&self,
permission_key: entities::PermissionKey<Evm>,
Expand All @@ -211,24 +215,35 @@ impl AuctionManager<Evm> for Service<Evm> {
.send()
.await?
.tx_hash();
tracing::Span::current().record("tx_hash", format!("{:?}", tx_hash));
Ok(tx_hash)
}

#[tracing::instrument(skip_all, fields(bid_ids, tx_hash, auction_id, result))]
async fn get_bid_results(
&self,
bids: Vec<entities::Bid<Evm>>,
bid_status_auction: entities::BidStatusAuction<entities::BidStatusEvm>,
) -> Result<Option<Vec<entities::BidStatusEvm>>> {
tracing::Span::current().record(
"bid_ids",
tracing::field::display(entities::BidContainerTracing(&bids)),
);
tracing::Span::current().record("tx_hash", format!("{:?}", bid_status_auction.tx_hash));
tracing::Span::current().record("auction_id", bid_status_auction.id.to_string());

let receipt = self
.config
.chain_config
.provider
.get_transaction_receipt(bid_status_auction.tx_hash)
.await
.map_err(|e| anyhow::anyhow!("Failed to get transaction receipt: {:?}", e))?;

match receipt {
Some(receipt) => {
let decoded_logs = Self::decode_logs_for_receipt(&receipt);
tracing::Span::current().record("result", format!("{:?}", decoded_logs));
Ok(Some(
bids.iter()
.map(|b| {
Expand Down Expand Up @@ -354,14 +369,21 @@ impl AuctionManager<Svm> for Service<Svm> {
trigger % CONCLUSION_TRIGGER_INTERVAL_SVM == 1
}

#[tracing::instrument(skip_all, fields(auction_id, bid_ids, simulation_result))]
async fn get_winner_bids(
&self,
auction: &entities::Auction<Svm>,
) -> Result<Vec<entities::Bid<Svm>>> {
tracing::Span::current().record("auction_id", auction.id.to_string());
tracing::Span::current().record(
"bid_ids",
tracing::field::display(entities::BidContainerTracing(&auction.bids)),
);
let mut bids = auction.bids.clone();
bids.sort_by(|a, b| b.amount.cmp(&a.amount));
let mut results = vec![];
for bid in bids.iter() {
match self
let result = self
.simulate_bid(&entities::BidCreate {
chain_id: bid.chain_id.clone(),
initiation_time: bid.initiation_time,
Expand All @@ -370,20 +392,26 @@ impl AuctionManager<Svm> for Service<Svm> {
transaction: bid.chain_data.transaction.clone(),
},
})
.await
{
.await;
results.push(result.clone());
match result {
Err(RestError::SimulationError {
result: _,
reason: _,
}) => {}
// Either simulation was successful or we can't simulate at this moment
_ => return Ok(vec![bid.clone()]),
_ => {
tracing::Span::current().record("simulation_result", format!("{:?}", results));
return Ok(vec![bid.clone()]);
}
}
}

tracing::Span::current().record("simulation_result", format!("{:?}", results));
Ok(vec![])
}

#[tracing::instrument(skip_all)]
#[tracing::instrument(skip_all, fields(tx_hash))]
async fn submit_bids(
&self,
_permission_key: entities::PermissionKey<Svm>,
Expand All @@ -395,20 +423,30 @@ impl AuctionManager<Svm> for Service<Svm> {

let mut bid = bids[0].clone();
self.add_relayer_signature(&mut bid);
match self.send_transaction(&bid.chain_data.transaction).await {
Ok(response) => Ok(response),
match self.send_transaction(&bid).await {
Ok(response) => {
tracing::Span::current().record("tx_hash", response.to_string());
Ok(response)
}
Err(e) => {
tracing::error!(error = ?e, "Error while submitting bid");
Err(anyhow::anyhow!(e))
}
}
}

#[tracing::instrument(skip_all, fields(bid_ids, tx_hash, auction_id, result))]
async fn get_bid_results(
&self,
bids: Vec<entities::Bid<Svm>>,
bid_status_auction: entities::BidStatusAuction<entities::BidStatusSvm>,
) -> Result<Option<Vec<entities::BidStatusSvm>>> {
tracing::Span::current().record(
"bid_ids",
tracing::field::display(entities::BidContainerTracing(&bids)),
);
tracing::Span::current().record("tx_hash", bid_status_auction.tx_hash.to_string());
tracing::Span::current().record("auction_id", bid_status_auction.id.to_string());
if bids.is_empty() {
return Ok(Some(vec![]));
}
Expand All @@ -428,6 +466,8 @@ impl AuctionManager<Svm> for Service<Svm> {
)
.await?;

tracing::Span::current().record("status", format!("{:?}", status));

let status = match status {
Some(res) => match res {
Ok(()) => entities::BidStatusSvm::Won {
Expand Down Expand Up @@ -522,10 +562,48 @@ impl Service<Svm> {
relayer.sign_message(&serialized_message);
}

#[tracing::instrument(skip_all, fields(bid_id, total_tries, tx_hash))]
async fn blocking_send_transaction(&self, bid: entities::Bid<Svm>, signature: Signature) {
tracing::Span::current().record("bid_id", bid.id.to_string());
tracing::Span::current().record("tx_hash", signature.to_string());
let config = RpcSendTransactionConfig {
skip_preflight: true,
max_retries: Some(0),
..RpcSendTransactionConfig::default()
};
let mut receiver = self.config.chain_config.log_sender.subscribe();
for retry_count in 0..SEND_TRANSACTION_RETRY_COUNT_SVM {
tokio::time::sleep(Duration::from_secs(2)).await;

// Do not wait for the logs to be received
// just check if the transaction is in the logs already
while let Ok(log) = receiver.try_recv() {
if log.value.signature.eq(&signature.to_string()) {
tracing::Span::current().record("total_tries", retry_count + 1);
return;
}
}
if let Err(e) = self
.config
.chain_config
.client
.send_transaction_with_config(&bid.chain_data.transaction, config)
.await
{
tracing::error!(error = ?e, "Failed to resend transaction");
}
}

tracing::Span::current().record("total_tries", SEND_TRANSACTION_RETRY_COUNT_SVM + 1);
}

#[tracing::instrument(skip_all, fields(bid_id))]
async fn send_transaction(
&self,
tx: &VersionedTransaction,
bid: &entities::Bid<Svm>,
) -> solana_client::client_error::Result<Signature> {
tracing::Span::current().record("bid_id", bid.id.to_string());
let tx = &bid.chain_data.transaction;
let config = RpcSendTransactionConfig {
skip_preflight: true,
max_retries: Some(0),
Expand All @@ -537,32 +615,10 @@ impl Service<Svm> {
.tx_broadcaster_client
.send_transaction_with_config(tx, config)
.await?;
let tx_cloned = tx.clone();
let mut receiver = self.config.chain_config.log_sender.subscribe();
let signature_bs58 = bs58::encode(res).into_string();
self.task_tracker.spawn({
let service = self.clone();
let (service, bid) = (self.clone(), bid.clone());
async move {
for _ in 0..SEND_TRANSACTION_RETRY_COUNT_SVM {
tokio::time::sleep(Duration::from_secs(2)).await;

// Do not wait for the logs to be received
// just check if the transaction is in the logs already
while let Ok(log) = receiver.try_recv() {
if log.value.signature.eq(&signature_bs58) {
return;
}
}
if let Err(e) = service
.config
.chain_config
.client
.send_transaction_with_config(&tx_cloned, config)
.await
{
tracing::error!(error = ?e, "Failed to resend transaction");
}
}
service.blocking_send_transaction(bid, res).await;
}
});
Ok(res)
Expand Down
Loading

0 comments on commit c9c4105

Please sign in to comment.