Skip to content

Commit

Permalink
fix: disable batch flushing by default
Browse files Browse the repository at this point in the history
  • Loading branch information
ali-bahjati committed Aug 22, 2024
1 parent e4683c7 commit 0b49d15
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 8 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "pyth-agent"
version = "2.10.3"
version = "2.10.4"
edition = "2021"

[[bin]]
Expand Down
9 changes: 8 additions & 1 deletion config/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,15 @@ listen_address = "127.0.0.1:8910"
# received from the Price state.
# notify_price_sched_tx_buffer = 10000

# Whether flush messages and responses to the client immediately. Once disabled the
# messages will be flushed every `flush_interval_duration`. Disabling it is useful if
# there are many messages to be sent between the client and the server to avoid overloading
# the connection.
# instant_flush = true

# Flush interval for responses and notifications. This is the maximum time the
# server will wait before flushing the messages to the client.
# server will wait before flushing the messages to the client. It will have no
# effect if `flush_immediately` is set to true.
# flush_interval_duration = "50ms"

# Configuration for the primary network this agent will publish data to. In most cases this should be a Pythnet endpoint.
Expand Down
40 changes: 35 additions & 5 deletions src/agent/pyth/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use {
anyhow,
Result,
},
futures::future::OptionFuture,
futures_util::{
stream::{
SplitSink,
Expand Down Expand Up @@ -50,7 +51,10 @@ use {
sync::Arc,
time::Duration,
},
tokio::sync::mpsc,
tokio::{
sync::mpsc,
time::Interval,
},
tracing::instrument,
warp::{
ws::{
Expand Down Expand Up @@ -111,11 +115,18 @@ enum ConnectionError {
WebsocketConnectionClosed,
}

#[derive(Debug)]
enum FlushStrategy {
Instant,
Interval(Interval),
}

async fn handle_connection<S>(
ws_conn: WebSocket,
state: Arc<S>,
notify_price_tx_buffer: usize,
notify_price_sched_tx_buffer: usize,
instant_flush: bool,
flush_interval_duration: Duration,
) where
S: state::Prices,
Expand All @@ -129,7 +140,10 @@ async fn handle_connection<S>(
let (mut notify_price_sched_tx, mut notify_price_sched_rx) =
mpsc::channel(notify_price_sched_tx_buffer);

let mut flush_interval = tokio::time::interval(flush_interval_duration);
let mut flush_strategy = match instant_flush {
true => FlushStrategy::Instant,
false => FlushStrategy::Interval(tokio::time::interval(flush_interval_duration)),
};

loop {
if let Err(err) = handle_next(
Expand All @@ -140,7 +154,7 @@ async fn handle_connection<S>(
&mut notify_price_rx,
&mut notify_price_sched_tx,
&mut notify_price_sched_rx,
&mut flush_interval,
&mut flush_strategy,
)
.await
{
Expand All @@ -156,6 +170,7 @@ async fn handle_connection<S>(
}
}

#[allow(clippy::too_many_arguments)]
async fn handle_next<S>(
state: &S,
ws_tx: &mut SplitSink<WebSocket, Message>,
Expand All @@ -164,11 +179,17 @@ async fn handle_next<S>(
notify_price_rx: &mut mpsc::Receiver<NotifyPrice>,
notify_price_sched_tx: &mut mpsc::Sender<NotifyPriceSched>,
notify_price_sched_rx: &mut mpsc::Receiver<NotifyPriceSched>,
flush_interval: &mut tokio::time::Interval,
flush_strategy: &mut FlushStrategy,
) -> Result<()>
where
S: state::Prices,
{
let optional_flush_tick: OptionFuture<_> = match flush_strategy {
FlushStrategy::Instant => None,
FlushStrategy::Interval(interval) => Some(interval.tick()),
}
.into();

tokio::select! {
msg = ws_rx.next() => {
match msg {
Expand Down Expand Up @@ -196,9 +217,14 @@ where
feed_notification(ws_tx, Method::NotifyPriceSched, Some(notify_price_sched))
.await
}
_ = flush_interval.tick() => {
Some(_) = optional_flush_tick => {
flush(ws_tx).await
}
}?;

match flush_strategy {
FlushStrategy::Interval(_) => Ok(()),
FlushStrategy::Instant => flush(ws_tx).await,
}
}

Expand Down Expand Up @@ -413,6 +439,8 @@ pub struct Config {
/// Size of the buffer of each Server's channel on which `notify_price_sched` events are
/// received from the Price state.
pub notify_price_sched_tx_buffer: usize,
/// Whether to flush immediately after sending a message or notification.
pub instant_flush: bool,
/// Flush interval duration for the notifications.
#[serde(with = "humantime_serde")]
pub flush_interval_duration: Duration,
Expand All @@ -424,6 +452,7 @@ impl Default for Config {
listen_address: "127.0.0.1:8910".to_string(),
notify_price_tx_buffer: 10000,
notify_price_sched_tx_buffer: 10000,
instant_flush: true,
flush_interval_duration: Duration::from_millis(50),
}
}
Expand Down Expand Up @@ -465,6 +494,7 @@ where
state,
config.notify_price_tx_buffer,
config.notify_price_sched_tx_buffer,
config.instant_flush,
config.flush_interval_duration,
)
.await
Expand Down

0 comments on commit 0b49d15

Please sign in to comment.