Skip to content

Commit

Permalink
DRY up
Browse files Browse the repository at this point in the history
  • Loading branch information
felipecsl committed Jan 17, 2025
1 parent 235e096 commit 4557cf4
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 12 deletions.
2 changes: 2 additions & 0 deletions eppo_core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ categories = ["config"]
rust-version = "1.75.0"

[features]
# TODO: Remove this line before merging
default = ["event_ingestion"]
# Unstable feature flag for an upcoming feature.
event_ingestion = ["dep:uuid", "dep:linked_hash_set"]
# Add implementation of `FromPyObject`/`ToPyObject` for some types.
Expand Down
24 changes: 12 additions & 12 deletions eppo_core/src/event_ingestion/delivery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,25 +65,16 @@ pub(super) async fn delivery(
success.push(queued_event);
}
}
delivery_status
.send(DeliveryStatus { success, failure })
.await
.ok()?;
deliver_status(&delivery_status, DeliveryStatus { success, failure }).await;
} else {
delivery_status
.send(DeliveryStatus::with_success(batch))
.await
.ok()?;
deliver_status(&delivery_status, DeliveryStatus::with_success(batch)).await;
}
}
Err(err) => {
match err {
EventDeliveryError::RetriableError(_) => {
// Retry later
delivery_status
.send(DeliveryStatus::with_failure(batch))
.await
.ok()?;
deliver_status(&delivery_status, DeliveryStatus::with_failure(batch)).await;
}
EventDeliveryError::NonRetriableError(_) => {
warn!("Failed to deliver events: {}", err);
Expand All @@ -95,3 +86,12 @@ pub(super) async fn delivery(
}
}
}

async fn deliver_status(receiver: &mpsc::Sender<DeliveryStatus>, status: DeliveryStatus) {
receiver
.send(status)
.await
.unwrap_or_else(|err| {
warn!("Failed to send delivery status: {}", err);
});
}

0 comments on commit 4557cf4

Please sign in to comment.