-
Notifications
You must be signed in to change notification settings - Fork 6
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
MTG-703 Adding peer to peer consistency checks #316
base: main
Are you sure you want to change the base?
Conversation
9a4547b
to
baedb6f
Compare
002990c
to
46d522d
Compare
pub account_pubkey: Pubkey, | ||
pub slot: u64, | ||
pub write_version: u64, | ||
pub data_hash: u64, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As far as we are tracking not only slot+write_version for accounts but also data_hash, we need to discuss the consensus mechanism.
If i understood correctly, the current implementation would identify accounts with the same slot+write_version but different data_hashes as different updates and will try to synchronize them between nodes. However the current accounts processing mechanism will not allow updating existing accounts with the same slot+write_version. So nodes will try to receive "updates" from each other, send these updates to each other but not process them correctly.
Maybe this sounds like a separate task but we need to discuss approach that we think will be the better
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is purely for account NFTs, because for account we don't have cleaning of forks, and as the result we cannot rely on slot. That's why we ignore slots and take only pubkey+write_version_data_hash
when we calculate account changes checksum, and when we compare peer's account change with local account change.
If the slot
is different, but the data_hash
is same, we assume that's the same account change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the consensus part - that's for us to define. Essentially, we should expect the following:
- same write_version, same slot, same hash - all good
- same write_version, same slot, different hash - we need the consensus to define, what the correct version is. I'd suggest we ask RPC of the current state (if it has not changed yet, of course)
- same write_version, different slot, same hash - all good, take the highest slot on merge probably
- same write_version, different slot, different hashes - looks like a fork, take the highest slot on merge
- different write_version, same/different slot, same/different hash - take the highest write_version
pub account_pubkey: Pubkey, | ||
pub slot: u64, | ||
pub write_version: u64, | ||
pub data_hash: u64, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, we do not track data_hashes for transaction-based protocol. Maybe we need also them here? I think it makes sense, because it will give us additional 'resist' from Solana forks
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO, we won't gain anything by adding data_hash
to bubblegum, since we do already have forks cleaner and sequence consistency checks for it.
@@ -193,6 +194,7 @@ impl MessageParser { | |||
slot_updated: account_update.slot as i64, | |||
amount: ta.amount as i64, | |||
write_version: account_update.write_version, | |||
data_hash: calc_solana_account_data_hash(&account_update.data), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we approximately calculate the performance impact of calculating hashes "in place" for each account? We have many account updates, way larger updates amount than we have for transactions, so it would be great to understand if this increases account processing time
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
According to my tests, it takes ~ 9 microseconds to calculate hash for 1KB of data, which looks acceptable.
(anyway xxhash is one of most performant hashing algorithms)
metrics_state.checksum_calculation_metrics.clone(), | ||
); | ||
|
||
if let Some(peer_urls_file) = config.peer_urls_file.as_ref() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to add mounting for this file in docker compose?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably, we do...
let mut missing_bbgm_changes: HashMap<BbgmChangeRecord, HashSet<usize>> = HashMap::new(); | ||
let trusted_peers = peers_provider.list_trusted_peers().await; | ||
|
||
for (peer_ind, trusted_peer) in trusted_peers.iter().enumerate() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we need to try to make this process more async? Here many i/o with other peers, so maybe it makes sense to spawn separate tasks for communicating with each peer?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can be a good improvement in the future, but for the "phase 1", I'd prefer to run it sequentially, and collect CPU and disk metrics to understand whether we have some potential bandwidth for parallelization.
Cause we can easily fall into a scenario when we are stressing the disk with bunch of concurrent peer-2-peer consistency checks, and as the result the indexing part lacks resources for fulfilling it's main duty.
.chain(ge_cmp_res.different.iter()) | ||
.map(|&a| a.tree_pubkey) | ||
.collect::<Vec<_>>(); | ||
for tree_pk in ge_trees_to_check { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe processing each grand_epoch also deserves to be spawned in separate task because of many i/o here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And same here: a potentially good improvement in the future.
} | ||
|
||
#[allow(clippy::while_let_on_iterator)] | ||
async fn handle_missing_bbgm_changes( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Point to discuss: a consensus mechanism
If we have many hosts in the network we need to think about not just fetching any updates from each host but finding consensus between them and rejecting updates from some hosts if they contradict the majority of the network
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, this could be a good thing for interaction with non-trusted peers, but we've decided to not make it as part of the initial implementation.
clients.get_mut(peer_ind).unwrap() | ||
}; | ||
if let Ok(block) = client | ||
.get_block(change.slot, Option::<Arc<grpc::client::Client>>::None) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We fetching the whole block, without indicating which trees we want to receive. In the future we need to add methods for syncing on concrete trees
.db_bubblegum_get_grand_epochs_latency | ||
.observe(start.elapsed().as_secs_f64()); | ||
|
||
let ge_cmp_res = cmp(&my_ge_chksms, &peer_ge_chksms); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we need to add some config for understanding which trees we are indexing. Because for now if i understood right we may mark as missed trees that we do not want to index
loop { | ||
let calc_msg = tokio::select! { | ||
msg = rcv.recv() => msg, | ||
_ = shutdown_signal.recv() => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: as far as this runs inside simple tokio task that is not related to any JoinSet, shutdown_signal is not really needed here because we do not wait for this task to complete anywhere
pub fn solana_change_info(&self) -> (Pubkey, u64, u64, u64) { | ||
let (slot, write_version, data_hash) = match &self.account { | ||
UnprocessedAccount::MetadataInfo(v) => (v.slot_updated, v.write_version, v.data_hash), | ||
UnprocessedAccount::Token(v) => (v.slot_updated as u64, v.write_version, v.data_hash), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are we ok with converting i64
as u64
? In case it cannot be negative, why then TokenAccount
stores it as i64? Some kind of restrictions from the DB?
rpc GetAccsInBucket(GetAccReq) returns (AccList); | ||
|
||
rpc ProposeMissingAccChanges(AccList) returns (google.protobuf.Empty); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: newline
@@ -21,8 +21,11 @@ pub struct Client { | |||
|
|||
impl Client { | |||
pub async fn connect(peer_discovery: impl PeerDiscovery) -> Result<Self, GrpcError> { | |||
let url = Uri::from_str(peer_discovery.get_gapfiller_peer_addr().as_str()) | |||
.map_err(|e| GrpcError::UriCreate(e.to_string()))?; | |||
Client::connect_to_url(peer_discovery.get_gapfiller_peer_addr().as_str()).await |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Client::connect_to_url(peer_discovery.get_gapfiller_peer_addr().as_str()).await | |
Client::connect_to_url(&peer_discovery.get_gapfiller_peer_addr()).await |
Just a preference of style, feel free to ignore
|
||
/// Interface for querying bubblegum checksums from peer | ||
/// or local storage. | ||
#[async_trait] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't async trait
stabilized?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, but #[async_trait]
is still required to build trait object
pub found_missing_accounts: Gauge, | ||
} | ||
|
||
impl Default for Peer2PeerConsistencyMetricsConfig { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems to me such default has no usage since new()
can be freely called instead. Seems like only one should survive imo.
From my perspective, new() with no parameters = default (if the function doesn't provoke side effects)
metrics_utils/src/lib.rs
Outdated
|
||
impl Peer2PeerConsistencyMetricsConfig { | ||
pub fn new() -> Peer2PeerConsistencyMetricsConfig { | ||
let mk_histogram = || Histogram::new(exponential_buckets(20.0, 1.8, 10)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Those numbers are slightly magical. What do they mean?
46d522d
to
31e0f50
Compare
|
||
/// Type of checksum for bubblegum epochs and account NFT buckets. | ||
/// It is technically a SHA3 hash. | ||
pub type Chksm = [u8; 32]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought the type above was for this
pub type Chksm = [u8; 32]; | |
pub checksum: Option<Chksm>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pub type Chksm = [u8; 32];
is just an alias that should make easier a potential change of checksum type in the future (that will never happen)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess, maybe would it be more convenient/idiomatic to use a newtype instead of the alias?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe it will just introduce a hell of wrap and unroll calls 🥲
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Deref
for the rescue?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just thinking aloud tho, not a call to action
metrics_utils/src/lib.rs
Outdated
pub db_bubblegum_get_grand_epochs_latency: Histogram, | ||
pub db_bubblegum_get_epochs_latency: Histogram, | ||
pub db_bubblegum_get_changes_latency: Histogram, | ||
pub db_account_get_grand_buckets_latency: Histogram, | ||
pub db_account_get_buckets_latency: Histogram, | ||
pub db_account_get_latests_latency: Histogram, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not a single Family<MetricLabel, Histogram>?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I usually prefer to have a set of plain metrics, instead of one labeled, because it is much easier to query them by other monitoring systems. But sure, I can turn these into a family. Should I?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, please. With our stack of Prometheus + grafana the primary flow is to put up the metrics onto the dashboard and have some simple alerts. With every added new metric there is an increased chance, it'll be not added as it'll require a dedicated query for itself for every chart and every alert rule. Those metrics are super generic, reusing even the existing RED approach is more favorable. If it doesn't fit into RED - having dedicated family is the next best choice. Please don't leave us with the need to create multiple charts to monitor every request. Metrics should be kept as simple as possible.
metrics_utils/src/lib.rs
Outdated
pub peers_bubblegum_get_grand_epochs_for_tree_errors: Family<MetricLabel, Counter>, | ||
pub peers_bubblegum_get_grand_epochs_errors: Family<MetricLabel, Counter>, | ||
pub peers_bubblegum_get_epochs_errors: Family<MetricLabel, Counter>, | ||
pub peers_bubblegum_get_changes_errors: Family<MetricLabel, Counter>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it looks more like an added label to me. Peers_sync_latency(protocol: "bubblegum/account", method/endpoint: "get_grand_epochs/get_epochs/get_changes")
// prepare | ||
let tree1 = Pubkey::new_unique(); | ||
|
||
// This change is for epoch we won't calculate in the test, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the comment is not valid in this context
.put(k1_2.clone(), v1_2.clone()) | ||
.unwrap(); | ||
|
||
// This will be also ignored |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this one as well
31e0f50
to
e0d46c1
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great work, thank you. Several open questions need clarification and the metrics should be simplified/moved to more appropriate measuring places.
// Verify account last state updated | ||
let latest_acc1_key = AccountNftKey::new(acc1_pubkey); | ||
let latest_acc1_val = storage | ||
.acc_nft_last |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
acc_nft_last holds the last calculated epoch value?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is the last seen change of the account with the given pubkey.
|
||
#[async_trait::async_trait] | ||
impl AuraPeersProvides for FileSrcAuraPeersProvides { | ||
async fn list_trusted_peers(&self) -> Vec<String> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not a list of URLs directly? Those are parsed every time anyway.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's to make possible to change the list of peers without restarting the appliction
rocks-db/src/storage_consistency.rs
Outdated
/// To prevent such inconsistency of a checksum, roght before the calulating, | ||
/// we mark the epoch checksum to be calculated is "Calculating", | ||
/// and after the checksum is calculated, we write this value only in case | ||
/// if the previous value is still in "Calculated" state. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/// if the previous value is still in "Calculated" state. | |
/// if the previous value is still in the same "Calculating" state. |
rocks-db/src/storage_consistency.rs
Outdated
/// if the previous value is still in "Calculated" state. | ||
/// | ||
/// At the same time, when the Bubblegum updated processor receives | ||
/// a new update with slot that epoch is from the previous epoch perioud, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/// a new update with slot that epoch is from the previous epoch perioud, | |
/// a new update with slot that epoch is from the previous epoch period, |
rocks-db/src/storage_consistency.rs
Outdated
/// | ||
/// At the same time, when the Bubblegum updated processor receives | ||
/// a new update with slot that epoch is from the previous epoch perioud, | ||
/// it not only writed the bubblegum change, but also updated |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/// it not only writed the bubblegum change, but also updated | |
/// it not only writes the bubblegum change, but also updates |
|
||
/// This flag is set to true before bubblegum epoch calculation is started, | ||
/// and set to false after the calculation is finished. | ||
static IS_CALCULATING_BBGM_EPOCH: AtomicI32 = AtomicI32::new(-1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not a flag any more
) { | ||
tracing::info!("Starting bubblegum changes peer-to-peer exchange for epoch={epoch}"); | ||
while get_calculating_bbgm_epoch() | ||
.map(|e| e == epoch) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we end up calculating any other epoch - a previous one, or a next one?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Theoretically that should not happen, but I've changed it to compare current epoch with the last calculated.
|
||
metrics | ||
.found_missing_bubblegums | ||
.set(changes_we_miss.len() as i64); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd suggest incrementing this, compared to setting. Set will be way more flickery given the periodic nature of metrics collectors
return result; | ||
} | ||
}; | ||
metrics |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this metric collection should be tied closely to the actual io - the Grpc client in our case, not to the business logic level
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This one measures how much time it takes on our side to fetch the data.
On the GRPC client side (BbgmConsistencyApiClientImpl and AccConsistencyApiClientImpl) there are separate metrics that measure how much time it takes to call the peer.
metrics_utils/src/lib.rs
Outdated
pub db_bubblegum_get_grand_epochs_latency: Histogram, | ||
pub db_bubblegum_get_epochs_latency: Histogram, | ||
pub db_bubblegum_get_changes_latency: Histogram, | ||
pub db_account_get_grand_buckets_latency: Histogram, | ||
pub db_account_get_buckets_latency: Histogram, | ||
pub db_account_get_latests_latency: Histogram, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, please. With our stack of Prometheus + grafana the primary flow is to put up the metrics onto the dashboard and have some simple alerts. With every added new metric there is an increased chance, it'll be not added as it'll require a dedicated query for itself for every chart and every alert rule. Those metrics are super generic, reusing even the existing RED approach is more favorable. If it doesn't fit into RED - having dedicated family is the next best choice. Please don't leave us with the need to create multiple charts to monitor every request. Metrics should be kept as simple as possible.
955a3bb
to
a1cdfab
Compare
This pull request contains full implementation of peer to peer2 consistency checking and missing blocks fetching for bubblegum and account nfts.
It includes:
Design document:
https://github.com/metaplex-foundation/aura/wiki/Data-consistency-for-peer%E2%80%90to%E2%80%90peer-indexers