Skip to content

Commit

Permalink
Remove a stream host from the pool when it starts erroring
Browse files Browse the repository at this point in the history
  • Loading branch information
tazz4843 committed Dec 18, 2023
1 parent ae51206 commit a2c57eb
Showing 1 changed file with 34 additions and 2 deletions.
36 changes: 34 additions & 2 deletions scripty_stt/src/load_balancer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ pub struct LoadBalancedStream {
is_overloaded: Arc<AtomicBool>,
can_overload: bool,
waiting_for_new_stream: Arc<AtomicBool>,
is_errored: Arc<AtomicBool>,

msg_tx: Sender<ClientToServerMessage>,
msg_rx_transmit_handle: Sender<ServerToClientMessage>,
Expand All @@ -264,20 +265,23 @@ impl LoadBalancedStream {
#[inline]
pub fn is_in_error(&self) -> bool {
self.waiting_for_new_stream.load(Ordering::Relaxed)
|| self.is_errored.load(Ordering::Relaxed)
}

pub async fn open_connection(&self) -> Result<Stream, ModelError> {
if !self.can_overload && self.is_overloaded() {
return Err(ModelError::OverloadedRemote);
}

Stream::new(
let res = Stream::new(
self.peer_address,
self.msg_tx.clone(),
self.msg_rx_transmit_handle.subscribe(),
self.purge_tx.clone(),
)
.await
.await;
self.is_errored.store(res.is_err(), Ordering::Relaxed);
res
}

pub async fn new(
Expand Down Expand Up @@ -548,6 +552,33 @@ impl LoadBalancedStream {
}
});

let is_errored = Arc::new(AtomicBool::new(false));
let ie2 = Arc::clone(&is_errored);
let cts2 = client_to_server_tx.clone();
let stc2 = server_to_client_tx.clone();
let ptx2 = purge_tx.clone();
// If in error state, clear out the queue
// and also try creating a new worker every few seconds.
// When one does succeed, unset the flag
tokio::spawn(async move {
loop {
if ie2.load(Ordering::Relaxed) {
// try fetching a new worker
match Stream::new(peer_address, cts2.clone(), stc2.subscribe(), ptx2.clone())
.await
{
Ok(_) => {
ie2.store(false, Ordering::Relaxed);
}
Err(e) => {
error!("error fetching new worker: {}", e);
}
}
}
tokio::time::sleep(Duration::from_secs(5)).await;
}
});

Ok(Self {
peer_address,
is_overloaded,
Expand All @@ -557,6 +588,7 @@ impl LoadBalancedStream {
msg_rx_transmit_handle: server_to_client_tx,
_msg_rx: server_to_client_rx,
purge_tx,
is_errored,
})
}
}
Expand Down

0 comments on commit a2c57eb

Please sign in to comment.