Skip to content

Commit

Permalink
nuke VecEventQUeue
Browse files Browse the repository at this point in the history
  • Loading branch information
felipecsl committed Jan 17, 2025
1 parent bde8dc4 commit 84a59ff
Show file tree
Hide file tree
Showing 5 changed files with 13 additions and 367 deletions.
4 changes: 0 additions & 4 deletions eppo_core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,8 @@ serde-bool = "0.1.3"
serde_json = "1.0.116"
serde_with = { version = "3.11.0", default-features = false, features = ["base64", "hex", "macros"] }
thiserror = "2.0.3"
<<<<<<< HEAD
linked_hash_set = { version = "0.1.5", optional = true }
tokio = { version = "1.34.0", default-features = false, features = ["macros", "sync", "rt", "time", "test-util"] }
=======
tokio = { version = "1.34.0", default-features = false, features = ["macros", "sync", "rt", "time"] }
>>>>>>> main
url = "2.5.0"
uuid = { version = "1.11.0", features = ["v4", "serde"], optional = true }

Expand Down
1 change: 1 addition & 0 deletions eppo_core/src/event_ingestion/delivery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ impl DeliveryStatus {
pub(super) async fn delivery(
mut uplink: mpsc::Receiver<BatchedMessage<QueuedEvent>>,
delivery_status: mpsc::Sender<DeliveryStatus>,
max_retries: u8,
event_delivery: EventDelivery,
) -> Option<()> {
loop {
Expand Down
42 changes: 12 additions & 30 deletions eppo_core/src/event_ingestion/event_dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use crate::event_ingestion::delivery::DeliveryStatus;
use crate::event_ingestion::event::Event;
use crate::event_ingestion::event_delivery::EventDelivery;
use crate::event_ingestion::queued_event::QueuedEvent;
use crate::event_ingestion::vec_event_queue::{EventQueue, QueueError};
use crate::event_ingestion::{auto_flusher, batcher, delivery};
use tokio::sync::mpsc;
use tokio::sync::mpsc::{Receiver, Sender};
Expand All @@ -21,40 +20,27 @@ pub struct EventDispatcherConfig {
pub delivery_interval: Duration,
pub retry_interval: Duration,
pub max_retry_delay: Duration,
pub max_retries: u8,
pub batch_size: usize,
pub max_queue_size: usize,
}

#[derive(thiserror::Error, Debug)]
pub enum DispatcherError {
#[error("Queue error: {0}")]
QueueError(QueueError),
#[error("Receiver should not be closed before all senders are closed")]
EventDeliveryError,
}

/// EventDispatcher is responsible for batching events and delivering them to the ingestion service
/// via [`EventDelivery`].
pub struct EventDispatcher<T> {
pub struct EventDispatcher {
config: EventDispatcherConfig,
event_queue: T,
}

impl<T: EventQueue + Clone + Send + 'static> EventDispatcher<T> {
pub fn new(config: EventDispatcherConfig, event_queue: T) -> Self {
impl EventDispatcher {
pub fn new(config: EventDispatcherConfig) -> Self {
EventDispatcher {
config,
event_queue,
}
}

/// Enqueues an event in the batch event processor and starts delivery if needed.
pub fn dispatch(&self, event: Event) -> Option<DispatcherError> {
self.event_queue
.push(QueuedEvent::new(event))
.map_err(DispatcherError::QueueError)
.err()
}

/// Starts the event dispatcher related tasks and returns a sender and receiver pair.
/// Use the sender to dispatch events and the receiver to receive delivery statuses.
fn spawn_event_dispatcher(&self) -> (Sender<BatchedMessage<QueuedEvent>>, Receiver<DeliveryStatus>) {
let config = self.config.clone();
let ingestion_url = Url::parse(config.ingestion_url.as_str())
Expand Down Expand Up @@ -82,6 +68,7 @@ impl<T: EventQueue + Clone + Send + 'static> EventDispatcher<T> {
tokio::spawn(delivery::delivery(
batcher_downlink_rx,
delivery_downlink_tx,
config.max_retries,
event_delivery,
));

Expand All @@ -92,7 +79,6 @@ impl<T: EventQueue + Clone + Send + 'static> EventDispatcher<T> {
#[cfg(test)]
mod tests {
use super::*;
use crate::event_ingestion::vec_event_queue::{VecEventQueue, VecEventQueueConfig};
use crate::timestamp::now;
use serde::Serialize;
use serde_json::json;
Expand Down Expand Up @@ -191,23 +177,19 @@ mod tests {
EventDispatcherConfig {
sdk_key: "test-sdk-key".to_string(),
ingestion_url,
batch_size,
delivery_interval: Duration::from_millis(100),
retry_interval: Duration::from_millis(1000),
max_retry_delay: Duration::from_millis(5000),
batch_size,
max_retries: 3,
max_queue_size: 10,
}
}

async fn run_dispatcher_task(event: Event, mock_server_uri: String) -> Receiver<DeliveryStatus> {
let batch_size = 1;
let config = new_test_event_config(mock_server_uri, batch_size);
let vec_event_queue_config = VecEventQueueConfig {
max_retries: 3,
max_queue_size: 10,
batch_size,
};
let dispatcher = EventDispatcher::new(config, VecEventQueue::new(vec_event_queue_config));
dispatcher.dispatch(event.clone());
let dispatcher = EventDispatcher::new(config);
let (tx, rx) = dispatcher.spawn_event_dispatcher();
tx.send(BatchedMessage::new(vec![QueuedEvent::new(event)], None)).await.unwrap();
// wait some time for the async task to finish
Expand Down
1 change: 0 additions & 1 deletion eppo_core/src/event_ingestion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,5 @@ mod event;
mod event_delivery;
mod event_dispatcher;
mod queued_event;
mod vec_event_queue;

use batched_message::BatchedMessage;
Loading

0 comments on commit 84a59ff

Please sign in to comment.