From bf26d8407ef99ba1fedd98d0fe45976ba6c9439e Mon Sep 17 00:00:00 2001 From: Tejas Badadare Date: Wed, 13 Nov 2024 11:48:11 -0800 Subject: [PATCH 1/7] feat(observer): improve stall detection check --- .gitignore | 3 +- .python-version | 1 + README.md | 6 +- poetry.lock | 69 ++++++++++- pyproject.toml | 1 + pyth_observer/__init__.py | 11 +- pyth_observer/check/price_feed.py | 20 ++-- pyth_observer/check/publisher.py | 93 ++++++++++----- pyth_observer/check/stall_detection.py | 149 +++++++++++++++++++++++ pyth_observer/cli.py | 1 + pyth_observer/crosschain.py | 3 +- pyth_observer/event.py | 3 +- sample.config.yaml | 4 +- tests/test_checks_publisher.py | 159 +++++++++++++++++-------- 14 files changed, 427 insertions(+), 96 deletions(-) create mode 100644 .python-version create mode 100644 pyth_observer/check/stall_detection.py diff --git a/.gitignore b/.gitignore index 86a5a17..628452e 100644 --- a/.gitignore +++ b/.gitignore @@ -6,4 +6,5 @@ __pycache__/ .envrc .coverage -.env \ No newline at end of file +.env +.vscode/ \ No newline at end of file diff --git a/.python-version b/.python-version new file mode 100644 index 0000000..c8cfe39 --- /dev/null +++ b/.python-version @@ -0,0 +1 @@ +3.10 diff --git a/README.md b/README.md index dc01a66..7480b34 100644 --- a/README.md +++ b/README.md @@ -6,9 +6,13 @@ Observe Pyth on-chain price feeds and run sanity checks on the data. Container images are available at https://github.com/pyth-network/pyth-observer/pkgs/container/pyth-observer -To run Observer locally, make sure you have a recent version of [Poetry](https://python-poetry.org) installed and run: +To run Observer locally, you will need: +- Python 3.10 ([pyenv](https://github.com/pyenv/pyenv) is a nice way to manage Python installs, and once installed will automatically set the version to 3.10 for this project dir via the `.python-version` file). +- [Poetry](https://python-poetry.org), which handles package and virtualenv management. +Install dependencies and run the service: ```sh +$ poetry env use $(which python) # point Poetry to the pyenv python shim $ poetry install $ poetry run pyth-observer ``` diff --git a/poetry.lock b/poetry.lock index d74e01e..14ed276 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 1.8.2 and should not be changed by hand. +# This file is automatically @generated by Poetry 1.8.4 and should not be changed by hand. [[package]] name = "aiodns" @@ -1114,6 +1114,70 @@ files = [ [package.dependencies] setuptools = "*" +[[package]] +name = "numpy" +version = "2.1.3" +description = "Fundamental package for array computing in Python" +optional = false +python-versions = ">=3.10" +files = [ + {file = "numpy-2.1.3-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:c894b4305373b9c5576d7a12b473702afdf48ce5369c074ba304cc5ad8730dff"}, + {file = "numpy-2.1.3-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:b47fbb433d3260adcd51eb54f92a2ffbc90a4595f8970ee00e064c644ac788f5"}, + {file = "numpy-2.1.3-cp310-cp310-macosx_14_0_arm64.whl", hash = "sha256:825656d0743699c529c5943554d223c021ff0494ff1442152ce887ef4f7561a1"}, + {file = "numpy-2.1.3-cp310-cp310-macosx_14_0_x86_64.whl", hash = "sha256:6a4825252fcc430a182ac4dee5a505053d262c807f8a924603d411f6718b88fd"}, + {file = "numpy-2.1.3-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:e711e02f49e176a01d0349d82cb5f05ba4db7d5e7e0defd026328e5cfb3226d3"}, + {file = "numpy-2.1.3-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:78574ac2d1a4a02421f25da9559850d59457bac82f2b8d7a44fe83a64f770098"}, + {file = "numpy-2.1.3-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:c7662f0e3673fe4e832fe07b65c50342ea27d989f92c80355658c7f888fcc83c"}, + {file = "numpy-2.1.3-cp310-cp310-musllinux_1_2_aarch64.whl", hash = "sha256:fa2d1337dc61c8dc417fbccf20f6d1e139896a30721b7f1e832b2bb6ef4eb6c4"}, + {file = "numpy-2.1.3-cp310-cp310-win32.whl", hash = "sha256:72dcc4a35a8515d83e76b58fdf8113a5c969ccd505c8a946759b24e3182d1f23"}, + {file = "numpy-2.1.3-cp310-cp310-win_amd64.whl", hash = "sha256:ecc76a9ba2911d8d37ac01de72834d8849e55473457558e12995f4cd53e778e0"}, + {file = "numpy-2.1.3-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:4d1167c53b93f1f5d8a139a742b3c6f4d429b54e74e6b57d0eff40045187b15d"}, + {file = "numpy-2.1.3-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:c80e4a09b3d95b4e1cac08643f1152fa71a0a821a2d4277334c88d54b2219a41"}, + {file = "numpy-2.1.3-cp311-cp311-macosx_14_0_arm64.whl", hash = "sha256:576a1c1d25e9e02ed7fa5477f30a127fe56debd53b8d2c89d5578f9857d03ca9"}, + {file = "numpy-2.1.3-cp311-cp311-macosx_14_0_x86_64.whl", hash = "sha256:973faafebaae4c0aaa1a1ca1ce02434554d67e628b8d805e61f874b84e136b09"}, + {file = "numpy-2.1.3-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:762479be47a4863e261a840e8e01608d124ee1361e48b96916f38b119cfda04a"}, + {file = "numpy-2.1.3-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:bc6f24b3d1ecc1eebfbf5d6051faa49af40b03be1aaa781ebdadcbc090b4539b"}, + {file = "numpy-2.1.3-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:17ee83a1f4fef3c94d16dc1802b998668b5419362c8a4f4e8a491de1b41cc3ee"}, + {file = "numpy-2.1.3-cp311-cp311-musllinux_1_2_aarch64.whl", hash = "sha256:15cb89f39fa6d0bdfb600ea24b250e5f1a3df23f901f51c8debaa6a5d122b2f0"}, + {file = "numpy-2.1.3-cp311-cp311-win32.whl", hash = "sha256:d9beb777a78c331580705326d2367488d5bc473b49a9bc3036c154832520aca9"}, + {file = "numpy-2.1.3-cp311-cp311-win_amd64.whl", hash = "sha256:d89dd2b6da69c4fff5e39c28a382199ddedc3a5be5390115608345dec660b9e2"}, + {file = "numpy-2.1.3-cp312-cp312-macosx_10_13_x86_64.whl", hash = "sha256:f55ba01150f52b1027829b50d70ef1dafd9821ea82905b63936668403c3b471e"}, + {file = "numpy-2.1.3-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:13138eadd4f4da03074851a698ffa7e405f41a0845a6b1ad135b81596e4e9958"}, + {file = "numpy-2.1.3-cp312-cp312-macosx_14_0_arm64.whl", hash = "sha256:a6b46587b14b888e95e4a24d7b13ae91fa22386c199ee7b418f449032b2fa3b8"}, + {file = "numpy-2.1.3-cp312-cp312-macosx_14_0_x86_64.whl", hash = "sha256:0fa14563cc46422e99daef53d725d0c326e99e468a9320a240affffe87852564"}, + {file = "numpy-2.1.3-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:8637dcd2caa676e475503d1f8fdb327bc495554e10838019651b76d17b98e512"}, + {file = "numpy-2.1.3-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:2312b2aa89e1f43ecea6da6ea9a810d06aae08321609d8dc0d0eda6d946a541b"}, + {file = "numpy-2.1.3-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:a38c19106902bb19351b83802531fea19dee18e5b37b36454f27f11ff956f7fc"}, + {file = "numpy-2.1.3-cp312-cp312-musllinux_1_2_aarch64.whl", hash = "sha256:02135ade8b8a84011cbb67dc44e07c58f28575cf9ecf8ab304e51c05528c19f0"}, + {file = "numpy-2.1.3-cp312-cp312-win32.whl", hash = "sha256:e6988e90fcf617da2b5c78902fe8e668361b43b4fe26dbf2d7b0f8034d4cafb9"}, + {file = "numpy-2.1.3-cp312-cp312-win_amd64.whl", hash = "sha256:0d30c543f02e84e92c4b1f415b7c6b5326cbe45ee7882b6b77db7195fb971e3a"}, + {file = "numpy-2.1.3-cp313-cp313-macosx_10_13_x86_64.whl", hash = "sha256:96fe52fcdb9345b7cd82ecd34547fca4321f7656d500eca497eb7ea5a926692f"}, + {file = "numpy-2.1.3-cp313-cp313-macosx_11_0_arm64.whl", hash = "sha256:f653490b33e9c3a4c1c01d41bc2aef08f9475af51146e4a7710c450cf9761598"}, + {file = "numpy-2.1.3-cp313-cp313-macosx_14_0_arm64.whl", hash = "sha256:dc258a761a16daa791081d026f0ed4399b582712e6fc887a95af09df10c5ca57"}, + {file = "numpy-2.1.3-cp313-cp313-macosx_14_0_x86_64.whl", hash = "sha256:016d0f6f5e77b0f0d45d77387ffa4bb89816b57c835580c3ce8e099ef830befe"}, + {file = "numpy-2.1.3-cp313-cp313-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:c181ba05ce8299c7aa3125c27b9c2167bca4a4445b7ce73d5febc411ca692e43"}, + {file = "numpy-2.1.3-cp313-cp313-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:5641516794ca9e5f8a4d17bb45446998c6554704d888f86df9b200e66bdcce56"}, + {file = "numpy-2.1.3-cp313-cp313-musllinux_1_1_x86_64.whl", hash = "sha256:ea4dedd6e394a9c180b33c2c872b92f7ce0f8e7ad93e9585312b0c5a04777a4a"}, + {file = "numpy-2.1.3-cp313-cp313-musllinux_1_2_aarch64.whl", hash = "sha256:b0df3635b9c8ef48bd3be5f862cf71b0a4716fa0e702155c45067c6b711ddcef"}, + {file = "numpy-2.1.3-cp313-cp313-win32.whl", hash = "sha256:50ca6aba6e163363f132b5c101ba078b8cbd3fa92c7865fd7d4d62d9779ac29f"}, + {file = "numpy-2.1.3-cp313-cp313-win_amd64.whl", hash = "sha256:747641635d3d44bcb380d950679462fae44f54b131be347d5ec2bce47d3df9ed"}, + {file = "numpy-2.1.3-cp313-cp313t-macosx_10_13_x86_64.whl", hash = "sha256:996bb9399059c5b82f76b53ff8bb686069c05acc94656bb259b1d63d04a9506f"}, + {file = "numpy-2.1.3-cp313-cp313t-macosx_11_0_arm64.whl", hash = "sha256:45966d859916ad02b779706bb43b954281db43e185015df6eb3323120188f9e4"}, + {file = "numpy-2.1.3-cp313-cp313t-macosx_14_0_arm64.whl", hash = "sha256:baed7e8d7481bfe0874b566850cb0b85243e982388b7b23348c6db2ee2b2ae8e"}, + {file = "numpy-2.1.3-cp313-cp313t-macosx_14_0_x86_64.whl", hash = "sha256:a9f7f672a3388133335589cfca93ed468509cb7b93ba3105fce780d04a6576a0"}, + {file = "numpy-2.1.3-cp313-cp313t-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:d7aac50327da5d208db2eec22eb11e491e3fe13d22653dce51b0f4109101b408"}, + {file = "numpy-2.1.3-cp313-cp313t-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:4394bc0dbd074b7f9b52024832d16e019decebf86caf909d94f6b3f77a8ee3b6"}, + {file = "numpy-2.1.3-cp313-cp313t-musllinux_1_1_x86_64.whl", hash = "sha256:50d18c4358a0a8a53f12a8ba9d772ab2d460321e6a93d6064fc22443d189853f"}, + {file = "numpy-2.1.3-cp313-cp313t-musllinux_1_2_aarch64.whl", hash = "sha256:14e253bd43fc6b37af4921b10f6add6925878a42a0c5fe83daee390bca80bc17"}, + {file = "numpy-2.1.3-cp313-cp313t-win32.whl", hash = "sha256:08788d27a5fd867a663f6fc753fd7c3ad7e92747efc73c53bca2f19f8bc06f48"}, + {file = "numpy-2.1.3-cp313-cp313t-win_amd64.whl", hash = "sha256:2564fbdf2b99b3f815f2107c1bbc93e2de8ee655a69c261363a1172a79a257d4"}, + {file = "numpy-2.1.3-pp310-pypy310_pp73-macosx_10_15_x86_64.whl", hash = "sha256:4f2015dfe437dfebbfce7c85c7b53d81ba49e71ba7eadbf1df40c915af75979f"}, + {file = "numpy-2.1.3-pp310-pypy310_pp73-macosx_14_0_x86_64.whl", hash = "sha256:3522b0dfe983a575e6a9ab3a4a4dfe156c3e428468ff08ce582b9bb6bd1d71d4"}, + {file = "numpy-2.1.3-pp310-pypy310_pp73-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:c006b607a865b07cd981ccb218a04fc86b600411d83d6fc261357f1c0966755d"}, + {file = "numpy-2.1.3-pp310-pypy310_pp73-win_amd64.whl", hash = "sha256:e14e26956e6f1696070788252dcdff11b4aca4c3e8bd166e0df1bb8f315a67cb"}, + {file = "numpy-2.1.3.tar.gz", hash = "sha256:aa08e04e08aaf974d4458def539dece0d28146d866a39da5639596f4921fd761"}, +] + [[package]] name = "onecache" version = "0.3.1" @@ -1613,6 +1677,7 @@ files = [ {file = "PyYAML-6.0.1-cp311-cp311-win_amd64.whl", hash = "sha256:bf07ee2fef7014951eeb99f56f39c9bb4af143d8aa3c21b1677805985307da34"}, {file = "PyYAML-6.0.1-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:855fb52b0dc35af121542a76b9a84f8d1cd886ea97c84703eaa6d88e37a2ad28"}, {file = "PyYAML-6.0.1-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:40df9b996c2b73138957fe23a16a4f0ba614f4c0efce1e9406a184b6d07fa3a9"}, + {file = "PyYAML-6.0.1-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:a08c6f0fe150303c1c6b71ebcd7213c2858041a7e01975da3a99aed1e7a378ef"}, {file = "PyYAML-6.0.1-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:6c22bec3fbe2524cde73d7ada88f6566758a8f7227bfbf93a408a9d86bcc12a0"}, {file = "PyYAML-6.0.1-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:8d4e9c88387b0f5c7d5f281e55304de64cf7f9c0021a3525bd3b1c542da3b0e4"}, {file = "PyYAML-6.0.1-cp312-cp312-win32.whl", hash = "sha256:d483d2cdf104e7c9fa60c544d92981f12ad66a457afae824d146093b8c294c54"}, @@ -2038,4 +2103,4 @@ multidict = ">=4.0" [metadata] lock-version = "2.0" python-versions = "^3.10" -content-hash = "68214551a9dc797c9890b689836f2ecdeea8536596bdbfc467b3b08fb404cca0" +content-hash = "f7bfd1509c9fc61ae0cc624b8b0c4b8f127ed6c71bb5d3db6195af9d1b45f592" diff --git a/pyproject.toml b/pyproject.toml index 45597df..4a00f1c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -27,6 +27,7 @@ throttler = "1.2.1" types-pyyaml = "^6.0.12" types-pytz = "^2022.4.0.0" python-dotenv = "^1.0.1" +numpy = "^2.1.3" [tool.poetry.group.dev.dependencies] diff --git a/pyth_observer/__init__.py b/pyth_observer/__init__.py index 8fa1d98..4dad505 100644 --- a/pyth_observer/__init__.py +++ b/pyth_observer/__init__.py @@ -77,6 +77,7 @@ async def run(self): coingecko_prices, coingecko_updates = await self.get_coingecko_prices() crosschain_prices = await self.get_crosschain_prices() + failed_checks = [] for product in products: # Skip tombstone accounts with blank metadata if "base" not in product.attrs: @@ -159,10 +160,16 @@ async def run(self): ) ) - await self.dispatch.run(states) + cur_failed_checks = await self.dispatch.run(states) + if cur_failed_checks: + failed_checks.extend(cur_failed_checks) + if failed_checks: + logger.error(f"Failed checks: {len(failed_checks)}") + else: + logger.info("All checks passed") logger.debug("Sleeping...") - await asyncio.sleep(5) + await asyncio.sleep(30) async def get_pyth_products(self) -> List[PythProductAccount]: logger.debug("Fetching Pyth product accounts...") diff --git a/pyth_observer/check/price_feed.py b/pyth_observer/check/price_feed.py index cd8bbb0..76260ad 100644 --- a/pyth_observer/check/price_feed.py +++ b/pyth_observer/check/price_feed.py @@ -32,17 +32,13 @@ class PriceFeedState: @runtime_checkable class PriceFeedCheck(Protocol): - def __init__(self, state: PriceFeedState, config: PriceFeedCheckConfig): - ... + def __init__(self, state: PriceFeedState, config: PriceFeedCheckConfig): ... - def state(self) -> PriceFeedState: - ... + def state(self) -> PriceFeedState: ... - def run(self) -> bool: - ... + def run(self) -> bool: ... - def error_message(self) -> dict: - ... + def error_message(self) -> dict: ... class PriceFeedOfflineCheck(PriceFeedCheck): @@ -281,8 +277,8 @@ def error_message(self) -> dict: PRICE_FEED_CHECKS = [ - PriceFeedCoinGeckoCheck, - PriceFeedCrossChainDeviationCheck, - PriceFeedCrossChainOnlineCheck, - PriceFeedOfflineCheck, + # PriceFeedCoinGeckoCheck, + # PriceFeedCrossChainDeviationCheck, + # PriceFeedCrossChainOnlineCheck, + # PriceFeedOfflineCheck, ] diff --git a/pyth_observer/check/publisher.py b/pyth_observer/check/publisher.py index 2c451b5..49f8444 100644 --- a/pyth_observer/check/publisher.py +++ b/pyth_observer/check/publisher.py @@ -1,16 +1,35 @@ +from collections import defaultdict, deque import time -from dataclasses import dataclass +from dataclasses import asdict, dataclass from datetime import datetime -from typing import Dict, Protocol, runtime_checkable +from typing import Dict, List, Optional, Protocol, runtime_checkable from zoneinfo import ZoneInfo - +from loguru import logger from pythclient.calendar import is_market_open from pythclient.pythaccounts import PythPriceStatus from pythclient.solana import SolanaPublicKey + +@dataclass +class PriceUpdate: + """Represents a single price with its timestamp (epoch seconds).""" + + timestamp: int + price: float + + PUBLISHER_EXCLUSION_DISTANCE = 25 +PUBLISHER_CACHE_MAX_LEN = 30 +"""Roughly 30 mins of updates, since the check runs about once a minute""" -PUBLISHER_CACHE = {} +PUBLISHER_CACHE: Dict[tuple[str, str], List[PriceUpdate]] = defaultdict( + lambda: deque(maxlen=PUBLISHER_CACHE_MAX_LEN) +) +""" +Cache that holds tuples of (price, timestamp) for publisher/feed combos as they stream in. +Entries longer than `PUBLISHER_CACHE_MAX_LEN` are automatically pruned. +Used by the PublisherStalledCheck to detect stalls in prices. +""" @dataclass @@ -35,17 +54,13 @@ class PublisherState: @runtime_checkable class PublisherCheck(Protocol): - def __init__(self, state: PublisherState, config: PublisherCheckConfig): - ... + def __init__(self, state: PublisherState, config: PublisherCheckConfig): ... - def state(self) -> PublisherState: - ... + def state(self) -> PublisherState: ... - def run(self) -> bool: - ... + def run(self) -> bool: ... - def error_message(self) -> dict: - ... + def error_message(self) -> dict: ... class PublisherWithinAggregateConfidenceCheck(PublisherCheck): @@ -240,6 +255,20 @@ def __init__(self, state: PublisherState, config: PublisherCheckConfig): self.__abandoned_time_limit: int = int(config["abandoned_time_limit"]) self.__max_slot_distance: int = int(config["max_slot_distance"]) + from pyth_observer.check.stall_detection import ( + StallDetectionResult, + StallDetector, + ) # noqa: deferred import to avoid circular import + + self.__detector = StallDetector( + stall_time_limit=self.__stall_time_limit, + noise_threshold=float(config.get("noise_threshold")), + min_noise_samples=int(config.get("min_noise_samples")), + ) + + # Keep track of last analysis for error reporting + self.__last_analysis: Optional[StallDetectionResult] = None + def state(self) -> PublisherState: return self.__state @@ -254,36 +283,46 @@ def run(self) -> bool: distance = self.__state.latest_block_slot - self.__state.slot + # Pass for redemption rates because they are expected to be static for long periods + if self.__state.asset_type == "Crypto Redemption Rate": + logger.info(f"Redemption rate: Skipping {self.__state.symbol}") + return True + # Pass when publisher is offline because PublisherOfflineCheck will be triggered if distance >= self.__max_slot_distance: return True - publisher_key = (self.__state.publisher_name, self.__state.symbol) current_time = int(time.time()) - previous_price, last_change_time = PUBLISHER_CACHE.get( - publisher_key, (None, None) - ) - if previous_price is None or self.__state.price != previous_price: - PUBLISHER_CACHE[publisher_key] = (self.__state.price, current_time) - return True + publisher_key = (self.__state.publisher_name, self.__state.symbol) + PUBLISHER_CACHE[publisher_key].append( + PriceUpdate(current_time, self.__state.price) + ), + updates = PUBLISHER_CACHE[publisher_key] - time_since_last_change = current_time - last_change_time - if time_since_last_change > self.__stall_time_limit: - if time_since_last_change > self.__abandoned_time_limit: - return True # Abandon this check after the abandoned time limit - return False + # Analyze for stalls + result = self.__detector.analyze_updates(list(updates)) + logger.debug(f"Stall detection result: {result}") + + self.__last_analysis = result # For error logging + + # If we've been stalled for too long, abandon this check + if result.is_stalled and result.duration > self.__abandoned_time_limit: + return True - return True + return not result.is_stalled def error_message(self) -> dict: + stall_duration = f"{self.__last_analysis.duration:.1f} seconds" return { - "msg": f"{self.__state.publisher_name} has been publishing the same price for too long.", + "msg": f"{self.__state.publisher_name} has been publishing the same price of {self.__state.symbol} for {stall_duration}", "type": "PublisherStalledCheck", "publisher": self.__state.publisher_name, "symbol": self.__state.symbol, "price": self.__state.price, - "stall_duration": f"{int(time.time()) - PUBLISHER_CACHE[(self.__state.publisher_name, self.__state.symbol)][1]} seconds", + "stall_type": self.__last_analysis.stall_type, + "stall_duration": stall_duration, + "analysis": asdict(self.__last_analysis), } diff --git a/pyth_observer/check/stall_detection.py b/pyth_observer/check/stall_detection.py new file mode 100644 index 0000000..4aa6a05 --- /dev/null +++ b/pyth_observer/check/stall_detection.py @@ -0,0 +1,149 @@ +from dataclasses import dataclass +from typing import List, Optional + +import numpy as np + +from pyth_observer.check.publisher import PriceUpdate + + +@dataclass +class StallDetectionResult: + """Results from stall detection analysis.""" + + is_stalled: bool + stall_type: Optional[ + str + ] # 'exact' for identical values, 'noisy' for artificial noise + base_price: Optional[float] + noise_magnitude: Optional[float] + duration: float # how long the price has been stalled + confidence: float + + @classmethod + def no_stall(cls) -> "StallDetectionResult": + """Create a StallDetectionResult instance indicating no stall detected.""" + return cls( + is_stalled=False, + stall_type=None, + base_price=None, + noise_magnitude=None, + duration=0.0, + confidence=0.0, + ) + + +class StallDetector: + """ + Detects price staleness by identifying both exact price repeats and artificial noise patterns. + + The detection strategy is based on the intuition that meaningful price movements must exceed + some minimum relative threshold. If a price very slightly fluctuates but doesn't exceed the + `noise_threshold` within `stall_time_limit`, then it's likely that it's just a static price + with artificial noise thrown in. + + Detection Logic: + 1. Exact Stalls: Prices within `stall_time_limit` are exactly equal (within float precision) + 2. Noisy Stalls: All price variations stay within a tiny relative `noise_threshold` (default 0.01%) + for longer than `stall_time_limit`. + + The `noise_threshold` (default 1e-4 or 0.01%) strategy is chosen because: + - Real price movements, even for very stable symbols, should exceed this threshold. + - Hard to circumvent. Avoiding detection would require larger variations, impacting the publisher's + price accuracy versus the aggregate. + - The threshold is relative to the base price, making it work across different price scales. + - It works across different noise patterns (random, sine wave, structured, etc.) + + Example: + - A $100 base price with all variations within ±$0.01 (0.01%) for 2+ minutes is likely stalled + - Natural price movements would occasionally exceed this tiny threshold + - Variations this small consistently over time suggest artificial noise + """ + + def __init__( + self, + stall_time_limit: float, + noise_threshold: float = 1e-4, + min_noise_samples: int = 5, + ): + """ + Initialize stall detector. + + Args: + stall_time_limit: Time in seconds before price is considered stalled + noise_threshold: Maximum relative noise magnitude (e.g., 1e-4 for 0.01%) + min_noise_updates: Minimum number of updates needed for noise detection + (doesn't apply to exact stall detection) + """ + self.stall_time_limit = stall_time_limit + self.noise_threshold = noise_threshold + self.min_noise_samples = min_noise_samples + + def analyze_updates(self, updates: List[PriceUpdate]) -> StallDetectionResult: + """ + Assumes that the cache has been recently updated since it takes the latest + cached timestamp as the current time. + + Args: + updates: List of price updates to analyze + + Returns: + StallDetectionResult with detection details + """ + # Need at least 2 samples + if not updates or len(updates) < 2: + return StallDetectionResult.no_stall() + + ## Check for exact stall + + # The latest 2 updates are sufficient to detect an exact stall + latest_updates = updates[-2:] + duration = latest_updates[1].timestamp - latest_updates[0].timestamp + if duration <= self.stall_time_limit: + return StallDetectionResult.no_stall() + elif latest_updates[1].price == latest_updates[0].price: + return StallDetectionResult( + is_stalled=True, + stall_type="exact", + base_price=latest_updates[1].price, + noise_magnitude=0.0, + duration=duration, + confidence=1.0, + ) + + ## Check for stalled price with artificial noise added in + + # Calculate relative deviations from base price + prices = np.array([u.price for u in updates]) + base_price = np.median(prices) + + if base_price == 0: + # Avoid division by zero + return StallDetectionResult.no_stall() + + relative_deviations = np.abs(prices - base_price) / abs(base_price) + max_relative_deviation = np.max(relative_deviations) + + # Check for artificial noise (variations below threshold) + if len(updates) < self.min_noise_samples: + # We need multiple samples to detect noise, pass until we have enough + return StallDetectionResult.no_stall() + + if max_relative_deviation <= self.noise_threshold: + confidence = 1.0 - (max_relative_deviation / self.noise_threshold) + return StallDetectionResult( + is_stalled=True, + stall_type="noisy", + base_price=base_price, + noise_magnitude=max_relative_deviation * base_price, + duration=duration, + confidence=confidence, + ) + + return StallDetectionResult( + is_stalled=False, + stall_type=None, + base_price=base_price, + noise_magnitude=max_relative_deviation * base_price, + duration=duration, + confidence=0.0, + ) diff --git a/pyth_observer/cli.py b/pyth_observer/cli.py index 1df6aed..b0e9cab 100644 --- a/pyth_observer/cli.py +++ b/pyth_observer/cli.py @@ -67,6 +67,7 @@ def run(config, publishers, coingecko_mapping, prometheus_port): logger.remove() logger.add( sys.stdout, + colorize=(os.environ.get("DEV_MODE")), serialize=(not os.environ.get("DEV_MODE")), level=os.environ.get("LOG_LEVEL", "INFO"), ) diff --git a/pyth_observer/crosschain.py b/pyth_observer/crosschain.py index ca08435..dd1b0b3 100644 --- a/pyth_observer/crosschain.py +++ b/pyth_observer/crosschain.py @@ -48,7 +48,8 @@ async def get_crosschain_prices(self) -> Dict[str, CrosschainPrice]: async with session.get( price_feeds_url + query_string, ) as response: - price_feeds += await response.json() + response_json = await response.json() + price_feeds.extend(response_json["parsed"]) # Return a dictionary of id -> {price, conf, expo} for fast lookup return { diff --git a/pyth_observer/event.py b/pyth_observer/event.py index 3116a5d..a9bf7f5 100644 --- a/pyth_observer/event.py +++ b/pyth_observer/event.py @@ -27,8 +27,7 @@ class Event(Protocol): check: Check context: Context - async def send(self): - ... + async def send(self): ... class DatadogEvent(Event): diff --git a/sample.config.yaml b/sample.config.yaml index fba7ecf..c93b3d5 100644 --- a/sample.config.yaml +++ b/sample.config.yaml @@ -54,9 +54,11 @@ checks: resolution_threshold: 1 PublisherStalledCheck: enable: false - stall_time_limit: 30 + stall_time_limit: 120 abandoned_time_limit: 300 max_slot_distance: 25 + noise_threshold: 0.0001 + min_noise_samples: 10 alert_threshold: 1 resolution_threshold: 0 # Per-symbol config diff --git a/tests/test_checks_publisher.py b/tests/test_checks_publisher.py index 38365bc..c9fc638 100644 --- a/tests/test_checks_publisher.py +++ b/tests/test_checks_publisher.py @@ -1,29 +1,35 @@ +import random import time from unittest.mock import patch +import numpy as np +import pytest from pythclient.pythaccounts import PythPriceStatus from pythclient.solana import SolanaPublicKey from pyth_observer.check.publisher import ( PUBLISHER_CACHE, + PriceUpdate, PublisherPriceCheck, PublisherStalledCheck, PublisherState, ) -def make_state( +def make_publisher_state( pub_slot: int, pub_price: float, pub_conf: float, agg_slot: int, agg_price: float, agg_conf: float, + asset_type: str = "Crypto", + symbol: str = "Crypto.BTC/USD", ) -> PublisherState: return PublisherState( publisher_name="publisher", - symbol="Crypto.BTC/USD", - asset_type="Crypto", + symbol=symbol, + asset_type=asset_type, public_key=SolanaPublicKey("2hgu6Umyokvo8FfSDdMa9nDKhcdv9Q4VvGNhRCeSWeD3"), status=PythPriceStatus.TRADING, aggregate_status=PythPriceStatus.TRADING, @@ -50,64 +56,123 @@ def check_is_ok( ).run() # check triggering threshold for price difference - state1 = make_state(1, 100.0, 2.0, 1, 110.0, 1.0) + state1 = make_publisher_state(1, 100.0, 2.0, 1, 110.0, 1.0) assert check_is_ok(state1, 10, 25) assert not check_is_ok(state1, 6, 25) -def test_publisher_stalled_check(): - current_time = time.time() - - def simulate_time_pass(seconds): - nonlocal current_time - current_time += seconds - return current_time - - def setup_check(state, stall_time_limit, abandoned_time_limit, max_slot_distance): +class TestPublisherStalledCheck: + @pytest.fixture(autouse=True) + def setup(self): + """Clear cache and time simulation before each test""" + PUBLISHER_CACHE.clear() + self.current_time = int(time.time()) + yield + PUBLISHER_CACHE.clear() + + def simulate_time_pass(self, seconds: float) -> float: + self.current_time += seconds + return self.current_time + + def setup_check( + self, + state: PublisherState, + stall_time_limit: int = 5, + abandoned_time_limit: int = 25, + max_slot_distance: int = 25, + noise_threshold: float = 1e-4, + min_noise_samples: int = 10, + ) -> PublisherStalledCheck: check = PublisherStalledCheck( state, { "stall_time_limit": stall_time_limit, "abandoned_time_limit": abandoned_time_limit, "max_slot_distance": max_slot_distance, + "noise_threshold": noise_threshold, + "min_noise_samples": min_noise_samples, }, ) - PUBLISHER_CACHE[(state.publisher_name, state.symbol)] = ( - state.price, - current_time, + + # Seed the cache with the publisher state + PUBLISHER_CACHE[(state.publisher_name, state.symbol)].append( + PriceUpdate(self.current_time, state.price) ) + return check - def run_check(check, seconds, expected): - with patch("time.time", new=lambda: simulate_time_pass(seconds)): + def run_check(self, check: PublisherStalledCheck, seconds: float, expected: bool): + with patch("time.time", new=lambda: self.simulate_time_pass(seconds)): assert check.run() == expected - PUBLISHER_CACHE.clear() - state_a = make_state(1, 100.0, 2.0, 1, 100.0, 1.0) - check_a = setup_check(state_a, 5, 25, 25) - run_check(check_a, 5, True) # Should pass as it hits the limit exactly - - PUBLISHER_CACHE.clear() - state_b = make_state(1, 100.0, 2.0, 1, 100.0, 1.0) - check_b = setup_check(state_b, 5, 25, 25) - run_check(check_b, 6, False) # Should fail as it exceeds the limit - - PUBLISHER_CACHE.clear() - state_c = make_state(1, 100.0, 2.0, 1, 100.0, 1.0) - check_c = setup_check(state_c, 5, 25, 25) - run_check(check_c, 2, True) # Initial check should pass - state_c.price = 105.0 # Change the price - run_check(check_c, 3, True) # Should pass as price changes - state_c.price = 100.0 # Change back to original price - run_check(check_c, 4, True) # Should pass as price changes - run_check( - check_c, 8, False - ) # Should fail as price stalls for too long after last change - - # Adding a check for when the publisher is offline - PUBLISHER_CACHE.clear() - state_d = make_state(1, 100.0, 2.0, 1, 100.0, 1.0) - state_d.latest_block_slot = 25 - state_d.slot = 0 - check_d = setup_check(state_d, 5, 25, 25) - run_check(check_d, 10, True) # Should pass as the publisher is offline + def test_exact_stall_fails_check(self): + state_a = make_publisher_state(1, 100.0, 2.0, 1, 100.0, 1.0) + check_a = self.setup_check(state_a, stall_time_limit=5) + self.run_check(check_a, 5, True) # Should pass as it hits the limit exactly + + PUBLISHER_CACHE.clear() + state_b = make_publisher_state(1, 100.0, 2.0, 1, 100.0, 1.0) + check_b = self.setup_check(state_b, stall_time_limit=5) + self.run_check(check_b, 6, False) # Should fail as it exceeds the limit + + PUBLISHER_CACHE.clear() + state_c = make_publisher_state(1, 100.0, 2.0, 1, 100.0, 1.0) + check_c = self.setup_check(state_c, stall_time_limit=5) + self.run_check(check_c, 2, True) # Initial check should pass + state_c.price = 105.0 # Change the price + self.run_check(check_c, 3, True) # Should pass as price changes + state_c.price = 100.0 # Change back to original price + self.run_check(check_c, 4, True) # Should pass as price changes + self.run_check( + check_c, 8, False + ) # Should fail as price stalls for too long after last change + + # Adding a check for when the publisher is offline + PUBLISHER_CACHE.clear() + state_d = make_publisher_state(1, 100.0, 2.0, 1, 100.0, 1.0) + state_d.latest_block_slot = 25 + state_d.slot = 0 + check_d = self.setup_check(state_d, 5, 25, 25) + self.run_check(check_d, 10, True) # Should pass as the publisher is offline + + def test_artificially_noisy_stall_fails_check(self): + """Test detection of stalls with artificial noise""" + state = make_publisher_state(1, 100.0, 2.0, 1, 100.0, 1.0) + check = self.setup_check(state, stall_time_limit=50, min_noise_samples=10) + + # Add prices with small artificial noise, exceeding stall_time_limit and min_noise_updates + for seconds in range(0, 55, 5): + noise = state.price * ( + 1e-6 * (random.random() - 0.5) + ) # Random noise within ±1e-4% + state.price = 100.0 + noise + # Should fail after 50 seconds and 10 samples + self.run_check(check, 30, seconds < 55) + + def test_normal_price_movement_passes_check(self): + """Test that normal price movements don't trigger stall detection""" + state = make_publisher_state(1, 100.0, 2.0, 1, 100.0, 1.0) + check = self.setup_check(state, stall_time_limit=50, min_noise_samples=10) + + # Add prices with significant variations to simulate real + # price movements, exceeding stall_time_limit and min_noise_updates + for seconds in range(0, 55, 5): + state.price = 100.0 + (seconds * 0.001) # 0.1% change each time + self.run_check(check, 30, True) # Should always pass + + def test_redemption_rate_passes_check(self): + """Test that redemption rates are always allowed to be static""" + state = make_publisher_state( + 1, + 100.0, + 2.0, + 1, + 100.0, + 1.0, + asset_type="Crypto Redemption Rate", + symbol="Crypto.FUSDC/USDC.RR", + ) + check = self.setup_check(state, self.current_time) + + # Should pass even after long period without changes + self.run_check(check, 3600, True) # 1 hour From d39685c011b2a1669b7e2e811b3b28a4371af83e Mon Sep 17 00:00:00 2001 From: Tejas Badadare Date: Wed, 13 Nov 2024 11:53:31 -0800 Subject: [PATCH 2/7] fix(observer): revert changes made for local debugging --- pyth_observer/__init__.py | 11 ++--------- pyth_observer/check/price_feed.py | 8 ++++---- 2 files changed, 6 insertions(+), 13 deletions(-) diff --git a/pyth_observer/__init__.py b/pyth_observer/__init__.py index 4dad505..8fa1d98 100644 --- a/pyth_observer/__init__.py +++ b/pyth_observer/__init__.py @@ -77,7 +77,6 @@ async def run(self): coingecko_prices, coingecko_updates = await self.get_coingecko_prices() crosschain_prices = await self.get_crosschain_prices() - failed_checks = [] for product in products: # Skip tombstone accounts with blank metadata if "base" not in product.attrs: @@ -160,16 +159,10 @@ async def run(self): ) ) - cur_failed_checks = await self.dispatch.run(states) - if cur_failed_checks: - failed_checks.extend(cur_failed_checks) + await self.dispatch.run(states) - if failed_checks: - logger.error(f"Failed checks: {len(failed_checks)}") - else: - logger.info("All checks passed") logger.debug("Sleeping...") - await asyncio.sleep(30) + await asyncio.sleep(5) async def get_pyth_products(self) -> List[PythProductAccount]: logger.debug("Fetching Pyth product accounts...") diff --git a/pyth_observer/check/price_feed.py b/pyth_observer/check/price_feed.py index 76260ad..5b87805 100644 --- a/pyth_observer/check/price_feed.py +++ b/pyth_observer/check/price_feed.py @@ -277,8 +277,8 @@ def error_message(self) -> dict: PRICE_FEED_CHECKS = [ - # PriceFeedCoinGeckoCheck, - # PriceFeedCrossChainDeviationCheck, - # PriceFeedCrossChainOnlineCheck, - # PriceFeedOfflineCheck, + PriceFeedCoinGeckoCheck, + PriceFeedCrossChainDeviationCheck, + PriceFeedCrossChainOnlineCheck, + PriceFeedOfflineCheck, ] From 7e34307b71d820e2fe974c38c0bea605e02a588d Mon Sep 17 00:00:00 2001 From: Tejas Badadare Date: Wed, 13 Nov 2024 13:33:22 -0800 Subject: [PATCH 3/7] fix(observer): fix pyright errors --- pyth_observer/check/price_feed.py | 12 ++++++++---- pyth_observer/check/publisher.py | 26 ++++++++++++-------------- pyth_observer/check/stall_detection.py | 10 +++++----- pyth_observer/cli.py | 1 - pyth_observer/event.py | 3 ++- tests/test_checks_publisher.py | 1 - 6 files changed, 27 insertions(+), 26 deletions(-) diff --git a/pyth_observer/check/price_feed.py b/pyth_observer/check/price_feed.py index 5b87805..cd8bbb0 100644 --- a/pyth_observer/check/price_feed.py +++ b/pyth_observer/check/price_feed.py @@ -32,13 +32,17 @@ class PriceFeedState: @runtime_checkable class PriceFeedCheck(Protocol): - def __init__(self, state: PriceFeedState, config: PriceFeedCheckConfig): ... + def __init__(self, state: PriceFeedState, config: PriceFeedCheckConfig): + ... - def state(self) -> PriceFeedState: ... + def state(self) -> PriceFeedState: + ... - def run(self) -> bool: ... + def run(self) -> bool: + ... - def error_message(self) -> dict: ... + def error_message(self) -> dict: + ... class PriceFeedOfflineCheck(PriceFeedCheck): diff --git a/pyth_observer/check/publisher.py b/pyth_observer/check/publisher.py index 49f8444..b05cf72 100644 --- a/pyth_observer/check/publisher.py +++ b/pyth_observer/check/publisher.py @@ -22,9 +22,7 @@ class PriceUpdate: PUBLISHER_CACHE_MAX_LEN = 30 """Roughly 30 mins of updates, since the check runs about once a minute""" -PUBLISHER_CACHE: Dict[tuple[str, str], List[PriceUpdate]] = defaultdict( - lambda: deque(maxlen=PUBLISHER_CACHE_MAX_LEN) -) +PUBLISHER_CACHE = defaultdict(lambda: deque(maxlen=PUBLISHER_CACHE_MAX_LEN)) """ Cache that holds tuples of (price, timestamp) for publisher/feed combos as they stream in. Entries longer than `PUBLISHER_CACHE_MAX_LEN` are automatically pruned. @@ -54,13 +52,17 @@ class PublisherState: @runtime_checkable class PublisherCheck(Protocol): - def __init__(self, state: PublisherState, config: PublisherCheckConfig): ... + def __init__(self, state: PublisherState, config: PublisherCheckConfig): + ... - def state(self) -> PublisherState: ... + def state(self) -> PublisherState: + ... - def run(self) -> bool: ... + def run(self) -> bool: + ... - def error_message(self) -> dict: ... + def error_message(self) -> dict: + ... class PublisherWithinAggregateConfidenceCheck(PublisherCheck): @@ -256,19 +258,15 @@ def __init__(self, state: PublisherState, config: PublisherCheckConfig): self.__max_slot_distance: int = int(config["max_slot_distance"]) from pyth_observer.check.stall_detection import ( - StallDetectionResult, StallDetector, ) # noqa: deferred import to avoid circular import self.__detector = StallDetector( stall_time_limit=self.__stall_time_limit, - noise_threshold=float(config.get("noise_threshold")), - min_noise_samples=int(config.get("min_noise_samples")), + noise_threshold=float(config["noise_threshold"]), + min_noise_samples=int(config["min_noise_samples"]), ) - # Keep track of last analysis for error reporting - self.__last_analysis: Optional[StallDetectionResult] = None - def state(self) -> PublisherState: return self.__state @@ -297,7 +295,7 @@ def run(self) -> bool: publisher_key = (self.__state.publisher_name, self.__state.symbol) PUBLISHER_CACHE[publisher_key].append( PriceUpdate(current_time, self.__state.price) - ), + ) updates = PUBLISHER_CACHE[publisher_key] # Analyze for stalls diff --git a/pyth_observer/check/stall_detection.py b/pyth_observer/check/stall_detection.py index 4aa6a05..cfafa50 100644 --- a/pyth_observer/check/stall_detection.py +++ b/pyth_observer/check/stall_detection.py @@ -133,17 +133,17 @@ def analyze_updates(self, updates: List[PriceUpdate]) -> StallDetectionResult: return StallDetectionResult( is_stalled=True, stall_type="noisy", - base_price=base_price, - noise_magnitude=max_relative_deviation * base_price, + base_price=float(base_price), + noise_magnitude=float(max_relative_deviation * base_price), duration=duration, - confidence=confidence, + confidence=float(confidence), ) return StallDetectionResult( is_stalled=False, stall_type=None, - base_price=base_price, - noise_magnitude=max_relative_deviation * base_price, + base_price=float(base_price), + noise_magnitude=float(max_relative_deviation * base_price), duration=duration, confidence=0.0, ) diff --git a/pyth_observer/cli.py b/pyth_observer/cli.py index b0e9cab..1df6aed 100644 --- a/pyth_observer/cli.py +++ b/pyth_observer/cli.py @@ -67,7 +67,6 @@ def run(config, publishers, coingecko_mapping, prometheus_port): logger.remove() logger.add( sys.stdout, - colorize=(os.environ.get("DEV_MODE")), serialize=(not os.environ.get("DEV_MODE")), level=os.environ.get("LOG_LEVEL", "INFO"), ) diff --git a/pyth_observer/event.py b/pyth_observer/event.py index a9bf7f5..3116a5d 100644 --- a/pyth_observer/event.py +++ b/pyth_observer/event.py @@ -27,7 +27,8 @@ class Event(Protocol): check: Check context: Context - async def send(self): ... + async def send(self): + ... class DatadogEvent(Event): diff --git a/tests/test_checks_publisher.py b/tests/test_checks_publisher.py index c9fc638..7af8750 100644 --- a/tests/test_checks_publisher.py +++ b/tests/test_checks_publisher.py @@ -2,7 +2,6 @@ import time from unittest.mock import patch -import numpy as np import pytest from pythclient.pythaccounts import PythPriceStatus from pythclient.solana import SolanaPublicKey From bba309bc2e868c880bd4fde67e2d6f18961bb135 Mon Sep 17 00:00:00 2001 From: Tejas Badadare Date: Wed, 13 Nov 2024 17:41:08 -0800 Subject: [PATCH 4/7] fix(observer): fix ci and pr comments --- pyth_observer/check/publisher.py | 9 +++++---- pyth_observer/check/stall_detection.py | 2 +- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/pyth_observer/check/publisher.py b/pyth_observer/check/publisher.py index b05cf72..fd3470d 100644 --- a/pyth_observer/check/publisher.py +++ b/pyth_observer/check/publisher.py @@ -1,9 +1,10 @@ -from collections import defaultdict, deque import time +from collections import defaultdict, deque from dataclasses import asdict, dataclass from datetime import datetime -from typing import Dict, List, Optional, Protocol, runtime_checkable +from typing import Dict, Protocol, runtime_checkable from zoneinfo import ZoneInfo + from loguru import logger from pythclient.calendar import is_market_open from pythclient.pythaccounts import PythPriceStatus @@ -257,9 +258,9 @@ def __init__(self, state: PublisherState, config: PublisherCheckConfig): self.__abandoned_time_limit: int = int(config["abandoned_time_limit"]) self.__max_slot_distance: int = int(config["max_slot_distance"]) - from pyth_observer.check.stall_detection import ( + from pyth_observer.check.stall_detection import ( # noqa: deferred import to avoid circular import StallDetector, - ) # noqa: deferred import to avoid circular import + ) self.__detector = StallDetector( stall_time_limit=self.__stall_time_limit, diff --git a/pyth_observer/check/stall_detection.py b/pyth_observer/check/stall_detection.py index cfafa50..56135cb 100644 --- a/pyth_observer/check/stall_detection.py +++ b/pyth_observer/check/stall_detection.py @@ -28,7 +28,7 @@ def no_stall(cls) -> "StallDetectionResult": base_price=None, noise_magnitude=None, duration=0.0, - confidence=0.0, + confidence=1.0, ) From fdb4397702899cc978d16663398c417cb92aa37a Mon Sep 17 00:00:00 2001 From: Tejas Badadare Date: Wed, 13 Nov 2024 17:50:07 -0800 Subject: [PATCH 5/7] chore(observer): remove unneeded log --- pyth_observer/check/publisher.py | 1 - 1 file changed, 1 deletion(-) diff --git a/pyth_observer/check/publisher.py b/pyth_observer/check/publisher.py index fd3470d..39e6676 100644 --- a/pyth_observer/check/publisher.py +++ b/pyth_observer/check/publisher.py @@ -284,7 +284,6 @@ def run(self) -> bool: # Pass for redemption rates because they are expected to be static for long periods if self.__state.asset_type == "Crypto Redemption Rate": - logger.info(f"Redemption rate: Skipping {self.__state.symbol}") return True # Pass when publisher is offline because PublisherOfflineCheck will be triggered From 33a149e2effdc587e96ba180199245f73c2f6d3e Mon Sep 17 00:00:00 2001 From: Tejas Badadare Date: Fri, 15 Nov 2024 09:36:46 -0800 Subject: [PATCH 6/7] fix(observer): fix exact stall, add test --- pyth_observer/check/publisher.py | 12 ++++++++---- pyth_observer/check/stall_detection.py | 17 ++++++++++------- tests/test_checks_publisher.py | 18 ++++++++++++++++++ 3 files changed, 36 insertions(+), 11 deletions(-) diff --git a/pyth_observer/check/publisher.py b/pyth_observer/check/publisher.py index 39e6676..6a20ad4 100644 --- a/pyth_observer/check/publisher.py +++ b/pyth_observer/check/publisher.py @@ -293,13 +293,17 @@ def run(self) -> bool: current_time = int(time.time()) publisher_key = (self.__state.publisher_name, self.__state.symbol) - PUBLISHER_CACHE[publisher_key].append( - PriceUpdate(current_time, self.__state.price) - ) updates = PUBLISHER_CACHE[publisher_key] + # Only cache new prices, let repeated prices grow stale. + # These will be caught as an exact stall in the detector. + is_repeated_price = updates and updates[-1].price == self.__state.price + cur_update = PriceUpdate(current_time, self.__state.price) + if not is_repeated_price: + PUBLISHER_CACHE[publisher_key].append(cur_update) + # Analyze for stalls - result = self.__detector.analyze_updates(list(updates)) + result = self.__detector.analyze_updates(list(updates), cur_update) logger.debug(f"Stall detection result: {result}") self.__last_analysis = result # For error logging diff --git a/pyth_observer/check/stall_detection.py b/pyth_observer/check/stall_detection.py index 56135cb..8fc1197 100644 --- a/pyth_observer/check/stall_detection.py +++ b/pyth_observer/check/stall_detection.py @@ -78,33 +78,36 @@ def __init__( self.noise_threshold = noise_threshold self.min_noise_samples = min_noise_samples - def analyze_updates(self, updates: List[PriceUpdate]) -> StallDetectionResult: + def analyze_updates( + self, updates: List[PriceUpdate], cur_update: PriceUpdate + ) -> StallDetectionResult: """ Assumes that the cache has been recently updated since it takes the latest cached timestamp as the current time. Args: updates: List of price updates to analyze + cur_update: The update currently being processed. If it's a repeated price, + the update won't be in `updates`, so we need it as a separate parameter. Returns: StallDetectionResult with detection details """ - # Need at least 2 samples - if not updates or len(updates) < 2: + # Need at least 1 sample + if not updates: return StallDetectionResult.no_stall() ## Check for exact stall # The latest 2 updates are sufficient to detect an exact stall - latest_updates = updates[-2:] - duration = latest_updates[1].timestamp - latest_updates[0].timestamp + duration = cur_update.timestamp - updates[-1].timestamp if duration <= self.stall_time_limit: return StallDetectionResult.no_stall() - elif latest_updates[1].price == latest_updates[0].price: + elif cur_update.price == updates[-1].price: return StallDetectionResult( is_stalled=True, stall_type="exact", - base_price=latest_updates[1].price, + base_price=cur_update.price, noise_magnitude=0.0, duration=duration, confidence=1.0, diff --git a/tests/test_checks_publisher.py b/tests/test_checks_publisher.py index 7af8750..4676b1f 100644 --- a/tests/test_checks_publisher.py +++ b/tests/test_checks_publisher.py @@ -114,6 +114,24 @@ def test_exact_stall_fails_check(self): check_b = self.setup_check(state_b, stall_time_limit=5) self.run_check(check_b, 6, False) # Should fail as it exceeds the limit + PUBLISHER_CACHE.clear() + state_c = make_publisher_state(1, 100.0, 2.0, 1, 100.0, 1.0) + check_c = self.setup_check(state_c, stall_time_limit=5) + self.run_check(check_c, 2, True) # Initial check should pass + state_c.price = 105.0 # Change the price + self.run_check(check_c, 3, True) # Should pass as price changes + state_c.price = 100.0 # Change back to original price + # Simulate a stall -- send the same price repeatedly. + self.run_check(check_c, 2, True) + state_c.price = 100.0 + self.run_check(check_c, 2, True) + state_c.price = 100.0 + self.run_check(check_c, 2, True) + state_c.price = 100.0 + self.run_check( + check_c, 2, False + ) # Should fail since we breached the stall time limit + PUBLISHER_CACHE.clear() state_c = make_publisher_state(1, 100.0, 2.0, 1, 100.0, 1.0) check_c = self.setup_check(state_c, stall_time_limit=5) From c700390de8f15aa000495bc59a88a652e82c88dc Mon Sep 17 00:00:00 2001 From: Tejas Badadare Date: Fri, 15 Nov 2024 09:43:14 -0800 Subject: [PATCH 7/7] bump version --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 4a00f1c..6be8b0e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ ignore_missing_imports = true [tool.poetry] name = "pyth-observer" -version = "0.2.15" +version = "0.3.0" description = "Alerts and stuff" authors = [] readme = "README.md"