Skip to content

Commit

Permalink
exporter: Use market hours information in price filtering
Browse files Browse the repository at this point in the history
  • Loading branch information
drozdziak1 committed Nov 24, 2023
1 parent 75497df commit c5bce62
Show file tree
Hide file tree
Showing 6 changed files with 121 additions and 38 deletions.
35 changes: 35 additions & 0 deletions integration-tests/tests/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
"nasdaq_symbol": "AAPL",
"symbol": "Equity.US.AAPL/USD",
"base": "AAPL",
"market_hours": "America/New_York,C,C,C,C,C,C,C" # Should never be published due to all-closed market hours
},
"metadata": {"jump_id": "186", "jump_symbol": "AAPL", "price_exp": -5, "min_publishers": 1},
}
Expand Down Expand Up @@ -732,3 +733,37 @@ async def test_agent_migrate_config(self,
# Continue with the simple test case, which must succeed
await self.test_update_price_simple(client_no_spawn)
await client_no_spawn.close()

@pytest.mark.asyncio
async def test_agent_respects_market_hours(self, client: PythAgentClient):
'''
Similar to test_update_price_simple, but using AAPL_USD and
asserting that nothing is published due to the symbol's
all-closed market hours.
'''

# Fetch all products
products = {product["attr_dict"]["symbol"]: product for product in await client.get_all_products()}

# Find the product account ID corresponding to the AAPL/USD symbol
product = products[AAPL_USD["attr_dict"]["symbol"]]
product_account = product["account"]

# Get the price account with which to send updates
price_account = product["price_accounts"][0]["account"]

# Send an "update_price" request
await client.update_price(price_account, 42, 2, "trading")
time.sleep(2)

# Send another "update_price" request to trigger aggregation
await client.update_price(price_account, 81, 1, "trading")
time.sleep(2)

# Confirm that the price account has been updated with the values from the first "update_price" request
final_product_state = await client.get_product(product_account)

final_price_account = final_product_state["price_accounts"][0]
assert final_price_account["price"] == 0
assert final_price_account["conf"] == 0
assert final_price_account["status"] == "unknown"
23 changes: 11 additions & 12 deletions src/agent/market_hours.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use {
chrono::{
naive::NaiveTime,
DateTime,
Datelike,
Duration,
TimeZone,
Weekday,
Expand All @@ -27,7 +28,7 @@ lazy_static! {
}

/// Weekly market hours schedule
#[derive(Default, Debug, Eq, PartialEq)]
#[derive(Clone, Default, Debug, Eq, PartialEq)]
pub struct MarketHours {
pub timezone: Tz,
pub mon: MHKind,
Expand All @@ -53,13 +54,11 @@ impl MarketHours {
}
}

pub fn can_publish_at<Tz: TimeZone>(&self, when: &DateTime<Tz>) -> Result<bool> {
pub fn can_publish_at<Tz: TimeZone>(&self, when: &DateTime<Tz>) -> bool {
// Convert to time local to the market
let when_market_local = when.with_timezone(&self.timezone);

// NOTE(2023-11-21): Strangely enough, I couldn't find a
// method that gets the programmatic Weekday from a DateTime.
let market_weekday: Weekday = when_market_local.format("%A").to_string().parse()?;
let market_weekday: Weekday = when_market_local.date_naive().weekday();

let market_time = when_market_local.time();

Expand All @@ -73,7 +72,7 @@ impl MarketHours {
Weekday::Sun => self.sun.can_publish_at(market_time),
};

Ok(ret)
ret
}
}

Expand Down Expand Up @@ -163,7 +162,7 @@ impl FromStr for MarketHours {
}

/// Helper enum for denoting per-day schedules: time range, all-day open and all-day closed.
#[derive(Debug, Eq, PartialEq)]
#[derive(Clone, Debug, Eq, PartialEq)]
pub enum MHKind {
Open,
Closed,
Expand Down Expand Up @@ -368,9 +367,9 @@ mod tests {
dbg!(&ok_dt);
dbg!(&after_dt);

assert!(!mh.can_publish_at(before_dt)?);
assert!(mh.can_publish_at(ok_dt)?);
assert!(!mh.can_publish_at(after_dt)?);
assert!(!mh.can_publish_at(before_dt));
assert!(mh.can_publish_at(ok_dt));
assert!(!mh.can_publish_at(after_dt));
}

Ok(())
Expand Down Expand Up @@ -414,8 +413,8 @@ mod tests {
dbg!(&ok_dt);
dbg!(&bad_dt);

assert!(mh.can_publish_at(ok_dt)?);
assert!(!mh.can_publish_at(bad_dt)?);
assert!(mh.can_publish_at(ok_dt));
assert!(!mh.can_publish_at(bad_dt));
}

Ok(())
Expand Down
4 changes: 1 addition & 3 deletions src/agent/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,7 @@ use {
},
warp::{
hyper::StatusCode,
reply::{
self,
},
reply::{self,},
Filter,
Rejection,
Reply,
Expand Down
2 changes: 2 additions & 0 deletions src/agent/pythd/adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -992,6 +992,7 @@ mod tests {
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
],
},
market_hours: Default::default(),
price_accounts: vec![
solana_sdk::pubkey::Pubkey::from_str(
"GVXRSBjFk6e6J3NbVPXohDJetcTjaeeuykUpbQF8UoMU",
Expand Down Expand Up @@ -1051,6 +1052,7 @@ mod tests {
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
],
},
market_hours: Default::default(),
price_accounts: vec![
solana_sdk::pubkey::Pubkey::from_str(
"GG3FTE7xhc9Diy7dn9P6BWzoCrAEE4D3p5NBYrDAm5DD",
Expand Down
45 changes: 31 additions & 14 deletions src/agent/solana/exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,23 @@ use {
key_store,
network::Network,
},
crate::agent::remote_keypair_loader::{
KeypairRequest,
RemoteKeypairLoader,
crate::agent::{
market_hours::MarketHours,
remote_keypair_loader::{
KeypairRequest,
RemoteKeypairLoader,
},
},
anyhow::{
anyhow,
Context,
Result,
},
bincode::Options,
chrono::Utc,
chrono::{
Local,
Utc,
},
futures_util::future::{
self,
join_all,
Expand Down Expand Up @@ -169,7 +175,7 @@ pub fn spawn_exporter(
network: Network,
rpc_url: &str,
rpc_timeout: Duration,
publisher_permissions_rx: mpsc::Receiver<HashMap<Pubkey, HashSet<Pubkey>>>,
publisher_permissions_rx: mpsc::Receiver<HashMap<Pubkey, HashMap<Pubkey, MarketHours>>>,
key_store: KeyStore,
local_store_tx: Sender<store::local::Message>,
global_store_tx: Sender<store::global::Lookup>,
Expand Down Expand Up @@ -256,11 +262,11 @@ pub struct Exporter {
// Channel on which to send inflight transactions to the transaction monitor
inflight_transactions_tx: Sender<Signature>,

/// Permissioned symbols as read by the oracle module
publisher_permissions_rx: mpsc::Receiver<HashMap<Pubkey, HashSet<Pubkey>>>,
/// publisher => { permissioned_price => market hours } as read by the oracle module
publisher_permissions_rx: mpsc::Receiver<HashMap<Pubkey, HashMap<Pubkey, MarketHours>>>,

/// Currently known permissioned prices of this publisher
our_prices: HashSet<Pubkey>,
/// Currently known permissioned prices of this publisher along with their market hours
our_prices: HashMap<Pubkey, MarketHours>,

/// Interval to update the dynamic price (if enabled)
dynamic_compute_unit_price_update_interval: Interval,
Expand All @@ -284,7 +290,7 @@ impl Exporter {
global_store_tx: Sender<store::global::Lookup>,
network_state_rx: watch::Receiver<NetworkState>,
inflight_transactions_tx: Sender<Signature>,
publisher_permissions_rx: mpsc::Receiver<HashMap<Pubkey, HashSet<Pubkey>>>,
publisher_permissions_rx: mpsc::Receiver<HashMap<Pubkey, HashMap<Pubkey, MarketHours>>>,
keypair_request_tx: mpsc::Sender<KeypairRequest>,
logger: Logger,
) -> Self {
Expand All @@ -301,7 +307,7 @@ impl Exporter {
network_state_rx,
inflight_transactions_tx,
publisher_permissions_rx,
our_prices: HashSet::new(),
our_prices: HashMap::new(),
dynamic_compute_unit_price_update_interval: tokio::time::interval(
time::Duration::from_secs(1),
),
Expand Down Expand Up @@ -468,13 +474,24 @@ impl Exporter {
"publish_pubkey" => publish_keypair.pubkey().to_string(),
);

let now = Local::now();

// Filter out price accounts we're not permissioned to update
Ok(fresh_updates
.into_iter()
.filter(|(id, _data)| {
let key_from_id = Pubkey::from((*id).clone().to_bytes());
if self.our_prices.contains(&key_from_id) {
true
if let Some(market_hours) = self.our_prices.get(&key_from_id) {
let ret = market_hours.can_publish_at(&now);

if !ret {
debug!(self.logger, "Exporter: Attempted to publish price outside market hours";
"price_account" => key_from_id.to_string(),
"market_hours" => format!("{:?}", market_hours),
);
}

ret
} else {
// Note: This message is not an error. Some
// publishers have different permissions on
Expand Down Expand Up @@ -570,7 +587,7 @@ impl Exporter {
"Exporter: No permissioned prices were found for the publishing keypair on-chain. This is expected only on startup.";
"publish_pubkey" => publish_pubkey.to_string(),
);
HashSet::new()
HashMap::new()
});
trace!(
self.logger,
Expand Down
50 changes: 41 additions & 9 deletions src/agent/solana/oracle.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::agent::market_hours::MarketHours;
// This module is responsible for loading the current state of the
// on-chain Oracle program accounts from Solana.
use {
Expand Down Expand Up @@ -47,16 +48,16 @@ pub struct Data {
pub mapping_accounts: HashMap<Pubkey, MappingAccount>,
pub product_accounts: HashMap<Pubkey, ProductEntry>,
pub price_accounts: HashMap<Pubkey, PriceEntry>,
/// publisher => {their permissioned price accounts}
pub publisher_permissions: HashMap<Pubkey, HashSet<Pubkey>>,
/// publisher => {their permissioned price accounts => market hours}
pub publisher_permissions: HashMap<Pubkey, HashMap<Pubkey, MarketHours>>,
}

impl Data {
fn new(
mapping_accounts: HashMap<Pubkey, MappingAccount>,
product_accounts: HashMap<Pubkey, ProductEntry>,
price_accounts: HashMap<Pubkey, PriceEntry>,
publisher_permissions: HashMap<Pubkey, HashSet<Pubkey>>,
publisher_permissions: HashMap<Pubkey, HashMap<Pubkey, MarketHours>>,
) -> Self {
Data {
mapping_accounts,
Expand All @@ -71,6 +72,7 @@ pub type MappingAccount = pyth_sdk_solana::state::MappingAccount;
#[derive(Debug, Clone)]
pub struct ProductEntry {
pub account_data: pyth_sdk_solana::state::ProductAccount,
pub market_hours: MarketHours,
pub price_accounts: Vec<Pubkey>,
}
pub type PriceEntry = pyth_sdk_solana::state::PriceAccount;
Expand Down Expand Up @@ -134,7 +136,7 @@ pub fn spawn_oracle(
wss_url: &str,
rpc_timeout: Duration,
global_store_update_tx: mpsc::Sender<global::Update>,
publisher_permissions_tx: mpsc::Sender<HashMap<Pubkey, HashSet<Pubkey>>>,
publisher_permissions_tx: mpsc::Sender<HashMap<Pubkey, HashMap<Pubkey, MarketHours>>>,
key_store: KeyStore,
logger: Logger,
) -> Vec<JoinHandle<()>> {
Expand Down Expand Up @@ -349,7 +351,7 @@ struct Poller {
data_tx: mpsc::Sender<Data>,

/// Updates about permissioned price accounts from oracle to exporter
publisher_permissions_tx: mpsc::Sender<HashMap<Pubkey, HashSet<Pubkey>>>,
publisher_permissions_tx: mpsc::Sender<HashMap<Pubkey, HashMap<Pubkey, MarketHours>>>,

/// The RPC client to use to poll data from the RPC node
rpc_client: RpcClient,
Expand All @@ -369,7 +371,7 @@ struct Poller {
impl Poller {
pub fn new(
data_tx: mpsc::Sender<Data>,
publisher_permissions_tx: mpsc::Sender<HashMap<Pubkey, HashSet<Pubkey>>>,
publisher_permissions_tx: mpsc::Sender<HashMap<Pubkey, HashMap<Pubkey, MarketHours>>>,
rpc_url: &str,
rpc_timeout: Duration,
commitment: CommitmentLevel,
Expand Down Expand Up @@ -435,9 +437,20 @@ impl Poller {
for component in price_entry.comp {
let component_pub_entry = publisher_permissions
.entry(component.publisher)
.or_insert(HashSet::new());
.or_insert(HashMap::new());

component_pub_entry.insert(*price_key);
let market_hours = if let Some(prod_entry) = product_accounts.get(&price_entry.prod)
{
prod_entry.market_hours.clone()
} else {
warn!(&self.logger, "Oracle: INTERNAL: could not find product from price `prod` field, market hours falling back to 24/7.";
"price" => price_key.to_string(),
"missing_product" => price_entry.prod.to_string(),
);
Default::default()
};

component_pub_entry.insert(*price_key, market_hours);
}
}

Expand Down Expand Up @@ -525,10 +538,29 @@ impl Poller {
let product = load_product_account(prod_acc.data.as_slice())
.context(format!("Could not parse product account {}", product_key))?;

let market_hours: MarketHours = if let Some((_mh_key, mh_val)) =
product.iter().find(|(k, _v)| *k == "market_hours")
{
mh_val.parse().unwrap_or_else(|err| {
warn!(
self.logger,
"Oracle: Product has market_hours defined but it could not be parsed. Falling back to 24/7 publishing.";
"product_key" => product_key.to_string(),
"market_hours" => mh_val,
);
debug!(self.logger, "parsing error context"; "context" => format!("{:?}", err));
Default::default()
}
)
} else {
Default::default() // No market hours specified, meaning 24/7 publishing
};

product_entries.insert(
*product_key,
ProductEntry {
account_data: *product,
account_data: *product,
market_hours,
price_accounts: vec![],
},
);
Expand Down

0 comments on commit c5bce62

Please sign in to comment.