diff --git a/infra/price-pusher/config/config.offchain.dev.yaml b/infra/price-pusher/config/config.offchain.dev.yaml index b78e1bae..70df30dd 100644 --- a/infra/price-pusher/config/config.offchain.dev.yaml +++ b/infra/price-pusher/config/config.offchain.dev.yaml @@ -31,6 +31,24 @@ - 1000SATS/USD - ONDO/USD - STRK/USD + - BCH/USD + - WBTC/USD + - WSTETH/USD + - STETH/USD + - NSTR/USD + - LORDS/USD + - ZEND/USD + - EKUBO/USD + - SHIB/USD + - CRV/USD + - BROTHER/USDPLUS + - POPCAT/USD + - SEI/USD + - FTM/USD + - GOAT/USD + - MOODENG/USD + - BONK/USD + - LUSD/USD future: - BTC/USDT @@ -62,7 +80,16 @@ - NEAR/USDT - 1000SATS/USDT - ONDO/USDT - - STRK/USD + - STRK/USDT + - BCH/USDT + - SHIB/USDT + - CRV/USDT + - POPCAT/USDT + - SEI/USDT + - FTM/USDT + - GOAT/USDT + - MOODENG/USDT + - BONK/USDT time_difference: 1 price_deviation: 0.025 \ No newline at end of file diff --git a/pragma-sdk/pragma_sdk/common/fetchers/fetcher_client.py b/pragma-sdk/pragma_sdk/common/fetchers/fetcher_client.py index f3031727..7cdfafcf 100644 --- a/pragma-sdk/pragma_sdk/common/fetchers/fetcher_client.py +++ b/pragma-sdk/pragma_sdk/common/fetchers/fetcher_client.py @@ -1,4 +1,5 @@ import asyncio +import time from typing import List import logging @@ -47,7 +48,7 @@ class FetcherClient: .. code-block:: python - await fc.fetch(timeout_duration=20) # Denominated in seconds (default=10) + await fc.fetch(timeout_duration=20) # Denominated in seconds """ __fetchers: List[FetcherInterfaceT] = [] @@ -90,18 +91,58 @@ async def fetch( :param timeout_duration: Timeout duration for each fetcher :return: List of fetched data """ + start_time = time.time() tasks = [] + + # Create a timeout for both connection and individual operations timeout = aiohttp.ClientTimeout( - total=timeout_duration - ) # 20 seconds per request - async with aiohttp.ClientSession(timeout=timeout) as session: - for fetcher in self.fetchers: - data = fetcher.fetch(session) - tasks.append(data) + total=None, # No timeout for the entire session + connect=timeout_duration, + sock_read=timeout_duration, + sock_connect=timeout_duration, + ) + + async with aiohttp.ClientSession( + timeout=timeout, connector=aiohttp.TCPConnector(limit=0) + ) as session: + tasks = [] + for idx, fetcher in enumerate(self.fetchers): + + async def wrapped_fetch(f, i): + try: + # Add timeout to the individual fetch operation + async with asyncio.timeout(timeout_duration): + fetch_start = time.time() + result = await f.fetch(session) + fetch_time = time.time() - fetch_start + logger.debug( + f"Fetcher {i} ({f.__class__.__name__}) completed in {fetch_time:.2f}s" + ) + return result + except asyncio.TimeoutError: + logger.error( + f"Fetcher {i} ({f.__class__.__name__}) timed out after {timeout_duration}s" + ) + return PublisherFetchError(f"Timeout after {timeout_duration}s") + except Exception as e: + logger.error( + f"Fetcher {i} ({f.__class__.__name__}) failed: {str(e)}" + ) + raise + + tasks.append(wrapped_fetch(fetcher, idx)) + + gather_start = time.time() result = await asyncio.gather(*tasks, return_exceptions=return_exceptions) - result = [val for subl in result for val in subl] # type: ignore[misc,union-attr] + logger.info(f"Gathered all results in {time.time() - gather_start:.2f}s") + + result = [r if isinstance(r, list) else [r] for r in result] + result = [val for subl in result for val in subl] # flatten + if filter_exceptions: result = [ subl for subl in result if not isinstance(subl, BaseException) ] - return result # type: ignore[union-attr, misc, return-value] + return result + + await asyncio.sleep(0) # Graceful shutdown diff --git a/pragma-sdk/pragma_sdk/common/fetchers/future_fetchers/binance.py b/pragma-sdk/pragma_sdk/common/fetchers/future_fetchers/binance.py index 52156032..a405f30b 100644 --- a/pragma-sdk/pragma_sdk/common/fetchers/future_fetchers/binance.py +++ b/pragma-sdk/pragma_sdk/common/fetchers/future_fetchers/binance.py @@ -1,3 +1,4 @@ +import asyncio import json import time @@ -21,7 +22,7 @@ class BinanceFutureFetcher(FetcherInterfaceT): async def fetch_pair( # type: ignore[override] self, pair: Pair, session: ClientSession - ) -> List[FutureEntry] | PublisherFetchError: + ) -> FutureEntry | PublisherFetchError: url = self.format_url(pair) async with session.get(url) as resp: if resp.status == 404: @@ -42,14 +43,10 @@ async def fetch_pair( # type: ignore[override] async def fetch( self, session: ClientSession ) -> List[Entry | PublisherFetchError | BaseException]: - entries: List[Entry | PublisherFetchError | BaseException] = [] + entries = [] for pair in self.pairs: - entries_or_error = await self.fetch_pair(pair, session) - if isinstance(entries_or_error, PublisherFetchError): - entries.append(entries_or_error) - else: - entries.extend(entries_or_error) - return entries + entries.append(asyncio.ensure_future(self.fetch_pair(pair, session))) + return list(await asyncio.gather(*entries, return_exceptions=True)) def format_url(self, pair: Pair) -> str: return ( @@ -79,14 +76,12 @@ def _construct( else: expiry_timestamp = int(0) - return [ - FutureEntry( - pair_id=pair.id, - price=price_int, - volume=int(volume), - timestamp=int(time.time()), - source=self.SOURCE, - publisher=self.publisher, - expiry_timestamp=expiry_timestamp * 1000, - ) - ] + return FutureEntry( + pair_id=pair.id, + price=price_int, + volume=int(volume), + timestamp=int(time.time()), + source=self.SOURCE, + publisher=self.publisher, + expiry_timestamp=expiry_timestamp * 1000, + ) diff --git a/pragma-sdk/pragma_sdk/common/fetchers/future_fetchers/okx.py b/pragma-sdk/pragma_sdk/common/fetchers/future_fetchers/okx.py index c4fa1e7c..2f072a2e 100644 --- a/pragma-sdk/pragma_sdk/common/fetchers/future_fetchers/okx.py +++ b/pragma-sdk/pragma_sdk/common/fetchers/future_fetchers/okx.py @@ -1,3 +1,4 @@ +import asyncio import json import time @@ -39,9 +40,8 @@ def format_expiry_timestamp_url(self, instrument_id: str) -> str: async def fetch_pair( # type: ignore[override] self, pair: Pair, session: ClientSession - ) -> PublisherFetchError | List[Entry]: + ) -> PublisherFetchError | FutureEntry: url = self.format_url(pair) - future_entries: List[Entry] = [] async with session.get(url) as resp: if resp.status == 404: return PublisherFetchError(f"No data found for {pair} from OKX") @@ -58,23 +58,15 @@ async def fetch_pair( # type: ignore[override] or result["msg"] == "Instrument ID does not exist" ): return PublisherFetchError(f"No data found for {pair} from OKX") - result_len = len(result["data"]) - if result_len >= 1: - for i in range(0, result_len): - future_entries.append(self._construct(pair, result["data"][i], 0)) - return future_entries + return self._construct(pair, result["data"][0], 0) async def fetch( self, session: ClientSession ) -> List[Entry | PublisherFetchError | BaseException]: - entries: List[Entry | PublisherFetchError | BaseException] = [] + entries = [] for pair in self.pairs: - future_entries = await self.fetch_pair(pair, session) - if isinstance(future_entries, list): - entries.extend(future_entries) - else: - entries.append(future_entries) - return entries + entries.append(asyncio.ensure_future(self.fetch_pair(pair, session))) + return list(await asyncio.gather(*entries, return_exceptions=True)) def format_url(self, pair: Pair) -> str: url = f"{self.BASE_URL}?instType=SWAP&uly={pair.base_currency.id}-{pair.quote_currency.id}" diff --git a/pragma-sdk/pragma_sdk/offchain/client.py b/pragma-sdk/pragma_sdk/offchain/client.py index 2653e884..e8bf7df9 100644 --- a/pragma-sdk/pragma_sdk/offchain/client.py +++ b/pragma-sdk/pragma_sdk/offchain/client.py @@ -179,7 +179,7 @@ async def _create_entries( "signature": [str(s) for s in sig], "entries": Entry.offchain_serialize_entries(entries), } - print(data) + # print(data) async with aiohttp.ClientSession() as session: start = time.time() diff --git a/pragma-sdk/pragma_sdk/supported_assets.yaml b/pragma-sdk/pragma_sdk/supported_assets.yaml index 1ed3933e..67b10567 100644 --- a/pragma-sdk/pragma_sdk/supported_assets.yaml +++ b/pragma-sdk/pragma_sdk/supported_assets.yaml @@ -10,7 +10,7 @@ starknet_address: '0x049d36570d4e46f48e99674bd3fcc84644ddd6b96f7c741b1562b82f9e004dc7' - name: 'Solana' - decimals: 18 + decimals: 9 ticker: 'SOL' coingecko_id: 'solana' @@ -71,6 +71,11 @@ starknet_address: '0x03fe2b97c1fd336e750087d68b9b867997fd64a2661ff3ca5a7c771641e8e7ac' ethereum_address: '0x2260FAC5E5542a773Aa44fBCfeDf7C193bc2C599' +- name: 'Bitcoin Cash' + decimals: 8 + ticker: 'BCH' + coingecko_id: 'bitcoin-cash' + - name: 'Wrapped Lido Staked Ether' decimals: 18 ticker: 'WSTETH' @@ -134,8 +139,13 @@ ticker: 'MATIC' coingecko_id: 'matic-network' +- name: 'Worldcoin' + decimals: 18 + ticker: 'WLD' + coingecko_id: 'worldcoin-wld' + - name: 'Ripple' - decimals: 6 + decimals: 15 ticker: 'XRP' coingecko_id: 'ripple' @@ -145,7 +155,7 @@ coingecko_id: 'dogecoin' - name: '1000 Pepe' - decimals: 18 + decimals: 15 ticker: '1000PEPE' coingecko_id: '1000-pepe' @@ -164,10 +174,10 @@ ticker: 'SUI' coingecko_id: 'sui' -- name: 'Wrapped IchiFarm' - decimals: 18 +- name: 'dogwifhat' + decimals: 6 ticker: 'WIF' - coingecko_id: 'ichifarm-wrapped' + coingecko_id: 'dogwifcoin' - name: 'Celestia' decimals: 18 @@ -205,15 +215,20 @@ ticker: 'OP' coingecko_id: 'optimism' -- name: 'Ordinal Dogecoin' - decimals: 18 +- name: 'Aptos' + decimals: 8 + ticker: 'APT' + coingecko_id: 'aptos' + +- name: 'Ordinals' + decimals: 8 ticker: 'ORDI' - coingecko_id: 'ordinals-dogecoin' + coingecko_id: 'ordinals' -- name: 'Japan Token' +- name: 'Filecoin' decimals: 18 - ticker: 'JTO' - coingecko_id: 'japan-token' + ticker: 'FIL' + coingecko_id: 'filecoin' - name: 'Jupiter' decimals: 9 @@ -231,6 +246,24 @@ coingecko_id: 'okb' ethereum_address: '0x75231f58b43240c9718dd58b4967c5114342a86c' +- name: 'ApeCoin' + decimals: 18 + ticker: 'APE' + coingecko_id: 'apecoin' + ethereum_address: '0x4d224452801aced8b2f0aebe155379bb5d594381' + +- name: 'Shiba Inu' + decimals: 18 + ticker: 'SHIB' + coingecko_id: 'shiba-inu' + ethereum_address: '0x95ad61b0a150d79219dcf64e1e6cc01f0b64c4ce' + +- name: 'Curve DAO' + decimals: 18 + ticker: 'CRV' + coingecko_id: 'curve-dao-token' + ethereum_address: '0xd533a949740bb3306d119cc777fa900ba034cd52' + - name: 'Cosmos' decimals: 6 ticker: 'ATOM' @@ -251,3 +284,38 @@ ticker: 'BROTHER' coingecko_id: 'starknet-brother' starknet_address: '0x03b405a98c9e795d427fe82cdeeeed803f221b52471e3a757574a2b4180793ee' + +- name: 'Cardano' + decimals: 6 + ticker: 'ADA' + coingecko_id: 'cardano' + +- name: 'Popcat' + decimals: 9 + ticker: 'POPCAT' + coingecko_id: 'popcat' + +- name: 'Sei' + decimals: 6 + ticker: 'SEI' + coingecko_id: 'sei-network' + +- name: 'Fantom' + decimals: 18 + ticker: 'FTM' + coingecko_id: 'fantom' + +- name: 'Goatseus Maximus' + decimals: 6 + ticker: 'GOAT' + coingecko_id: 'goatseus-maximus' + +- name: 'Moo Deng' + decimals: 6 + ticker: 'MOODENG' + coingecko_id: 'moo-deng' + +- name: 'Bonk' + decimals: 5 + ticker: 'BONK' + coingecko_id: 'bonk' diff --git a/pragma-sdk/tests/fetcher_test.py b/pragma-sdk/tests/fetcher_test.py index 856074f4..40bb26e2 100644 --- a/pragma-sdk/tests/fetcher_test.py +++ b/pragma-sdk/tests/fetcher_test.py @@ -2,12 +2,12 @@ import argparse import aiohttp from datetime import datetime -from typing import Type, Optional +from typing import Type, Optional, List from pragma_sdk.common.types.pair import Pair from pragma_sdk.common.types.currency import Currency from pragma_sdk.common.configs.asset_config import AssetConfig -from pragma_sdk.common.types.entry import SpotEntry +from pragma_sdk.common.types.entry import SpotEntry, FutureEntry from pragma_sdk.common.exceptions import PublisherFetchError from pragma_sdk.common.fetchers.interface import FetcherInterfaceT from pragma_sdk.common.utils import felt_to_str @@ -20,94 +20,135 @@ python fetcher_test.py DefillamaFetcher BROTHER USDPLUS ``` """ + + async def test_fetcher( fetcher_class: Type[FetcherInterfaceT], - base_currency: str, - quote_currency: str, + pairs: List[Pair], api_key: Optional[str] = None, + is_future: bool = False, ) -> None: """ - Test a price fetcher for a specific currency pair. + Test a price fetcher for multiple currency pairs. Args: fetcher_class: The fetcher class to test - base_currency: Base currency ticker (e.g., "BTC") - quote_currency: Quote currency ticker (e.g., "USD") + pairs: List of currency pairs to fetch api_key: Optional API key for the fetcher + is_future: Whether the fetcher is a future fetcher """ - # Create the currency pair - base = Currency.from_asset_config(AssetConfig.from_ticker(base_currency)) - quote = Currency.from_asset_config(AssetConfig.from_ticker(quote_currency)) - pair = Pair(base, quote) + fetcher = fetcher_class(pairs=pairs, publisher="TEST") - # Initialize the fetcher - fetcher = fetcher_class(pairs=[pair], publisher="TEST") - - # Set API key if provided if api_key and hasattr(fetcher, "headers"): fetcher.headers["Authorization"] = f"Bearer {api_key}" - print(f"\nTesting {fetcher.__class__.__name__} for pair: {pair}") + print( + f"\nTesting {fetcher.__class__.__name__} for pairs: {[str(pair) for pair in pairs]}" + ) print("-" * 50) try: async with aiohttp.ClientSession() as session: start_time = datetime.now() results = await fetcher.fetch(session) - result = results[0] end_time = datetime.now() - - if isinstance(result, SpotEntry): - print("✅ Successfully fetched price:") - print(f" Price: {result.price}") - print( - f" Human readable price: {result.price / (10 ** pair.decimals())}" - ) - print(f" Timestamp: {result.base.timestamp}") - print( - f" Human readable time: {datetime.fromtimestamp(result.base.timestamp)}" - ) - print(f" Volume: {result.volume}") - print(f" Source: {felt_to_str(result.base.source)}") - print(f" Publisher: {felt_to_str(result.base.publisher)}") - print(f" Fetch time: {(end_time - start_time).total_seconds():.3f}s") - elif isinstance(result, PublisherFetchError): - print("❌ Error fetching price:") - print(f" Error message: {str(result)}") - else: - print("❌ Unexpected result type:") - print(f" {type(result)}: {result}") + fetch_time = (end_time - start_time).total_seconds() + + print(f"Fetch completed in {fetch_time:.3f}s") + print(f"Number of results: {len(results)}") + + for idx, result in enumerate(results): + print(f"\nResult {idx + 1}:") + if isinstance(result, (SpotEntry, FutureEntry)): + pair = pairs[idx] + print("✅ Successfully fetched price:") + print(f" Pair: {pair}") + print(f" Price: {result.price}") + print( + f" Human readable price: {result.price / (10 ** pair.decimals())}" + ) + print(f" Timestamp: {result.base.timestamp}") + print( + f" Human readable time: {datetime.fromtimestamp(result.base.timestamp)}" + ) + print(f" Volume: {result.volume}") + print(f" Source: {felt_to_str(result.base.source)}") + print(f" Publisher: {felt_to_str(result.base.publisher)}") + if isinstance(result, FutureEntry): + print( + f" Expiry: {datetime.fromtimestamp(result.expiry_timestamp)}" + ) + elif isinstance(result, PublisherFetchError): + print("❌ Error fetching price:") + print(f" Error message: {str(result)}") + else: + print("❌ Unexpected result type:") + print(f" {type(result)}: {result}") except Exception as e: print("❌ Exception occurred:") print(f" {type(e).__name__}: {str(e)}") +def parse_pair(pair_str: str) -> Pair: + """Parse a pair string in format 'BTC/USD' into a Pair object.""" + try: + base, quote = pair_str.split("/") + base_currency = Currency.from_asset_config(AssetConfig.from_ticker(base)) + quote_currency = Currency.from_asset_config(AssetConfig.from_ticker(quote)) + return Pair(base_currency, quote_currency) + except ValueError: + raise ValueError( + f"Invalid pair format: {pair_str}. Expected format: BASE/QUOTE (e.g., BTC/USD)" + ) + + def main(): parser = argparse.ArgumentParser( - description="Test a price fetcher for a specific currency pair" + description="Test a price fetcher for multiple currency pairs" + ) + parser.add_argument( + "fetcher", type=str, help="Fetcher class name (e.g., BinanceFutureFetcher)" ) parser.add_argument( - "fetcher", type=str, help="Fetcher class name (e.g., GeckoTerminalFetcher)" + "pairs", + type=str, + nargs="+", + help="Currency pairs in format BASE/QUOTE (e.g., BTC/USD ETH/USD)", ) - parser.add_argument("base", type=str, help="Base currency ticker (e.g., BTC)") - parser.add_argument("quote", type=str, help="Quote currency ticker (e.g., USD)") parser.add_argument( "--api-key", type=str, help="API key for the fetcher", default=None ) + parser.add_argument( + "--future", action="store_true", help="Use future fetcher module path" + ) args = parser.parse_args() - # Import the fetcher class dynamically try: - # This assumes the fetcher is in the same directory - # You might need to modify this to import from different locations + # Parse all pairs + pairs = [parse_pair(pair_str) for pair_str in args.pairs] + + # Handle the module name conversion module_name = args.fetcher.lower() - if module_name.endswith("fetcher"): - module_name = module_name[:-7] + # For future fetchers, we want 'binance' from 'BinanceFutureFetcher' + if args.future: + if module_name.endswith("futurefetcher"): + module_name = module_name.replace("futurefetcher", "") + else: + if module_name.endswith("fetcher"): + module_name = module_name.replace("fetcher", "") + + # Construct the correct import path + if args.future: + import_path = f"pragma_sdk.common.fetchers.future_fetchers.{module_name}" + else: + import_path = f"pragma_sdk.common.fetchers.fetchers.{module_name}" + + print(f"Attempting to import from: {import_path}") fetcher_module = __import__( - f"pragma_sdk.common.fetchers.fetchers.{module_name}", + import_path, fromlist=[args.fetcher], ) fetcher_class = getattr(fetcher_module, args.fetcher) @@ -115,16 +156,18 @@ def main(): asyncio.run( test_fetcher( fetcher_class=fetcher_class, - base_currency=args.base, - quote_currency=args.quote, + pairs=pairs, api_key=args.api_key, + is_future=args.future, ) ) except ImportError as e: - print(f"❌ Could not import fetcher class '{args.fetcher}', {e}") - except AttributeError: + print(f"❌ Could not import fetcher class '{args.fetcher}' from {import_path}") + print(f" Error: {str(e)}") + except AttributeError as e: print(f"❌ Could not find fetcher class '{args.fetcher}' in module") + print(f" Error: {str(e)}") except Exception as e: print(f"❌ Error: {type(e).__name__}: {str(e)}") diff --git a/price-pusher/config/config.example.yaml b/price-pusher/config/config.example.yaml index a878abcd..fd5ddf1e 100644 --- a/price-pusher/config/config.example.yaml +++ b/price-pusher/config/config.example.yaml @@ -2,7 +2,8 @@ spot: - BTC/USD - ETH/USD - - BROTHER/USDPLUS - + future: + - BTC/USD + - ETH/USD time_difference: 1 price_deviation: 0.025 \ No newline at end of file diff --git a/price-pusher/price_pusher/configs/fetchers.py b/price-pusher/price_pusher/configs/fetchers.py index 61a191c2..54813c3e 100644 --- a/price-pusher/price_pusher/configs/fetchers.py +++ b/price-pusher/price_pusher/configs/fetchers.py @@ -54,7 +54,5 @@ class FetcherWithApiKeyConfig: # Configuration for fetchers that may require API keys. FETCHERS_WITH_API_KEY: Dict[FetcherInterfaceT, FetcherWithApiKeyConfig] = { - DefillamaFetcher: FetcherWithApiKeyConfig( - env_api_key="DEFI_LLAMA_API_KEY", optional=True - ), + DefillamaFetcher: FetcherWithApiKeyConfig(env_api_key="DEFI_LLAMA_API_KEY", optional=True), } diff --git a/price-pusher/price_pusher/core/poller.py b/price-pusher/price_pusher/core/poller.py index b3fee0f3..07dfd4d5 100644 --- a/price-pusher/price_pusher/core/poller.py +++ b/price-pusher/price_pusher/core/poller.py @@ -72,7 +72,7 @@ async def _fetch_action(self) -> None: Call the fetcher client `fetch` function to try to retrieve all entries from fetchers. """ new_entries = await self.fetcher_client.fetch( - filter_exceptions=True, return_exceptions=True, timeout_duration=20 + filter_exceptions=True, return_exceptions=True, timeout_duration=3 ) return new_entries