diff --git a/build/Dockerfile b/build/Dockerfile index 4b928875f..1187e8c3a 100644 --- a/build/Dockerfile +++ b/build/Dockerfile @@ -422,8 +422,6 @@ RUN < Result<(), Box> { let tempdir = tempfile::tempdir()?; diff --git a/cmd/authority-claimer/src/checker.rs b/cmd/authority-claimer/src/checker.rs index 6c4f16106..982623698 100644 --- a/cmd/authority-claimer/src/checker.rs +++ b/cmd/authority-claimer/src/checker.rs @@ -30,6 +30,7 @@ pub trait DuplicateChecker: Debug { async fn is_duplicated_rollups_claim( &mut self, rollups_claim: &RollupsClaim, + iconsensus: &Address, ) -> Result; } @@ -40,15 +41,13 @@ pub trait DuplicateChecker: Debug { #[derive(Debug, Clone, Hash, Eq, PartialEq)] struct Claim { application: Address, - first_index: u64, - last_index: u64, - epoch_hash: Hash, + last_block: u64, + claim_hash: Hash, } #[derive(Debug)] pub struct DefaultDuplicateChecker { provider: Arc>>, - iconsensus: IConsensus>>, from: EthersAddress, claims: HashSet, confirmations: usize, @@ -81,7 +80,6 @@ pub enum DuplicateCheckerError { impl DefaultDuplicateChecker { pub async fn new( http_endpoint: String, - iconsensus: Address, from: EthersAddress, confirmations: usize, genesis_block: u64, @@ -94,19 +92,13 @@ impl DefaultDuplicateChecker { INITIAL_BACKOFF, ); let provider = Arc::new(Provider::new(retry_client)); - let iconsensus = IConsensus::new( - H160(iconsensus.inner().to_owned()), - provider.clone(), - ); - let mut checker = Self { + let checker = Self { provider, - iconsensus, from, claims: HashSet::new(), confirmations, next_block_to_read: genesis_block, }; - checker.update_claims().await?; // to allow failure during instantiation Ok(checker) } } @@ -118,20 +110,23 @@ impl DuplicateChecker for DefaultDuplicateChecker { async fn is_duplicated_rollups_claim( &mut self, rollups_claim: &RollupsClaim, + iconsensus: &Address, ) -> Result { - self.update_claims().await?; + self.update_claims(iconsensus).await?; let claim = Claim { application: rollups_claim.dapp_address.clone(), - first_index: rollups_claim.first_index as u64, - last_index: rollups_claim.last_index as u64, - epoch_hash: rollups_claim.epoch_hash.clone(), + last_block: rollups_claim.last_block, + claim_hash: rollups_claim.output_merkle_root_hash.clone(), }; Ok(self.claims.contains(&claim)) } } impl DefaultDuplicateChecker { - async fn update_claims(&mut self) -> Result<(), DuplicateCheckerError> { + async fn update_claims( + &mut self, + iconsensus: &Address, + ) -> Result<(), DuplicateCheckerError> { let depth = self.confirmations as u64; let latest = self @@ -153,8 +148,12 @@ impl DefaultDuplicateChecker { return Ok(()); } - let claims = self - .iconsensus + let iconsensus = IConsensus::new( + H160(iconsensus.inner().to_owned()), + self.provider.clone(), + ); + + let claims = iconsensus .claim_submission_filter() .from_block(self.next_block_to_read) .to_block(latest) @@ -173,9 +172,10 @@ impl DefaultDuplicateChecker { for claim_submission in claims.into_iter() { let claim = Claim { application: Address::new(claim_submission.app_contract.into()), - first_index: claim_submission.input_range.first_index, - last_index: claim_submission.input_range.last_index, - epoch_hash: Hash::new(claim_submission.epoch_hash), + last_block: claim_submission + .last_processed_block_number + .as_u64(), + claim_hash: Hash::new(claim_submission.claim), }; self.claims.insert(claim); } diff --git a/cmd/authority-claimer/src/claimer.rs b/cmd/authority-claimer/src/claimer.rs index c0dd8e80e..f75d0b740 100644 --- a/cmd/authority-claimer/src/claimer.rs +++ b/cmd/authority-claimer/src/claimer.rs @@ -2,10 +2,11 @@ // SPDX-License-Identifier: Apache-2.0 (see LICENSE) use crate::{ - checker::DuplicateChecker, listener::BrokerListener, + checker::DuplicateChecker, repository::Repository, sender::TransactionSender, }; use async_trait::async_trait; +use ethers::types::H256; use snafu::ResultExt; use std::fmt::Debug; use tracing::{info, trace}; @@ -25,13 +26,10 @@ pub trait Claimer: Sized + Debug { } #[derive(Debug, snafu::Snafu)] -pub enum ClaimerError< - B: BrokerListener, - D: DuplicateChecker, - T: TransactionSender, -> { - #[snafu(display("broker listener error"))] - BrokerListener { source: B::Error }, +pub enum ClaimerError +{ + #[snafu(display("repository error"))] + Repository { source: R::Error }, #[snafu(display("duplicated claim error"))] DuplicatedClaim { source: D::Error }, @@ -48,25 +46,25 @@ pub enum ClaimerError< /// `BrokerListener`, a `DuplicateChecker` and a `TransactionSender`. #[derive(Debug)] pub struct DefaultClaimer< - B: BrokerListener, + R: Repository, D: DuplicateChecker, T: TransactionSender, > { - broker_listener: B, + repository: R, duplicate_checker: D, transaction_sender: T, } -impl - DefaultClaimer +impl + DefaultClaimer { pub fn new( - broker_listener: B, + repository: R, duplicate_checker: D, transaction_sender: T, ) -> Self { Self { - broker_listener, + repository, duplicate_checker, transaction_sender, } @@ -74,40 +72,50 @@ impl } #[async_trait] -impl Claimer for DefaultClaimer +impl Claimer for DefaultClaimer where - B: BrokerListener + Send + Sync + 'static, + R: Repository + Send + Sync + 'static, D: DuplicateChecker + Send + Sync + 'static, T: TransactionSender + Send + 'static, { - type Error = ClaimerError; + type Error = ClaimerError; async fn start(mut self) -> Result<(), Self::Error> { trace!("Starting the authority claimer loop"); loop { - let rollups_claim = self - .broker_listener - .listen() - .await - .context(BrokerListenerSnafu)?; - trace!("Got a claim from the broker: {:?}", rollups_claim); + let (rollups_claim, iconsensus) = + self.repository.get_claim().await.context(RepositorySnafu)?; + info!("Received claim from the repository: {:?}", rollups_claim); + let tx_hash: H256; + let id = rollups_claim.id; let is_duplicated_rollups_claim = self .duplicate_checker - .is_duplicated_rollups_claim(&rollups_claim) + .is_duplicated_rollups_claim(&rollups_claim, &iconsensus) .await .context(DuplicatedClaimSnafu)?; if is_duplicated_rollups_claim { - trace!("It was a duplicated claim"); + info!("Duplicate claim detected: {:?}", rollups_claim); + // Updates the database so the claim leaves the queue + self.repository + .update_claim(id, H256::zero()) + .await + .context(RepositorySnafu)?; continue; } - info!("Sending a new rollups claim"); - self.transaction_sender = self + info!("Sending a new rollups claim transaction"); + (tx_hash, self.transaction_sender) = self .transaction_sender - .send_rollups_claim_transaction(rollups_claim) + .send_rollups_claim_transaction(rollups_claim, iconsensus) + .await + .context(TransactionSenderSnafu)?; + + trace!("Updating claim data in repository"); + self.repository + .update_claim(id, tx_hash) .await - .context(TransactionSenderSnafu)? + .context(RepositorySnafu)?; } } } diff --git a/cmd/authority-claimer/src/config.rs b/cmd/authority-claimer/src/config.rs index 0edd348b4..ce4da458a 100644 --- a/cmd/authority-claimer/src/config.rs +++ b/cmd/authority-claimer/src/config.rs @@ -4,7 +4,7 @@ use crate::{ log::{LogConfig, LogEnvCliConfig}, redacted::Redacted, - rollups_events::{Address, BrokerCLIConfig, BrokerConfig, HexArrayError}, + rollups_events::HexArrayError, }; use clap::{command, Parser}; use eth_tx_manager::{ @@ -48,9 +48,8 @@ pub struct Config { pub tx_manager_config: TxManagerConfig, pub tx_signing_config: TxSigningConfig, pub tx_manager_priority: Priority, - pub broker_config: BrokerConfig, pub log_config: LogConfig, - pub iconsensus_address: Address, + pub postgres_endpoint: String, pub genesis_block: u64, pub http_server_port: u16, } @@ -83,22 +82,16 @@ impl Config { let tx_signing_config = TxSigningConfig::try_from(cli_config.tx_signing_config)?; - let broker_config = BrokerConfig::from(cli_config.broker_config); - let log_config = LogConfig::initialize(cli_config.log_config); - let iconsensus_address = cli_config - .iconsensus_address - .try_into() - .context(ParseIConsensusAddressSnafu)?; + let postgres_endpoint = cli_config.postgres_endpoint; Ok(Config { tx_manager_config, tx_signing_config, tx_manager_priority: Priority::Normal, - broker_config, log_config, - iconsensus_address, + postgres_endpoint, genesis_block: cli_config.genesis_block, http_server_port: cli_config.http_server_port, }) @@ -115,15 +108,12 @@ struct AuthorityClaimerCLI { #[command(flatten)] pub tx_signing_config: TxSigningCLIConfig, - #[command(flatten)] - pub broker_config: BrokerCLIConfig, - #[command(flatten)] pub log_config: LogEnvCliConfig, - /// Address of the IConsensus contract + /// Postgres endpoint address #[arg(long, env)] - pub iconsensus_address: String, + pub postgres_endpoint: String, /// Genesis block for reading blockchain events #[arg(long, env, default_value_t = 1)] diff --git a/cmd/authority-claimer/src/lib.rs b/cmd/authority-claimer/src/lib.rs index 9132f0f7d..b02735cf5 100644 --- a/cmd/authority-claimer/src/lib.rs +++ b/cmd/authority-claimer/src/lib.rs @@ -6,24 +6,21 @@ mod claimer; mod config; mod contracts; mod http_server; -mod listener; pub mod log; mod metrics; mod redacted; +mod repository; mod rollups_events; mod sender; mod signer; mod types; -#[cfg(test)] -mod test_fixtures; - use checker::DefaultDuplicateChecker; use claimer::{Claimer, DefaultClaimer}; pub use config::Config; use ethers::signers::Signer; -use listener::DefaultBrokerListener; use metrics::AuthorityClaimerMetrics; +use repository::DefaultRepository; use sender::DefaultTransactionSender; use signer::ConditionalSigner; use snafu::Error; @@ -37,11 +34,9 @@ pub async fn run(config: Config) -> Result<(), Box> { let chain_id = config.tx_manager_config.chain_id; - // Creating the broker listener. - trace!("Creating the broker listener"); - let broker_listener = - DefaultBrokerListener::new(config.broker_config.clone(), chain_id) - .await?; + // Creating repository. + trace!("Creating the repository"); + let repository = DefaultRepository::new(config.postgres_endpoint)?; // Creating the conditional signer. let conditional_signer = @@ -52,7 +47,6 @@ pub async fn run(config: Config) -> Result<(), Box> { trace!("Creating the duplicate checker"); let duplicate_checker = DefaultDuplicateChecker::new( config.tx_manager_config.provider_http_endpoint.clone(), - config.iconsensus_address.clone(), from, config.tx_manager_config.default_confirmations, config.genesis_block, @@ -65,7 +59,6 @@ pub async fn run(config: Config) -> Result<(), Box> { config.tx_manager_config, config.tx_manager_priority, conditional_signer, - config.iconsensus_address, from, chain_id, metrics, @@ -73,11 +66,8 @@ pub async fn run(config: Config) -> Result<(), Box> { .await?; // Creating the claimer loop. - let claimer = DefaultClaimer::new( - broker_listener, - duplicate_checker, - transaction_sender, - ); + let claimer = + DefaultClaimer::new(repository, duplicate_checker, transaction_sender); let claimer_handle = claimer.start(); // Starting the HTTP server and the claimer loop. diff --git a/cmd/authority-claimer/src/listener.rs b/cmd/authority-claimer/src/listener.rs deleted file mode 100644 index 358979250..000000000 --- a/cmd/authority-claimer/src/listener.rs +++ /dev/null @@ -1,212 +0,0 @@ -// (c) Cartesi and individual authors (see AUTHORS) -// SPDX-License-Identifier: Apache-2.0 (see LICENSE) - -use crate::rollups_events::{ - Broker, BrokerConfig, BrokerError, RollupsClaim, RollupsClaimsStream, - INITIAL_ID, -}; -use async_trait::async_trait; -use snafu::ResultExt; -use std::fmt::Debug; - -/// The `BrokerListener` listens for new claims from the broker -#[async_trait] -pub trait BrokerListener: Debug { - type Error: snafu::Error + 'static; - - /// Listen to claims - async fn listen(&mut self) -> Result; -} - -// ------------------------------------------------------------------------------------------------ -// DefaultBrokerListener -// ------------------------------------------------------------------------------------------------ - -#[derive(Debug)] -pub struct DefaultBrokerListener { - broker: Broker, - stream: RollupsClaimsStream, - last_claim_id: String, -} - -#[derive(Debug, snafu::Snafu)] -pub enum BrokerListenerError { - #[snafu(display("broker error"))] - BrokerError { source: BrokerError }, -} - -impl DefaultBrokerListener { - pub async fn new( - broker_config: BrokerConfig, - chain_id: u64, - ) -> Result { - tracing::trace!("Connecting to the broker ({:?})", broker_config); - let broker = Broker::new(broker_config).await?; - let stream = RollupsClaimsStream::new(chain_id); - let last_claim_id = INITIAL_ID.to_string(); - Ok(Self { - broker, - stream, - last_claim_id, - }) - } -} - -#[async_trait] -impl BrokerListener for DefaultBrokerListener { - type Error = BrokerListenerError; - - async fn listen(&mut self) -> Result { - tracing::trace!("Waiting for claim with id {}", self.last_claim_id); - let event = self - .broker - .consume_blocking(&self.stream, &self.last_claim_id) - .await - .context(BrokerSnafu)?; - - self.last_claim_id = event.id; - - Ok(event.payload) - } -} - -#[cfg(test)] -mod tests { - use crate::listener::{BrokerListener, DefaultBrokerListener}; - use crate::redacted::{RedactedUrl, Url}; - use crate::rollups_events::{ - broker::BrokerEndpoint, BrokerConfig, BrokerError, RollupsClaim, - }; - use crate::test_fixtures::BrokerFixture; - use backoff::ExponentialBackoffBuilder; - use std::time::Duration; - use testcontainers::clients::Cli; - - // ------------------------------------------------------------------------------------------------ - // Broker Mock - // ------------------------------------------------------------------------------------------------ - - pub async fn setup_broker( - docker: &Cli, - should_fail: bool, - ) -> Result<(BrokerFixture, DefaultBrokerListener), BrokerError> { - let fixture = BrokerFixture::setup(docker).await; - - let redis_endpoint = if should_fail { - BrokerEndpoint::Single(RedactedUrl::new( - Url::parse("https://invalid.com").unwrap(), - )) - } else { - fixture.redis_endpoint().clone() - }; - - let config = BrokerConfig { - redis_endpoint, - consume_timeout: 300000, - backoff: ExponentialBackoffBuilder::new() - .with_initial_interval(Duration::from_millis(1000)) - .with_max_elapsed_time(Some(Duration::from_millis(3000))) - .build(), - }; - let broker = - DefaultBrokerListener::new(config, fixture.chain_id()).await?; - Ok((fixture, broker)) - } - - pub async fn produce_rollups_claims( - fixture: &BrokerFixture<'_>, - n: usize, - epoch_index_start: usize, - ) -> Vec { - let mut rollups_claims = Vec::new(); - for i in 0..n { - let mut rollups_claim = RollupsClaim::default(); - rollups_claim.epoch_index = (i + epoch_index_start) as u64; - fixture.produce_rollups_claim(rollups_claim.clone()).await; - rollups_claims.push(rollups_claim); - } - rollups_claims - } - - /// The last claim should trigger an `EndError` error. - pub async fn produce_last_claim( - fixture: &BrokerFixture<'_>, - epoch_index: usize, - ) -> Vec { - produce_rollups_claims(fixture, 1, epoch_index).await - } - - // ------------------------------------------------------------------------------------------------ - // Listener Unit Tests - // ------------------------------------------------------------------------------------------------ - - #[tokio::test] - async fn instantiate_new_broker_listener_ok() { - let docker = Cli::default(); - let _ = setup_broker(&docker, false).await; - } - - #[tokio::test] - async fn instantiate_new_broker_listener_error() { - let docker = Cli::default(); - let result = setup_broker(&docker, true).await; - assert!(result.is_err(), "setup_broker didn't fail as it should"); - let error = result.err().unwrap().to_string(); - assert_eq!(error, "error connecting to Redis"); - } - - #[tokio::test] - async fn start_broker_listener_with_one_claim_enqueued() { - let docker = Cli::default(); - let (fixture, mut broker_listener) = - setup_broker(&docker, false).await.unwrap(); - let n = 5; - produce_rollups_claims(&fixture, n, 0).await; - produce_last_claim(&fixture, n).await; - let result = broker_listener.listen().await; - assert!(result.is_ok()); - } - - #[tokio::test] - async fn start_broker_listener_with_claims_enqueued() { - let docker = Cli::default(); - let (fixture, mut broker_listener) = - setup_broker(&docker, false).await.unwrap(); - produce_last_claim(&fixture, 0).await; - let claim = broker_listener.listen().await; - assert!(claim.is_ok()); - } - - #[tokio::test] - async fn start_broker_listener_listener_with_no_claims_enqueued() { - let docker = Cli::default(); - let (fixture, mut broker_listener) = - setup_broker(&docker, false).await.unwrap(); - let n = 7; - - let broker_listener_thread = tokio::spawn(async move { - println!("Spawned the broker-listener thread."); - let claim = broker_listener.listen().await; - assert!(claim.is_ok()); - }); - - println!("Going to sleep for 1 second."); - tokio::time::sleep(Duration::from_secs(1)).await; - - let x = 2; - println!("Creating {} claims.", x); - produce_rollups_claims(&fixture, x, 0).await; - - println!("Going to sleep for 2 seconds."); - tokio::time::sleep(Duration::from_secs(2)).await; - - let y = 5; - println!("Creating {} claims.", y); - produce_rollups_claims(&fixture, y, x).await; - - assert_eq!(x + y, n); - produce_last_claim(&fixture, n).await; - - broker_listener_thread.await.unwrap(); - } -} diff --git a/cmd/authority-claimer/src/redacted.rs b/cmd/authority-claimer/src/redacted.rs index 00158e8f4..476ac619b 100644 --- a/cmd/authority-claimer/src/redacted.rs +++ b/cmd/authority-claimer/src/redacted.rs @@ -38,6 +38,7 @@ fn redacts_debug_fmt() { #[derive(Clone)] pub struct RedactedUrl(Url); +#[allow(dead_code)] impl RedactedUrl { pub fn new(url: Url) -> Self { Self(url) diff --git a/cmd/authority-claimer/src/repository.rs b/cmd/authority-claimer/src/repository.rs new file mode 100644 index 000000000..8101ddab2 --- /dev/null +++ b/cmd/authority-claimer/src/repository.rs @@ -0,0 +1,205 @@ +// (c) Cartesi and individual authors (see AUTHORS) +// SPDX-License-Identifier: Apache-2.0 (see LICENSE) + +use async_trait::async_trait; +use ethers::types::H256; +use sea_query::{Alias, Expr, Func, Iden, Order, PostgresQueryBuilder, Query}; +use sea_query_binder::SqlxBinder; +use snafu::{ResultExt, Snafu}; +use sqlx::{ + pool::PoolConnection, postgres::PgPoolOptions, types::Decimal, Pool, + Postgres, +}; +use std::sync::Arc; +use std::{fmt::Debug, time::Duration}; + +use crate::rollups_events::{Address, Hash, RollupsClaim}; + +const REPOSITORY_MIN_CONNECTIONS: u32 = 2; +const REPOSITORY_MAX_CONNECTIONS: u32 = 10; +const REPOSITORY_ACQUIRE_TIMEOUT: Duration = Duration::new(15, 0); + +/// The `Repository` queries the database and gets an unsubmitted claim +#[async_trait] +pub trait Repository: Debug { + type Error: snafu::Error + 'static; + + async fn get_claim( + &mut self, + ) -> Result<(RollupsClaim, Address), Self::Error>; + + async fn update_claim( + &mut self, + id: u64, + tx_hash: H256, + ) -> Result<(), Self::Error>; +} + +#[derive(Debug, Snafu)] +#[snafu(visibility(pub(super)))] +pub enum RepositoryError { + #[snafu(display("database error"))] + DatabaseSqlx { source: sqlx::Error }, + + #[snafu(display("int conversion error"))] + IntConversion { source: std::num::TryFromIntError }, +} + +#[derive(Clone, Debug)] +pub struct DefaultRepository { + // Connection is not thread-safe, we use a connection pool + db_pool: Arc>, +} + +impl DefaultRepository { + /// Create database connection pool, wait until database server is available + pub fn new(endpoint: String) -> Result { + let connection = PgPoolOptions::new() + .acquire_timeout(REPOSITORY_ACQUIRE_TIMEOUT) + .min_connections(REPOSITORY_MIN_CONNECTIONS) + .max_connections(REPOSITORY_MAX_CONNECTIONS) + .connect_lazy(&endpoint) + .context(DatabaseSqlxSnafu)?; + Ok(Self { + db_pool: Arc::new(connection), + }) + } + + /// Obtain a connection from the connection pool + async fn conn(&self) -> PoolConnection { + self.db_pool + .acquire() + .await + .expect("No connections available in the pool") + } +} + +#[async_trait] +impl Repository for DefaultRepository { + type Error = RepositoryError; + + async fn get_claim( + &mut self, + ) -> Result<(RollupsClaim, Address), Self::Error> { + let claim: RollupsClaim; + let iconsensus_address: Address; + let mut conn = self.conn().await; + let (sql, values) = Query::select() + .columns([ + (Epoch::Table, Epoch::Id), + (Epoch::Table, Epoch::ClaimHash), + (Epoch::Table, Epoch::ApplicationAddress), + (Epoch::Table, Epoch::LastBlock), + ]) + .column((Application::Table, Application::IconsensusAddress)) + .from(Epoch::Table) + .inner_join( + Application::Table, + Expr::col((Epoch::Table, Epoch::ApplicationAddress)) + .equals((Application::Table, Application::ContractAddress)), + ) + .and_where(Expr::col((Epoch::Table, Epoch::Status)).eq( + Func::cast_as("CLAIM_COMPUTED", Alias::new("\"EpochStatus\"")), + )) + .order_by(Epoch::Index, Order::Asc) + .order_by((Epoch::Table, Epoch::Id), Order::Asc) + .limit(1) + .build_sqlx(PostgresQueryBuilder); + + loop { + let result = sqlx::query_as_with::<_, QueryResponse, _>( + &sql, + values.clone(), + ) + .fetch_optional(&mut *conn) + .await + .context(DatabaseSqlxSnafu)?; + + match result { + Some(row) => { + claim = RollupsClaim { + id: row.id.try_into().context(IntConversionSnafu)?, + last_block: row.last_block.try_into().unwrap(), + dapp_address: Address::new( + row.application_address.try_into().unwrap(), + ), + output_merkle_root_hash: Hash::new( + row.claim_hash.try_into().unwrap(), + ), + }; + iconsensus_address = Address::new( + row.iconsensus_address.try_into().unwrap(), + ); + break; + } + None => continue, + } + } + + let _ = conn.close().await.context(DatabaseSqlxSnafu)?; + Ok((claim, iconsensus_address)) + } + + async fn update_claim( + &mut self, + id: u64, + tx_hash: H256, + ) -> Result<(), Self::Error> { + let mut conn = self.conn().await; + let (sql, values) = Query::update() + .table(Epoch::Table) + .values([ + ( + Epoch::Status, + Func::cast_as( + "CLAIM_SUBMITTED", + Alias::new("\"EpochStatus\""), + ) + .into(), + ), + ( + Epoch::TransactionHash, + tx_hash.as_fixed_bytes().to_vec().into(), + ), + ]) + .and_where(Expr::col(Epoch::Id).eq(id)) + .build_sqlx(PostgresQueryBuilder); + + let _result = sqlx::query_with(&sql, values) + .execute(&mut *conn) + .await + .context(DatabaseSqlxSnafu)?; + + let _ = conn.close().await.context(DatabaseSqlxSnafu)?; + Ok(()) + } +} + +#[derive(Iden)] +enum Epoch { + Table, + Id, + Index, + Status, + LastBlock, + ClaimHash, + TransactionHash, + ApplicationAddress, +} + +#[derive(Iden)] +enum Application { + Table, + ContractAddress, + IconsensusAddress, +} + +#[derive(sqlx::FromRow, Debug, Clone)] +#[allow(dead_code)] +struct QueryResponse { + id: i64, + last_block: Decimal, + claim_hash: Vec, + application_address: Vec, + iconsensus_address: Vec, +} diff --git a/cmd/authority-claimer/src/rollups_events/broker.rs b/cmd/authority-claimer/src/rollups_events/broker.rs deleted file mode 100644 index 51f31bd17..000000000 --- a/cmd/authority-claimer/src/rollups_events/broker.rs +++ /dev/null @@ -1,411 +0,0 @@ -// (c) Cartesi and individual authors (see AUTHORS) -// SPDX-License-Identifier: Apache-2.0 (see LICENSE) - -use crate::redacted::{RedactedUrl, Url}; -use backoff::{future::retry, ExponentialBackoff, ExponentialBackoffBuilder}; -use clap::Parser; -use redis::{ - aio::{ConnectionLike, ConnectionManager}, - cluster::ClusterClient, - cluster_async::ClusterConnection, - streams::{StreamId, StreamRangeReply, StreamReadOptions, StreamReadReply}, - AsyncCommands, Client, Cmd, Pipeline, RedisError, RedisFuture, Value, -}; -use serde::{de::DeserializeOwned, Serialize}; -use snafu::{ResultExt, Snafu}; -use std::{fmt, time::Duration}; - -pub const INITIAL_ID: &str = "0"; - -/// The `BrokerConnection` enum implements the `ConnectionLike` trait -/// to satisfy the `AsyncCommands` trait bounds. -/// As `AsyncCommands` requires its implementors to be `Sized`, we couldn't -/// use a trait object instead. -#[derive(Clone)] -enum BrokerConnection { - ConnectionManager(ConnectionManager), - ClusterConnection(ClusterConnection), -} - -impl ConnectionLike for BrokerConnection { - fn req_packed_command<'a>( - &'a mut self, - cmd: &'a Cmd, - ) -> RedisFuture<'a, Value> { - match self { - Self::ConnectionManager(connection) => { - connection.req_packed_command(cmd) - } - Self::ClusterConnection(connection) => { - connection.req_packed_command(cmd) - } - } - } - - fn req_packed_commands<'a>( - &'a mut self, - cmd: &'a Pipeline, - offset: usize, - count: usize, - ) -> RedisFuture<'a, Vec> { - match self { - Self::ConnectionManager(connection) => { - connection.req_packed_commands(cmd, offset, count) - } - Self::ClusterConnection(connection) => { - connection.req_packed_commands(cmd, offset, count) - } - } - } - - fn get_db(&self) -> i64 { - match self { - Self::ConnectionManager(connection) => connection.get_db(), - Self::ClusterConnection(connection) => connection.get_db(), - } - } -} - -/// Client that connects to the broker -#[derive(Clone)] -pub struct Broker { - connection: BrokerConnection, - backoff: ExponentialBackoff, - consume_timeout: usize, -} - -impl Broker { - /// Create a new client - /// The broker_address should be in the format redis://host:port/db. - #[tracing::instrument(level = "trace", skip_all)] - pub async fn new(config: BrokerConfig) -> Result { - tracing::trace!(?config, "connecting to broker"); - - let connection = retry(config.backoff.clone(), || async { - match config.redis_endpoint.clone() { - BrokerEndpoint::Single(endpoint) => { - tracing::trace!("creating Redis Client"); - let client = Client::open(endpoint.inner().as_str())?; - - tracing::trace!("creating Redis ConnectionManager"); - let connection = ConnectionManager::new(client).await?; - - Ok(BrokerConnection::ConnectionManager(connection)) - } - BrokerEndpoint::Cluster(endpoints) => { - tracing::trace!("creating Redis Cluster Client"); - let client = ClusterClient::new( - endpoints - .iter() - .map(|endpoint| endpoint.inner().as_str()) - .collect::>(), - )?; - tracing::trace!("connecting to Redis Cluster"); - let connection = client.get_async_connection().await?; - Ok(BrokerConnection::ClusterConnection(connection)) - } - } - }) - .await - .context(ConnectionSnafu)?; - - tracing::trace!("returning successful connection"); - Ok(Self { - connection, - backoff: config.backoff, - consume_timeout: config.consume_timeout, - }) - } - - /// Produce an event and return its id - #[tracing::instrument(level = "trace", skip_all)] - pub async fn produce( - &mut self, - stream: &S, - payload: S::Payload, - ) -> Result { - tracing::trace!("converting payload to JSON string"); - let payload = - serde_json::to_string(&payload).context(InvalidPayloadSnafu)?; - - let event_id = retry(self.backoff.clone(), || async { - tracing::trace!( - stream_key = stream.key(), - payload, - "producing event" - ); - let event_id = self - .connection - .clone() - .xadd(stream.key(), "*", &[("payload", &payload)]) - .await?; - - Ok(event_id) - }) - .await - .context(ConnectionSnafu)?; - - tracing::trace!(event_id, "returning event id"); - Ok(event_id) - } - - /// Peek at the end of the stream - /// This function doesn't block; if there is no event in the stream it returns None. - #[tracing::instrument(level = "trace", skip_all)] - pub async fn peek_latest( - &mut self, - stream: &S, - ) -> Result>, BrokerError> { - let mut reply = retry(self.backoff.clone(), || async { - tracing::trace!(stream_key = stream.key(), "peeking at the stream"); - let reply: StreamRangeReply = self - .connection - .clone() - .xrevrange_count(stream.key(), "+", "-", 1) - .await?; - - Ok(reply) - }) - .await - .context(ConnectionSnafu)?; - - if let Some(event) = reply.ids.pop() { - tracing::trace!("parsing received event"); - Some(event.try_into()).transpose() - } else { - tracing::trace!("stream is empty"); - Ok(None) - } - } - - #[tracing::instrument(level = "trace", skip_all)] - async fn _consume_blocking( - &mut self, - stream: &S, - last_consumed_id: &str, - ) -> Result, BrokerError> { - let mut reply = retry(self.backoff.clone(), || async { - tracing::trace!( - stream_key = stream.key(), - last_consumed_id, - "consuming event" - ); - let opts = StreamReadOptions::default() - .count(1) - .block(self.consume_timeout); - let reply: StreamReadReply = self - .connection - .clone() - .xread_options(&[stream.key()], &[last_consumed_id], &opts) - .await?; - - Ok(reply) - }) - .await - .context(ConnectionSnafu)?; - - tracing::trace!("checking for timeout"); - let mut events = reply.keys.pop().ok_or(BrokerError::ConsumeTimeout)?; - - tracing::trace!("checking if event was received"); - let event = events.ids.pop().ok_or(BrokerError::FailedToConsume)?; - - tracing::trace!("parsing received event"); - event.try_into() - } - - /// Consume the next event in stream - /// - /// This function blocks until a new event is available - /// and retries whenever a timeout happens instead of returning an error. - /// - /// To consume the first event in the stream, `last_consumed_id` should be `INITIAL_ID`. - #[tracing::instrument(level = "trace", skip_all)] - pub async fn consume_blocking( - &mut self, - stream: &S, - last_consumed_id: &str, - ) -> Result, BrokerError> { - loop { - let result = self._consume_blocking(stream, last_consumed_id).await; - - if let Err(BrokerError::ConsumeTimeout) = result { - tracing::trace!("consume timed out, retrying"); - } else { - return result; - } - } - } - - /// Consume the next event in stream without blocking - /// This function returns None if there are no more remaining events. - /// To consume the first event in the stream, `last_consumed_id` should be `INITIAL_ID`. - #[tracing::instrument(level = "trace", skip_all)] - pub async fn consume_nonblocking( - &mut self, - stream: &S, - last_consumed_id: &str, - ) -> Result>, BrokerError> { - let mut reply = retry(self.backoff.clone(), || async { - tracing::trace!( - stream_key = stream.key(), - last_consumed_id, - "consuming event (non-blocking)" - ); - let opts = StreamReadOptions::default().count(1); - let reply: StreamReadReply = self - .connection - .clone() - .xread_options(&[stream.key()], &[last_consumed_id], &opts) - .await?; - - Ok(reply) - }) - .await - .context(ConnectionSnafu)?; - - tracing::trace!("checking if event was received"); - if let Some(mut events) = reply.keys.pop() { - let event = events.ids.pop().ok_or(BrokerError::FailedToConsume)?; - tracing::trace!("parsing received event"); - Some(event.try_into()).transpose() - } else { - tracing::trace!("stream is empty"); - Ok(None) - } - } -} - -/// Custom implementation of Debug because ConnectionManager doesn't implement debug -impl fmt::Debug for Broker { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("Broker") - .field("consume_timeout", &self.consume_timeout) - .finish() - } -} - -/// Trait that defines the type of a stream -pub trait BrokerStream { - type Payload: Serialize + DeserializeOwned + Clone + Eq + PartialEq; - fn key(&self) -> &str; -} - -/// Event that goes through the broker -#[derive(Debug, Clone, Eq, PartialEq)] -pub struct Event { - pub id: String, - pub payload: P, -} - -impl TryFrom - for Event

-{ - type Error = BrokerError; - - #[tracing::instrument(level = "trace", skip_all)] - fn try_from(stream_id: StreamId) -> Result, BrokerError> { - tracing::trace!("getting event payload"); - let payload = stream_id - .get::("payload") - .ok_or(BrokerError::InvalidEvent)?; - let id = stream_id.id; - - tracing::trace!(id, payload, "received event"); - - tracing::trace!("parsing JSON payload"); - let payload = - serde_json::from_str(&payload).context(InvalidPayloadSnafu)?; - - tracing::trace!("returning event"); - Ok(Event { id, payload }) - } -} - -#[derive(Debug, Snafu)] -pub enum BrokerError { - #[snafu(display("error connecting to Redis"))] - ConnectionError { source: RedisError }, - - #[snafu(display("failed to consume event"))] - FailedToConsume, - - #[snafu(display("timed out when consuming event"))] - ConsumeTimeout, - - #[snafu(display("event in invalid format"))] - InvalidEvent, - - #[snafu(display("error parsing event payload"))] - InvalidPayload { source: serde_json::Error }, -} - -#[derive(Debug, Parser)] -#[command(name = "broker")] -pub struct BrokerCLIConfig { - /// Redis address - #[arg(long, env, default_value = "redis://127.0.0.1:6379")] - redis_endpoint: String, - - /// Address list of Redis cluster nodes, defined by a single string - /// separated by commas. If present, it supersedes `redis_endpoint`. - /// A single endpoint can be enough as the client will discover - /// other nodes automatically - #[arg(long, env, num_args = 1.., value_delimiter = ',')] - redis_cluster_endpoints: Option>, - - /// Timeout when consuming input events (in millis) - #[arg(long, env, default_value = "5000")] - broker_consume_timeout: usize, - - /// The max elapsed time for backoff in ms - #[arg(long, env, default_value = "120000")] - broker_backoff_max_elapsed_duration: u64, -} - -#[derive(Debug, Clone)] -pub enum BrokerEndpoint { - Single(RedactedUrl), - Cluster(Vec), -} - -#[derive(Debug, Clone)] -pub struct BrokerConfig { - pub redis_endpoint: BrokerEndpoint, - pub consume_timeout: usize, - pub backoff: ExponentialBackoff, -} - -impl From for BrokerConfig { - fn from(cli_config: BrokerCLIConfig) -> BrokerConfig { - let max_elapsed_time = Duration::from_millis( - cli_config.broker_backoff_max_elapsed_duration, - ); - let backoff = ExponentialBackoffBuilder::new() - .with_max_elapsed_time(Some(max_elapsed_time)) - .build(); - let redis_endpoint = - if let Some(endpoints) = cli_config.redis_cluster_endpoints { - let urls = endpoints - .iter() - .map(|endpoint| { - RedactedUrl::new( - Url::parse(endpoint) - .expect("failed to parse Redis URL"), - ) - }) - .collect(); - BrokerEndpoint::Cluster(urls) - } else { - let url = Url::parse(&cli_config.redis_endpoint) - .map(RedactedUrl::new) - .expect("failed to parse Redis URL"); - BrokerEndpoint::Single(url) - }; - BrokerConfig { - redis_endpoint, - consume_timeout: cli_config.broker_consume_timeout, - backoff, - } - } -} diff --git a/cmd/authority-claimer/src/rollups_events/mod.rs b/cmd/authority-claimer/src/rollups_events/mod.rs index bc4506b02..60ad85f27 100644 --- a/cmd/authority-claimer/src/rollups_events/mod.rs +++ b/cmd/authority-claimer/src/rollups_events/mod.rs @@ -1,15 +1,10 @@ // (c) Cartesi and individual authors (see AUTHORS) // SPDX-License-Identifier: Apache-2.0 (see LICENSE) -pub mod broker; pub mod common; pub mod rollups_claims; pub mod rollups_stream; -pub use broker::{ - Broker, BrokerCLIConfig, BrokerConfig, BrokerError, BrokerStream, - INITIAL_ID, -}; pub use common::{Address, Hash, HexArrayError}; -pub use rollups_claims::{RollupsClaim, RollupsClaimsStream}; +pub use rollups_claims::RollupsClaim; pub use rollups_stream::DAppMetadata; diff --git a/cmd/authority-claimer/src/rollups_events/rollups_claims.rs b/cmd/authority-claimer/src/rollups_events/rollups_claims.rs index 2d216ed61..e2de524c1 100644 --- a/cmd/authority-claimer/src/rollups_events/rollups_claims.rs +++ b/cmd/authority-claimer/src/rollups_events/rollups_claims.rs @@ -1,45 +1,21 @@ // (c) Cartesi and individual authors (see AUTHORS) // SPDX-License-Identifier: Apache-2.0 (see LICENSE) -use super::{Address, BrokerStream, Hash}; +use super::{Address, Hash}; use serde::{Deserialize, Serialize}; -#[derive(Debug)] -pub struct RollupsClaimsStream { - key: String, -} - -impl BrokerStream for RollupsClaimsStream { - type Payload = RollupsClaim; - - fn key(&self) -> &str { - &self.key - } -} - -impl RollupsClaimsStream { - pub fn new(chain_id: u64) -> Self { - Self { - key: format!("{{chain-{}}}:rollups-claims", chain_id), - } - } -} - /// Event generated when the Cartesi Rollups epoch finishes #[derive(Debug, Default, Clone, Eq, PartialEq, Serialize, Deserialize)] pub struct RollupsClaim { - // DApp address - pub dapp_address: Address, + // Claim id + pub id: u64, - /// Epoch index - pub epoch_index: u64, + // Last block processed in the claim + pub last_block: u64, - /// Hash of the Epoch - pub epoch_hash: Hash, - - /// Index of the first input of the Epoch - pub first_index: u128, + // DApp address + pub dapp_address: Address, - /// Index of the last input of the Epoch - pub last_index: u128, + /// Hash of the output merkle root + pub output_merkle_root_hash: Hash, } diff --git a/cmd/authority-claimer/src/sender.rs b/cmd/authority-claimer/src/sender.rs index a5f4ed937..1210ca6f6 100644 --- a/cmd/authority-claimer/src/sender.rs +++ b/cmd/authority-claimer/src/sender.rs @@ -2,7 +2,7 @@ // SPDX-License-Identifier: Apache-2.0 (see LICENSE) use crate::{ - contracts::iconsensus::{IConsensus, InputRange}, + contracts::iconsensus::IConsensus, metrics::AuthorityClaimerMetrics, rollups_events::{Address, DAppMetadata, RollupsClaim}, signer::ConditionalSigner, @@ -20,10 +20,8 @@ use eth_tx_manager::{ use ethers::{ self, middleware::SignerMiddleware, - providers::{ - Http, HttpRateLimitRetryPolicy, MockProvider, Provider, RetryClient, - }, - types::{NameOrAddress, H160}, + providers::{Http, HttpRateLimitRetryPolicy, Provider, RetryClient}, + types::{NameOrAddress, H160, H256, U256}, }; use snafu::{OptionExt, ResultExt, Snafu}; use std::{fmt::Debug, sync::Arc}; @@ -43,7 +41,8 @@ pub trait TransactionSender: Sized + Debug { async fn send_rollups_claim_transaction( self, rollups_claim: RollupsClaim, - ) -> Result; + iconsensus: Address, + ) -> Result<(H256, Self), Self::Error>; } // ------------------------------------------------------------------------------------------------ @@ -79,7 +78,6 @@ pub struct DefaultTransactionSender { confirmations: usize, priority: Priority, from: ethers::types::Address, - iconsensus: IConsensus>, chain_id: u64, metrics: AuthorityClaimerMetrics, } @@ -149,7 +147,6 @@ impl DefaultTransactionSender { tx_manager_config: TxManagerConfig, tx_manager_priority: Priority, conditional_signer: ConditionalSigner, - iconsensus: Address, from: ethers::types::Address, chain_id: u64, metrics: AuthorityClaimerMetrics, @@ -164,18 +161,10 @@ impl DefaultTransactionSender { ) .await?; - let iconsensus = { - let (provider, _mock) = Provider::mocked(); - let provider = Arc::new(provider); - let address: H160 = iconsensus.into_inner().into(); - IConsensus::new(address, provider) - }; - Ok(Self { tx_manager, confirmations: tx_manager_config.default_confirmations, priority: tx_manager_priority, - iconsensus, from, chain_id, metrics, @@ -190,20 +179,23 @@ impl TransactionSender for DefaultTransactionSender { async fn send_rollups_claim_transaction( self, rollups_claim: RollupsClaim, - ) -> Result { + iconsensus: Address, + ) -> Result<(H256, Self), Self::Error> { let dapp_address = rollups_claim.dapp_address.clone(); + let iconsensus = { + let (provider, _mock) = Provider::mocked(); + let provider = Arc::new(provider); + let address: H160 = iconsensus.into_inner().into(); + IConsensus::new(address, provider) + }; + let transaction = { - let input_range = InputRange { - first_index: rollups_claim.first_index as u64, - last_index: rollups_claim.last_index as u64, - }; - let call = self - .iconsensus + let call = iconsensus .submit_claim( H160(dapp_address.inner().to_owned()), - input_range, - rollups_claim.epoch_hash.into_inner(), + U256([rollups_claim.last_block, 0, 0, 0]), + rollups_claim.output_merkle_root_hash.into_inner(), ) .from(self.from); let to = match call.tx.to().context(InternalEthersSnafu)? { @@ -232,8 +224,8 @@ impl TransactionSender for DefaultTransactionSender { dapp_address, }) .inc(); - trace!("Claim transaction confirmed: `{:?}`", receipt); + info!("Claim transaction confirmed: `{:?}`", receipt); - Ok(Self { tx_manager, ..self }) + Ok((receipt.transaction_hash, Self { tx_manager, ..self })) } } diff --git a/cmd/authority-claimer/src/test_fixtures.rs b/cmd/authority-claimer/src/test_fixtures.rs deleted file mode 100644 index 2e7dc0f69..000000000 --- a/cmd/authority-claimer/src/test_fixtures.rs +++ /dev/null @@ -1,154 +0,0 @@ -// (c) Cartesi and individual authors (see AUTHORS) -// SPDX-License-Identifier: Apache-2.0 (see LICENSE) - -use crate::{ - redacted::{RedactedUrl, Url}, - rollups_events::{ - broker::BrokerEndpoint, common::ADDRESS_SIZE, Address, Broker, - BrokerConfig, DAppMetadata, RollupsClaim, RollupsClaimsStream, - INITIAL_ID, - }, -}; -use backoff::ExponentialBackoff; -use testcontainers::{ - clients::Cli, core::WaitFor, images::generic::GenericImage, Container, -}; -use tokio::sync::Mutex; - -const CHAIN_ID: u64 = 0; -const DAPP_ADDRESS: Address = Address::new([0xfa; ADDRESS_SIZE]); -const CONSUME_TIMEOUT: usize = 10_000; // ms - -pub struct BrokerFixture<'d> { - _node: Container<'d, GenericImage>, - client: Mutex, - claims_stream: RollupsClaimsStream, - redis_endpoint: BrokerEndpoint, - chain_id: u64, -} - -impl BrokerFixture<'_> { - #[tracing::instrument(level = "trace", skip_all)] - pub async fn setup(docker: &Cli) -> BrokerFixture<'_> { - tracing::info!("setting up redis fixture"); - - tracing::trace!("starting redis docker container"); - let image = GenericImage::new("redis", "6.2").with_wait_for( - WaitFor::message_on_stdout("Ready to accept connections"), - ); - let node = docker.run(image); - let port = node.get_host_port_ipv4(6379); - let redis_endpoint = BrokerEndpoint::Single( - Url::parse(&format!("redis://127.0.0.1:{}", port)) - .map(RedactedUrl::new) - .expect("failed to parse Redis Url"), - ); - let chain_id = CHAIN_ID; - let backoff = ExponentialBackoff::default(); - let metadata = DAppMetadata { - chain_id, - dapp_address: DAPP_ADDRESS.clone(), - }; - let claims_stream = RollupsClaimsStream::new(metadata.chain_id); - let config = BrokerConfig { - redis_endpoint: redis_endpoint.clone(), - consume_timeout: CONSUME_TIMEOUT, - backoff, - }; - - tracing::trace!( - ?redis_endpoint, - "connecting to redis with rollups_events crate" - ); - let client = Mutex::new( - Broker::new(config) - .await - .expect("failed to connect to broker"), - ); - BrokerFixture { - _node: node, - client, - claims_stream, - redis_endpoint, - chain_id, - } - } - - pub fn redis_endpoint(&self) -> &BrokerEndpoint { - &self.redis_endpoint - } - - pub fn chain_id(&self) -> u64 { - self.chain_id - } - - /// Produce the claim given the hash - #[tracing::instrument(level = "trace", skip_all)] - pub async fn produce_rollups_claim(&self, rollups_claim: RollupsClaim) { - tracing::trace!(?rollups_claim.epoch_hash, "producing rollups-claim event"); - { - let last_claim = self - .client - .lock() - .await - .peek_latest(&self.claims_stream) - .await - .expect("failed to get latest claim"); - let epoch_index = match last_claim { - Some(event) => event.payload.epoch_index + 1, - None => 0, - }; - assert_eq!( - rollups_claim.epoch_index, epoch_index, - "invalid epoch index", - ); - } - self.client - .lock() - .await - .produce(&self.claims_stream, rollups_claim) - .await - .expect("failed to produce claim"); - } - - /// Obtain all produced claims - #[tracing::instrument(level = "trace", skip_all)] - pub async fn consume_all_claims(&self) -> Vec { - tracing::trace!("consuming all rollups-claims events"); - let mut claims = vec![]; - let mut last_id = INITIAL_ID.to_owned(); - while let Some(event) = self - .client - .lock() - .await - .consume_nonblocking(&self.claims_stream, &last_id) - .await - .expect("failed to consume claim") - { - claims.push(event.payload); - last_id = event.id; - } - claims - } - - /// Obtain the first n produced claims - /// Panic in case of timeout - #[tracing::instrument(level = "trace", skip_all)] - pub async fn consume_n_claims(&self, n: usize) -> Vec { - tracing::trace!(n, "consuming n rollups-claims events"); - let mut claims = vec![]; - let mut last_id = INITIAL_ID.to_owned(); - for _ in 0..n { - let event = self - .client - .lock() - .await - .consume_blocking(&self.claims_stream, &last_id) - .await - .expect("failed to consume claim"); - claims.push(event.payload); - last_id = event.id - } - claims - } -} diff --git a/docs/config.md b/docs/config.md index 7c067eaf5..67a4653ae 100644 --- a/docs/config.md +++ b/docs/config.md @@ -124,12 +124,6 @@ Address of the DApp's contract. * **Type:** `string` -## `CARTESI_CONTRACTS_ICONSENSUS_ADDRESS` - -Address of the IConsensus contract. - -* **Type:** `string` - ## `CARTESI_CONTRACTS_INPUT_BOX_ADDRESS` Address of the InputBox contract. diff --git a/internal/node/config/config.go b/internal/node/config/config.go index 0cb4f1db3..a67323e49 100644 --- a/internal/node/config/config.go +++ b/internal/node/config/config.go @@ -26,7 +26,6 @@ type NodeConfig struct { EvmReaderRetryPolicyMaxDelay Duration BlockchainBlockTimeout int ContractsApplicationAddress string - ContractsIConsensusAddress string ContractsInputBoxAddress string ContractsInputBoxDeploymentBlockNumber int64 SnapshotDir string @@ -88,7 +87,6 @@ func FromEnv() NodeConfig { config.EvmReaderRetryPolicyMaxDelay = getEvmReaderRetryPolicyMaxDelay() config.BlockchainBlockTimeout = getBlockchainBlockTimeout() config.ContractsApplicationAddress = getContractsApplicationAddress() - config.ContractsIConsensusAddress = getContractsIconsensusAddress() config.ContractsInputBoxAddress = getContractsInputBoxAddress() config.ContractsInputBoxDeploymentBlockNumber = getContractsInputBoxDeploymentBlockNumber() config.SnapshotDir = getSnapshotDir() diff --git a/internal/node/config/generate/Config.toml b/internal/node/config/generate/Config.toml index df75f01e1..a1b9dfe27 100644 --- a/internal/node/config/generate/Config.toml +++ b/internal/node/config/generate/Config.toml @@ -117,11 +117,6 @@ go-type = "string" description = """ Address of the DApp's contract.""" -[contracts.CARTESI_CONTRACTS_ICONSENSUS_ADDRESS] -go-type = "string" -description = """ -Address of the IConsensus contract.""" - [contracts.CARTESI_CONTRACTS_INPUT_BOX_ADDRESS] go-type = "string" description = """ diff --git a/internal/node/config/generated.go b/internal/node/config/generated.go index b92903195..24c899e2e 100644 --- a/internal/node/config/generated.go +++ b/internal/node/config/generated.go @@ -312,18 +312,6 @@ func getContractsApplicationAddress() string { return val } -func getContractsIconsensusAddress() string { - s, ok := os.LookupEnv("CARTESI_CONTRACTS_ICONSENSUS_ADDRESS") - if !ok { - panic("missing env var CARTESI_CONTRACTS_ICONSENSUS_ADDRESS") - } - val, err := toString(s) - if err != nil { - panic(fmt.Sprintf("failed to parse CARTESI_CONTRACTS_ICONSENSUS_ADDRESS: %v", err)) - } - return val -} - func getContractsInputBoxAddress() string { s, ok := os.LookupEnv("CARTESI_CONTRACTS_INPUT_BOX_ADDRESS") if !ok { diff --git a/internal/node/services.go b/internal/node/services.go index eb5a098b9..2dd21d03b 100644 --- a/internal/node/services.go +++ b/internal/node/services.go @@ -21,26 +21,14 @@ type portOffset = int const ( portOffsetProxy = iota portOffsetAuthorityClaimer - portOffsetRedis portOffsetPostgraphile ) -const localhost = "127.0.0.1" - // Get the port of the given service. func getPort(c config.NodeConfig, offset portOffset) int { return c.HttpPort + int(offset) } -// Get the redis endpoint based on whether the experimental sunodo validator mode is enabled. -func getRedisEndpoint(c config.NodeConfig) string { - if c.ExperimentalSunodoValidatorEnabled { - return c.ExperimentalSunodoValidatorRedisEndpoint - } else { - return fmt.Sprintf("redis://%v:%v", localhost, getPort(c, portOffsetRedis)) - } -} - // Create the RUST_LOG variable using the config log level. // If the log level is set to debug, set tracing log for the given rust module. func getRustLog(c config.NodeConfig, rustModule string) string { @@ -72,8 +60,8 @@ func newAuthorityClaimer(c config.NodeConfig, workDir string) services.CommandSe s.Env = append(s.Env, fmt.Sprintf("TX_CHAIN_IS_LEGACY=%v", c.LegacyBlockchainEnabled)) s.Env = append(s.Env, fmt.Sprintf("TX_DEFAULT_CONFIRMATIONS=%v", c.BlockchainFinalityOffset)) - s.Env = append(s.Env, fmt.Sprintf("REDIS_ENDPOINT=%v", getRedisEndpoint(c))) - s.Env = append(s.Env, fmt.Sprintf("ICONSENSUS_ADDRESS=%v", c.ContractsIConsensusAddress)) + s.Env = append(s.Env, fmt.Sprintf("POSTGRES_ENDPOINT=%v", + fmt.Sprintf("%v", c.PostgresEndpoint.Value))) s.Env = append(s.Env, fmt.Sprintf("INPUT_BOX_ADDRESS=%v", c.ContractsInputBoxAddress)) s.Env = append(s.Env, fmt.Sprintf("GENESIS_BLOCK=%v", c.ContractsInputBoxDeploymentBlockNumber)) @@ -99,20 +87,6 @@ func newAuthorityClaimer(c config.NodeConfig, workDir string) services.CommandSe return s } -func newRedis(c config.NodeConfig, workDir string) services.CommandService { - var s services.CommandService - s.Name = "redis" - s.HealthcheckPort = getPort(c, portOffsetRedis) - s.Path = "redis-server" - s.Args = append(s.Args, "--port", fmt.Sprint(getPort(c, portOffsetRedis))) - // Disable persistence with --save and --appendonly config - s.Args = append(s.Args, "--save", "") - s.Args = append(s.Args, "--appendonly", "no") - s.Env = append(s.Env, os.Environ()...) - s.WorkDir = workDir - return s -} - func newSupervisorService( c config.NodeConfig, workDir string, @@ -120,11 +94,6 @@ func newSupervisorService( ) services.SupervisorService { var s []services.Service - if !c.ExperimentalSunodoValidatorEnabled { - // add Redis first - s = append(s, newRedis(c, workDir)) - } - // enable claimer if reader mode and sunodo validator mode are not enabled if c.FeatureClaimerEnabled && !c.ExperimentalSunodoValidatorEnabled { s = append(s, newAuthorityClaimer(c, workDir)) diff --git a/test/config.go b/test/config.go index 520dcd866..7b8911390 100644 --- a/test/config.go +++ b/test/config.go @@ -52,7 +52,6 @@ func NewLocalNodeConfig(localPostgresEndpoint string, localBlockchainHttpEndpoin //Contracts nodeConfig.ContractsApplicationAddress = book.Application.Hex() - nodeConfig.ContractsIConsensusAddress = book.Authority.Hex() nodeConfig.ContractsInputBoxAddress = book.InputBox.Hex() nodeConfig.ContractsInputBoxDeploymentBlockNumber = LocalInputBoxDeploymentBlockNumber