Skip to content

Commit

Permalink
feat(ops): add account backfill to ops cli by program or for a single…
Browse files Browse the repository at this point in the history
… account (#140)
  • Loading branch information
kespinola authored Apr 5, 2024
1 parent b9bcbda commit d5f1967
Show file tree
Hide file tree
Showing 18 changed files with 461 additions and 130 deletions.
9 changes: 9 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,17 @@ publish.workspace = true

[dependencies]
anyhow = { workspace = true }
backon = { workspace = true }
solana-account-decoder = { workspace = true }
solana-client = { workspace = true }
solana-sdk = { workspace = true }
solana-transaction-status = { workspace = true }
cadence = { workspace = true }
cadence-macros = { workspace = true }
thiserror = { workspace = true }
figment = { workspace = true }
plerkle_messenger = { workspace = true }
tokio = { workspace = true }
clap = { workspace = true, features = ["derive", "cargo", "env"] }
sqlx = { workspace = true, features = ["runtime-tokio-rustls", "postgres"] }

Expand Down
4 changes: 4 additions & 0 deletions core/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
mod db;
mod metrics;
mod plerkle_messenger_queue;
mod solana_rpc;

pub use db::*;
pub use metrics::*;
pub use plerkle_messenger_queue::*;
pub use solana_rpc::*;
180 changes: 180 additions & 0 deletions core/src/plerkle_messenger_queue.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
use anyhow::Result;
use clap::Parser;
use figment::value::{Dict, Value};
use plerkle_messenger::{
Messenger, MessengerConfig, MessengerType, ACCOUNT_BACKFILL_STREAM, ACCOUNT_STREAM,
TRANSACTION_BACKFILL_STREAM, TRANSACTION_STREAM,
};
use std::num::TryFromIntError;
use std::sync::Arc;
use tokio::sync::mpsc;
use tokio::sync::{mpsc::error::TrySendError, Mutex};

#[derive(Clone, Debug, Parser)]
pub struct QueueArgs {
#[arg(long, env)]
pub messenger_redis_url: String,
#[arg(long, env, default_value = "100")]
pub messenger_redis_batch_size: String,
#[arg(long, env, default_value = "25")]
pub messenger_queue_connections: u64,
}

impl From<QueueArgs> for MessengerConfig {
fn from(args: QueueArgs) -> Self {
let mut connection_config = Dict::new();

connection_config.insert(
"redis_connection_str".to_string(),
Value::from(args.messenger_redis_url),
);
connection_config.insert(
"batch_size".to_string(),
Value::from(args.messenger_redis_batch_size),
);
connection_config.insert(
"pipeline_size_bytes".to_string(),
Value::from(1u128.to_string()),
);

Self {
messenger_type: MessengerType::Redis,
connection_config,
}
}
}

#[derive(thiserror::Error, Debug)]
pub enum QueuePoolError {
#[error("messenger")]
Messenger(#[from] plerkle_messenger::MessengerError),
#[error("tokio try send to channel")]
TrySendMessengerChannel(#[from] TrySendError<Box<dyn Messenger>>),
#[error("revc messenger connection")]
RecvMessengerConnection,
#[error("try from int")]
TryFromInt(#[from] TryFromIntError),
#[error("tokio send to channel")]
SendMessengerChannel(#[from] mpsc::error::SendError<Box<dyn Messenger>>),
}

#[derive(Debug, Clone)]
pub struct QueuePool {
tx: mpsc::Sender<Box<dyn plerkle_messenger::Messenger>>,
rx: Arc<Mutex<mpsc::Receiver<Box<dyn plerkle_messenger::Messenger>>>>,
}

impl QueuePool {
pub async fn try_from_config(config: QueueArgs) -> anyhow::Result<Self, QueuePoolError> {
let size = usize::try_from(config.messenger_queue_connections)?;
let (tx, rx) = mpsc::channel(size);

for _ in 0..config.messenger_queue_connections {
let messenger_config: MessengerConfig = config.clone().into();
let mut messenger = plerkle_messenger::select_messenger(messenger_config).await?;

let streams = [
(plerkle_messenger::ACCOUNT_STREAM, 100_000_000),
(plerkle_messenger::ACCOUNT_BACKFILL_STREAM, 100_000_000),
(plerkle_messenger::SLOT_STREAM, 100_000),
(plerkle_messenger::TRANSACTION_STREAM, 10_000_000),
(plerkle_messenger::TRANSACTION_BACKFILL_STREAM, 10_000_000),
(plerkle_messenger::BLOCK_STREAM, 100_000),
];

for &(key, size) in &streams {
messenger.add_stream(key).await?;
messenger.set_buffer_size(key, size).await;
}

tx.try_send(messenger)?;
}

Ok(Self {
tx,
rx: Arc::new(Mutex::new(rx)),
})
}

/// Pushes account backfill data to the appropriate stream.
///
/// This method sends account backfill data to the `ACCOUNT_BACKFILL_STREAM`.
/// It is used for backfilling account information in the system.
///
/// # Arguments
///
/// * `bytes` - A byte slice containing the account backfill data to be pushed.
///
/// # Returns
///
/// This method returns a `Result` which is `Ok` if the push is successful,
/// or an `Err` with a `QueuePoolError` if the push fails.
pub async fn push_account_backfill(&self, bytes: &[u8]) -> Result<(), QueuePoolError> {
self.push(ACCOUNT_BACKFILL_STREAM, bytes).await
}

/// Pushes transaction backfill data to the appropriate stream.
///
/// This method sends transaction backfill data to the `TRANSACTION_BACKFILL_STREAM`.
/// It is used for backfilling transaction information in the system.
///
/// # Arguments
///
/// * `bytes` - A byte slice containing the transaction backfill data to be pushed.
///
/// # Returns
///
/// This method returns a `Result` which is `Ok` if the push is successful,
/// or an `Err` with a `QueuePoolError` if the push fails.
pub async fn push_transaction_backfill(&self, bytes: &[u8]) -> Result<(), QueuePoolError> {
self.push(TRANSACTION_BACKFILL_STREAM, bytes).await
}

/// Pushes account data to the appropriate stream.
///
/// This method sends account data to the `ACCOUNT_STREAM`.
/// It is used for pushing real-time account updates to the system.
///
/// # Arguments
///
/// * `bytes` - A byte slice containing the account data to be pushed.
///
/// # Returns
///
/// This method returns a `Result` which is `Ok` if the push is successful,
/// or an `Err` with a `QueuePoolError` if the push fails.
pub async fn push_account(&self, bytes: &[u8]) -> Result<(), QueuePoolError> {
self.push(ACCOUNT_STREAM, bytes).await
}

/// Pushes transaction data to the appropriate stream.
///
/// This method sends transaction data to the `TRANSACTION_STREAM`.
/// It is used for pushing real-time transaction updates to the system.
///
/// # Arguments
///
/// * `bytes` - A byte slice containing the transaction data to be pushed.
///
/// # Returns
///
/// This method returns a `Result` which is `Ok` if the push is successful,
/// or an `Err` with a `QueuePoolError` if the push fails.
pub async fn push_transaction(&self, bytes: &[u8]) -> Result<(), QueuePoolError> {
self.push(TRANSACTION_STREAM, bytes).await
}

async fn push(&self, stream_key: &'static str, bytes: &[u8]) -> Result<(), QueuePoolError> {
let mut rx = self.rx.lock().await;
let mut messenger = rx
.recv()
.await
.ok_or(QueuePoolError::RecvMessengerConnection)?;

messenger.send(stream_key, bytes).await?;

self.tx.send(messenger).await?;

Ok(())
}
}
25 changes: 25 additions & 0 deletions ops/src/bubblegum/rpc.rs → core/src/solana_rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,31 @@ impl Rpc {
.await
}

pub async fn get_account(
&self,
pubkey: &Pubkey,
) -> Result<
solana_client::rpc_response::Response<std::option::Option<solana_sdk::account::Account>>,
ClientError,
> {
(|| async {
self.0
.get_account_with_config(
pubkey,
RpcAccountInfoConfig {
encoding: Some(UiAccountEncoding::Base64),
commitment: Some(CommitmentConfig {
commitment: CommitmentLevel::Finalized,
}),
..RpcAccountInfoConfig::default()
},
)
.await
})
.retry(&ExponentialBuilder::default())
.await
}

pub async fn get_program_accounts(
&self,
program: &Pubkey,
Expand Down
30 changes: 30 additions & 0 deletions ops/src/account/account_details.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
use anyhow::Result;
use das_core::Rpc;
use solana_sdk::{account::Account, pubkey::Pubkey};

pub struct AccountDetails<'a> {
pub account: Account,
pub slot: u64,
pub pubkey: &'a Pubkey,
}

impl<'a> AccountDetails<'a> {
pub fn new(account: Account, slot: u64, pubkey: &'a Pubkey) -> Self {
Self {
account,
slot,
pubkey,
}
}

pub async fn fetch(rpc: &Rpc, pubkey: &'a Pubkey) -> Result<Self> {
let account_response = rpc.get_account(pubkey).await?;
let slot = account_response.context.slot;

let account = account_response
.value
.ok_or_else(|| anyhow::anyhow!("Account not found for pubkey: {}", pubkey))?;

Ok(Self::new(account, slot, pubkey))
}
}
32 changes: 32 additions & 0 deletions ops/src/account/cmd.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
use super::{program, single};
use anyhow::Result;
use clap::{Args, Subcommand};

#[derive(Debug, Clone, Subcommand)]
pub enum Commands {
/// The 'program' command is used to backfill the index against on-chain accounts owned by a program.
#[clap(name = "program")]
Program(program::Args),
/// The 'single' command is used to backfill the index against a single account.
#[clap(name = "single")]
Single(single::Args),
}

#[derive(Debug, Clone, Args)]
pub struct AccountCommand {
#[clap(subcommand)]
pub action: Commands,
}

pub async fn subcommand(subcommand: AccountCommand) -> Result<()> {
match subcommand.action {
Commands::Program(args) => {
program::run(args).await?;
}
Commands::Single(args) => {
single::run(args).await?;
}
}

Ok(())
}
6 changes: 6 additions & 0 deletions ops/src/account/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
mod account_details;
mod cmd;
mod program;
mod single;

pub use cmd::*;
Loading

0 comments on commit d5f1967

Please sign in to comment.