From 88476339d02d3008eb55ab2781b6266b2b1072b9 Mon Sep 17 00:00:00 2001 From: bingyuyap Date: Tue, 3 Sep 2024 15:12:37 +0800 Subject: [PATCH] hum: fix test Signed-off-by: bingyuyap --- fly/cmd/historical_uptime/main.go | 1 + fly/pkg/historical_uptime/helpers.go | 9 +++++---- fly/pkg/historical_uptime/process_observation_test.go | 7 ++++++- 3 files changed, 12 insertions(+), 5 deletions(-) diff --git a/fly/cmd/historical_uptime/main.go b/fly/cmd/historical_uptime/main.go index ca55590d..e818522b 100644 --- a/fly/cmd/historical_uptime/main.go +++ b/fly/cmd/historical_uptime/main.go @@ -264,6 +264,7 @@ func main() { // Add channel capacity checks go monitorChannelCapacity(rootCtx, logger, "obsvC", obsvC) + go monitorChannelCapacity(rootCtx, logger, "batchObsvC", batchObsvC) // Heartbeat updates heartbeatC := make(chan *gossipv1.Heartbeat, 50) diff --git a/fly/pkg/historical_uptime/helpers.go b/fly/pkg/historical_uptime/helpers.go index 9edeb87e..fb8801fc 100644 --- a/fly/pkg/historical_uptime/helpers.go +++ b/fly/pkg/historical_uptime/helpers.go @@ -45,7 +45,7 @@ func InitializeMissingObservationsCount(logger *zap.Logger, messages []*types.Me func DecrementMissingObservationsCount(logger *zap.Logger, guardianMissingObservations map[string]map[string]int, messageObservations map[types.MessageID][]*types.Observation) { // Keep track of processed observations to avoid duplicates - processed := make(map[string]map[string]bool) + processed := make(map[string]map[string]struct{}) for messageID, observations := range messageObservations { chainID, err := messageID.ChainID() @@ -63,15 +63,16 @@ func DecrementMissingObservationsCount(logger *zap.Logger, guardianMissingObserv // Check if we've already processed this guardian for this message if processed[string(messageID)] == nil { - processed[string(messageID)] = make(map[string]bool) + processed[string(messageID)] = make(map[string]struct{}) } - if processed[string(messageID)][guardianName] { + + if _, exists := processed[string(messageID)][guardianName]; exists { logger.Warn("Duplicate observation", zap.String("messageID", string(messageID)), zap.String("guardian", guardianName)) continue } // Mark as processed - processed[string(messageID)][guardianName] = true + processed[string(messageID)][guardianName] = struct{}{} // Safely decrement the count if guardianMissingObservations[guardianName] == nil { diff --git a/fly/pkg/historical_uptime/process_observation_test.go b/fly/pkg/historical_uptime/process_observation_test.go index 96826ab7..f0025512 100644 --- a/fly/pkg/historical_uptime/process_observation_test.go +++ b/fly/pkg/historical_uptime/process_observation_test.go @@ -33,7 +33,12 @@ func processObservation(t *testing.T, observations []node_common.MsgWithTimeStam // Process each observation for _, o := range observations { - err := ProcessObservation(*database, logger, o) + err := ProcessObservation(*database, logger, o.Timestamp, o.Msg.Addr, &gossipv1.Observation{ + Hash: o.Msg.Hash, + Signature: o.Msg.Signature, + TxHash: o.Msg.TxHash, + MessageId: o.Msg.MessageId, + }) if err != nil { t.Errorf("failed to process observation: %v", err) }