From 3ea71d71e78a119dc286fd235a2787334a2c24bb Mon Sep 17 00:00:00 2001 From: bingyuyap Date: Sun, 4 Feb 2024 09:03:56 +0800 Subject: [PATCH] historical_uptime: add db cleanup to optimise memory Signed-off-by: bingyuyap --- fly/cmd/historical_uptime/main.go | 49 ++++++++-- fly/common/consts.go | 3 + fly/pkg/db/message.go | 145 ++++++++++++++++++++++++------ fly/pkg/db/message_test.go | 58 +++++++++--- 4 files changed, 206 insertions(+), 49 deletions(-) diff --git a/fly/cmd/historical_uptime/main.go b/fly/cmd/historical_uptime/main.go index fcafe840..66c79dc8 100644 --- a/fly/cmd/historical_uptime/main.go +++ b/fly/cmd/historical_uptime/main.go @@ -31,6 +31,7 @@ var ( rootCtx context.Context rootCtxCancel context.CancelFunc + dataDir string p2pNetworkID string p2pPort uint p2pBootstrap string @@ -74,6 +75,7 @@ func loadEnvVars() { if err != nil { log.Fatal("Error loading .env file") } + dataDir = verifyEnvVar("DATA_DIR") p2pNetworkID = verifyEnvVar("P2P_NETWORK_ID") port, err := strconv.ParseUint(verifyEnvVar("P2P_PORT"), 10, 32) if err != nil { @@ -95,7 +97,7 @@ func verifyEnvVar(key string) string { return value } -func initPromScraper(promRemoteURL string, logger *zap.Logger) { +func initPromScraper(promRemoteURL string, logger *zap.Logger, errC chan error) { usingPromRemoteWrite := promRemoteURL != "" if usingPromRemoteWrite { var info promremotew.PromTelemetryInfo @@ -106,7 +108,7 @@ func initPromScraper(promRemoteURL string, logger *zap.Logger) { } promLogger := logger.With(zap.String("component", "prometheus_scraper")) - errC := make(chan error) + node_common.StartRunnable(rootCtx, errC, false, "prometheus_scraper", func(ctx context.Context) error { t := time.NewTicker(15 * time.Second) @@ -144,8 +146,8 @@ func initPromScraper(promRemoteURL string, logger *zap.Logger) { } } -func initObservationScraper(db *db.Database, logger *zap.Logger) { - node_common.StartRunnable(rootCtx, nil, false, "observation_scraper", func(ctx context.Context) error { +func initObservationScraper(db *db.Database, logger *zap.Logger, errC chan error) { + node_common.StartRunnable(rootCtx, errC, false, "observation_scraper", func(ctx context.Context) error { t := time.NewTicker(15 * time.Second) for { @@ -175,6 +177,25 @@ func initObservationScraper(db *db.Database, logger *zap.Logger) { }) } +func initDatabaseCleanUp(db *db.Database, logger *zap.Logger, errC chan error) { + node_common.StartRunnable(rootCtx, errC, false, "db_cleanup", func(ctx context.Context) error { + t := time.NewTicker(common.DatabaseCleanUpInterval) + + for { + select { + case <-ctx.Done(): + return nil + case <-t.C: + err := db.RemoveObservationsByIndex(true, common.ExpiryDuration) + if err != nil { + logger.Error("RemoveObservationsByIndex error", zap.Error(err)) + } + } + } + }) + +} + func main() { loadEnvVars() p2pBootstrap = "/dns4/wormhole-v2-mainnet-bootstrap.xlabs.xyz/udp/8999/quic/p2p/12D3KooWNQ9tVrcb64tw6bNs2CaNrUGPM7yRrKvBBheQ5yCyPHKC,/dns4/wormhole.mcf.rocks/udp/8999/quic/p2p/12D3KooWDZVv7BhZ8yFLkarNdaSWaB43D6UbQwExJ8nnGAEmfHcU,/dns4/wormhole-v2-mainnet-bootstrap.staking.fund/udp/8999/quic/p2p/12D3KooWG8obDX9DNi1KUwZNu9xkGwfKqTp2GFwuuHpWZ3nQruS1" @@ -228,11 +249,23 @@ func main() { } gst.Set(&gs) - db := db.OpenDb(logger, nil) - + db := db.OpenDb(logger, &dataDir) + promErrC := make(chan error) // Start Prometheus scraper - initPromScraper(promRemoteURL, logger) - initObservationScraper(db, logger) + initPromScraper(promRemoteURL, logger, promErrC) + initObservationScraper(db, logger, promErrC) + initDatabaseCleanUp(db, logger, promErrC) + + go func() { + for { + select { + case <-rootCtx.Done(): + return + case err := <-promErrC: + logger.Error("error from prometheus scraper", zap.Error(err)) + } + } + }() // WIP(bing): add metrics for guardian observations go func() { diff --git a/fly/common/consts.go b/fly/common/consts.go index 9c237056..94082fbe 100644 --- a/fly/common/consts.go +++ b/fly/common/consts.go @@ -6,4 +6,7 @@ import ( const ( ExpiryDuration = 30 * time.Hour + DatabaseCleanUpInterval = 48 * time.Hour + + MessageUpdateBatchSize = 100 ) diff --git a/fly/pkg/db/message.go b/fly/pkg/db/message.go index 6b39ad81..cd8868a3 100644 --- a/fly/pkg/db/message.go +++ b/fly/pkg/db/message.go @@ -8,6 +8,7 @@ import ( "strings" "time" + "github.com/wormhole-foundation/wormhole-monitor/fly/common" "github.com/dgraph-io/badger/v3" "github.com/wormhole-foundation/wormhole/sdk/vaa" ) @@ -154,42 +155,91 @@ func (db *Database) AppendObservationIfNotExist(messageID string, observation Ob }) } -// QueryMessagesByIndex retrieves messages based on indexed attributes. -func (db *Database) QueryMessagesByIndex(metricsChecked bool, cutOffTime time.Duration) ([]*Message, error) { - var messages []*Message - now := time.Now() +// batchUpdateMessages performs batch updates on a slice of messages. +func (db *Database) batchUpdateMessages(messages []*Message) error { + for i := 0; i < len(messages); i += common.MessageUpdateBatchSize { + end := i + common.MessageUpdateBatchSize + if end > len(messages) { + end = len(messages) + } + if err := db.updateMessagesBatch(messages[i:end]); err != nil { + return err + } + } + return nil +} - // Start a read-only transaction - err := db.db.View(func(txn *badger.Txn) error { - opts := badger.DefaultIteratorOptions - opts.PrefetchValues = false // Only keys are needed +// updateMessagesBatch updates a batch of messages in a single transaction. +func (db *Database) updateMessagesBatch(messagesBatch []*Message) error { + return db.db.Update(func(txn *badger.Txn) error { + for _, message := range messagesBatch { + data, err := json.Marshal(message) + if err != nil { + return fmt.Errorf("failed to marshal message: %w", err) + } + if err := txn.Set([]byte(message.MessageID), data); err != nil { + return fmt.Errorf("failed to save message: %w", err) + } + } + return nil + }) +} + +// iterateIndex iterates over a metricsChecked index and applies a callback function to each item. +func (db *Database) iterateIndex(metricsChecked bool, callback func(item *badger.Item) error) error { + opts := badger.DefaultIteratorOptions + opts.PrefetchValues = false // Only keys are needed + return db.db.View(func(txn *badger.Txn) error { it := txn.NewIterator(opts) defer it.Close() metricsCheckedPrefix := fmt.Sprintf("metricsChecked|%t|", metricsChecked) - - // Iterate over the metricsChecked index for it.Seek([]byte(metricsCheckedPrefix)); it.ValidForPrefix([]byte(metricsCheckedPrefix)); it.Next() { - item := it.Item() - key := item.Key() - - // Extract MessageID from the key and query lastObservedAt index - _, messageID, err := parseMetricsCheckedIndexKey(key) - if err != nil { - return fmt.Errorf("failed to parse index key: %w", err) + if err := callback(it.Item()); err != nil { + return err } + } + return nil + }) +} - message, err := db.GetMessage(messageID) - if err != nil { - return fmt.Errorf("failed to get message by ID: %w", err) - } - // Check if the last observed timestamp is before the specified hours - if message.LastObservedAt.Before(now.Add(-cutOffTime)) { - message.MetricsChecked = true - db.SaveMessage(message) - messages = append(messages, message) - } +// processMessage retrieves a message from the database and applies an update function to it if the message is older than the cut-off time +func (db *Database) processMessage(messageID string, now time.Time, cutOffTime time.Duration, updateFunc func(*Message) bool) (*Message, error) { + message, err := db.GetMessage(messageID) + if err != nil { + return nil, fmt.Errorf("failed to get message by ID: %w", err) + } + + if message.LastObservedAt.Before(now.Add(-cutOffTime)) && updateFunc(message) { + return message, nil + } + return nil, nil +} + +// QueryMessagesByIndex retrieves messages based on indexed attributes. +func (db *Database) QueryMessagesByIndex(metricsChecked bool, cutOffTime time.Duration) ([]*Message, error) { + var messagesToUpdate []*Message + now := time.Now() + + err := db.iterateIndex(metricsChecked, func(item *badger.Item) error { + key := item.Key() + _, messageID, err := parseMetricsCheckedIndexKey(key) + if err != nil { + return fmt.Errorf("failed to parse index key: %w", err) + } + + message, err := db.processMessage(messageID, now, cutOffTime, func(m *Message) bool { + m.MetricsChecked = true + return true + }) + + if err != nil { + return err + } + + if message != nil { + messagesToUpdate = append(messagesToUpdate, message) } return nil @@ -199,5 +249,44 @@ func (db *Database) QueryMessagesByIndex(metricsChecked bool, cutOffTime time.Du return nil, fmt.Errorf("failed to query messages: %w", err) } - return messages, nil + if err := db.batchUpdateMessages(messagesToUpdate); err != nil { + return nil, fmt.Errorf("failed to batch update messages: %w", err) + } + + return messagesToUpdate, nil +} + +// RemoveObservationsByIndex removes observations from messages based on indexed attributes. +func (db *Database) RemoveObservationsByIndex(metricsChecked bool, cutOffTime time.Duration) error { + var messagesToUpdate []*Message + now := time.Now() + + err := db.iterateIndex(metricsChecked, func(item *badger.Item) error { + key := item.Key() + _, messageID, err := parseMetricsCheckedIndexKey(key) + if err != nil { + return fmt.Errorf("failed to parse index key: %w", err) + } + + message, err := db.processMessage(messageID, now, cutOffTime, func(m *Message) bool { + m.Observations = nil + return true + }) + + if err != nil { + return err + } + + if message != nil { + messagesToUpdate = append(messagesToUpdate, message) + } + + return nil + }) + + if err != nil { + return err + } + + return db.batchUpdateMessages(messagesToUpdate) } diff --git a/fly/pkg/db/message_test.go b/fly/pkg/db/message_test.go index 3a2da607..a8f12e44 100644 --- a/fly/pkg/db/message_test.go +++ b/fly/pkg/db/message_test.go @@ -63,33 +63,65 @@ func TestQueryMessagesByIndex(t *testing.T) { db := OpenDb(zap.NewNop(), nil) defer db.db.Close() - // Create a message with LastObservedAt set to 30 hours ago - message0 := getMessage(0, time.Now().Add(-30*time.Hour), false) + // Store the time for consistent comparison + observedTime := time.Now().Add(-30 * time.Hour) + + // Create messages + message0 := getMessage(0, observedTime, false) err := db.SaveMessage(message0) require.NoError(t, err) - // Create a message with LastObservedAt set to 30 hours ago but with metrics checked - message1 := getMessage(1, time.Now().Add(-30*time.Hour), true) + message1 := getMessage(1, observedTime, true) err = db.SaveMessage(message1) require.NoError(t, err) - // Create a message with LastObservedAt set to 10 hours ago message2 := getMessage(2, time.Now().Add(-10*time.Hour), false) err = db.SaveMessage(message2) require.NoError(t, err) + // Query messages result, err := db.QueryMessagesByIndex(false, 30*time.Hour) require.NoError(t, err) + require.Len(t, result, 1, "expected 1 message") + + // Check if the message0 is in the result + require.Equal(t, message0.MessageID, result[0].MessageID) + require.True(t, observedTime.Equal(result[0].LastObservedAt), "message0 should be found in the result set") +} + +func TestRemoveObservationsByIndex(t *testing.T) { + db := OpenDb(zap.NewNop(), nil) + defer db.db.Close() - length := len(result) - require.Equal(t, 1, length, "expected 1 message") + testCases := []struct { + messageID int + timeOffset time.Duration + metricsChecked bool + expectEmpty bool + }{ + {0, -49 * time.Hour, true, true}, + {1, -40 * time.Hour, true, false}, + {2, -50 * time.Hour, false, false}, + {3, -72 * time.Hour, true, true}, + {4, -96 * time.Hour, true, true}, + } + + for _, tc := range testCases { + message := getMessage(tc.messageID, time.Now().Add(tc.timeOffset), tc.metricsChecked) + err := db.SaveMessage(message) + require.NoError(t, err) + } + + err := db.RemoveObservationsByIndex(true, 48*time.Hour) + require.NoError(t, err) - found := false - for _, msg := range result { - if msg.MessageID == message0.MessageID && msg.LastObservedAt.Equal(message0.LastObservedAt) && msg.MetricsChecked == true { - found = true - break + for _, tc := range testCases { + messageFromDb, err := db.GetMessage(fmt.Sprintf("messageId%d", tc.messageID)) + require.NoError(t, err) + if tc.expectEmpty { + require.Empty(t, messageFromDb.Observations, "expected observations to be removed for message", tc.messageID) + } else { + require.NotEmpty(t, messageFromDb.Observations, "expected observations to be present for message", tc.messageID) } } - require.True(t, found, "message found in the result set") }