Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: batch mint indexing #198

Draft
wants to merge 34 commits into
base: main
Choose a base branch
from
Draft

feat: batch mint indexing #198

wants to merge 34 commits into from

Conversation

RequescoS
Copy link

@RequescoS RequescoS commented Jun 11, 2024

This PR aims to add indexing for the FinalizeTreeWithRoot instruction in DAS-API.

Notice: As we need to integrate this instruction into other packages (blockbuster, spl-compression, etc.), we are currently using local forks of these packages. In the future, these will be replaced by standard imports.

The FinalizeTreeWithRoot instruction has a unique characteristic that prevents it from being processed in the same way as other Bubblegum instructions. It represents a batch mint action, and the potential size of this batch can reach many millions of assets. Processing this instruction inline with others could block them in the queue until the FinalizeTreeWithRoot processing is complete, which can take a considerable amount of time.

To address this issue, we decided to create a separate queue for processing rollups, similar to our existing task processing system. When we receive a FinalizeTreeWithRoot instruction update, we add a new rollup to the queue, which is then processed in a separate process.

To represent the rollups queue, we created the rollup_to_verify table in Postgres, while downloaded rollups are stored in the rollup table. We store downloaded rollups for several reasons:

  1. To avoid downloading the same rollup multiple times if it is received from several instructions, which is possible.
  2. In the future, DAS-API providers will be able to create rollups instead of users, and all providers will send some FinalizeTreeWithRoot transactions to the Solana network. At this step, they will store the rollup in the database and use it instead of downloading it from an external link.

For rollup processing, we implemented a finite state machine (FSM) with the following states: ReceivedTransaction, SuccessfullyDownload, SuccessfullyValidate, StoredUpdate, and FailedToPersist. StoredUpdate and FailedToPersist are the final states, representing successful and unsuccessful processing cases, respectively. If we encounter a FailedToPersist state, the rollup_fail_status column in the rollup_to_verify table will store an enum representing the reason for the failure. Possible values of this enum include: ChecksumVerifyFailed (hash calculated during processing and hash received from the transaction are different), DownloadFailed, FileSerialization (invalid JSON), and RollupVerifyFailed (invalid tree root, leaf pubkey, etc.).

The initial state for a rollup is ReceivedTransaction. If the rollup is already stored in the database, we retrieve it, cast it to the Rollup structure in the code, and move to the next state. Otherwise, we need to download it using the URL received from the transaction.

The next state is SuccessfullyDownload. Here, we need to validate that the rollup contains a valid tree. For this purpose, we use the ITree trait, which abstracts ConcurrentMerkleTree<DEPTH, BUF_SIZE> regardless of the const generic parameters. This abstraction is necessary for working comfortably with ConcurrentMerkleTree without causing stack overflow (detailed reasons are described in the code comments in the merkle_tree_wrapper.rs file). If validation completes successfully, the rollup transitions to the SuccessfullyValidate state.

If the rollup is SuccessfullyValidate, we can process all the assets inside it by iterating over them and calling the mint_v1 instruction handler. Once all assets are processed, the rollup transitions to the final StoredUpdate state. If a failure occurs at any step, the rollup will enter the FailedToPersist state. Any step may be retried.

The persist_rollups process runs in a single worker in the nft_ingester/src/main.rs file. While we can run it with multiple workers, it can be very RAM-intensive (a single rollup may be many GB in size).

@RequescoS RequescoS marked this pull request as draft June 11, 2024 15:41
@RequescoS RequescoS requested a review from danenbm June 12, 2024 06:24
@RequescoS RequescoS marked this pull request as ready for review June 12, 2024 06:45
@RequescoS RequescoS marked this pull request as draft June 12, 2024 06:49
blockbuster/.gitmodules Outdated Show resolved Hide resolved
blockbuster/blockbuster/src/error.rs Outdated Show resolved Hide resolved
program_transformers/src/rollups/rollup_persister.rs Outdated Show resolved Hide resolved

pub async fn persist_rollups(&self) {
loop {
tokio::time::sleep(Duration::from_secs(5)).await;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you considered doing a a trigger/notification when a row is added to the rollup_to_verify table? That would probably work better than a hardcoded delay for the times when there are no rollups and the times there are spikes of multiple new entries.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you! I added listener for inserts into rollup_to_verify table. Primary i was thinking about share single listener between multiple instances of RollupPersister but as far as most listener methods use mutable self reference (&mut self) i decided to create new listener for each RollupPersister. i hope that together with StartProcessing state it will not cause any race conditions

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was also thinking about using single listener and sending notifications inside code using channels, but mpsc is very uncomfortable for such purpose because we need to send notifications for multiple consumers/workers. There also one community crate that implements Golang channels for Rust with multiple consumers but i think that it is bad practice to use additional import in our case, wdyt?

Copy link
Contributor

@danenbm danenbm Jul 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there could be a race condition within get_rollup_to_verify():

  1. Both processes start a transaction, and both can use a SELECT query to find the same row in rollups_to_verify, which is a row that is not in the StartProcessing state.

  2. Both processes attempt to update the selected row in rollups_to_verify, which does implicitly lock the row. The second process waits for the lock on the row to be released.

  3. The first process commits the transaction and releases the lock. The second process successfully updates the row again.

  4. Now both processes move to persist_rollup() and can race each other in their respective state machines.

I think to solve this issue, there's a few options.

  1. You could figure out the single listener thing and then each process would receive a notification. However, if there's multiple rows getting added at nearly the same time, then there will be multiple triggers at nearly the same time. From what I can tell there's no specific rollup_to_verify row associated with a trigger, so then the multiple processes could still find the same row, causing the same concurrency issue.

  2. You could use a multi-consumer/multi-producer channel. I agree that mpsc is incorrect for this purpose. I agree the community crate is not right either, as the README says it is end-of-life and deprecated. The Tokio tutorial at https://tokio.rs/tokio/tutorial/channels suggests to use async-channel crate for multi-consumer/multi-producer, so you could look into that.

  3. My recommended option is this third one, which is modify get_rollup_to_verify() to do a SELECT ... FOR UPDATE. I believe you can accomplish this by simply adding lock_for_update() to the SELECT:

let rollup_to_verify = rollup_to_verify::Entity::find()
    .filter(condition)
    .order_by_asc(rollup_to_verify::Column::CreatedAtSlot)
+   .lock_for_update() // ************** Lock the row for the current transaction
    .one(&multi_txn)
    .await
    .map_err(|e| ProgramTransformerError::DatabaseError(e.to_string()))?;

Then I think persist_rollup also needs to be modified to NOT service rollups in the ReceivedTransaction state. Otherwise I think it will still grab anything that was just ingested by the program transformer.

  1. I think there could be multiple other database strategies, such as "serializable transactions" (that one probably causes too much performance hit). I think what I suggest in 3 will work. Here's some other Postgres resources if you want to deep-dive this topic:

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you very much!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No problem! LGTM with the lock (option 3) in latest rev.

Comment on lines 202 to 208
let Ok(rollup) = rollup
.map(|r| bincode::deserialize::<Rollup>(r.rollup_binary_bincode.as_slice()))
.transpose()
.map_err(|e| ProgramTransformerError::DeserializationError(e.to_string()))
else {
continue;
};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the case of error will it keep trying to deserialize this rollup in subsequent iterations of the loop, because it is stuck in RollupPersistingState::ReceivedTransaction and never deleted from the table?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great catch, thank you!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the sequence would be: if this deserialization failed, it would default to None, which it would pass to persist_rollup, which would re-download and serialize it, and update the row in the rollup table with new FileHash and RollupBinaryBincode, which we would assume would fix the issue. Is this correct understanding?

But now looking at it more, when is this code even expected to run? In the outer persist_rollups process, it should get a rollup to verify from the rollup_to_verify table and at the same time attempt to get the downloaded rollup from the rollup table. But it doesn't seem like the Rollup should yet be poplulated in the rollup table until persist_rollup function is called which actually does the download.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still don't fully understand when this code would be run to deserialize a rollup. It seems like it would only be on an error case because it wouldn't be populated until download_rollup occurs?

program_transformers/src/rollups/rollup_persister.rs Outdated Show resolved Hide resolved
program_transformers/src/rollups/rollup_persister.rs Outdated Show resolved Hide resolved
Cargo.toml Show resolved Hide resolved
program_transformers/src/rollups/rollup_persister.rs Outdated Show resolved Hide resolved
n00m4d and others added 4 commits July 10, 2024 20:21
* feat: make rollup processing feature optional

* feat: inverted parameter
.get(1)
.ok_or(BlockbusterError::InstructionParsingError)?;
let staker = *keys
.get(2)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here should be 4 according to latest Bubblegum program version, I mean version with rollup changes.

@@ -0,0 +1,299 @@
use crate::error::RollupValidationError;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code in this file looks like code in SDK. Can we import it from there?

pub leaf_update: LeafSchema,
pub mint_args: MetadataArgs,
#[serde(with = "serde_with::As::<serde_with::DisplayFromStr>")]
pub authority: Pubkey,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here also should be creator_signature. But I gues we can add it as a separate ticket/PR. Task added to the backlog

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeap, i think we can remove all code from sdk and just use import but in the scope of separate task. This will also add creator_signature

Comment on lines 162 to 198
#[derive(Iden)]
enum RollupToVerify {
Table,
Url,
FileHash,
CreatedAtSlot,
Signature,
DownloadAttempts,
RollupPersistingState,
RollupFailStatus,
Staker,
}

#[derive(Iden, Debug, PartialEq, Sequence)]
enum PersistingRollupState {
ReceivedTransaction,
FailedToPersist,
StartProcessing,
SuccessfullyDownload,
SuccessfullyValidate,
StoredUpdate,
}

#[derive(Iden, Debug, PartialEq, Sequence)]
enum FailedRollupState {
DownloadFailed,
ChecksumVerifyFailed,
RollupVerifyFailed,
FileSerialization,
}

#[derive(Iden)]
enum Rollup {
Table,
FileHash,
RollupBinaryBincode,
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if it matters but should these be under the model directory and then imported?

Comment on lines 32 to 47
.on_conflict(
OnConflict::columns([digital_asset_types::dao::rollup_to_verify::Column::FileHash])
.update_columns([digital_asset_types::dao::rollup_to_verify::Column::Url])
.update_columns([digital_asset_types::dao::rollup_to_verify::Column::Signature])
.update_columns([
digital_asset_types::dao::rollup_to_verify::Column::DownloadAttempts,
])
.update_columns([
digital_asset_types::dao::rollup_to_verify::Column::RollupFailStatus,
])
.update_columns([
digital_asset_types::dao::rollup_to_verify::Column::RollupPersistingState,
])
.update_columns([digital_asset_types::dao::rollup_to_verify::Column::CreatedAtSlot])
.to_owned(),
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should staker now be included in the ON CONFLICT section?

Comment on lines 19 to 27
if let Err(e) = listener.recv().await {
error!("Recv rollup notification: {}", e);
tokio::time::sleep(Duration::from_secs(5)).await;
continue;
}
if let Err(e) = s.send(()).await {
error!("Send rollup notification: {}", e);
tokio::time::sleep(Duration::from_secs(5)).await;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the benefit of waiting 5 seconds if there is a send or receive error? Doesn't it drop the problematic message and move on? Why wait 5 seconds before doing that?

Comment on lines 288 to 300
let condition = Condition::all()
.add(
rollup_to_verify::Column::RollupPersistingState
.ne(RollupPersistingState::FailedToPersist),
)
.add(
rollup_to_verify::Column::RollupPersistingState
.ne(RollupPersistingState::StoredUpdate),
)
.add(
rollup_to_verify::Column::RollupPersistingState
.ne(RollupPersistingState::StartProcessing),
);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if the state of the rollup is SuccessfullyDownload or SuccessfullyValidate? It could get picked up here by another processor and put back into StartProcessing?

Instead shouldn't this condition just be .eq(RollupPersistingState::ReceivedTransaction?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the state of the rollup is SuccessfullyDownload or SuccessfullyValidate then it will be picked up here by processor and continue processing from this state

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh yeah I see. Somehow thought it would get reset.

Comment on lines 228 to 231
&RollupPersistingState::ReceivedTransaction => {
// We get ReceivedTransaction state on the start of processing
rollup_to_verify.rollup_persisting_state =
RollupPersistingState::StartProcessing;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This case should not be possible right because anything gotten by get_rollup_to_verify() should be moved out of this state before being sent as a parameter to persist_rollup?

);

pub fn make_concurrent_merkle_tree(
max_dapth: u32,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: typo

pub fn validate_change_logs(
max_depth: u32,
max_buffer_size: u32,
leafs: &[[u8; 32]],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: should be leaves

Comment on lines 202 to 208
let Ok(rollup) = rollup
.map(|r| bincode::deserialize::<Rollup>(r.rollup_binary_bincode.as_slice()))
.transpose()
.map_err(|e| ProgramTransformerError::DeserializationError(e.to_string()))
else {
continue;
};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still don't fully understand when this code would be run to deserialize a rollup. It seems like it would only be on an error case because it wouldn't be populated until download_rollup occurs?


pub async fn persist_rollups(&self) {
loop {
tokio::time::sleep(Duration::from_secs(5)).await;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No problem! LGTM with the lock (option 3) in latest rev.

Comment on lines 115 to 123
let r = rollup_to_verify::Entity::find()
.filter(rollup_to_verify::Column::FileHash.eq(metadata_hash.clone()))
.one(setup.db.as_ref())
.await
.unwrap()
.unwrap();

assert_eq!(r.file_hash, metadata_hash);
assert_eq!(r.url, metadata_url);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wdyt about checking all the expected columns that should have been inserted?

Comment on lines 190 to 202
let query = rollup_to_verify::Entity::insert(rollup_to_verify)
.on_conflict(
OnConflict::columns([rollup_to_verify::Column::FileHash])
.update_columns([rollup_to_verify::Column::Url])
.update_columns([rollup_to_verify::Column::Signature])
.update_columns([rollup_to_verify::Column::DownloadAttempts])
.update_columns([rollup_to_verify::Column::RollupFailStatus])
.update_columns([rollup_to_verify::Column::RollupPersistingState])
.update_columns([rollup_to_verify::Column::CreatedAtSlot])
.to_owned(),
)
.build(DbBackend::Postgres);
setup.db.execute(query).await.unwrap();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should staker column be incorporated into test now?

}
&RollupPersistingState::SuccessfullyValidate => {
if let Some(r) = &rollup {
// TODO: Add retry?
Copy link
Contributor

@danenbm danenbm Jul 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we do a retry for normal bubblegum processing, but did you want to add retry here? Seems like we don't need it since we don't do it for existing bubblegum txn processing.

Copy link
Contributor

@danenbm danenbm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unless there are significant changes, I believe I've completed my reviews and once the last open comments are resolved the change looks good to me. I really like the thoughtful design and good testing that was added for this feature.

Next steps for this PR would be:

  1. Resolve merge conflicts with main, release dependencies and update this PR to use released packages.
  2. Send PR out to the wider RPC community, discuss on the RPC channel we have, get approval and then merge to main.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as overall naming goes, would batch_operation_persistor (and in general batch_operation_*) be a better fit than rollup_persistor? Or are there other options discussed for naming besides rollup?

Comment on lines 598 to 611
bubblegum::mint_v1::mint_v1(
&rolled_mint.into(),
&InstructionBundle {
txn_id: &signature,
program: Default::default(),
instruction: None,
inner_ix: None,
keys: &[],
slot,
},
txn,
"CreateTreeWithRoot",
false,
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we've already talked about this indirectly in a previous comment, but wanted to consider it directly.

The transactions sent for finalize_merkle_tree_with_root will all have same sequence number of 1. The normal backfilling process for a tree searches for gaps in the sequence numbers to detect missing transactions. In this case, the batch mint/rollup is downloaded and verified by the indexing process so later gap filling is irrelevant.

The end result is there will be multiple leaves for the same tree that share the same sequence number. I cannot think of any issues with that. But wanted to leave the comment just to share the thought process, in case you can think of any system issues with the shared sequence numbers or backfilling.

@RequescoS RequescoS changed the title feat: rollup indexing feat: batch mint indexing Jul 26, 2024
RequescoS and others added 4 commits July 26, 2024 10:08
* feat: add creators and collection bath mint verifications

* style: fmt

* Update blockbuster/blockbuster/src/programs/bubblegum/mod.rs

Co-authored-by: Stanislav Cherviakov <stchervyakov@gmail.com>

* chore: dependency change

* chore: renaming

---------

Co-authored-by: Stanislav Cherviakov <stchervyakov@gmail.com>
* feat: add skip batch mint indexing feature

* fix: small fixes

* chore: add comment and variable renaming
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants