Skip to content

Commit

Permalink
Merge pull request #61 from spectrum-finance/dev-1035-blacklist-spect…
Browse files Browse the repository at this point in the history
…rum-production-pools-for

Blacklist Spectrum pools for v1
  • Loading branch information
kettlebell authored May 24, 2023
2 parents f90f3fa + 60eed6e commit c6696bf
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 11 deletions.
29 changes: 28 additions & 1 deletion spectrum-offchain-lm/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use std::collections::HashSet;
use std::sync::{Arc, Once};

use clap::{arg, Parser};
use ergo_lib::ergo_chain_types::Digest32;
use ergo_lib::ergotree_ir::chain::address::{AddressEncoder, NetworkPrefix};
use ergo_lib::ergotree_ir::chain::token::TokenId;
use futures::channel::mpsc;
use futures::future::ready;
use futures::stream::select_all;
Expand Down Expand Up @@ -41,6 +44,7 @@ use crate::data::bundle::IndexedStakingBundle;
use crate::data::funding::{ExecutorWallet, FundingUpdate};
use crate::data::pool::Pool;
use crate::data::AsBox;
use crate::data::PoolId;
use crate::data::{
order::{Order, OrderProto},
OrderId,
Expand Down Expand Up @@ -147,9 +151,21 @@ async fn main() {

let default_handler = NoopDefaultHandler;

// The following pool ids are from Spectrum test and production pools.
let blacklisted_entities = generate_pool_blacklist(&[
"f61da4f7d651fc7a1c1bb586c91ec1fcea1ef9611461fd437176c49d9db37bb2",
"8a82a413c451fec826c8d39e87b95b6104d7de30e5d883a3c5ba4236d44b5837",
"8d49ef70ab015d79cb9ab523adf3ccb0b7d05534c598e4ddf8acb8b2b420b463",
"1b3d37d78650dd8527fa02f8783d9b98490df3b464dd44af0e0593ceb4717702",
"af629d8e63d08a9770bc543f807bdb82dcda942d4e21d506771f975dc2b3fd3a",
"24e9f9a3e0aa89092d8690941900323dea2ee3603ca7368c0c35175259df6930",
]);
// pools
let (pool_snd, pool_recv) = mpsc::unbounded::<Confirmed<StateUpdate<AsBox<Pool>>>>();
let pool_han = ConfirmedUpdateHandler::<_, AsBox<Pool>, _>::new(pool_snd, Arc::clone(&pools));

let pool_han =
ConfirmedUpdateHandler::<_, AsBox<Pool>, _>::new(pool_snd, Arc::clone(&pools), blacklisted_entities);

let pool_update_stream = boxed(entity_tracking_stream(pool_recv, Arc::clone(&pools)));

// bundles
Expand Down Expand Up @@ -265,3 +281,14 @@ struct AppArgs {
#[arg(long, short)]
log4rs_path: Option<String>,
}

fn generate_pool_blacklist(base16_encodings: &[&str]) -> HashSet<PoolId> {
base16_encodings
.iter()
.map(|encoding| {
PoolId::from(TokenId::from(
Digest32::try_from(String::from(*encoding)).unwrap(),
))
})
.collect()
}
55 changes: 45 additions & 10 deletions spectrum-offchain/src/event_sink/handlers/entity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,24 +20,39 @@ use crate::event_sink::handlers::types::TryFromBox;
use crate::event_sink::types::EventHandler;
use crate::event_source::data::LedgerTxEvent;

pub struct ConfirmedUpdateHandler<TSink, TEntity, TRepo> {
pub struct ConfirmedUpdateHandler<TSink, TEntity, TRepo>
where
TEntity: OnChainEntity + TryFromBox + Clone,
TEntity::TEntityId: Clone,
{
pub topic: TSink,
pub entities: Arc<Mutex<TRepo>>,
pub blacklisted_entities: HashSet<TEntity::TEntityId>,
pub pd: PhantomData<TEntity>,
}

impl<TSink, TEntity, TRepo> ConfirmedUpdateHandler<TSink, TEntity, TRepo> {
pub fn new(topic: TSink, entities: Arc<Mutex<TRepo>>) -> Self {
impl<TSink, TEntity, TRepo> ConfirmedUpdateHandler<TSink, TEntity, TRepo>
where
TEntity: OnChainEntity + TryFromBox + Clone,
TEntity::TEntityId: Clone,
{
pub fn new(
topic: TSink,
entities: Arc<Mutex<TRepo>>,
blacklisted_entities: HashSet<TEntity::TEntityId>,
) -> Self {
Self {
topic,
entities,
blacklisted_entities,
pd: Default::default(),
}
}
}

async fn extract_transitions<TEntity, TRepo>(
entities: Arc<Mutex<TRepo>>,
blacklisted_entities: &HashSet<TEntity::TEntityId>,
tx: Transaction,
) -> Vec<EitherOrBoth<TEntity, TEntity>>
where
Expand All @@ -52,18 +67,25 @@ where
let entities = entities.lock().await;
if entities.may_exist(state_id).await {
if let Some(entity) = entities.get_state(state_id).await {
consumed_entities.insert(entity.get_self_ref(), entity);
let entity_id = entity.get_self_ref();
if !blacklisted_entities.contains(&entity_id) {
consumed_entities.insert(entity_id, entity);
}
}
}
}
let mut created_entities = HashMap::<TEntity::TEntityId, TEntity>::new();
for bx in &tx.outputs {
if let Some(entity) = TEntity::try_from_box(bx.clone()) {
created_entities.insert(entity.get_self_ref(), entity);
let entity_id = entity.get_self_ref();
if !blacklisted_entities.contains(&entity_id) {
created_entities.insert(entity_id.clone(), entity);
}
}
}
let consumed_keys = consumed_entities.keys().cloned().collect::<HashSet<_>>();
let created_keys = created_entities.keys().cloned().collect::<HashSet<_>>();

consumed_keys
.union(&created_keys)
.flat_map(|k| {
Expand All @@ -86,7 +108,9 @@ where
async fn try_handle(&mut self, ev: LedgerTxEvent) -> Option<LedgerTxEvent> {
let res = match ev {
LedgerTxEvent::AppliedTx { tx, timestamp } => {
let transitions = extract_transitions(Arc::clone(&self.entities), tx.clone()).await;
let transitions =
extract_transitions(Arc::clone(&self.entities), &self.blacklisted_entities, tx.clone())
.await;
let num_transitions = transitions.len();
let is_success = num_transitions > 0;
for tr in transitions {
Expand All @@ -100,7 +124,9 @@ where
}
}
LedgerTxEvent::UnappliedTx(tx) => {
let transitions = extract_transitions(Arc::clone(&self.entities), tx.clone()).await;
let transitions =
extract_transitions(Arc::clone(&self.entities), &self.blacklisted_entities, tx.clone())
.await;
let num_transitions = transitions.len();
let is_success = num_transitions > 0;
for tr in transitions {
Expand All @@ -122,9 +148,14 @@ where
}
}

pub struct UnconfirmedUpgradeHandler<TSink, TEntity, TRepo> {
pub struct UnconfirmedUpgradeHandler<TSink, TEntity, TRepo>
where
TEntity: OnChainEntity + TryFromBox + Clone,
TEntity::TEntityId: Clone,
{
pub topic: TSink,
pub entities: Arc<Mutex<TRepo>>,
pub blacklisted_entities: HashSet<TEntity::TEntityId>,
pub pd: PhantomData<TEntity>,
}

Expand All @@ -140,7 +171,9 @@ where
async fn try_handle(&mut self, ev: MempoolUpdate) -> Option<MempoolUpdate> {
let res = match ev {
MempoolUpdate::TxAccepted(tx) => {
let transitions = extract_transitions(Arc::clone(&self.entities), tx.clone()).await;
let transitions =
extract_transitions(Arc::clone(&self.entities), &self.blacklisted_entities, tx.clone())
.await;
let is_success = !transitions.is_empty();
for tr in transitions {
let _ = self.topic.feed(Unconfirmed(StateUpdate::Transition(tr))).await;
Expand All @@ -152,7 +185,9 @@ where
}
}
MempoolUpdate::TxWithdrawn(tx) => {
let transitions = extract_transitions(Arc::clone(&self.entities), tx.clone()).await;
let transitions =
extract_transitions(Arc::clone(&self.entities), &self.blacklisted_entities, tx.clone())
.await;
let is_success = !transitions.is_empty();
for tr in transitions {
let _ = self
Expand Down

0 comments on commit c6696bf

Please sign in to comment.