Skip to content

Commit

Permalink
temporary: payment processor deadlock detection
Browse files Browse the repository at this point in the history
  • Loading branch information
mwalkiewicz committed Aug 14, 2024
1 parent d9389f9 commit 6717380
Showing 1 changed file with 161 additions and 10 deletions.
171 changes: 161 additions & 10 deletions core/payment/src/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::error::processor::{
};
use crate::models::order::ReadObj as DbOrder;
use crate::payment_sync::SYNC_NOTIFS_NOTIFY;
use crate::timeout_lock::{MutexTimeoutExt, RwLockTimeoutExt};
use crate::timeout_lock::RwLockTimeoutExt;
use crate::utils::remove_allocation_ids_from_payment;
use actix_web::web::Data;
use bigdecimal::{BigDecimal, Zero};
Expand All @@ -17,11 +17,16 @@ use futures::{FutureExt, TryFutureExt};
use metrics::counter;
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::ops::Deref;
use std::process::exit;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;
use thiserror::Error;
use tokio::sync::mpsc::UnboundedSender;
use tokio::sync::{Mutex, RwLock};
use tokio::task::JoinHandle;
use tokio::time::error::Elapsed;
use ya_client_model::payment::allocation::Deposit;
use ya_client_model::payment::{
Account, ActivityPayment, AgreementPayment, DriverDetails, Network, Payment,
Expand Down Expand Up @@ -306,8 +311,139 @@ impl DriverRegistry {
const DB_LOCK_TIMEOUT: Duration = Duration::from_secs(30);
const REGISTRY_LOCK_TIMEOUT: Duration = Duration::from_secs(30);

struct TimedMutex {
mutex: Mutex<DbExecutor>,
sender: Option<UnboundedSender<TimedMutexTaskMessage>>,
counter_task: Option<JoinHandle<()>>,
}

use tokio::sync::MutexGuard;

enum TimedMutexTaskMessage {
Start(String),
Finish,
}

struct TimedMutexGuard<'a> {
mutex_guard: MutexGuard<'a, DbExecutor>,
sender: &'a Option<UnboundedSender<TimedMutexTaskMessage>>,
}

impl Drop for TimedMutexGuard<'_> {
fn drop(&mut self) {
if let Some(sender) = &self.sender {
if let Err(e) = sender.send(TimedMutexTaskMessage::Finish) {
log::error!("Cannot send fininsh to counter task {e}");
}
}
}
}

impl<'a> Deref for TimedMutexGuard<'a> {
type Target = MutexGuard<'a, DbExecutor>;

fn deref(&self) -> &Self::Target {
&self.mutex_guard
}
}

impl TimedMutex {
fn new(db: DbExecutor) -> Self {
let (sender, mut receiver) =
tokio::sync::mpsc::unbounded_channel::<TimedMutexTaskMessage>();

let counter_task = tokio::spawn(async move {
log::info!("[TimedMutex] Counter thread started");
loop {
// wait for start or close without timeout
let task_name = match receiver.recv().await {
None => break,
Some(TimedMutexTaskMessage::Start(x)) => x,
Some(TimedMutexTaskMessage::Finish) => {
panic!("[TimedMutex] Unexpected finish")
}
};

log::info!("[TimedMutex] task {task_name} started...");
let mut counter = 0;
loop {
match tokio::time::timeout(Duration::from_secs(10), receiver.recv()).await {
Err(_) => {
log::error!("[TimedMutex] Long running task: {task_name}!");
counter += 1;
// five minutes
if counter > 30 {
exit(41);
}
}
Ok(None) => panic!("[TimedMutex] Unexpected mpsc close."),
Ok(Some(TimedMutexTaskMessage::Finish)) => break,
Ok(Some(TimedMutexTaskMessage::Start(_))) => {
panic!("[TimedMutex] Unexpected start")
}
}
}

log::info!("[TimedMutex] Timed task {task_name} finished.");
}
log::info!("[TimedMutex] Counter thread finished");
});

Self {
mutex: Mutex::new(db),
sender: Some(sender),
counter_task: Some(counter_task),
}
}

async fn timeout_lock(
&self,
duration: Duration,
name: &str,
) -> Result<TimedMutexGuard<'_>, Elapsed> {
let result = tokio::time::timeout(duration, self.mutex.lock())
.await
.map_err(|e| {
log::info!("Failed to lock mutex in scenario {0}", name);
e
})?;

if self
.counter_task
.as_ref()
.map_or(false, |v| v.is_finished())
{
log::error!("counter task is dead! {name}");
exit(42)
}

if let Some(sender) = &self.sender {
if let Err(e) = sender.send(TimedMutexTaskMessage::Start(name.into())) {
log::error!("Cannot send start to counter task {name}: {e}");
}
}

Ok(TimedMutexGuard {
mutex_guard: result,
sender: &self.sender,
})
}
}

impl Drop for TimedMutex {
fn drop(&mut self) {
self.sender.take().unwrap();
let handle = self.counter_task.take().unwrap();
tokio::spawn(async move {
if let Err(e) = handle.await {
log::error!("Cannot join counter thread {e}");
}
});
}
}

pub struct PaymentProcessor {
db_executor: Arc<Mutex<DbExecutor>>,
db_executor: Arc<TimedMutex>,
registry: RwLock<DriverRegistry>,
in_shutdown: AtomicBool,
}
Expand All @@ -325,7 +461,7 @@ enum PaymentSendToGsbError {
impl PaymentProcessor {
pub fn new(db_executor: DbExecutor) -> Self {
Self {
db_executor: Arc::new(Mutex::new(db_executor)),
db_executor: Arc::new(TimedMutex::new(db_executor)),
registry: Default::default(),
in_shutdown: AtomicBool::new(false),
}
Expand Down Expand Up @@ -429,7 +565,10 @@ impl PaymentProcessor {
let mut payment: Payment;

{
let db_executor = self.db_executor.timeout_lock(DB_LOCK_TIMEOUT).await?;
let db_executor = self
.db_executor
.timeout_lock(DB_LOCK_TIMEOUT, "notify payment 1")
.await?;

let orders = db_executor
.as_dao::<OrderDao>()
Expand Down Expand Up @@ -512,7 +651,9 @@ impl PaymentProcessor {

tokio::task::spawn_local(
async move {
let db_executor = db_executor.timeout_lock(DB_LOCK_TIMEOUT).await?;
let db_executor = db_executor
.timeout_lock(DB_LOCK_TIMEOUT, "notify payment 2")
.await?;

let payment_dao: PaymentDao = db_executor.as_dao();
let sync_dao: SyncNotifsDao = db_executor.as_dao();
Expand Down Expand Up @@ -597,7 +738,7 @@ impl PaymentProcessor {

let allocation_status = self
.db_executor
.timeout_lock(DB_LOCK_TIMEOUT)
.timeout_lock(DB_LOCK_TIMEOUT, "schedule_payment 1")
.await?
.as_dao::<AllocationDao>()
.get(msg.allocation_id.clone(), msg.payer_id)
Expand Down Expand Up @@ -626,7 +767,7 @@ impl PaymentProcessor {
.await??;

self.db_executor
.timeout_lock(DB_LOCK_TIMEOUT)
.timeout_lock(DB_LOCK_TIMEOUT, "schedule_payment 2")
.await?
.as_dao::<OrderDao>()
.create(msg, order_id, driver)
Expand Down Expand Up @@ -703,7 +844,10 @@ impl PaymentProcessor {
}

{
let db_executor = self.db_executor.timeout_lock(DB_LOCK_TIMEOUT).await?;
let db_executor = self
.db_executor
.timeout_lock(DB_LOCK_TIMEOUT, "verify payment 1")
.await?;

// Verify agreement payments
let agreement_dao: AgreementDao = db_executor.as_dao();
Expand Down Expand Up @@ -856,7 +1000,10 @@ impl PaymentProcessor {
}

let (active_allocations, past_allocations) = {
let db = self.db_executor.timeout_lock(DB_LOCK_TIMEOUT).await?;
let db = self
.db_executor
.timeout_lock(DB_LOCK_TIMEOUT, "validate_allocation 1")
.await?;
let dao = db.as_dao::<AllocationDao>();

let active = dao
Expand Down Expand Up @@ -893,7 +1040,11 @@ impl PaymentProcessor {
/// For `false` each allocation timestamp is respected.
pub async fn release_allocations(&self, force: bool) {
// keep this lock alive for the entirety of this function for now
let db_executor = match self.db_executor.timeout_lock(DB_LOCK_TIMEOUT).await {
let db_executor = match self
.db_executor
.timeout_lock(DB_LOCK_TIMEOUT, "release_allocations")
.await
{
Ok(db) => db,
Err(_) => {
log::error!("Timed out waiting for db lock");
Expand Down

0 comments on commit 6717380

Please sign in to comment.