From ee51cdef06a8f96c6553f5bcaa20895ed67c9b27 Mon Sep 17 00:00:00 2001 From: bingyuyap Date: Tue, 3 Sep 2024 03:38:17 +0800 Subject: [PATCH] hum: fix negative missing observations hum: improve decrement missing obs hum: cleanup Signed-off-by: bingyuyap --- fly/cmd/historical_uptime/main.go | 41 +++++++++++++++++++++++----- fly/pkg/historical_uptime/helpers.go | 35 ++++++++++++++++++++++-- 2 files changed, 67 insertions(+), 9 deletions(-) diff --git a/fly/cmd/historical_uptime/main.go b/fly/cmd/historical_uptime/main.go index 8b877131..ca55590d 100644 --- a/fly/cmd/historical_uptime/main.go +++ b/fly/cmd/historical_uptime/main.go @@ -92,10 +92,8 @@ var ( const PYTHNET_CHAIN_ID = int(vaa.ChainIDPythNet) -var ( - // guardianChainHeights indexes current chain height by chain id and guardian name - guardianChainHeights = make(common.GuardianChainHeights) -) +// guardianChainHeights indexes current chain height by chain id and guardian name +var guardianChainHeights = make(common.GuardianChainHeights) func loadEnvVars() { err := godotenv.Load() // By default loads .env @@ -186,7 +184,6 @@ func initPromScraper(promRemoteURL string, logger *zap.Logger, errC chan error) } } err := promremotew.ScrapeAndSendLocalMetrics(ctx, info, promLogger) - if err != nil { promLogger.Error("ScrapeAndSendLocalMetrics error", zap.Error(err)) continue @@ -237,7 +234,7 @@ func initObservationScraper(db *bigtable.BigtableDB, logger *zap.Logger, errC ch historical_uptime.DecrementMissingObservationsCount(logger, guardianMissingObservations, messageObservations) // Update the metrics with the final count of missing observations - historical_uptime.UpdateMetrics(guardianMissedObservations, guardianMissingObservations) + historical_uptime.UpdateMetrics(logger, guardianMissedObservations, guardianMissingObservations) } } }) @@ -262,9 +259,12 @@ func main() { defer rootCtxCancel() // Inbound observations - obsvC := make(chan *node_common.MsgWithTimeStamp[gossipv1.SignedObservation], 1024) + obsvC := make(chan *node_common.MsgWithTimeStamp[gossipv1.SignedObservation], 20000) batchObsvC := make(chan *node_common.MsgWithTimeStamp[gossipv1.SignedObservationBatch], 1024) + // Add channel capacity checks + go monitorChannelCapacity(rootCtx, logger, "obsvC", obsvC) + // Heartbeat updates heartbeatC := make(chan *gossipv1.Heartbeat, 50) @@ -406,5 +406,32 @@ func main() { <-rootCtx.Done() logger.Info("root context cancelled, exiting...") // TODO: wait for things to shut down gracefully +} +func monitorChannelCapacity[T any](ctx context.Context, logger *zap.Logger, channelName string, ch <-chan T) { + ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + capacity := cap(ch) + length := len(ch) + utilization := float64(length) / float64(capacity) * 100 + + logger.Info("Channel capacity check", + zap.String("channel", channelName), + zap.Int("capacity", capacity), + zap.Int("length", length), + zap.Float64("utilization_percentage", utilization)) + + if utilization > 80 { + logger.Warn("Channel near capacity, potential for dropped messages", + zap.String("channel", channelName), + zap.Float64("utilization_percentage", utilization)) + } + } + } } diff --git a/fly/pkg/historical_uptime/helpers.go b/fly/pkg/historical_uptime/helpers.go index f0efe6c0..9edeb87e 100644 --- a/fly/pkg/historical_uptime/helpers.go +++ b/fly/pkg/historical_uptime/helpers.go @@ -44,6 +44,9 @@ func InitializeMissingObservationsCount(logger *zap.Logger, messages []*types.Me } func DecrementMissingObservationsCount(logger *zap.Logger, guardianMissingObservations map[string]map[string]int, messageObservations map[types.MessageID][]*types.Observation) { + // Keep track of processed observations to avoid duplicates + processed := make(map[string]map[string]bool) + for messageID, observations := range messageObservations { chainID, err := messageID.ChainID() if err != nil { @@ -57,14 +60,42 @@ func DecrementMissingObservationsCount(logger *zap.Logger, guardianMissingObserv logger.Error("Unknown guardian address", zap.String("guardianAddr", observation.GuardianAddr)) continue } - guardianMissingObservations[guardianName][chainID]-- + + // Check if we've already processed this guardian for this message + if processed[string(messageID)] == nil { + processed[string(messageID)] = make(map[string]bool) + } + if processed[string(messageID)][guardianName] { + logger.Warn("Duplicate observation", zap.String("messageID", string(messageID)), zap.String("guardian", guardianName)) + continue + } + + // Mark as processed + processed[string(messageID)][guardianName] = true + + // Safely decrement the count + if guardianMissingObservations[guardianName] == nil { + guardianMissingObservations[guardianName] = make(map[string]int) + } + if guardianMissingObservations[guardianName][chainID] > 0 { + guardianMissingObservations[guardianName][chainID]-- + } else { + logger.Warn("Attempted to decrement below zero", + zap.String("guardian", guardianName), + zap.String("chainID", chainID), + zap.Int("currentCount", guardianMissingObservations[guardianName][chainID])) + } } } } -func UpdateMetrics(guardianMissedObservations *prometheus.CounterVec, guardianMissingObservations map[string]map[string]int) { +func UpdateMetrics(logger *zap.Logger, guardianMissedObservations *prometheus.CounterVec, guardianMissingObservations map[string]map[string]int) { for guardianName, chains := range guardianMissingObservations { for chainID, missingCount := range chains { + if missingCount < 0 { + logger.Warn("Skipping negative missing count", zap.String("chainID", chainID), zap.Int("missingCount", missingCount)) + continue + } guardianMissedObservations.WithLabelValues(guardianName, chainID).Add(float64(missingCount)) } }