Skip to content

Commit

Permalink
Merge pull request #311 from metaplex-foundation/replace-seqcst-ordering
Browse files Browse the repository at this point in the history
[mtg-885] Replaced SeqCst ordering with Relaxed
  • Loading branch information
kstepanovdev authored Nov 14, 2024
2 parents 880d025 + 01ca6e9 commit 5142f69
Show file tree
Hide file tree
Showing 5 changed files with 10 additions and 10 deletions.
4 changes: 2 additions & 2 deletions nft_ingester/src/api/backfilling_state_consistency.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ impl BackfillingStateConsistencyChecker {
overwhelm_backfill_gap_clone.store(
rocks_db.bubblegum_slots.iter_start().count().saturating_add(rocks_db.ingestable_slots.iter_start().count())
>= consistence_backfilling_slots_threshold as usize,
Ordering::SeqCst,
Ordering::Relaxed,
);
tokio::select! {
_ = tokio::time::sleep(Duration::from_secs(CATCH_UP_SEQUENCES_TIMEOUT_SEC)) => {},
Expand All @@ -50,6 +50,6 @@ impl BackfillingStateConsistencyChecker {

impl ConsistencyChecker for BackfillingStateConsistencyChecker {
fn should_cancel_request(&self, _call: &Call) -> bool {
self.overwhelm_backfill_gap.load(Ordering::SeqCst)
self.overwhelm_backfill_gap.load(Ordering::Relaxed)
}
}
4 changes: 2 additions & 2 deletions nft_ingester/src/api/synchronization_state_consistency.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ impl SynchronizationStateConsistencyChecker {
.seq
.saturating_sub(decoded_index_update_key.seq)
>= synchronization_api_threshold,
Ordering::SeqCst,
Ordering::Relaxed,
);
tokio::select! {
_ = tokio::time::sleep(Duration::from_secs(CATCH_UP_SEQUENCES_TIMEOUT_SEC))=> {},
Expand All @@ -82,7 +82,7 @@ impl SynchronizationStateConsistencyChecker {

impl ConsistencyChecker for SynchronizationStateConsistencyChecker {
fn should_cancel_request(&self, call: &Call) -> bool {
if !self.overwhelm_seq_gap.load(Ordering::SeqCst) {
if !self.overwhelm_seq_gap.load(Ordering::Relaxed) {
return false;
}

Expand Down
4 changes: 2 additions & 2 deletions nft_ingester/src/bin/ingester/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,15 +283,15 @@ pub async fn main() -> Result<(), IngesterError> {
.ok();

if let Some(gaped_data_client) = grpc_client.clone() {
while first_processed_slot.load(Ordering::SeqCst) == 0 && shutdown_rx.is_empty() {
while first_processed_slot.load(Ordering::Relaxed) == 0 && shutdown_rx.is_empty() {
tokio_sleep(Duration::from_millis(100)).await
}

let cloned_rocks_storage = primary_rocks_storage.clone();
if shutdown_rx.is_empty() {
let gaped_data_client_clone = gaped_data_client.clone();

let first_processed_slot_value = first_processed_slot.load(Ordering::SeqCst);
let first_processed_slot_value = first_processed_slot.load(Ordering::Relaxed);
let cloned_rx = shutdown_rx.resubscribe();
mutexed_tasks.lock().await.spawn(process_asset_details_stream_wrapper(
cloned_rx,
Expand Down
2 changes: 1 addition & 1 deletion nft_ingester/src/rocks_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ pub async fn receive_last_saved_slot(
while cloned_rx.is_empty() {
match cloned_rocks_storage.last_saved_slot() {
Ok(Some(slot)) if slot != last_saved_slot => {
first_processed_slot_clone.store(slot, Ordering::SeqCst);
first_processed_slot_clone.store(slot, Ordering::Relaxed);
break;
}
Err(e) => {
Expand Down
6 changes: 3 additions & 3 deletions rocks-db/src/asset_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use std::collections::HashMap;

impl Storage {
fn get_next_asset_update_seq(&self) -> Result<u64> {
if self.assets_update_last_seq.load(Ordering::SeqCst) == 0 {
if self.assets_update_last_seq.load(Ordering::Relaxed) == 0 {
// If assets_update_next_seq is zero, fetch the last key from assets_update_idx
let mut iter = self.assets_update_idx.iter_end(); // Assuming iter_end method fetches the last item

Expand All @@ -24,11 +24,11 @@ impl Storage {
// Assuming the key is structured as (u64, ...)

let seq = u64::from_be_bytes(last_key[..std::mem::size_of::<u64>()].try_into()?);
self.assets_update_last_seq.store(seq, Ordering::SeqCst);
self.assets_update_last_seq.store(seq, Ordering::Relaxed);
}
}
// Increment and return the sequence number
let seq = self.assets_update_last_seq.fetch_add(1, Ordering::SeqCst) + 1;
let seq = self.assets_update_last_seq.fetch_add(1, Ordering::Relaxed) + 1;
Ok(seq)
}

Expand Down

0 comments on commit 5142f69

Please sign in to comment.