Skip to content

Commit

Permalink
Make update fallible again
Browse files Browse the repository at this point in the history
A recent commit made `update` infallible, reasoning that if we fail
to store a new leaf, it is better to at least try storing the next
one, rather than failing completely. However, this is not accurate:
the client uses the success/failure of `udpate` to determine when
it is safe to garbage collect data from temporary storage. Thus,
returning success when we didn't actually store all the leaves can
lead to data loss.

This change fixes the situation by again returning an error whenever
we fail to insert a leaf. It also tries to make the return value more
useful, by returning on error the height of the first leaf we failed
to insert (which is useful for garbage collection) instead of just an
opaque error message which the caller can't do much with.
  • Loading branch information
jbearer committed Oct 31, 2024
1 parent 7e12d6c commit 1a7a840
Show file tree
Hide file tree
Showing 6 changed files with 27 additions and 13 deletions.
2 changes: 1 addition & 1 deletion src/data_source/extension.rs
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,7 @@ mod impl_testable_data_source {
}

async fn handle_event(&self, event: &Event<MockTypes>) {
self.update(event).await;
self.update(event).await.unwrap();
}
}
}
Expand Down
6 changes: 4 additions & 2 deletions src/data_source/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,9 @@ pub use super::storage::fs::Transaction;
/// let mut events = hotshot.event_stream();
/// while let Some(event) = events.next().await {
/// let mut state = state.write().await;
/// state.hotshot_qs.update(&event).await;
/// if state.hotshot_qs.update(&event).await.is_err() {
/// continue;
/// }
///
/// // Update other modules' states based on `event`.
/// let mut tx = state.hotshot_qs.write().await.unwrap();
Expand Down Expand Up @@ -269,7 +271,7 @@ mod impl_testable_data_source {
}

async fn handle_event(&self, event: &Event<MockTypes>) {
self.update(event).await;
self.update(event).await.unwrap();
}
}
}
Expand Down
6 changes: 4 additions & 2 deletions src/data_source/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,9 @@ impl Config {
/// spawn(async move {
/// let mut events = hotshot.event_stream();
/// while let Some(event) = events.next().await {
/// state.hotshot_qs.update(&event).await;
/// if state.hotshot_qs.update(&event).await.is_err() {
/// continue;
/// }
///
/// let mut tx = state.hotshot_qs.write().await.unwrap();
/// // Update other modules' states based on `event`, using `tx` to access the database.
Expand Down Expand Up @@ -334,7 +336,7 @@ pub mod testing {
}

async fn handle_event(&self, event: &Event<MockTypes>) {
self.update(event).await;
self.update(event).await.unwrap();
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/data_source/storage/no_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ pub mod testing {
}

async fn handle_event(&self, event: &Event<MockTypes>) {
self.update(event).await;
self.update(event).await.unwrap();
}
}

Expand Down
16 changes: 12 additions & 4 deletions src/data_source/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,13 @@ pub trait UpdateDataSource<Types: NodeType>: UpdateAvailabilityData<Types> {
///
/// If you want to update the data source with an untrusted event, for example one received from
/// a peer over the network, you must authenticate it first.
async fn update(&self, event: &Event<Types>);
///
/// # Returns
///
/// If all provided data is successfully inserted into the database, returns `Ok(())`. If any
/// error occurred, the error is logged, and the return value is the height of the first leaf
/// which failed to be inserted.
async fn update(&self, event: &Event<Types>) -> Result<(), u64>;
}

#[async_trait]
Expand All @@ -66,7 +72,7 @@ where
Payload<Types>: QueryablePayload<Types>,
<Types as NodeType>::InstanceState: Default,
{
async fn update(&self, event: &Event<Types>) {
async fn update(&self, event: &Event<Types>) -> Result<(), u64> {
if let EventType::Decide { leaf_chain, qc, .. } = &event.event {
// `qc` justifies the first (most recent) leaf...
let qcs = once((**qc).clone())
Expand Down Expand Up @@ -95,7 +101,7 @@ where
?qc,
"inconsistent leaf; cannot append leaf information: {err:#}"
);
continue;
return Err(leaf.block_header().block_number());
}
};

Expand Down Expand Up @@ -137,10 +143,12 @@ where
.append(BlockInfo::new(leaf_data, block_data, vid_common, vid_share))
.await
{
tracing::warn!(height, "failed to append leaf information: {err:#}");
tracing::error!(height, "failed to append leaf information: {err:#}");
return Err(leaf.block_header().block_number());
}
}
}
Ok(())
}
}

Expand Down
8 changes: 5 additions & 3 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@
//! let mut events = hotshot.event_stream();
//! while let Some(event) = events.next().await {
//! // Update the query data based on this event.
//! data_source.update(&event).await;
//! data_source.update(&event).await.ok();
//! }
//! # Ok(())
//! # }
Expand Down Expand Up @@ -553,8 +553,10 @@ where

// Update query data using HotShot events.
while let Some(event) = events.next().await {
// Update the query data based on this event.
data_source.update(&event).await;
// Update the query data based on this event. It is safe to ignore errors here; the error
// just returns the failed block height for use in garbage collection, but this simple
// implementation isn't doing any kind of garbage collection.
data_source.update(&event).await.ok();
}

Ok(())
Expand Down

0 comments on commit 1a7a840

Please sign in to comment.