Skip to content

Commit

Permalink
feat(code): Add persistence to the block store (#514)
Browse files Browse the repository at this point in the history
* feat(code): Add persistence to the block store

* Fix clippy warnings

* Revert log message change

* scripts: Delete pre-existing database before spawning node
  • Loading branch information
romac authored Nov 4, 2024
1 parent 1d800c3 commit a94bbb4
Show file tree
Hide file tree
Showing 16 changed files with 356 additions and 142 deletions.
17 changes: 16 additions & 1 deletion code/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions code/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ prost-types = "0.13"
ractor = "0.12.4"
rand = { version = "0.8.5", features = ["std_rng"] }
rand_chacha = "0.3.1"
redb = "2.2.0"
seahash = "4.1"
serde = "1.0"
serde_json = "1.0"
Expand Down
5 changes: 4 additions & 1 deletion code/crates/cli/src/cmd/start.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ impl StartCmd {
pub async fn run(
&self,
cfg: Config,
home_dir: PathBuf,
private_key_file: PathBuf,
genesis_file: PathBuf,
) -> Result<()> {
Expand All @@ -45,10 +46,12 @@ impl StartCmd {
let (actor, handle) = match cfg.app {
App::Starknet => {
use malachite_starknet_app::spawn::spawn_node_actor;

let start_height = self
.start_height
.map(|height| malachite_starknet_app::types::Height::new(height, 1));
spawn_node_actor(cfg, genesis, private_key, start_height, None).await

spawn_node_actor(cfg, home_dir, genesis, private_key, start_height, None).await
}
};

Expand Down
1 change: 1 addition & 0 deletions code/crates/cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ fn start(args: &Args, cfg: Config, cmd: &StartCmd) -> Result<()> {
let rt = builder.enable_all().build()?;
rt.block_on(cmd.run(
cfg,
args.get_home_dir()?,
args.get_priv_validator_key_file_path()?,
args.get_genesis_file_path()?,
))
Expand Down
1 change: 0 additions & 1 deletion code/crates/starknet/app/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
#![allow(unexpected_cfgs)]
#![cfg_attr(coverage_nightly, feature(coverage_attribute))]

pub mod codec;
pub mod node;
pub mod spawn;

Expand Down
8 changes: 6 additions & 2 deletions code/crates/starknet/app/src/spawn.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::path::PathBuf;
use std::time::Duration;

use libp2p_identity::ecdsa;
Expand Down Expand Up @@ -28,10 +29,11 @@ use malachite_starknet_host::mock::context::MockContext;
use malachite_starknet_host::mock::host::{MockHost, MockParams};
use malachite_starknet_host::types::{Address, Height, PrivateKey, ValidatorSet};

use crate::codec::ProtobufCodec;
use malachite_starknet_host::codec::ProtobufCodec;

pub async fn spawn_node_actor(
cfg: NodeConfig,
home_dir: PathBuf,
initial_validator_set: ValidatorSet,
private_key: PrivateKey,
start_height: Option<Height>,
Expand All @@ -54,6 +56,7 @@ pub async fn spawn_node_actor(

// Spawn the host actor
let host = spawn_host_actor(
home_dir,
&cfg,
&address,
&initial_validator_set,
Expand Down Expand Up @@ -242,6 +245,7 @@ async fn spawn_gossip_mempool_actor(
}

async fn spawn_host_actor(
home_dir: PathBuf,
cfg: &NodeConfig,
address: &Address,
initial_validator_set: &ValidatorSet,
Expand All @@ -266,7 +270,7 @@ async fn spawn_host_actor(
initial_validator_set.clone(),
);

StarknetHost::spawn(mock_host, mempool, gossip_consensus, metrics)
StarknetHost::spawn(home_dir, mock_host, mempool, gossip_consensus, metrics)
.await
.unwrap()
}
7 changes: 6 additions & 1 deletion code/crates/starknet/host/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,23 +10,28 @@ rust-version.workspace = true
[dependencies]
malachite-actors.workspace = true
malachite-blocksync.workspace = true
malachite-consensus.workspace = true
malachite-gossip-consensus.workspace = true
malachite-common.workspace = true
malachite-metrics.workspace = true
malachite-config.workspace = true
malachite-gossip-mempool.workspace = true
malachite-proto.workspace = true
malachite-starknet-p2p-types.workspace = true
malachite-starknet-p2p-proto.workspace = true

bytes = { workspace = true, features = ["serde"] }

starknet-core.workspace = true
async-trait.workspace = true
bytesize.workspace = true
derive-where.workspace = true
itertools.workspace = true
libp2p-identity.workspace = true
eyre.workspace = true
prost.workspace = true
ractor.workspace = true
rand.workspace = true
redb.workspace = true
sha3.workspace = true
tokio.workspace = true
tracing.workspace = true
Expand Down
57 changes: 30 additions & 27 deletions code/crates/starknet/host/src/actor.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use std::path::PathBuf;
use std::sync::Arc;

use eyre::eyre;

use itertools::Itertools;
use ractor::{async_trait, Actor, ActorProcessingErr, SpawnErr};
use rand::RngCore;
use sha3::Digest;
Expand Down Expand Up @@ -45,13 +45,13 @@ pub struct HostState {
next_stream_id: StreamId,
}

impl Default for HostState {
fn default() -> Self {
impl HostState {
fn new(home_dir: PathBuf) -> Self {
Self {
height: Height::new(0, 0),
round: Round::Nil,
proposer: None,
block_store: BlockStore::default(),
block_store: BlockStore::new(home_dir.join("blocks.db")).unwrap(),
part_store: PartStore::default(),
part_streams_map: PartStreamsMap::default(),
next_stream_id: StreamId::default(),
Expand All @@ -78,6 +78,7 @@ impl StarknetHost {
}

pub async fn spawn(
home_dir: PathBuf,
host: MockHost,
mempool: MempoolRef,
gossip_consensus: GossipConsensusRef<MockContext>,
Expand All @@ -86,7 +87,7 @@ impl StarknetHost {
let (actor_ref, _) = Actor::spawn(
None,
Self::new(host, mempool, gossip_consensus, metrics),
HostState::default(),
HostState::new(home_dir),
)
.await?;

Expand Down Expand Up @@ -191,8 +192,8 @@ impl StarknetHost {
let all_parts = state.part_store.all_parts(height, round);

trace!(
count = state.part_store.blocks_stored(),
"The store has blocks"
count = state.part_store.blocks_count(),
"Blocks for which we have parts"
);

// TODO: Do more validations, e.g. there is no higher tx proposal part,
Expand All @@ -213,18 +214,19 @@ impl StarknetHost {
self.build_value_from_parts(&all_parts, height, round)
}

fn store_block(&self, state: &mut HostState) {
let max_height = state.block_store.store_keys().last().unwrap_or_default();
async fn prune_block_store(&self, state: &mut HostState) {
let max_height = state.block_store.last_height().unwrap_or_default();
let max_retain_blocks = self.host.params().max_retain_blocks as u64;

let min_number_blocks: u64 = std::cmp::min(
self.host.params().max_retain_blocks as u64,
max_height.as_u64(),
);

let retain_height =
Height::new(max_height.as_u64() - min_number_blocks, max_height.fork_id);
// Compute the height to retain blocks higher than
let retain_height = max_height.as_u64().saturating_sub(max_retain_blocks);
if retain_height == 0 {
// No need to prune anything, since we would retain every blocks
return;
}

state.block_store.prune(retain_height);
let retain_height = Height::new(retain_height, max_height.fork_id);
state.block_store.prune(retain_height).await;
}
}

Expand Down Expand Up @@ -262,8 +264,7 @@ impl Actor for StarknetHost {
}

HostMsg::GetEarliestBlockHeight { reply_to } => {
let earliest_block_height =
state.block_store.store_keys().next().unwrap_or_default();
let earliest_block_height = state.block_store.first_height().unwrap_or_default();
reply_to.send(earliest_block_height)?;
Ok(())
}
Expand Down Expand Up @@ -405,7 +406,10 @@ impl Actor for StarknetHost {
}

// Build the block from proposal parts and commits and store it
state.block_store.store(&proposal, &all_txes, &commits);
state
.block_store
.store(&proposal, &all_txes, &commits)
.await;

// Update metrics
let block_size: usize = all_parts.iter().map(|p| p.size_bytes()).sum();
Expand Down Expand Up @@ -436,7 +440,7 @@ impl Actor for StarknetHost {
state.part_store.prune(state.height);

// Store the block
self.store_block(state);
self.prune_block_store(state).await;

// Notify the mempool to remove corresponding txs
self.mempool.cast(MempoolMsg::Update { tx_hashes })?;
Expand All @@ -455,13 +459,12 @@ impl Actor for StarknetHost {
HostMsg::GetDecidedBlock { height, reply_to } => {
debug!(%height, "Received request for block");

match state.block_store.store.get(&height).cloned() {
match state.block_store.get(height).await {
None => {
warn!(
%height,
"No block found, available blocks: {}",
state.block_store.store_keys().format(", ")
);
let min = state.block_store.first_height().unwrap_or_default();
let max = state.block_store.last_height().unwrap_or_default();

warn!(%height, "No block for this height, available blocks: {min}..={max}");

reply_to.send(None)?;
}
Expand Down
Loading

0 comments on commit a94bbb4

Please sign in to comment.