From 0b49d15e4eb11afaeb6c04c919f966c3e478eee8 Mon Sep 17 00:00:00 2001 From: Ali Behjati Date: Thu, 22 Aug 2024 11:26:13 +0200 Subject: [PATCH] fix: disable batch flushing by default --- Cargo.lock | 2 +- Cargo.toml | 2 +- config/config.toml | 9 ++++++++- src/agent/pyth/rpc.rs | 40 +++++++++++++++++++++++++++++++++++----- 4 files changed, 45 insertions(+), 8 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4de4fd9..5919b0f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3400,7 +3400,7 @@ dependencies = [ [[package]] name = "pyth-agent" -version = "2.10.3" +version = "2.10.4" dependencies = [ "anyhow", "async-trait", diff --git a/Cargo.toml b/Cargo.toml index 672bbd2..cb8cb1d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "pyth-agent" -version = "2.10.3" +version = "2.10.4" edition = "2021" [[bin]] diff --git a/config/config.toml b/config/config.toml index d29904e..01d0556 100644 --- a/config/config.toml +++ b/config/config.toml @@ -14,8 +14,15 @@ listen_address = "127.0.0.1:8910" # received from the Price state. # notify_price_sched_tx_buffer = 10000 +# Whether flush messages and responses to the client immediately. Once disabled the +# messages will be flushed every `flush_interval_duration`. Disabling it is useful if +# there are many messages to be sent between the client and the server to avoid overloading +# the connection. +# instant_flush = true + # Flush interval for responses and notifications. This is the maximum time the -# server will wait before flushing the messages to the client. +# server will wait before flushing the messages to the client. It will have no +# effect if `flush_immediately` is set to true. # flush_interval_duration = "50ms" # Configuration for the primary network this agent will publish data to. In most cases this should be a Pythnet endpoint. diff --git a/src/agent/pyth/rpc.rs b/src/agent/pyth/rpc.rs index ae52f0e..df16169 100644 --- a/src/agent/pyth/rpc.rs +++ b/src/agent/pyth/rpc.rs @@ -18,6 +18,7 @@ use { anyhow, Result, }, + futures::future::OptionFuture, futures_util::{ stream::{ SplitSink, @@ -50,7 +51,10 @@ use { sync::Arc, time::Duration, }, - tokio::sync::mpsc, + tokio::{ + sync::mpsc, + time::Interval, + }, tracing::instrument, warp::{ ws::{ @@ -111,11 +115,18 @@ enum ConnectionError { WebsocketConnectionClosed, } +#[derive(Debug)] +enum FlushStrategy { + Instant, + Interval(Interval), +} + async fn handle_connection( ws_conn: WebSocket, state: Arc, notify_price_tx_buffer: usize, notify_price_sched_tx_buffer: usize, + instant_flush: bool, flush_interval_duration: Duration, ) where S: state::Prices, @@ -129,7 +140,10 @@ async fn handle_connection( let (mut notify_price_sched_tx, mut notify_price_sched_rx) = mpsc::channel(notify_price_sched_tx_buffer); - let mut flush_interval = tokio::time::interval(flush_interval_duration); + let mut flush_strategy = match instant_flush { + true => FlushStrategy::Instant, + false => FlushStrategy::Interval(tokio::time::interval(flush_interval_duration)), + }; loop { if let Err(err) = handle_next( @@ -140,7 +154,7 @@ async fn handle_connection( &mut notify_price_rx, &mut notify_price_sched_tx, &mut notify_price_sched_rx, - &mut flush_interval, + &mut flush_strategy, ) .await { @@ -156,6 +170,7 @@ async fn handle_connection( } } +#[allow(clippy::too_many_arguments)] async fn handle_next( state: &S, ws_tx: &mut SplitSink, @@ -164,11 +179,17 @@ async fn handle_next( notify_price_rx: &mut mpsc::Receiver, notify_price_sched_tx: &mut mpsc::Sender, notify_price_sched_rx: &mut mpsc::Receiver, - flush_interval: &mut tokio::time::Interval, + flush_strategy: &mut FlushStrategy, ) -> Result<()> where S: state::Prices, { + let optional_flush_tick: OptionFuture<_> = match flush_strategy { + FlushStrategy::Instant => None, + FlushStrategy::Interval(interval) => Some(interval.tick()), + } + .into(); + tokio::select! { msg = ws_rx.next() => { match msg { @@ -196,9 +217,14 @@ where feed_notification(ws_tx, Method::NotifyPriceSched, Some(notify_price_sched)) .await } - _ = flush_interval.tick() => { + Some(_) = optional_flush_tick => { flush(ws_tx).await } + }?; + + match flush_strategy { + FlushStrategy::Interval(_) => Ok(()), + FlushStrategy::Instant => flush(ws_tx).await, } } @@ -413,6 +439,8 @@ pub struct Config { /// Size of the buffer of each Server's channel on which `notify_price_sched` events are /// received from the Price state. pub notify_price_sched_tx_buffer: usize, + /// Whether to flush immediately after sending a message or notification. + pub instant_flush: bool, /// Flush interval duration for the notifications. #[serde(with = "humantime_serde")] pub flush_interval_duration: Duration, @@ -424,6 +452,7 @@ impl Default for Config { listen_address: "127.0.0.1:8910".to_string(), notify_price_tx_buffer: 10000, notify_price_sched_tx_buffer: 10000, + instant_flush: true, flush_interval_duration: Duration::from_millis(50), } } @@ -465,6 +494,7 @@ where state, config.notify_price_tx_buffer, config.notify_price_sched_tx_buffer, + config.instant_flush, config.flush_interval_duration, ) .await