Skip to content

Commit

Permalink
[CORE-354] - Add tracking for market availability in order to alert i…
Browse files Browse the repository at this point in the history
…n datadog (#706)
  • Loading branch information
clemire authored Oct 26, 2023
1 parent 32e52a5 commit eb1e497
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 1 deletion.
50 changes: 50 additions & 0 deletions protocol/daemons/pricefeed/client/price_fetcher/price_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ package price_fetcher
import (
"context"
"fmt"
"github.com/cosmos/cosmos-sdk/telemetry"
daemontypes "github.com/dydxprotocol/v4-chain/protocol/daemons/types"
"math/rand"
"sync"
"time"

Expand Down Expand Up @@ -183,6 +185,27 @@ func (pf *PriceFetcher) RunTaskLoop(requestHandler daemontypes.RequestHandler) {
}
}

// emitMarketAvailabilityMetrics emits telemetry that tracks whether a market was available when queried on an exchange.
// Success is tracked by (market, exchange) so that we can track the availability of each market on each exchange.
func emitMarketAvailabilityMetrics(exchangeId types.ExchangeId, id types.MarketId, available bool) {
success := metrics.Success
if !available {
success = metrics.Error
}
telemetry.IncrCounterWithLabels(
[]string{
metrics.PricefeedDaemon,
metrics.PriceFetcherQueryForMarket,
success,
},
1,
[]gometrics.Label{
pricefeedmetrics.GetLabelForExchangeId(exchangeId),
pricefeedmetrics.GetLabelForMarketId(id),
},
)
}

// runSubTask makes a single query to an exchange for market prices. This query can be for 1 or
// n markets.
// For single market exchanges, a task loop execution will execute multiple runSubTask goroutines, where
Expand Down Expand Up @@ -237,11 +260,28 @@ func (pf *PriceFetcher) runSubTask(
taskLoopDefinition.marketExponents,
)

// Emit metrics at the `AvailableMarketsSampleRate`.
emitMetricsSample := rand.Float64() < metrics.AvailableMarketsSampleRate

if err != nil {
pf.writeToBufferedChannel(exchangeId, nil, err)

// Since the query failed, report all markets as unavailable, according to the sampling rate.
if emitMetricsSample {
for _, marketId := range marketIds {
emitMarketAvailabilityMetrics(exchangeId, marketId, false)
}
}

return
}

// Track which markets were available when queried, and which were not, for telemetry.
availableMarkets := make(map[types.MarketId]bool, len(marketIds))
for _, marketId := range marketIds {
availableMarkets[marketId] = false
}

for _, price := range prices {
// No price should validly be zero. A price of zero points to an error in the API queried.
if price.Price == uint64(0) {
Expand Down Expand Up @@ -269,8 +309,18 @@ func (pf *PriceFetcher) runSubTask(
price.LastUpdatedAt,
)

// Report market as available.
availableMarkets[price.MarketId] = true

pf.writeToBufferedChannel(exchangeId, price, err)
}

// Emit metrics on this exchange's market availability according to the sampling rate.
if emitMetricsSample {
for marketId, available := range availableMarkets {
emitMarketAvailabilityMetrics(exchangeId, marketId, available)
}
}
}

// writeToBufferedChannel writes the (price, error) generated during querying to the price fetcher's
Expand Down
6 changes: 5 additions & 1 deletion protocol/lib/metrics/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,7 @@ const (
MarketUpdaterUpdateMarkets = "market_updater_update_markets"
PriceEncoderPriceConversion = "price_encoder_price_conversion"
PriceFetcherQueryExchange = "price_fetcher_query_exchange"
PriceFetcherQueryForMarket = "price_fetcher_query_for_market_sampled"
PriceFetcherSubtaskLoop = "price_fetcher_subtask_loop"
PriceFetcherSubtaskLoopAndSetCtxTimeout = "price_fetcher_subtask_loop_and_set_ctx_timeout"
PriceUpdateCount = "price_update_count"
Expand Down Expand Up @@ -389,4 +390,7 @@ const (
ValidatorVolumeQuoteQuantums = "validator_volume_quote_quantums"
)

const LatencyMetricSampleRate = 0.01
const (
LatencyMetricSampleRate = 0.01
AvailableMarketsSampleRate = .1
)

0 comments on commit eb1e497

Please sign in to comment.