Skip to content

Commit

Permalink
feat(events): Scaffold EventDispatcher (#132)
Browse files Browse the repository at this point in the history
* feat(events): [WIP] Scaffold `EventDispatcher`

* fix test

* fix tokio features for tokio::test

* use timestamp type

* use proper uuid type

* code review: use duration

* code review rename

* replace hashmap with generic type

* update async code according to code review

* update test for accuracy

* Update eppo_core/src/events/event_dispatcher.rs

Co-authored-by: Oleksii Shmalko <oleksii@geteppo.com>

* Update eppo_core/src/events/event_dispatcher.rs

Co-authored-by: Oleksii Shmalko <oleksii@geteppo.com>

* Update eppo_core/src/events/event_dispatcher.rs

Co-authored-by: Oleksii Shmalko <oleksii@geteppo.com>

* code review updates

* revert this

* revert sdk-test-data changes

* set min batch size to zero

* get rid of batch_event_queue

* minor nit

* pas in channel

* improve test

* remove log init

* Update eppo_core/src/events/event_dispatcher.rs

Co-authored-by: Oleksii Shmalko <oleksii@geteppo.com>

* code review comments

* set min batch size to one

* more code review comments

* more comments

* remove env logger init

---------

Co-authored-by: Oleksii Shmalko <oleksii@geteppo.com>
  • Loading branch information
felipecsl and rasendubi authored Jan 7, 2025
1 parent cfb2f0d commit 6f360ed
Show file tree
Hide file tree
Showing 9 changed files with 206 additions and 10 deletions.
3 changes: 2 additions & 1 deletion eppo_core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,9 @@ serde-bool = "0.1.3"
serde_json = "1.0.116"
serde_with = { version = "3.11.0", default-features = false, features = ["base64", "hex", "macros"] }
thiserror = "2.0.3"
tokio = { version = "1.34.0", features = ["rt", "time"] }
tokio = { version = "1.34.0", features = ["rt", "time", "macros"] }
url = "2.5.0"
uuid = { version = "1.11.0", features = ["v4", "serde"] }

# pyo3 dependencies
pyo3 = { version = "0.22.0", optional = true, default-features = false }
Expand Down
2 changes: 1 addition & 1 deletion eppo_core/src/configuration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ impl Configuration {
/// Return bandit configuration for the given key.
///
/// Returns `None` if bandits are missing for bandit does not exist.
pub(crate) fn get_bandit<'a>(&'a self, bandit_key: &str) -> Option<&'a BanditConfiguration> {
pub(crate) fn get_bandit(&self, bandit_key: &str) -> Option<&BanditConfiguration> {
self.bandits.as_ref()?.bandits.get(bandit_key)
}

Expand Down
2 changes: 1 addition & 1 deletion eppo_core/src/configuration_store.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//! A thread-safe in-memory storage for currently active configuration. [`ConfigurationStore`]
//! provides a concurrent access for readers (e.g., flag evaluation) and writers (e.g., periodic
//! provides concurrent access for readers (e.g., flag evaluation) and writers (e.g., periodic
//! configuration fetcher).
use std::sync::{Arc, RwLock};

Expand Down
4 changes: 2 additions & 2 deletions eppo_core/src/eval/eval_assignment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ impl CompiledFlagsConfig {
flag.eval(visitor, subject_key, subject_attributes, now)
}

fn get_flag<'a>(&'a self, flag_key: &str) -> Result<&'a Flag, EvaluationFailure> {
fn get_flag(&self, flag_key: &str) -> Result<&Flag, EvaluationFailure> {
let flag = self
.flags
.get(flag_key)
Expand Down Expand Up @@ -484,7 +484,7 @@ mod tests {
name: "test",
version: "0.1.0",
},
std::fs::read("../sdk-test-data/ufc/flags-v1.json").unwrap(),
fs::read("../sdk-test-data/ufc/flags-v1.json").unwrap(),
)
.unwrap();
let config = Configuration::from_server_response(config, None);
Expand Down
3 changes: 3 additions & 0 deletions eppo_core/src/events.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
mod event_dispatcher;
mod event;

use std::{collections::HashMap, sync::Arc};

use serde::Serialize;
Expand Down
10 changes: 10 additions & 0 deletions eppo_core/src/events/event.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
use crate::timestamp::Timestamp;
use serde::{Deserialize, Serialize};

#[derive(Debug, Serialize, Deserialize)]
pub struct Event {
pub uuid: uuid::Uuid,
pub timestamp: Timestamp,
pub event_type: String,
pub payload: serde_json::Value,
}
182 changes: 182 additions & 0 deletions eppo_core/src/events/event_dispatcher.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
use crate::events::event::Event;
use log::info;
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
use tokio::time::{Duration, Instant};

#[derive(Debug)]
pub enum EventDispatcherCommand {
Event(Event),
Flush,
}

// batch size of one means each event will be delivered individually, thus effectively disabling batching.
const MIN_BATCH_SIZE: usize = 1;
const MAX_BATCH_SIZE: usize = 10_000;

#[derive(Debug, Clone)]
pub struct EventDispatcherConfig {
pub sdk_key: String,
pub ingestion_url: String,
pub delivery_interval: Duration,
pub retry_interval: Duration,
pub max_retry_delay: Duration,
pub max_retries: Option<u32>,
pub batch_size: usize,
}

pub struct EventDispatcher {
config: EventDispatcherConfig,
tx: UnboundedSender<EventDispatcherCommand>,
}

impl<'a> EventDispatcher {
pub fn new(config: EventDispatcherConfig, tx: UnboundedSender<EventDispatcherCommand>) -> Self {
EventDispatcher { config, tx }
}

/// Enqueues an event in the batch event processor and starts delivery if needed.
pub fn dispatch(&self, event: Event) -> Result<(), &str> {
self.send(EventDispatcherCommand::Event(event))
}

pub fn send(&self, command: EventDispatcherCommand) -> Result<(), &str> {
match self.tx.send(command) {
Ok(_) => Ok(()),
Err(_) => Err("receiver should not be closed before all senders are closed"),
}
}

async fn event_dispatcher(&self, rx: &mut UnboundedReceiver<EventDispatcherCommand>) {
let config = self.config.clone();
let batch_size = config.batch_size;
loop {
let mut batch_queue: Vec<Event> = Vec::with_capacity(batch_size);
let ingestion_url = config.ingestion_url.clone();

// Wait for the first event in the batch.
//
// Optimization: Moved outside the loop below, so we're not woken up on regular intervals
// unless we have something to send. (This achieves a similar effect as starting/stopping
// delivery loop.)
match rx.recv().await {
None => {
// Channel closed, no more messages. Exit the main loop.
return;
}
Some(EventDispatcherCommand::Event(event)) => batch_queue.push(event),
Some(EventDispatcherCommand::Flush) => {
// No buffered events yet, nothing to flush.
continue;
}
}

// short-circuit for batch size of 1
if batch_queue.len() < batch_size {
let deadline = Instant::now() + config.delivery_interval;
// Loop until we have enough events to send or reached deadline.
loop {
tokio::select! {
_ = tokio::time::sleep_until(deadline) => {
// reached deadline -> send everything we have
break;
},
command = rx.recv() => {
match command {
None => {
// channel closed
break;
},
Some(EventDispatcherCommand::Event(event)) => {
batch_queue.push(event);
if batch_queue.len() >= batch_size {
// Reached max batch size -> send events immediately
break;
} // else loop to get more events
},
Some(EventDispatcherCommand::Flush) => {
break;
}
}
}
}
}
}

// Send `batch` events.
tokio::spawn(async move {
// Spawning a new task, so the main task can continue batching events and respond to
// commands. At this point, batch_queue is guaranteed to have at least one event.
let events_to_deliver = batch_queue.as_slice();
EventDispatcher::deliver(&ingestion_url, &events_to_deliver).await;
});
}
}

async fn deliver(ingestion_url: &str, events: &[Event]) {
// Simulated HTTP request or delivery logic
info!(
"Pretending to deliver {} events to {}",
events.len(),
ingestion_url
);
}
}

#[cfg(test)]
mod tests {
use std::sync::Arc;
use super::*;
use crate::timestamp::now;
use serde::Serialize;
use tokio::sync::mpsc::unbounded_channel;
use tokio::sync::Mutex;
use tokio::time::Duration;
use uuid::Uuid;

#[derive(Debug, Clone, Serialize)]
struct LoginPayload {
pub user_id: String,
pub session_id: String,
}

#[tokio::test]
async fn test_dispatch_starts_delivery() {
let config = EventDispatcherConfig {
sdk_key: "test-sdk-key".to_string(),
ingestion_url: "http://example.com".to_string(),
delivery_interval: Duration::from_millis(100),
retry_interval: Duration::from_millis(1000),
max_retry_delay: Duration::from_millis(5000),
max_retries: Some(3),
batch_size: 1,
};
let (tx, rx) = unbounded_channel();
let rx = Arc::new(Mutex::new(rx));
let dispatcher = EventDispatcher::new(config, tx);
let payload = LoginPayload {
user_id: "user123".to_string(),
session_id: "session456".to_string(),
};
let serialized_payload = serde_json::to_value(payload).expect("Serialization failed");
let event = Event {
uuid: Uuid::new_v4(),
timestamp: now(),
event_type: "test".to_string(),
payload: serialized_payload,
};
dispatcher.dispatch(event).unwrap();
dispatcher
.send(EventDispatcherCommand::Flush)
.expect("send should not fail");
let rx_clone = Arc::clone(&rx);
tokio::spawn(async move {
let mut rx = rx_clone.lock().await;
dispatcher.event_dispatcher(&mut rx).await;
});
{
let mut rx = rx.lock().await; // Acquire the lock for rx
rx.close();
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
}
8 changes: 4 additions & 4 deletions eppo_core/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//! `eppo_core` is a common library to build Eppo SDKs for different languages. If you're an Eppo
//! user, you probably want to take a look at one of existing SDKs.
//! user, you probably want to take a look at one of the existing SDKs.
//!
//! # Overview
//!
Expand All @@ -9,7 +9,7 @@
//!
//! [`Configuration`] is the heart of an SDK. It is an immutable structure that encapsulates all
//! server-provided configuration ([flag configurations](ufc::UniversalFlagConfig) and [bandit
//! models](bandits::BanditResponse)) that describes how SDK should evaluate user requests.
//! models](bandits::BanditResponse)) that describes how the SDK should evaluate user requests.
//!
//! [`ConfigurationStore`](configuration_store::ConfigurationStore) is a thread-safe multi-reader
//! multi-writer in-memory manager for [`Configuration`]. The job of configuration store is to be a
Expand All @@ -24,7 +24,7 @@
//!
//! [`PollerThread`](poller_thread::PollerThread) launches a background thread that periodically
//! fetches a new `Configuration` (using `ConfigurationFetcher`) and updates
//! `ConfigurationStore`. This is the simplest way to keep SDK configuration up-to-date.
//! `ConfigurationStore`. This is the simplest way to keep the SDK configuration up-to-date.
//!
//! [`eval`] module contains functions for flag and bandit evaluation. It also supports evaluation
//! with [details](eval::eval_details::EvaluationDetails). These functions return evaluation results
Expand All @@ -36,7 +36,7 @@
//! callback handling is currently too different between languages (e.g., in Ruby, it's too tedious
//! to call from Rust into Ruby, so we return events into Ruby land where they get logged).
//!
//! Because evaluation functions are pure functions (they don't have side-effects and don't use any
//! Because evaluation functions are pure functions (they don't have side effects and don't use any
//! global state), they are a bit tedious to call directly. [`Evaluator`](eval::Evaluator) is a
//! helper to simplify SDK code and pass repeated parameters automatically.
//!
Expand Down
2 changes: 1 addition & 1 deletion ruby-sdk/ext/eppo_client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ impl Client {
.map_err(|err| {
Error::new(
exception::runtime_error(),
format!("enexpected value for subject_attributes: {err}"),
format!("Unexpected value for subject_attributes: {err}"),
)
})?;
let actions = serde_magnus::deserialize(actions)?;
Expand Down

0 comments on commit 6f360ed

Please sign in to comment.