Skip to content

Commit

Permalink
hum: fix negative missing observations
Browse files Browse the repository at this point in the history
hum: improve decrement missing obs

hum: cleanup

Signed-off-by: bingyuyap <bingyu.yap.21@gmail.com>
  • Loading branch information
bingyuyap committed Sep 2, 2024
1 parent 3b59063 commit ee51cde
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 9 deletions.
41 changes: 34 additions & 7 deletions fly/cmd/historical_uptime/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,8 @@ var (

const PYTHNET_CHAIN_ID = int(vaa.ChainIDPythNet)

var (
// guardianChainHeights indexes current chain height by chain id and guardian name
guardianChainHeights = make(common.GuardianChainHeights)
)
// guardianChainHeights indexes current chain height by chain id and guardian name
var guardianChainHeights = make(common.GuardianChainHeights)

func loadEnvVars() {
err := godotenv.Load() // By default loads .env
Expand Down Expand Up @@ -186,7 +184,6 @@ func initPromScraper(promRemoteURL string, logger *zap.Logger, errC chan error)
}
}
err := promremotew.ScrapeAndSendLocalMetrics(ctx, info, promLogger)

if err != nil {
promLogger.Error("ScrapeAndSendLocalMetrics error", zap.Error(err))
continue
Expand Down Expand Up @@ -237,7 +234,7 @@ func initObservationScraper(db *bigtable.BigtableDB, logger *zap.Logger, errC ch
historical_uptime.DecrementMissingObservationsCount(logger, guardianMissingObservations, messageObservations)

// Update the metrics with the final count of missing observations
historical_uptime.UpdateMetrics(guardianMissedObservations, guardianMissingObservations)
historical_uptime.UpdateMetrics(logger, guardianMissedObservations, guardianMissingObservations)
}
}
})
Expand All @@ -262,9 +259,12 @@ func main() {
defer rootCtxCancel()

// Inbound observations
obsvC := make(chan *node_common.MsgWithTimeStamp[gossipv1.SignedObservation], 1024)
obsvC := make(chan *node_common.MsgWithTimeStamp[gossipv1.SignedObservation], 20000)
batchObsvC := make(chan *node_common.MsgWithTimeStamp[gossipv1.SignedObservationBatch], 1024)

// Add channel capacity checks
go monitorChannelCapacity(rootCtx, logger, "obsvC", obsvC)

// Heartbeat updates
heartbeatC := make(chan *gossipv1.Heartbeat, 50)

Expand Down Expand Up @@ -406,5 +406,32 @@ func main() {
<-rootCtx.Done()
logger.Info("root context cancelled, exiting...")
// TODO: wait for things to shut down gracefully
}

func monitorChannelCapacity[T any](ctx context.Context, logger *zap.Logger, channelName string, ch <-chan T) {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return
case <-ticker.C:
capacity := cap(ch)
length := len(ch)
utilization := float64(length) / float64(capacity) * 100

logger.Info("Channel capacity check",
zap.String("channel", channelName),
zap.Int("capacity", capacity),
zap.Int("length", length),
zap.Float64("utilization_percentage", utilization))

if utilization > 80 {
logger.Warn("Channel near capacity, potential for dropped messages",
zap.String("channel", channelName),
zap.Float64("utilization_percentage", utilization))
}
}
}
}
35 changes: 33 additions & 2 deletions fly/pkg/historical_uptime/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ func InitializeMissingObservationsCount(logger *zap.Logger, messages []*types.Me
}

func DecrementMissingObservationsCount(logger *zap.Logger, guardianMissingObservations map[string]map[string]int, messageObservations map[types.MessageID][]*types.Observation) {
// Keep track of processed observations to avoid duplicates
processed := make(map[string]map[string]bool)

for messageID, observations := range messageObservations {
chainID, err := messageID.ChainID()
if err != nil {
Expand All @@ -57,14 +60,42 @@ func DecrementMissingObservationsCount(logger *zap.Logger, guardianMissingObserv
logger.Error("Unknown guardian address", zap.String("guardianAddr", observation.GuardianAddr))
continue
}
guardianMissingObservations[guardianName][chainID]--

// Check if we've already processed this guardian for this message
if processed[string(messageID)] == nil {
processed[string(messageID)] = make(map[string]bool)
}
if processed[string(messageID)][guardianName] {
logger.Warn("Duplicate observation", zap.String("messageID", string(messageID)), zap.String("guardian", guardianName))
continue
}

// Mark as processed
processed[string(messageID)][guardianName] = true

// Safely decrement the count
if guardianMissingObservations[guardianName] == nil {
guardianMissingObservations[guardianName] = make(map[string]int)
}
if guardianMissingObservations[guardianName][chainID] > 0 {
guardianMissingObservations[guardianName][chainID]--
} else {
logger.Warn("Attempted to decrement below zero",
zap.String("guardian", guardianName),
zap.String("chainID", chainID),
zap.Int("currentCount", guardianMissingObservations[guardianName][chainID]))
}
}
}
}

func UpdateMetrics(guardianMissedObservations *prometheus.CounterVec, guardianMissingObservations map[string]map[string]int) {
func UpdateMetrics(logger *zap.Logger, guardianMissedObservations *prometheus.CounterVec, guardianMissingObservations map[string]map[string]int) {
for guardianName, chains := range guardianMissingObservations {
for chainID, missingCount := range chains {
if missingCount < 0 {
logger.Warn("Skipping negative missing count", zap.String("chainID", chainID), zap.Int("missingCount", missingCount))
continue
}
guardianMissedObservations.WithLabelValues(guardianName, chainID).Add(float64(missingCount))
}
}
Expand Down

0 comments on commit ee51cde

Please sign in to comment.