Skip to content

Commit

Permalink
hum: add message index bulk mutation that was hum
Browse files Browse the repository at this point in the history
hum: update MAX_CHAIN_ID

Signed-off-by: bingyuyap <bingyu.yap.21@gmail.com>
  • Loading branch information
bingyuyap committed Sep 23, 2024
1 parent 81bd43d commit d090941
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 5 deletions.
10 changes: 5 additions & 5 deletions fly/cmd/historical_uptime/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,9 @@ var (
)
)

const PYTHNET_CHAIN_ID = int(vaa.ChainIDPythNet)
// [0, MAX_CHAIN_ID] is the range of chain id that we will track for the uptime monitor
// in this case it's snaxchain since it's the largest mainnet chain idj
const MAX_CHAIN_ID = vaa.ChainIDSnaxchain

// guardianChainHeights indexes current chain height by chain id and guardian name
var guardianChainHeights = make(common.GuardianChainHeights)
Expand Down Expand Up @@ -187,10 +189,7 @@ func initPromScraper(promRemoteURL string, logger *zap.Logger, errC chan error)
case <-t.C:
recordGuardianHeightDifferences()

for i := 1; i < 36; i++ {
if i == PYTHNET_CHAIN_ID {
continue
}
for i := 1; i <= int(MAX_CHAIN_ID); i++ {
chainName := vaa.ChainID(i).String()
if strings.HasPrefix(chainName, "unknown chain ID:") {
continue
Expand Down Expand Up @@ -230,6 +229,7 @@ func initObservationScraper(db *bigtable.BigtableDB, logger *zap.Logger, errC ch
messageObservations := make(map[types.MessageID][]*types.Observation)

messages, err := db.GetUnprocessedMessagesBeforeCutOffTime(ctx, time.Now().Add(-common.ExpiryDuration))
logger.Info("Number of unprocessed messages", zap.Int("count", len(messages)))
if err != nil {
logger.Error("QueryMessagesByIndex error", zap.Error(err))
continue
Expand Down
23 changes: 23 additions & 0 deletions fly/pkg/bigtable/operations.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ func (db *BigtableDB) FlushCache(ctx context.Context, logger *zap.Logger, cache
observationMuts := make([]*bigtable.Mutation, 0)
observationRows := make([]string, 0)

// Prepare bulk mutations for message indexes
indexMuts := make([]*bigtable.Mutation, 0, len(cache.Messages))
indexRows := make([]string, 0, len(cache.Messages))

for messageID, message := range cache.Messages {
// Prepare message mutation
messageMut, err := createMessageMutation(message)
Expand All @@ -64,20 +68,39 @@ func (db *BigtableDB) FlushCache(ctx context.Context, logger *zap.Logger, cache
observationMuts = append(observationMuts, observationMut)
observationRows = append(observationRows, observationRow)
}

// Prepare message index mutation
indexMut := bigtable.NewMutation()
indexMut.Set("indexData", "placeholder", bigtable.Now(), nil)
indexMuts = append(indexMuts, indexMut)
indexRows = append(indexRows, string(messageID))
}

// Apply bulk mutations for messages
err := db.ApplyBulk(ctx, MessageTableName, messageRows, messageMuts)
if err != nil {
logger.Error("Failed to apply bulk mutations for messages", zap.Error(err))
return err
}

// Apply bulk mutations for observations
err = db.ApplyBulk(ctx, ObservationTableName, observationRows, observationMuts)
if err != nil {
logger.Error("Failed to apply bulk mutations for observations", zap.Error(err))
return err
}

// Apply bulk mutations for message indexes
err = db.ApplyBulk(ctx, MessageIndexTableName, indexRows, indexMuts)
if err != nil {
logger.Error("Failed to apply bulk mutations for message indexes", zap.Error(err))
return err
}

logger.Info("Successfully applied bulk mutations for messages", zap.Int("count", len(messageMuts)))
logger.Info("Successfully applied bulk mutations for observations", zap.Int("count", len(observationMuts)))
logger.Info("Successfully applied bulk mutations for message indexes", zap.Int("count", len(indexMuts)))

return nil
}

Expand Down

0 comments on commit d090941

Please sign in to comment.