Skip to content

Commit

Permalink
fix failing tests
Browse files Browse the repository at this point in the history
  • Loading branch information
felipecsl committed Jan 17, 2025
1 parent 0286935 commit 235e096
Show file tree
Hide file tree
Showing 7 changed files with 92 additions and 72 deletions.
2 changes: 1 addition & 1 deletion eppo_core/src/event_ingestion/auto_flusher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ mod tests {
use crate::event_ingestion::auto_flusher;
use crate::event_ingestion::batched_message::BatchedMessage;
use tokio::sync::mpsc;
use tokio::time::{self, Duration};
use tokio::time::{Duration};

#[tokio::test(start_paused = true)]
async fn test_auto_flusher() {
Expand Down
8 changes: 8 additions & 0 deletions eppo_core/src/event_ingestion/batched_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,14 @@ impl<T> BatchedMessage<T> {
}
}

/// Create a new message with a batch of data and optionally a list of watchers.
pub fn new(batch: Vec<T>, flush: Option<()>) -> BatchedMessage<T> {
BatchedMessage {
batch,
flush,
}
}

pub fn requires_flush(&self) -> bool {
self.flush.is_some()
}
Expand Down
64 changes: 50 additions & 14 deletions eppo_core/src/event_ingestion/delivery.rs
Original file line number Diff line number Diff line change
@@ -1,51 +1,87 @@
use super::{BatchedMessage, Event};
use super::BatchedMessage;
use crate::event_ingestion::event_delivery::{EventDelivery, EventDeliveryError};
use crate::event_ingestion::queued_event::QueuedEvent;
use log::warn;
use tokio::sync::mpsc;

pub(super) struct DeliveryStatus {
success: Vec<Event>,
failure: Vec<Event>,
pub success: Vec<QueuedEvent>,
pub failure: Vec<QueuedEvent>,
}

impl DeliveryStatus {
fn with_success(success: Vec<QueuedEvent>) -> Self {
DeliveryStatus {
success,
failure: Vec::new(),
}
}

fn with_failure(failure: Vec<QueuedEvent>) -> Self {
DeliveryStatus {
success: Vec::new(),
failure,
}
}
}

pub(super) async fn delivery(
mut uplink: mpsc::Receiver<BatchedMessage<Event>>,
mut uplink: mpsc::Receiver<BatchedMessage<QueuedEvent>>,
delivery_status: mpsc::Sender<DeliveryStatus>,
event_delivery: EventDelivery,
) -> Option<()> {
loop {
let event_delivery = event_delivery.clone();
let BatchedMessage { batch, flush } = uplink.recv().await?;
let result = event_delivery.deliver(batch.clone()).await;
let BatchedMessage {
batch,
flush: _flush,
} = uplink.recv().await?;
if batch.is_empty() {
continue;
}
let result = event_delivery
.deliver(
batch
.clone()
.into_iter()
.map(|queued_event| queued_event.event)
.collect(),
)
.await;
match result {
Ok(response) => {
let failed_event_uuids = response.failed_events;
if !failed_event_uuids.is_empty() {
warn!("Failed to deliver {} events", failed_event_uuids.len());
let mut success = Vec::new();
let mut failure = Vec::new();
for event in batch {
if failed_event_uuids.contains(&event.uuid) {
failure.push(event);
for queued_event in batch {
if failed_event_uuids.contains(&queued_event.event.uuid) {
failure.push(QueuedEvent {
event: queued_event.event,
attempts: queued_event.attempts + 1,
});
} else {
success.push(event);
success.push(queued_event);
}
}
delivery_status
.send(DeliveryStatus { success, failure })
.await
.ok()?;
} else {
delivery_status
.send(DeliveryStatus::with_success(batch))
.await
.ok()?;
}
}
Err(err) => {
match err {
EventDeliveryError::RetriableError(_) => {
// Retry later
delivery_status
.send(DeliveryStatus {
failure: batch,
success: Vec::new(),
})
.send(DeliveryStatus::with_failure(batch))
.await
.ok()?;
}
Expand Down
60 changes: 29 additions & 31 deletions eppo_core/src/event_ingestion/event_dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,14 @@ use crate::event_ingestion::batched_message::BatchedMessage;
use crate::event_ingestion::delivery::DeliveryStatus;
use crate::event_ingestion::event::Event;
use crate::event_ingestion::event_delivery::EventDelivery;
use crate::event_ingestion::queued_event::{QueuedEvent, QueuedEventStatus};
use crate::event_ingestion::queued_event::QueuedEvent;
use crate::event_ingestion::vec_event_queue::{EventQueue, QueueError};
use crate::event_ingestion::{auto_flusher, batcher, delivery};
use tokio::sync::mpsc;
use tokio::sync::mpsc::{Receiver, Sender, UnboundedSender};
use tokio::sync::mpsc::{Receiver, Sender};
use tokio::time::Duration;
use url::Url;

#[derive(Debug)]
pub enum EventDispatcherCommand {
Event,
}

// batch size of one means each event will be delivered individually, thus effectively disabling batching.
const MIN_BATCH_SIZE: usize = 1;
const MAX_BATCH_SIZE: usize = 10_000;
Expand Down Expand Up @@ -60,7 +55,7 @@ impl<T: EventQueue + Clone + Send + 'static> EventDispatcher<T> {
.err()
}

fn spawn_event_dispatcher(&self) -> (Sender<BatchedMessage<Event>>, Receiver<DeliveryStatus>) {
fn spawn_event_dispatcher(&self) -> (Sender<BatchedMessage<QueuedEvent>>, Receiver<DeliveryStatus>) {
let config = self.config.clone();
let ingestion_url = Url::parse(config.ingestion_url.as_str())
.expect("Failed to create EventDelivery. invalid ingestion URL");
Expand Down Expand Up @@ -113,20 +108,31 @@ mod tests {
pub session_id: String,
}

fn init() {
let _ = env_logger::try_init();
}

#[tokio::test]
async fn test_dispatch_starts_delivery() {
init();
let event = new_test_event();
let mock_server = MockServer::start().await;
let mut eppo_events = Vec::new();
eppo_events.push(serde_json::to_value(event.clone()).unwrap());
let expected_body = json!({"eppo_events": eppo_events });
let response_body =
ResponseTemplate::new(200).set_body_json(json!({"failed_events": []}));
Mock::given(method("POST"))
.and(path("/"))
.and(body_json(&expected_body))
.respond_with(ResponseTemplate::new(200))
.respond_with(response_body)
.mount(&mock_server)
.await;
run_dispatcher_task(event, mock_server.uri()).await;
let mut rx = run_dispatcher_task(event.clone(), mock_server.uri()).await;
let delivery_status = rx.recv().await.unwrap();
let successful_events = delivery_status.success.clone();
let failed_events = delivery_status.failure.clone();
drop(delivery_status);
let received_requests = mock_server.received_requests().await.unwrap();
assert_eq!(received_requests.len(), 1);
let received_request = &received_requests[0];
Expand All @@ -135,10 +141,13 @@ mod tests {
let received_body: serde_json::Value =
serde_json::from_slice(&received_request.body).expect("Failed to parse request body");
assert_eq!(received_body, expected_body);
assert_eq!(successful_events, vec![QueuedEvent { event, attempts: 0 }]);
assert_eq!(failed_events.len(), 0);
}

#[tokio::test]
async fn test_dispatch_failed() {
init();
let event = new_test_event();
let mock_server = MockServer::start().await;
let mut eppo_events = Vec::new();
Expand All @@ -152,22 +161,16 @@ mod tests {
.respond_with(response_body)
.mount(&mock_server)
.await;
let queue = run_dispatcher_task(event.clone(), mock_server.uri()).await;
let mut rx = run_dispatcher_task(event.clone(), mock_server.uri()).await;
let delivery_status = rx.recv().await.unwrap();
let successful_events = delivery_status.success.clone();
let failed_events = delivery_status.failure.clone();
drop(delivery_status);
let received_requests = mock_server.received_requests().await.unwrap();
assert_eq!(received_requests.len(), 1);
let failed_events = queue.next_batch(QueuedEventStatus::Failed);
// assert failed event was moved to failed queue
assert_eq!(
failed_events,
vec![QueuedEvent {
event,
attempts: 1,
status: QueuedEventStatus::Failed
}]
);
let pending_events = queue.next_batch(QueuedEventStatus::Pending);
// assert no more pending events
assert_eq!(pending_events, vec![]);
assert_eq!(failed_events, vec![QueuedEvent { event, attempts: 1 }]);
assert_eq!(successful_events.len(), 0);
}

fn new_test_event() -> Event {
Expand Down Expand Up @@ -195,7 +198,7 @@ mod tests {
}
}

async fn run_dispatcher_task(event: Event, mock_server_uri: String) -> VecEventQueue {
async fn run_dispatcher_task(event: Event, mock_server_uri: String) -> Receiver<DeliveryStatus> {
let batch_size = 1;
let config = new_test_event_config(mock_server_uri, batch_size);
let vec_event_queue_config = VecEventQueueConfig {
Expand All @@ -204,16 +207,11 @@ mod tests {
batch_size,
};
let dispatcher = EventDispatcher::new(config, VecEventQueue::new(vec_event_queue_config));
let queue = dispatcher.event_queue.clone();
dispatcher.dispatch(event.clone());
let (tx, rx) = dispatcher.spawn_event_dispatcher();
tx.send(BatchedMessage {
batch: vec![event],
flush: None,
}).await.unwrap();
drop(rx);
tx.send(BatchedMessage::new(vec![QueuedEvent::new(event)], None)).await.unwrap();
// wait some time for the async task to finish
tokio::time::sleep(Duration::from_millis(100)).await;
queue
rx
}
}
1 change: 0 additions & 1 deletion eppo_core/src/event_ingestion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,3 @@ mod queued_event;
mod vec_event_queue;

use batched_message::BatchedMessage;
use event::Event;
5 changes: 1 addition & 4 deletions eppo_core/src/event_ingestion/queued_event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,23 +11,21 @@ pub(super) enum QueuedEventStatus {
pub(super) struct QueuedEvent {
pub event: Event,
pub attempts: u8,
pub status: QueuedEventStatus,
}

impl QueuedEvent {
pub fn new(event: Event) -> Self {
QueuedEvent {
event,
attempts: 0,
status: QueuedEventStatus::Pending,
}
}
}

#[cfg(test)]
mod tests {
use crate::event_ingestion::event::Event;
use crate::event_ingestion::queued_event::{QueuedEvent, QueuedEventStatus};
use crate::event_ingestion::queued_event::QueuedEvent;
use crate::timestamp::now;

#[test]
Expand All @@ -42,6 +40,5 @@ mod tests {
assert_eq!(queued_event.event, event);
assert_eq!(queued_event.attempts, 0);
assert_eq!(queued_event.event.event_type, "test");
assert_eq!(queued_event.status, QueuedEventStatus::Pending);
}
}
24 changes: 3 additions & 21 deletions eppo_core/src/event_ingestion/vec_event_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,6 @@ impl EventQueue for VecEventQueue {
fn enqueue_failed_events_for_retry(&self, failed_event_uuids: Vec<QueuedEvent>) {
let mut failed_event_queue = self.failed_event_queue.lock().unwrap();
for mut failed_event in failed_event_uuids {
failed_event.status = QueuedEventStatus::Failed;
if failed_event.attempts >= self.config.max_retries {
// do not re-add to the queue if max retries is reached and simply drop the event
warn!(
Expand Down Expand Up @@ -201,7 +200,6 @@ mod tests {
let failed_events = queue.next_batch(QueuedEventStatus::Failed);
assert_eq!(failed_events.len(), 1);
assert_eq!(failed_events[0].event.uuid, event.event.uuid);
assert_eq!(failed_events[0].status, QueuedEventStatus::Failed);
assert_eq!(failed_events[0].attempts, 2);
// failing a third time should not requeue since that exceeds max_retries == 2
queue.enqueue_failed_events_for_retry(failed_events);
Expand Down Expand Up @@ -230,21 +228,9 @@ mod tests {
event_type: "C".to_string(),
payload: serde_json::json!({"key": "value"}),
};
let queued_event_a = QueuedEvent {
event: event_a.clone(),
attempts: 0,
status: QueuedEventStatus::Pending,
};
let queued_event_b = QueuedEvent {
event: event_b.clone(),
attempts: 1,
status: QueuedEventStatus::Failed,
};
let queued_event_c = QueuedEvent {
event: event_c.clone(),
attempts: 0,
status: QueuedEventStatus::Pending,
};
let queued_event_a = QueuedEvent::new(event_a.clone());
let queued_event_b = QueuedEvent::new(event_b.clone());
let queued_event_c = QueuedEvent::new(event_c.clone());
let queue = VecEventQueue {
config: VecEventQueueConfig {
batch_size: 10,
Expand Down Expand Up @@ -339,10 +325,6 @@ mod tests {
failed_events.front().unwrap().event.uuid,
queued_event_a.event.uuid
);
assert_eq!(
failed_events.front().unwrap().status,
QueuedEventStatus::Failed
);
assert_eq!(failed_events.front().unwrap().attempts, 1);
let pending_events = queue.pending_event_queue.lock().unwrap();
assert_eq!(pending_events.len(), 1);
Expand Down

0 comments on commit 235e096

Please sign in to comment.