Skip to content

Commit

Permalink
WIP retry failures
Browse files Browse the repository at this point in the history
  • Loading branch information
felipecsl committed Jan 18, 2025
1 parent a3516d5 commit 3dfbe9d
Show file tree
Hide file tree
Showing 8 changed files with 104 additions and 60 deletions.
1 change: 1 addition & 0 deletions eppo_core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ categories = ["config"]
rust-version = "1.75.0"

[features]
default = ["event_ingestion"]
# Unstable feature flag for an upcoming feature.
event_ingestion = ["dep:uuid"]
# Add implementation of `FromPyObject`/`ToPyObject` for some types.
Expand Down
1 change: 0 additions & 1 deletion eppo_core/src/event_ingestion/batcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ pub(super) async fn batcher<T>(
let mut uplink_alive = true;
while uplink_alive {
let mut batch = BatchedMessage::empty();

while uplink_alive && batch.batch.len() < min_batch_size && batch.flush.is_none() {
match uplink.recv().await {
None => {
Expand Down
96 changes: 53 additions & 43 deletions eppo_core/src/event_ingestion/delivery.rs
Original file line number Diff line number Diff line change
@@ -1,81 +1,65 @@
use super::BatchedMessage;
use crate::event_ingestion::event_delivery::{EventDelivery, EventDeliveryError};
use crate::event_ingestion::event_delivery::{
EventDelivery, EventDeliveryError, EventDeliveryResponse,
};
use crate::event_ingestion::queued_event::QueuedEvent;
use log::warn;
use tokio::sync::mpsc;

pub(super) struct DeliveryStatus {
pub(super) struct QueuedBatch {
pub success: Vec<QueuedEvent>,
pub failure: Vec<QueuedEvent>,
pub retry: Vec<QueuedEvent>,
}

impl DeliveryStatus {
impl QueuedBatch {
fn with_success(success: Vec<QueuedEvent>) -> Self {
DeliveryStatus {
QueuedBatch {
success,
failure: Vec::new(),
retry: Vec::new(),
}
}

fn with_failure(failure: Vec<QueuedEvent>) -> Self {
DeliveryStatus {
QueuedBatch {
success: Vec::new(),
retry: Vec::new(),
failure,
}
}
}

pub(super) async fn delivery(
mut uplink: mpsc::Receiver<BatchedMessage<QueuedEvent>>,
delivery_status: mpsc::Sender<DeliveryStatus>,
delivery_status: mpsc::Sender<QueuedBatch>,
max_retries: u8,
event_delivery: EventDelivery,
) -> Option<()> {
loop {
let event_delivery = event_delivery.clone();
let BatchedMessage {
batch,
flush: _flush,
} = uplink.recv().await?;
if batch.is_empty() {
continue;
}
let result = event_delivery
.deliver(
batch
.clone()
.into_iter()
.map(|queued_event| queued_event.event)
.collect(),
)
.await;
let events_to_deliver = batch
.clone()
.into_iter()
.map(|queued_event| queued_event.event)
.collect();
let result = event_delivery.deliver(events_to_deliver).await;
match result {
Ok(response) => {
let failed_event_uuids = response.failed_events;
if !failed_event_uuids.is_empty() {
warn!("Failed to deliver {} events", failed_event_uuids.len());
let mut success = Vec::new();
let mut failure = Vec::new();
for queued_event in batch {
if failed_event_uuids.contains(&queued_event.event.uuid) {
failure.push(QueuedEvent {
event: queued_event.event,
attempts: queued_event.attempts + 1,
});
} else {
success.push(queued_event);
}
}
deliver_status(&delivery_status, DeliveryStatus { success, failure }).await;
} else {
deliver_status(&delivery_status, DeliveryStatus::with_success(batch)).await;
}
let delivery_status_data = collect_delivery_response(batch, response, max_retries);
deliver_status(&delivery_status, delivery_status_data).await;
}
Err(err) => {
match err {
EventDeliveryError::RetriableError(_) => {
// Retry later
deliver_status(&delivery_status, DeliveryStatus::with_failure(batch)).await;
deliver_status(&delivery_status, QueuedBatch::with_failure(batch)).await;
}
EventDeliveryError::NonRetriableError(_) => {
warn!("Failed to deliver events: {}", err);
Expand All @@ -88,11 +72,37 @@ pub(super) async fn delivery(
}
}

async fn deliver_status(receiver: &mpsc::Sender<DeliveryStatus>, status: DeliveryStatus) {
receiver
.send(status)
.await
.unwrap_or_else(|err| {
warn!("Failed to send delivery status: {}", err);
});
fn collect_delivery_response(
batch: Vec<QueuedEvent>,
response: EventDeliveryResponse,
max_retries: u8,
) -> QueuedBatch {
let failed_event_uuids = response.failed_events;
if failed_event_uuids.is_empty() {
return QueuedBatch::with_success(batch);
}
warn!("Failed to deliver {} events", failed_event_uuids.len());
let mut success = Vec::new();
let mut failure = Vec::new();
let mut retry = Vec::new();
for queued_event in batch {
if failed_event_uuids.contains(&queued_event.event.uuid) {
if queued_event.attempts < max_retries {
// increment failed attempts count and retry
retry.push(QueuedEvent::new_from_failed(queued_event));
} else {
// max retries reached, mark as failed
failure.push(QueuedEvent::new_from_failed(queued_event));
}
} else {
success.push(queued_event);
}
}
QueuedBatch { success, failure, retry }
}

async fn deliver_status(receiver: &mpsc::Sender<QueuedBatch>, status: QueuedBatch) {
receiver.send(status).await.unwrap_or_else(|err| {
warn!("Failed to send delivery status: {}", err);
});
}
4 changes: 2 additions & 2 deletions eppo_core/src/event_ingestion/event_delivery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,10 @@ impl EventDelivery {

/// Delivers the provided event batch and returns a Vec with the events that failed to be delivered.
pub(super) async fn deliver(
self,
&self,
events: Vec<Event>,
) -> Result<EventDeliveryResponse, EventDeliveryError> {
let ingestion_url = self.ingestion_url;
let ingestion_url = self.ingestion_url.clone();
let sdk_key = &self.sdk_key;
debug!("Delivering {} events to {}", events.len(), ingestion_url);
let body = IngestionRequestBody {
Expand Down
20 changes: 13 additions & 7 deletions eppo_core/src/event_ingestion/event_dispatcher.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
use crate::event_ingestion::batched_message::BatchedMessage;
use crate::event_ingestion::delivery::DeliveryStatus;
use crate::event_ingestion::event::Event;
use crate::event_ingestion::event_delivery::EventDelivery;
use crate::event_ingestion::queued_event::QueuedEvent;
use crate::event_ingestion::{auto_flusher, batcher, delivery};
use crate::event_ingestion::{auto_flusher, batcher, delivery, retry};
use tokio::sync::mpsc;
use tokio::sync::mpsc::{Receiver, Sender};
use tokio::time::Duration;
use url::Url;
use crate::event_ingestion::retry::FinishedBatch;

// batch size of one means each event will be delivered individually, thus effectively disabling batching.
const MIN_BATCH_SIZE: usize = 1;
Expand Down Expand Up @@ -41,7 +41,7 @@ impl EventDispatcher {

/// Starts the event dispatcher related tasks and returns a sender and receiver pair.
/// Use the sender to dispatch events and the receiver to receive delivery statuses.
fn spawn_event_dispatcher(&self) -> (Sender<BatchedMessage<QueuedEvent>>, Receiver<DeliveryStatus>) {
fn spawn_event_dispatcher(&self) -> (Sender<BatchedMessage<QueuedEvent>>, Receiver<FinishedBatch>) {
let config = self.config.clone();
let ingestion_url = Url::parse(config.ingestion_url.as_str())
.expect("Failed to create EventDelivery. invalid ingestion URL");
Expand All @@ -52,7 +52,8 @@ impl EventDispatcher {
let (sender, flusher_uplink_rx) = mpsc::channel(channel_size);
let (flusher_downlink_tx, flusher_downlink_rx) = mpsc::channel(channel_size);
let (batcher_downlink_tx, batcher_downlink_rx) = mpsc::channel(channel_size);
let (delivery_downlink_tx, receiver) = mpsc::channel(channel_size);
let (delivery_downlink_tx, delivery_downlink_rx) = mpsc::channel(channel_size);
let (retry_downlink_tx, receiver) = mpsc::channel(channel_size);

// Spawn the auto_flusher, batcher and delivery
tokio::spawn(auto_flusher::auto_flusher(
Expand All @@ -62,7 +63,7 @@ impl EventDispatcher {
));
tokio::spawn(batcher::batcher(
flusher_downlink_rx,
batcher_downlink_tx,
batcher_downlink_tx.clone(),
config.batch_size,
));
tokio::spawn(delivery::delivery(
Expand All @@ -71,6 +72,11 @@ impl EventDispatcher {
config.max_retries,
event_delivery,
));
tokio::spawn(retry::retry(
delivery_downlink_rx,
batcher_downlink_tx,
retry_downlink_tx,
));

(sender, receiver)
}
Expand Down Expand Up @@ -181,12 +187,12 @@ mod tests {
delivery_interval: Duration::from_millis(100),
retry_interval: Duration::from_millis(1000),
max_retry_delay: Duration::from_millis(5000),
max_retries: 3,
max_retries: 2,
max_queue_size: 10,
}
}

async fn run_dispatcher_task(event: Event, mock_server_uri: String) -> Receiver<DeliveryStatus> {
async fn run_dispatcher_task(event: Event, mock_server_uri: String) -> Receiver<FinishedBatch> {
let batch_size = 1;
let config = new_test_event_config(mock_server_uri, batch_size);
let dispatcher = EventDispatcher::new(config);
Expand Down
1 change: 1 addition & 0 deletions eppo_core/src/event_ingestion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,6 @@ mod event;
mod event_delivery;
mod event_dispatcher;
mod queued_event;
mod retry;

use batched_message::BatchedMessage;
15 changes: 8 additions & 7 deletions eppo_core/src/event_ingestion/queued_event.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,5 @@
use crate::event_ingestion::event::Event;

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub(super) enum QueuedEventStatus {
Pending,
Retry,
Failed,
}

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub(super) struct QueuedEvent {
pub event: Event,
Expand All @@ -20,6 +13,14 @@ impl QueuedEvent {
attempts: 0,
}
}

/// Creates a new QueuedEvent from a failed QueuedEvent, incrementing the attempts counter.
pub fn new_from_failed(queued_event: QueuedEvent) -> Self {
QueuedEvent {
event: queued_event.event,
attempts: queued_event.attempts + 1,
}
}
}

#[cfg(test)]
Expand Down
26 changes: 26 additions & 0 deletions eppo_core/src/event_ingestion/retry.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
use tokio::sync::mpsc;
use crate::event_ingestion::batched_message::BatchedMessage;
use crate::event_ingestion::delivery::QueuedBatch;
use crate::event_ingestion::queued_event::QueuedEvent;

pub(super) struct FinishedBatch {
pub success: Vec<QueuedEvent>,
pub failure: Vec<QueuedEvent>,
}

/// Retry events that failed to be delivered through `retry_downlink`, forwards remaining events to
/// `delivery_status`.
pub(super) async fn retry(
mut uplink: mpsc::Receiver<QueuedBatch>,
retry_downlink: mpsc::Sender<BatchedMessage<QueuedEvent>>,
delivery_status: mpsc::Sender<FinishedBatch>,
) -> Option<()> {
loop {
let QueuedBatch { retry, success, failure } = uplink.recv().await?;
if !retry.is_empty() {
retry_downlink.send(BatchedMessage::new(retry, None)).await.ok()?;
}
// forward remaining events to delivery
delivery_status.send(FinishedBatch { success, failure }).await.ok()?;
}
}

0 comments on commit 3dfbe9d

Please sign in to comment.