Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MTG-703 Adding peer to peer consistency checks #316

Open
wants to merge 6 commits into
base: main
Choose a base branch
from

Conversation

snorochevskiy
Copy link
Contributor

@snorochevskiy snorochevskiy commented Nov 19, 2024

This pull request contains full implementation of peer to peer2 consistency checking and missing blocks fetching for bubblegum and account nfts.

It includes:

  • storing of change records for newly received bubblegum changes and account NFT accounts
  • calculation of bubblegum epochs checksums and account buckets checksums at the end of the "epoch" (10000 slots)
  • periodical requesting of checksums from peers, comparison with own checksums, search for missing changes, and requesting of identified missed blocks from peers

Design document:
https://github.com/metaplex-foundation/aura/wiki/Data-consistency-for-peer%E2%80%90to%E2%80%90peer-indexers

@snorochevskiy snorochevskiy force-pushed the feature/mtg-703-peer2peer_consistency branch 6 times, most recently from 9a4547b to baedb6f Compare November 22, 2024 14:06
@snorochevskiy snorochevskiy force-pushed the feature/mtg-703-peer2peer_consistency branch 3 times, most recently from 002990c to 46d522d Compare November 26, 2024 20:05
pub account_pubkey: Pubkey,
pub slot: u64,
pub write_version: u64,
pub data_hash: u64,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as we are tracking not only slot+write_version for accounts but also data_hash, we need to discuss the consensus mechanism.

If i understood correctly, the current implementation would identify accounts with the same slot+write_version but different data_hashes as different updates and will try to synchronize them between nodes. However the current accounts processing mechanism will not allow updating existing accounts with the same slot+write_version. So nodes will try to receive "updates" from each other, send these updates to each other but not process them correctly.

Maybe this sounds like a separate task but we need to discuss approach that we think will be the better

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is purely for account NFTs, because for account we don't have cleaning of forks, and as the result we cannot rely on slot. That's why we ignore slots and take only pubkey+write_version_data_hash when we calculate account changes checksum, and when we compare peer's account change with local account change.
If the slot is different, but the data_hash is same, we assume that's the same account change.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the consensus part - that's for us to define. Essentially, we should expect the following:

  • same write_version, same slot, same hash - all good
  • same write_version, same slot, different hash - we need the consensus to define, what the correct version is. I'd suggest we ask RPC of the current state (if it has not changed yet, of course)
  • same write_version, different slot, same hash - all good, take the highest slot on merge probably
  • same write_version, different slot, different hashes - looks like a fork, take the highest slot on merge
  • different write_version, same/different slot, same/different hash - take the highest write_version

pub account_pubkey: Pubkey,
pub slot: u64,
pub write_version: u64,
pub data_hash: u64,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, we do not track data_hashes for transaction-based protocol. Maybe we need also them here? I think it makes sense, because it will give us additional 'resist' from Solana forks

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO, we won't gain anything by adding data_hash to bubblegum, since we do already have forks cleaner and sequence consistency checks for it.

@@ -193,6 +194,7 @@ impl MessageParser {
slot_updated: account_update.slot as i64,
amount: ta.amount as i64,
write_version: account_update.write_version,
data_hash: calc_solana_account_data_hash(&account_update.data),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we approximately calculate the performance impact of calculating hashes "in place" for each account? We have many account updates, way larger updates amount than we have for transactions, so it would be great to understand if this increases account processing time

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to my tests, it takes ~ 9 microseconds to calculate hash for 1KB of data, which looks acceptable.
(anyway xxhash is one of most performant hashing algorithms)

metrics_state.checksum_calculation_metrics.clone(),
);

if let Some(peer_urls_file) = config.peer_urls_file.as_ref() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to add mounting for this file in docker compose?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably, we do...

let mut missing_bbgm_changes: HashMap<BbgmChangeRecord, HashSet<usize>> = HashMap::new();
let trusted_peers = peers_provider.list_trusted_peers().await;

for (peer_ind, trusted_peer) in trusted_peers.iter().enumerate() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we need to try to make this process more async? Here many i/o with other peers, so maybe it makes sense to spawn separate tasks for communicating with each peer?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be a good improvement in the future, but for the "phase 1", I'd prefer to run it sequentially, and collect CPU and disk metrics to understand whether we have some potential bandwidth for parallelization.
Cause we can easily fall into a scenario when we are stressing the disk with bunch of concurrent peer-2-peer consistency checks, and as the result the indexing part lacks resources for fulfilling it's main duty.

.chain(ge_cmp_res.different.iter())
.map(|&a| a.tree_pubkey)
.collect::<Vec<_>>();
for tree_pk in ge_trees_to_check {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe processing each grand_epoch also deserves to be spawned in separate task because of many i/o here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And same here: a potentially good improvement in the future.

}

#[allow(clippy::while_let_on_iterator)]
async fn handle_missing_bbgm_changes(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Point to discuss: a consensus mechanism

If we have many hosts in the network we need to think about not just fetching any updates from each host but finding consensus between them and rejecting updates from some hosts if they contradict the majority of the network

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, this could be a good thing for interaction with non-trusted peers, but we've decided to not make it as part of the initial implementation.

clients.get_mut(peer_ind).unwrap()
};
if let Ok(block) = client
.get_block(change.slot, Option::<Arc<grpc::client::Client>>::None)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We fetching the whole block, without indicating which trees we want to receive. In the future we need to add methods for syncing on concrete trees

.db_bubblegum_get_grand_epochs_latency
.observe(start.elapsed().as_secs_f64());

let ge_cmp_res = cmp(&my_ge_chksms, &peer_ge_chksms);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we need to add some config for understanding which trees we are indexing. Because for now if i understood right we may mark as missed trees that we do not want to index

loop {
let calc_msg = tokio::select! {
msg = rcv.recv() => msg,
_ = shutdown_signal.recv() => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: as far as this runs inside simple tokio task that is not related to any JoinSet, shutdown_signal is not really needed here because we do not wait for this task to complete anywhere

pub fn solana_change_info(&self) -> (Pubkey, u64, u64, u64) {
let (slot, write_version, data_hash) = match &self.account {
UnprocessedAccount::MetadataInfo(v) => (v.slot_updated, v.write_version, v.data_hash),
UnprocessedAccount::Token(v) => (v.slot_updated as u64, v.write_version, v.data_hash),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we ok with converting i64 as u64? In case it cannot be negative, why then TokenAccount stores it as i64? Some kind of restrictions from the DB?

rpc GetAccsInBucket(GetAccReq) returns (AccList);

rpc ProposeMissingAccChanges(AccList) returns (google.protobuf.Empty);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: newline

@@ -21,8 +21,11 @@ pub struct Client {

impl Client {
pub async fn connect(peer_discovery: impl PeerDiscovery) -> Result<Self, GrpcError> {
let url = Uri::from_str(peer_discovery.get_gapfiller_peer_addr().as_str())
.map_err(|e| GrpcError::UriCreate(e.to_string()))?;
Client::connect_to_url(peer_discovery.get_gapfiller_peer_addr().as_str()).await
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Client::connect_to_url(peer_discovery.get_gapfiller_peer_addr().as_str()).await
Client::connect_to_url(&peer_discovery.get_gapfiller_peer_addr()).await

Just a preference of style, feel free to ignore


/// Interface for querying bubblegum checksums from peer
/// or local storage.
#[async_trait]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't async trait stabilized?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but #[async_trait] is still required to build trait object

pub found_missing_accounts: Gauge,
}

impl Default for Peer2PeerConsistencyMetricsConfig {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems to me such default has no usage since new() can be freely called instead. Seems like only one should survive imo.

From my perspective, new() with no parameters = default (if the function doesn't provoke side effects)


impl Peer2PeerConsistencyMetricsConfig {
pub fn new() -> Peer2PeerConsistencyMetricsConfig {
let mk_histogram = || Histogram::new(exponential_buckets(20.0, 1.8, 10));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Those numbers are slightly magical. What do they mean?

@snorochevskiy snorochevskiy force-pushed the feature/mtg-703-peer2peer_consistency branch from 46d522d to 31e0f50 Compare December 2, 2024 12:31

/// Type of checksum for bubblegum epochs and account NFT buckets.
/// It is technically a SHA3 hash.
pub type Chksm = [u8; 32];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought the type above was for this

Suggested change
pub type Chksm = [u8; 32];
pub checksum: Option<Chksm>,

Copy link
Contributor Author

@snorochevskiy snorochevskiy Dec 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pub type Chksm = [u8; 32];

is just an alias that should make easier a potential change of checksum type in the future (that will never happen)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess, maybe would it be more convenient/idiomatic to use a newtype instead of the alias?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe it will just introduce a hell of wrap and unroll calls 🥲

Copy link
Contributor

@kstepanovdev kstepanovdev Dec 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deref for the rescue?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just thinking aloud tho, not a call to action

Comment on lines 247 to 252
pub db_bubblegum_get_grand_epochs_latency: Histogram,
pub db_bubblegum_get_epochs_latency: Histogram,
pub db_bubblegum_get_changes_latency: Histogram,
pub db_account_get_grand_buckets_latency: Histogram,
pub db_account_get_buckets_latency: Histogram,
pub db_account_get_latests_latency: Histogram,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not a single Family<MetricLabel, Histogram>?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I usually prefer to have a set of plain metrics, instead of one labeled, because it is much easier to query them by other monitoring systems. But sure, I can turn these into a family. Should I?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, please. With our stack of Prometheus + grafana the primary flow is to put up the metrics onto the dashboard and have some simple alerts. With every added new metric there is an increased chance, it'll be not added as it'll require a dedicated query for itself for every chart and every alert rule. Those metrics are super generic, reusing even the existing RED approach is more favorable. If it doesn't fit into RED - having dedicated family is the next best choice. Please don't leave us with the need to create multiple charts to monitor every request. Metrics should be kept as simple as possible.

Comment on lines 342 to 345
pub peers_bubblegum_get_grand_epochs_for_tree_errors: Family<MetricLabel, Counter>,
pub peers_bubblegum_get_grand_epochs_errors: Family<MetricLabel, Counter>,
pub peers_bubblegum_get_epochs_errors: Family<MetricLabel, Counter>,
pub peers_bubblegum_get_changes_errors: Family<MetricLabel, Counter>,
Copy link
Contributor

@StanChe StanChe Dec 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it looks more like an added label to me. Peers_sync_latency(protocol: "bubblegum/account", method/endpoint: "get_grand_epochs/get_epochs/get_changes")

// prepare
let tree1 = Pubkey::new_unique();

// This change is for epoch we won't calculate in the test,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the comment is not valid in this context

.put(k1_2.clone(), v1_2.clone())
.unwrap();

// This will be also ignored
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this one as well

@snorochevskiy snorochevskiy force-pushed the feature/mtg-703-peer2peer_consistency branch from 31e0f50 to e0d46c1 Compare December 3, 2024 12:25
Copy link
Contributor

@StanChe StanChe left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great work, thank you. Several open questions need clarification and the metrics should be simplified/moved to more appropriate measuring places.

// Verify account last state updated
let latest_acc1_key = AccountNftKey::new(acc1_pubkey);
let latest_acc1_val = storage
.acc_nft_last
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

acc_nft_last holds the last calculated epoch value?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is the last seen change of the account with the given pubkey.


#[async_trait::async_trait]
impl AuraPeersProvides for FileSrcAuraPeersProvides {
async fn list_trusted_peers(&self) -> Vec<String> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not a list of URLs directly? Those are parsed every time anyway.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's to make possible to change the list of peers without restarting the appliction

/// To prevent such inconsistency of a checksum, roght before the calulating,
/// we mark the epoch checksum to be calculated is "Calculating",
/// and after the checksum is calculated, we write this value only in case
/// if the previous value is still in "Calculated" state.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// if the previous value is still in "Calculated" state.
/// if the previous value is still in the same "Calculating" state.

/// if the previous value is still in "Calculated" state.
///
/// At the same time, when the Bubblegum updated processor receives
/// a new update with slot that epoch is from the previous epoch perioud,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// a new update with slot that epoch is from the previous epoch perioud,
/// a new update with slot that epoch is from the previous epoch period,

///
/// At the same time, when the Bubblegum updated processor receives
/// a new update with slot that epoch is from the previous epoch perioud,
/// it not only writed the bubblegum change, but also updated
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// it not only writed the bubblegum change, but also updated
/// it not only writes the bubblegum change, but also updates


/// This flag is set to true before bubblegum epoch calculation is started,
/// and set to false after the calculation is finished.
static IS_CALCULATING_BBGM_EPOCH: AtomicI32 = AtomicI32::new(-1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not a flag any more

) {
tracing::info!("Starting bubblegum changes peer-to-peer exchange for epoch={epoch}");
while get_calculating_bbgm_epoch()
.map(|e| e == epoch)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we end up calculating any other epoch - a previous one, or a next one?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Theoretically that should not happen, but I've changed it to compare current epoch with the last calculated.


metrics
.found_missing_bubblegums
.set(changes_we_miss.len() as i64);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd suggest incrementing this, compared to setting. Set will be way more flickery given the periodic nature of metrics collectors

return result;
}
};
metrics
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this metric collection should be tied closely to the actual io - the Grpc client in our case, not to the business logic level

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one measures how much time it takes on our side to fetch the data.
On the GRPC client side (BbgmConsistencyApiClientImpl and AccConsistencyApiClientImpl) there are separate metrics that measure how much time it takes to call the peer.

Comment on lines 247 to 252
pub db_bubblegum_get_grand_epochs_latency: Histogram,
pub db_bubblegum_get_epochs_latency: Histogram,
pub db_bubblegum_get_changes_latency: Histogram,
pub db_account_get_grand_buckets_latency: Histogram,
pub db_account_get_buckets_latency: Histogram,
pub db_account_get_latests_latency: Histogram,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, please. With our stack of Prometheus + grafana the primary flow is to put up the metrics onto the dashboard and have some simple alerts. With every added new metric there is an increased chance, it'll be not added as it'll require a dedicated query for itself for every chart and every alert rule. Those metrics are super generic, reusing even the existing RED approach is more favorable. If it doesn't fit into RED - having dedicated family is the next best choice. Please don't leave us with the need to create multiple charts to monitor every request. Metrics should be kept as simple as possible.

@snorochevskiy snorochevskiy force-pushed the feature/mtg-703-peer2peer_consistency branch from 955a3bb to a1cdfab Compare December 8, 2024 21:25
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants