Skip to content
This repository has been archived by the owner on Oct 31, 2024. It is now read-only.

Commit

Permalink
feat: add lru cache for delivered certificates and known signatures
Browse files Browse the repository at this point in the history
  • Loading branch information
gruberb committed Apr 25, 2024
1 parent 922e26d commit 57d260e
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 51 deletions.
7 changes: 4 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ lazy_static = "1"
rand = { version = "0.8", default-features = false }
rand_core = { version = "0.6", default-features = false }
rand_distr = { version = "0.4", default-features = false }
lru = "0.12.3"

# Async & Tokio related
async-stream = { version = "0.3", default-features = false }
Expand Down
1 change: 1 addition & 0 deletions crates/topos-tce-broadcast/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ topos-config = { path = "../topos-config/" }
topos-metrics = { path = "../topos-metrics/" }
topos-tce-storage = { path = "../topos-tce-storage/" }
topos-crypto = { path = "../topos-crypto" }
lru.workspace = true

[dev-dependencies]
criterion = { version = "0.5.1", features = ["async_futures", "async_tokio"] }
Expand Down
73 changes: 26 additions & 47 deletions crates/topos-tce-broadcast/src/double_echo/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,18 @@
use crate::event::ProtocolEvents;
use crate::{DoubleEchoCommand, SubscriptionsView};
use lru::LruCache;
use std::collections::HashSet;
use std::num::NonZeroUsize;
use std::sync::Arc;
use tokio::sync::{broadcast, mpsc, oneshot};
use tokio_util::sync::CancellationToken;
use topos_config::tce::broadcast::ReliableBroadcastParams;
use topos_core::{types::ValidatorId, uci::CertificateId};
use topos_crypto::messages::{MessageSigner, Signature};
use topos_tce_storage::store::ReadStore;
use topos_tce_storage::types::CertificateDeliveredWithPositions;
use topos_tce_storage::validator::ValidatorStore;
use tracing::{debug, error, info, warn};
use tracing::{debug, info, warn};

pub mod broadcast_state;

Expand All @@ -49,14 +50,15 @@ pub struct DoubleEcho {
/// List of approved validators through smart contract and/or genesis
pub validators: HashSet<ValidatorId>,
pub validator_store: Arc<ValidatorStore>,
pub known_signatures: HashSet<Signature>,
pub known_signatures: LruCache<Signature, ()>,
pub broadcast_sender: broadcast::Sender<CertificateDeliveredWithPositions>,

pub task_manager_cancellation: CancellationToken,
}

impl DoubleEcho {
pub const MAX_BUFFER_SIZE: usize = 1024 * 20;
pub const KNOWN_SIGNATURES_CACHE_SIZE: usize = 15 * 10_000;

#[allow(clippy::too_many_arguments)]
pub fn new(
Expand Down Expand Up @@ -86,7 +88,9 @@ impl DoubleEcho {
},
shutdown,
validator_store,
known_signatures: HashSet::new(),
known_signatures: LruCache::new(
NonZeroUsize::new(Self::KNOWN_SIGNATURES_CACHE_SIZE).unwrap(),
),
broadcast_sender,
task_manager_cancellation: CancellationToken::new(),
}
Expand Down Expand Up @@ -167,7 +171,7 @@ impl DoubleEcho {
continue;
}

self.known_signatures.insert(signature);
self.known_signatures.push(signature, ());

self.handle_echo(certificate_id, validator_id, signature).await
},
Expand All @@ -184,7 +188,6 @@ impl DoubleEcho {
continue;
}


let mut payload = Vec::new();
payload.extend_from_slice(certificate_id.as_array());
payload.extend_from_slice(validator_id.as_bytes());
Expand All @@ -194,7 +197,7 @@ impl DoubleEcho {
continue;
}

self.known_signatures.insert(signature);
self.known_signatures.push(signature, ());

self.handle_ready(certificate_id, validator_id, signature).await
},
Expand Down Expand Up @@ -230,26 +233,14 @@ impl DoubleEcho {
validator_id: ValidatorId,
signature: Signature,
) {
match self.validator_store.get_certificate(&certificate_id) {
Err(storage_error) => error!(
"Unable to get the Certificate {} due to {:?}",
&certificate_id, storage_error
),
Ok(Some(_)) => debug!(
"Certificate {} already delivered, ignoring echo",
&certificate_id
),
Ok(None) => {
let _ = self
.task_manager_message_sender
.send(DoubleEchoCommand::Echo {
validator_id,
certificate_id,
signature,
})
.await;
}
}
let _ = self
.task_manager_message_sender
.send(DoubleEchoCommand::Echo {
validator_id,
certificate_id,
signature,
})
.await;
}

pub async fn handle_ready(
Expand All @@ -258,25 +249,13 @@ impl DoubleEcho {
validator_id: ValidatorId,
signature: Signature,
) {
match self.validator_store.get_certificate(&certificate_id) {
Err(storage_error) => error!(
"Unable to get the Certificate {} due to {:?}",
&certificate_id, storage_error
),
Ok(Some(_)) => debug!(
"Certificate {} already delivered, ignoring echo",
&certificate_id
),
Ok(None) => {
let _ = self
.task_manager_message_sender
.send(DoubleEchoCommand::Ready {
validator_id,
certificate_id,
signature,
})
.await;
}
}
let _ = self
.task_manager_message_sender
.send(DoubleEchoCommand::Ready {
validator_id,
certificate_id,
signature,
})
.await;
}
}
20 changes: 19 additions & 1 deletion crates/topos-tce-broadcast/src/task_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ use crate::event::ProtocolEvents;
use futures::stream::FuturesUnordered;
use futures::Future;
use futures::StreamExt;
use lru::LruCache;
use std::collections::HashMap;
use std::future::IntoFuture;
use std::num::NonZeroUsize;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;
Expand Down Expand Up @@ -52,11 +54,14 @@ pub struct TaskManager {
pub thresholds: ReliableBroadcastParams,
pub validator_id: ValidatorId,
pub validator_store: Arc<ValidatorStore>,
pub delivered_certificates: LruCache<CertificateId, ()>,
pub broadcast_sender: broadcast::Sender<CertificateDeliveredWithPositions>,
pub latest_pending_id: PendingCertificateId,
}

impl TaskManager {
pub const DELIVERED_CERTIFICATES_CACHE_SIZE: usize = 20_000;

#[allow(clippy::too_many_arguments)]
pub fn new(
message_receiver: mpsc::Receiver<DoubleEchoCommand>,
Expand All @@ -79,6 +84,9 @@ impl TaskManager {
message_signer,
thresholds,
validator_store,
delivered_certificates: LruCache::new(
NonZeroUsize::new(Self::DELIVERED_CERTIFICATES_CACHE_SIZE).unwrap(),
),
broadcast_sender,
latest_pending_id: 0,
}
Expand Down Expand Up @@ -120,6 +128,11 @@ impl TaskManager {
Some(msg) = self.message_receiver.recv() => {
match msg {
DoubleEchoCommand::Echo { certificate_id, .. } | DoubleEchoCommand::Ready { certificate_id, .. } => {
if self.delivered_certificates.contains(&certificate_id) {
trace!("Received message for certificate {} that has already been delivered", certificate_id);
continue;
}

if let Some(task_context) = self.tasks.get(&certificate_id) {
_ = task_context.sink.send(msg).await;
} else {
Expand All @@ -130,6 +143,11 @@ impl TaskManager {
};
}
DoubleEchoCommand::Broadcast { ref cert, need_gossip, pending_id } => {
if self.delivered_certificates.contains(&cert.id) {
trace!("Received message for certificate {} that has already been delivered", cert.id);
continue;
}

trace!("Received broadcast message for certificate {} ", cert.id);

self.create_task(cert, need_gossip, pending_id)
Expand All @@ -139,10 +157,10 @@ impl TaskManager {

Some((certificate_id, status)) = self.running_tasks.next() => {
if let TaskStatus::Success = status {
self.delivered_certificates.put(certificate_id, ());
trace!("Task for certificate {} finished successfully", certificate_id);
self.tasks.remove(&certificate_id);
DOUBLE_ECHO_ACTIVE_TASKS_COUNT.dec();

} else {
error!("Task for certificate {} finished unsuccessfully", certificate_id);
}
Expand Down

0 comments on commit 57d260e

Please sign in to comment.