diff --git a/protocol/daemons/pricefeed/client/price_fetcher/price_fetcher.go b/protocol/daemons/pricefeed/client/price_fetcher/price_fetcher.go index 8a8377b4dd..4f6f5760ed 100644 --- a/protocol/daemons/pricefeed/client/price_fetcher/price_fetcher.go +++ b/protocol/daemons/pricefeed/client/price_fetcher/price_fetcher.go @@ -3,7 +3,9 @@ package price_fetcher import ( "context" "fmt" + "github.com/cosmos/cosmos-sdk/telemetry" daemontypes "github.com/dydxprotocol/v4-chain/protocol/daemons/types" + "math/rand" "sync" "time" @@ -183,6 +185,27 @@ func (pf *PriceFetcher) RunTaskLoop(requestHandler daemontypes.RequestHandler) { } } +// emitMarketAvailabilityMetrics emits telemetry that tracks whether a market was available when queried on an exchange. +// Success is tracked by (market, exchange) so that we can track the availability of each market on each exchange. +func emitMarketAvailabilityMetrics(exchangeId types.ExchangeId, id types.MarketId, available bool) { + success := metrics.Success + if !available { + success = metrics.Error + } + telemetry.IncrCounterWithLabels( + []string{ + metrics.PricefeedDaemon, + metrics.PriceFetcherQueryForMarket, + success, + }, + 1, + []gometrics.Label{ + pricefeedmetrics.GetLabelForExchangeId(exchangeId), + pricefeedmetrics.GetLabelForMarketId(id), + }, + ) +} + // runSubTask makes a single query to an exchange for market prices. This query can be for 1 or // n markets. // For single market exchanges, a task loop execution will execute multiple runSubTask goroutines, where @@ -237,11 +260,28 @@ func (pf *PriceFetcher) runSubTask( taskLoopDefinition.marketExponents, ) + // Emit metrics at the `AvailableMarketsSampleRate`. + emitMetricsSample := rand.Float64() < metrics.AvailableMarketsSampleRate + if err != nil { pf.writeToBufferedChannel(exchangeId, nil, err) + + // Since the query failed, report all markets as unavailable, according to the sampling rate. + if emitMetricsSample { + for _, marketId := range marketIds { + emitMarketAvailabilityMetrics(exchangeId, marketId, false) + } + } + return } + // Track which markets were available when queried, and which were not, for telemetry. + availableMarkets := make(map[types.MarketId]bool, len(marketIds)) + for _, marketId := range marketIds { + availableMarkets[marketId] = false + } + for _, price := range prices { // No price should validly be zero. A price of zero points to an error in the API queried. if price.Price == uint64(0) { @@ -269,8 +309,18 @@ func (pf *PriceFetcher) runSubTask( price.LastUpdatedAt, ) + // Report market as available. + availableMarkets[price.MarketId] = true + pf.writeToBufferedChannel(exchangeId, price, err) } + + // Emit metrics on this exchange's market availability according to the sampling rate. + if emitMetricsSample { + for marketId, available := range availableMarkets { + emitMarketAvailabilityMetrics(exchangeId, marketId, available) + } + } } // writeToBufferedChannel writes the (price, error) generated during querying to the price fetcher's diff --git a/protocol/lib/metrics/constants.go b/protocol/lib/metrics/constants.go index 7e03df2b34..f3ac29a071 100644 --- a/protocol/lib/metrics/constants.go +++ b/protocol/lib/metrics/constants.go @@ -343,6 +343,7 @@ const ( MarketUpdaterUpdateMarkets = "market_updater_update_markets" PriceEncoderPriceConversion = "price_encoder_price_conversion" PriceFetcherQueryExchange = "price_fetcher_query_exchange" + PriceFetcherQueryForMarket = "price_fetcher_query_for_market_sampled" PriceFetcherSubtaskLoop = "price_fetcher_subtask_loop" PriceFetcherSubtaskLoopAndSetCtxTimeout = "price_fetcher_subtask_loop_and_set_ctx_timeout" PriceUpdateCount = "price_update_count" @@ -389,4 +390,7 @@ const ( ValidatorVolumeQuoteQuantums = "validator_volume_quote_quantums" ) -const LatencyMetricSampleRate = 0.01 +const ( + LatencyMetricSampleRate = 0.01 + AvailableMarketsSampleRate = .1 +)