Skip to content

Commit

Permalink
Also trigger cooldowns on requests to the same penumbra address (#74)
Browse files Browse the repository at this point in the history
* Also trigger cooldowns on requests to the same penumbra address

* Fix function name
  • Loading branch information
zbuc authored Nov 14, 2023
1 parent 4edf046 commit 1f644c9
Show file tree
Hide file tree
Showing 2 changed files with 113 additions and 33 deletions.
144 changes: 112 additions & 32 deletions src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ use serenity::{
use tokio::time::{Duration, Instant};
use tracing::instrument;

use crate::responder::AddressOrAlmost;

use super::responder::{Request, RequestQueue};

pub struct Handler {
Expand All @@ -26,15 +28,117 @@ pub struct Handler {
/// History of requests we answered for token dispersal, with a timestamp and the number of
/// times we've told the user about the rate limit (so that eventually we can stop replying if
/// they keep asking).
send_history: Arc<Mutex<VecDeque<(UserId, Instant, usize)>>>,
send_history: Arc<Mutex<SendHistory>>,
}

struct SendHistory {
pub discord_users: VecDeque<(UserId, Instant, usize)>,
pub penumbra_addresses: VecDeque<(AddressOrAlmost, Instant, usize)>,
}

impl SendHistory {
pub fn new() -> Self {
SendHistory {
discord_users: VecDeque::new(),
penumbra_addresses: VecDeque::new(),
}
}

/// Returns whether the given user is rate limited, and if so, when the rate limit will expire along with
/// the number of times the user has been notified of the rate limit.
pub fn is_rate_limited(
&mut self,
user_id: UserId,
addresses: &[AddressOrAlmost],
) -> std::option::Option<(tokio::time::Instant, usize)> {
let discord_limited = self
.discord_users
.iter_mut()
.find(|(user, _, _)| *user == user_id)
.map(|(_, last_fulfilled, notified)| {
// Increase the notification count by one and return the previous count:
let old_notified = *notified;
*notified += 1;
(*last_fulfilled, old_notified)
});

if discord_limited.is_some() {
return discord_limited;
}

self.penumbra_addresses
.iter_mut()
.find(|(address_or_almost, _, _)| addresses.contains(address_or_almost))
.map(|(_, last_fulfilled, notified)| {
// Increase the notification count by one and return the previous count:
let old_notified = *notified;
*notified += 1;
(*last_fulfilled, old_notified)
})
}

pub fn record_request(&mut self, user_id: UserId, addresses: &[AddressOrAlmost]) {
self.discord_users.push_back((user_id, Instant::now(), 1));
self.penumbra_addresses
.extend(addresses.iter().map(|address_or_almost| {
(
address_or_almost.clone(),
Instant::now(),
1, // notification count
)
}));
}

pub fn prune(&mut self, rate_limit: Duration) {
tracing::trace!("pruning discord user send history");
while let Some((user, last_fulfilled, _)) = self.discord_users.front() {
if last_fulfilled.elapsed() >= rate_limit {
tracing::debug!(?user, ?last_fulfilled, "rate limit expired");
self.discord_users.pop_front();
} else {
break;
}
}
tracing::trace!("finished pruning discord user send history");

tracing::trace!("pruning penumbra address send history");
while let Some((address_or_almost, last_fulfilled, _)) = self.penumbra_addresses.front() {
if last_fulfilled.elapsed() >= rate_limit {
tracing::debug!(?address_or_almost, ?last_fulfilled, "rate limit expired");
self.penumbra_addresses.pop_front();
} else {
break;
}
}
tracing::trace!("finished pruning penumbra address send history");
}

pub fn record_failure(&mut self, user_id: UserId, addresses: &[AddressOrAlmost]) {
// If the request failed, we set the notification count to zero, so that the rate
// limit will not apply to future requests
if let Some((_, _, notified)) = self
.discord_users
.iter_mut()
.find(|(user, _, _)| *user == user_id)
{
*notified = notified.saturating_sub(1);
}
if let Some((_, _, notified)) = self
.penumbra_addresses
.iter_mut()
.find(|(address_or_almost, _, _)| addresses.contains(address_or_almost))
{
*notified = notified.saturating_sub(1);
}
}
}

impl Handler {
pub fn new(rate_limit: Duration, reply_limit: usize) -> Self {
Handler {
rate_limit,
reply_limit,
send_history: Arc::new(Mutex::new(VecDeque::new())),
send_history: Arc::new(Mutex::new(SendHistory::new())),
}
}
}
Expand Down Expand Up @@ -84,20 +188,7 @@ impl EventHandler for Handler {
}

// Prune the send history of all expired rate limit timeouts
{
tracing::trace!("pruning send history");
// scoped to prevent deadlock on send_history
let mut send_history = self.send_history.lock().unwrap();
while let Some((user, last_fulfilled, _)) = send_history.front() {
if last_fulfilled.elapsed() >= self.rate_limit {
tracing::debug!(?user, ?last_fulfilled, "rate limit expired");
send_history.pop_front();
} else {
break;
}
}
tracing::trace!("finished pruning send history");
}
self.send_history.lock().unwrap().prune(self.rate_limit);

// Check if the message contains a penumbra address and create a request for it if so
let (response, request) = if let Some(parsed) = { Request::try_new(&message) } {
Expand All @@ -107,19 +198,14 @@ impl EventHandler for Handler {
return;
};

let penumbra_addresses = request.addresses().to_owned();

// If the message author was in the send history, don't send them tokens
let rate_limited = self
.send_history
.lock()
.unwrap()
.iter_mut()
.find(|(user, _, _)| *user == user_id)
.map(|(_, last_fulfilled, notified)| {
// Increase the notification count by one and return the previous count:
let old_notified = *notified;
*notified += 1;
(*last_fulfilled, old_notified)
});
.is_rate_limited(user_id, &penumbra_addresses);

if let Some((last_fulfilled, notified)) = rate_limited {
tracing::info!(
Expand Down Expand Up @@ -155,7 +241,7 @@ impl EventHandler for Handler {
self.send_history
.lock()
.unwrap()
.push_back((user_id, Instant::now(), 1));
.record_request(user_id, &penumbra_addresses);

// Send the message to the queue, to be processed asynchronously
tracing::trace!("sending message to worker queue");
Expand All @@ -180,13 +266,7 @@ impl EventHandler for Handler {
self.send_history
.lock()
.unwrap()
.iter_mut()
.find(|(user, _, _)| *user == user_id)
.map(|(_, _, notified)| {
// If the request failed, we set the notification count to zero, so that the rate
// limit will not apply to future requests
*notified = notified.saturating_sub(1);
});
.record_failure(user_id, &penumbra_addresses);
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/responder/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ pub struct Request {
}

/// Either a correctly parsed address, or something that looks almost like it.
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Eq, PartialEq)]
pub enum AddressOrAlmost {
Address(Box<Address>),
Almost(String),
Expand Down

0 comments on commit 1f644c9

Please sign in to comment.