Skip to content

Commit

Permalink
Merge pull request #28 from talaia-labs/handle-reorgs-height
Browse files Browse the repository at this point in the history
Adds basic reorg handling
  • Loading branch information
sr-gi authored Mar 11, 2022
2 parents f923509 + 71747fd commit 009e3be
Show file tree
Hide file tree
Showing 7 changed files with 1,295 additions and 776 deletions.
313 changes: 161 additions & 152 deletions teos/src/carrier.rs

Large diffs are not rendered by default.

51 changes: 38 additions & 13 deletions teos/src/dbm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@ use teos_common::UserId;

use crate::extended_appointment::{compute_appointment_slots, ExtendedAppointment, UUID};
use crate::gatekeeper::UserInfo;
use crate::responder::TransactionTracker;
use crate::watcher::Breach;
use crate::responder::{ConfirmationStatus, TransactionTracker};

/// Packs the errors than can raise when interacting with the underlying database.
#[derive(Debug)]
pub enum Error {
AlreadyExists,
MissingForeignKey,
MissingField,
NotFound,
Unknown(SqliteError),
}
Expand Down Expand Up @@ -91,6 +91,8 @@ impl DBM {
UUID INT PRIMARY KEY,
dispute_tx BLOB NOT NULL,
penalty_tx BLOB NOT NULL,
height INT NOT NULL,
confirmed BOOL NOT NULL,
FOREIGN KEY(UUID)
REFERENCES appointments(UUID)
ON DELETE CASCADE
Expand Down Expand Up @@ -455,13 +457,18 @@ impl DBM {
uuid: UUID,
tracker: &TransactionTracker,
) -> Result<(), Error> {
let query = "INSERT INTO trackers (UUID, dispute_tx, penalty_tx) VALUES (?1, ?2, ?3)";
let (height, confirmed) = tracker.status.to_db_data().ok_or(Error::MissingField)?;

let query =
"INSERT INTO trackers (UUID, dispute_tx, penalty_tx, height, confirmed) VALUES (?1, ?2, ?3, ?4, ?5)";
match self.store_data(
query,
params![
uuid.serialize(),
tracker.dispute_tx.serialize(),
tracker.penalty_tx.serialize(),
height,
confirmed,
],
) {
Ok(x) => {
Expand All @@ -486,12 +493,15 @@ impl DBM {
let dispute_tx = deserialize::<Transaction>(&raw_dispute_tx).unwrap();
let raw_penalty_tx: Vec<u8> = row.get(2).unwrap();
let penalty_tx = deserialize::<Transaction>(&raw_penalty_tx).unwrap();
let raw_userid: Vec<u8> = row.get(3).unwrap();
let height: u32 = row.get(3).unwrap();
let confirmed: bool = row.get(4).unwrap();
let raw_userid: Vec<u8> = row.get(5).unwrap();
let user_id = UserId::deserialize(&raw_userid).unwrap();

Ok(TransactionTracker {
dispute_tx,
penalty_tx,
status: ConfirmationStatus::from_db_data(height, confirmed),
user_id,
})
})
Expand All @@ -514,12 +524,19 @@ impl DBM {
let dispute_tx = deserialize::<Transaction>(&raw_dispute_tx).unwrap();
let raw_penalty_tx: Vec<u8> = row.get(2).unwrap();
let penalty_tx = deserialize::<Transaction>(&raw_penalty_tx).unwrap();
let raw_userid: Vec<u8> = row.get(3).unwrap();
let height: u32 = row.get(3).unwrap();
let confirmed: bool = row.get(4).unwrap();
let raw_userid: Vec<u8> = row.get(5).unwrap();
let user_id = UserId::deserialize(&raw_userid).unwrap();

trackers.insert(
uuid,
TransactionTracker::new(Breach::new(dispute_tx, penalty_tx), user_id),
TransactionTracker {
dispute_tx,
penalty_tx,
status: ConfirmationStatus::from_db_data(height, confirmed),
user_id,
},
);
}

Expand Down Expand Up @@ -754,7 +771,8 @@ mod tests {
let mut dbm = DBM::in_memory().unwrap();
let uuid = generate_uuid();
let appointment = generate_dummy_appointment(None);
let tracker = get_random_tracker(appointment.user_id);
// The confirmation status doesn't really matter here, it can be any of {ConfirmedIn, InMempoolSince}.
let tracker = get_random_tracker(appointment.user_id, ConfirmationStatus::ConfirmedIn(100));

// Add the user and link an appointment (this is usually done once the appointment)
// is added after the user creation, but for the test purpose it can be done all at once.
Expand Down Expand Up @@ -905,7 +923,8 @@ mod tests {
let (uuid, appointment) = generate_dummy_appointment_with_user(user_id, None);
dbm.store_appointment(uuid, &appointment).unwrap();

let tracker = get_random_tracker(user_id);
// The confirmation status doesn't really matter here, it can be any of {ConfirmedIn, InMempoolSince}.
let tracker = get_random_tracker(user_id, ConfirmationStatus::InMempoolSince(100));
dbm.store_tracker(uuid, &tracker).unwrap();

// We should get all the appointments back except from the triggered one
Expand Down Expand Up @@ -970,7 +989,8 @@ mod tests {
let mut dbm = DBM::in_memory().unwrap();
let uuid = generate_uuid();
let appointment = generate_dummy_appointment(None);
let tracker = get_random_tracker(appointment.user_id);
// The confirmation status doesn't really matter here, it can be any of {ConfirmedIn, InMempoolSince}.
let tracker = get_random_tracker(appointment.user_id, ConfirmationStatus::ConfirmedIn(21));

let info = UserInfo::new(21, 42);

Expand Down Expand Up @@ -1053,7 +1073,8 @@ mod tests {
let (uuid, appointment) = generate_dummy_appointment_with_user(user_id, None);
dbm.store_appointment(uuid, &appointment).unwrap();

let tracker = get_random_tracker(user_id);
// The confirmation status doesn't really matter here, it can be any of {ConfirmedIn, InMempoolSince}.
let tracker = get_random_tracker(user_id, ConfirmationStatus::ConfirmedIn(21));
assert!(matches!(dbm.store_tracker(uuid, &tracker), Ok { .. }));
assert_eq!(dbm.load_tracker(uuid).unwrap(), tracker);
}
Expand All @@ -1069,7 +1090,8 @@ mod tests {
let (uuid, appointment) = generate_dummy_appointment_with_user(user_id, None);
dbm.store_appointment(uuid, &appointment).unwrap();

let tracker = get_random_tracker(user_id);
// The confirmation status doesn't really matter here, it can be any of {ConfirmedIn, InMempoolSince}.
let tracker = get_random_tracker(user_id, ConfirmationStatus::InMempoolSince(42));
assert!(matches!(dbm.store_tracker(uuid, &tracker), Ok { .. }));

// Try to store it again, but it shouldn't go through
Expand All @@ -1085,7 +1107,9 @@ mod tests {

let uuid = generate_uuid();
let user_id = get_random_user_id();
let tracker = get_random_tracker(user_id);

// The confirmation status doesn't really matter here, it can be any of {ConfirmedIn, InMempoolSince}.
let tracker = get_random_tracker(user_id, ConfirmationStatus::InMempoolSince(42));

assert!(matches!(
dbm.store_tracker(uuid, &tracker),
Expand Down Expand Up @@ -1114,7 +1138,8 @@ mod tests {
let (uuid, appointment) = generate_dummy_appointment_with_user(user_id, None);
dbm.store_appointment(uuid, &appointment).unwrap();

let tracker = get_random_tracker(user_id);
// The confirmation status doesn't really matter here, it can be any of {ConfirmedIn, InMempoolSince}.
let tracker = get_random_tracker(user_id, ConfirmationStatus::InMempoolSince(42));
dbm.store_tracker(uuid, &tracker).unwrap();
trackers.insert(uuid, tracker);
}
Expand Down
78 changes: 50 additions & 28 deletions teos/src/gatekeeper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,10 @@
use std::collections::{HashMap, HashSet};
use std::iter::FromIterator;
use std::ops::Deref;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::{Arc, Mutex};

use lightning::chain;
use lightning_block_sync::{poll::ValidatedBlockHeader, BlockHeaderData};

use teos_common::constants::ENCRYPTED_BLOB_MAX_SIZE;
use teos_common::cryptography;
Expand Down Expand Up @@ -75,7 +74,7 @@ pub(crate) struct MaxSlotsReached;
#[derive(Debug)]
pub struct Gatekeeper {
/// last known block header by the [Gatekeeper].
last_known_block_header: Mutex<BlockHeaderData>,
last_known_block_height: AtomicU32,
/// Number of slots new subscriptions get by default.
subscription_slots: u32,
/// Expiry time new subscription get by default, in blocks (starting from the block the subscription is requested).
Expand All @@ -91,15 +90,15 @@ pub struct Gatekeeper {
impl Gatekeeper {
/// Creates a new [Gatekeeper] instance.
pub fn new(
last_known_block_header: ValidatedBlockHeader,
last_known_block_height: u32,
subscription_slots: u32,
subscription_duration: u32,
expiry_delta: u32,
dbm: Arc<Mutex<DBM>>,
) -> Self {
let registered_users = dbm.lock().unwrap().load_all_users();
Gatekeeper {
last_known_block_header: Mutex::new(*last_known_block_header.deref()),
last_known_block_height: AtomicU32::new(last_known_block_height),
subscription_slots,
subscription_duration,
expiry_delta,
Expand Down Expand Up @@ -159,7 +158,7 @@ impl Gatekeeper {
&self,
user_id: UserId,
) -> Result<RegistrationReceipt, MaxSlotsReached> {
let block_count = self.last_known_block_header.lock().unwrap().height;
let block_count = self.last_known_block_height.load(Ordering::Acquire);

// TODO: For now, new calls to `add_update_user` add subscription_slots to the current count and reset the expiry time
let mut registered_users = self.registered_users.lock().unwrap();
Expand Down Expand Up @@ -238,7 +237,7 @@ impl Gatekeeper {
Err(AuthenticationFailure("User not found.")),
|user_info| {
Ok((
self.last_known_block_header.lock().unwrap().height
self.last_known_block_height.load(Ordering::Acquire)
>= user_info.subscription_expiry,
user_info.subscription_expiry,
))
Expand Down Expand Up @@ -316,19 +315,17 @@ impl chain::Listen for Gatekeeper {
.retain(|id, _| !outdated_users.contains(id));
self.dbm.lock().unwrap().batch_remove_users(&outdated_users);

// Update last known block
*self.last_known_block_header.lock().unwrap() = BlockHeaderData {
header: block.header,
height,
chainwork: block.header.work(),
};
// Update last known block height
self.last_known_block_height
.store(height, Ordering::Release);
}

/// FIXME: To be implemented.
/// This will handle reorgs on the [Gatekeeper].
#[allow(unused_variables)]
/// Handles reorgs in the [Gatekeeper]. Simply updates the last_known_block_height.
fn block_disconnected(&self, header: &bitcoin::BlockHeader, height: u32) {
todo!()
log::warn!("Block disconnected: {}", header.block_hash());
// There's nothing to be done here but updating the last known block
self.last_known_block_height
.store(height - 1, Ordering::Release);
}
}

Expand All @@ -355,8 +352,8 @@ mod tests {
&& self.subscription_duration == other.subscription_duration
&& self.expiry_delta == other.expiry_delta
&& *self.registered_users.lock().unwrap() == *other.registered_users.lock().unwrap()
&& *self.last_known_block_header.lock().unwrap()
== *other.last_known_block_header.lock().unwrap()
&& self.last_known_block_height.load(Ordering::Relaxed)
== other.last_known_block_height.load(Ordering::Relaxed)
}
}
impl Eq for Gatekeeper {}
Expand Down Expand Up @@ -385,9 +382,8 @@ mod tests {
}

fn init_gatekeeper(chain: &Blockchain) -> Gatekeeper {
let tip = chain.tip();
let dbm = Arc::new(Mutex::new(DBM::in_memory().unwrap()));
Gatekeeper::new(tip, SLOTS, DURATION, EXPIRY_DELTA, dbm)
Gatekeeper::new(chain.get_block_count(), SLOTS, DURATION, EXPIRY_DELTA, dbm)
}

#[test]
Expand All @@ -396,7 +392,13 @@ mod tests {
let chain = Blockchain::default().with_height(START_HEIGHT);
let dbm = Arc::new(Mutex::new(DBM::in_memory().unwrap()));

let gatekeeper = Gatekeeper::new(chain.tip(), SLOTS, DURATION, EXPIRY_DELTA, dbm.clone());
let gatekeeper = Gatekeeper::new(
chain.get_block_count(),
SLOTS,
DURATION,
EXPIRY_DELTA,
dbm.clone(),
);
assert!(gatekeeper.is_fresh());

// If we add some users and appointments to the system and create a new Gatekeeper reusing the same db
Expand All @@ -419,7 +421,8 @@ mod tests {
}

// Create a new GK reusing the same DB and check that the data is loaded
let another_gk = Gatekeeper::new(chain.tip(), SLOTS, DURATION, EXPIRY_DELTA, dbm);
let another_gk =
Gatekeeper::new(chain.get_block_count(), SLOTS, DURATION, EXPIRY_DELTA, dbm);
assert!(!another_gk.is_fresh());
assert_eq!(gatekeeper, another_gk);
}
Expand Down Expand Up @@ -474,7 +477,9 @@ mod tests {

// Let generate a new block and add the user again to check that both the slots and expiry are updated.
chain.generate(None);
*gatekeeper.last_known_block_header.lock().unwrap() = *chain.tip().deref();
gatekeeper
.last_known_block_height
.store(chain.get_block_count(), Ordering::Relaxed);
let updated_receipt = gatekeeper.add_update_user(user_id).unwrap();

assert_eq!(
Expand Down Expand Up @@ -801,7 +806,7 @@ mod tests {
fn test_block_connected() {
// block_connected in the Gatekeeper is used to keep track of time in order to manage the users' subscription expiry.
// Remove users that get outdated at the new block's height from registered_users and the database.
let chain = Blockchain::default().with_height(START_HEIGHT);
let mut chain = Blockchain::default().with_height(START_HEIGHT);
let gatekeeper = init_gatekeeper(&chain);

// Check that users are outdated when the expected height if hit
Expand All @@ -814,7 +819,7 @@ mod tests {
}

// Connect a new block. Outdated users are deleted
gatekeeper.block_connected(chain.blocks.last().unwrap(), chain.tip().height + 1);
gatekeeper.block_connected(&chain.generate(None), chain.get_block_count());

// Check that users have been removed from registered_users and the database
for user_id in &[user1_id, user2_id, user3_id] {
Expand All @@ -831,8 +836,25 @@ mod tests {

// Check that the last_known_block_header has been properly updated
assert_eq!(
gatekeeper.last_known_block_header.lock().unwrap().header,
chain.tip().header
gatekeeper.last_known_block_height.load(Ordering::Relaxed),
chain.get_block_count()
);
}

#[test]
fn test_block_disconnected() {
// Block disconnected simply updates the last known block
let chain = Blockchain::default().with_height(START_HEIGHT);
let gatekeeper = init_gatekeeper(&chain);
let height = chain.get_block_count();

let last_known_block_header = chain.tip();
let prev_block_header = chain.at_height((height - 1) as usize);

gatekeeper.block_disconnected(&last_known_block_header.header, height);
assert_eq!(
gatekeeper.last_known_block_height.load(Ordering::Relaxed),
prev_block_header.height
);
}
}
17 changes: 6 additions & 11 deletions teos/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,25 +167,20 @@ async fn main() {

// Build components
let gatekeeper = Arc::new(Gatekeeper::new(
tip,
tip.height,
conf.subscription_slots,
conf.subscription_duration,
conf.expiry_delta,
dbm.clone(),
));

let carrier = Carrier::new(rpc, bitcoind_reachable.clone());
let responder = Arc::new(Responder::new(
carrier,
gatekeeper.clone(),
dbm.clone(),
tip,
));
let carrier = Carrier::new(rpc, bitcoind_reachable.clone(), tip.deref().height);
let responder = Arc::new(Responder::new(carrier, gatekeeper.clone(), dbm.clone()));
let watcher = Arc::new(Watcher::new(
gatekeeper.clone(),
responder.clone(),
last_n_blocks,
tip,
tip.height,
tower_sk,
UserId(tower_pk),
dbm.clone(),
Expand All @@ -204,7 +199,7 @@ async fn main() {

// The ordering here actually matters. Listeners are called by order, and we want the gatekeeper to be called
// last, so both the Watcher and the Responder can query the necessary data from it during data deletion.
let listener = &(watcher.clone(), &(responder.clone(), gatekeeper));
let listener = &(watcher.clone(), &(responder, gatekeeper));
let cache = &mut UnboundedCache::new();
let spv_client = SpvClient::new(tip, poller, cache, listener);
let mut chain_monitor = ChainMonitor::new(
Expand All @@ -223,7 +218,7 @@ async fn main() {

// Build interfaces
let rpc_api = Arc::new(InternalAPI::new(
watcher.clone(),
watcher,
bitcoind_reachable.clone(),
shutdown_trigger,
));
Expand Down
Loading

0 comments on commit 009e3be

Please sign in to comment.