Skip to content

Commit

Permalink
historical_uptime: add db cleanup to optimise memory
Browse files Browse the repository at this point in the history
Signed-off-by: bingyuyap <bingyu.yap.21@gmail.com>
  • Loading branch information
bingyuyap committed Feb 12, 2024
1 parent 56682dd commit 3ea71d7
Show file tree
Hide file tree
Showing 4 changed files with 206 additions and 49 deletions.
49 changes: 41 additions & 8 deletions fly/cmd/historical_uptime/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ var (
rootCtx context.Context
rootCtxCancel context.CancelFunc

dataDir string
p2pNetworkID string
p2pPort uint
p2pBootstrap string
Expand Down Expand Up @@ -74,6 +75,7 @@ func loadEnvVars() {
if err != nil {
log.Fatal("Error loading .env file")
}
dataDir = verifyEnvVar("DATA_DIR")
p2pNetworkID = verifyEnvVar("P2P_NETWORK_ID")
port, err := strconv.ParseUint(verifyEnvVar("P2P_PORT"), 10, 32)
if err != nil {
Expand All @@ -95,7 +97,7 @@ func verifyEnvVar(key string) string {
return value
}

func initPromScraper(promRemoteURL string, logger *zap.Logger) {
func initPromScraper(promRemoteURL string, logger *zap.Logger, errC chan error) {
usingPromRemoteWrite := promRemoteURL != ""
if usingPromRemoteWrite {
var info promremotew.PromTelemetryInfo
Expand All @@ -106,7 +108,7 @@ func initPromScraper(promRemoteURL string, logger *zap.Logger) {
}

promLogger := logger.With(zap.String("component", "prometheus_scraper"))
errC := make(chan error)

node_common.StartRunnable(rootCtx, errC, false, "prometheus_scraper", func(ctx context.Context) error {
t := time.NewTicker(15 * time.Second)

Expand Down Expand Up @@ -144,8 +146,8 @@ func initPromScraper(promRemoteURL string, logger *zap.Logger) {
}
}

func initObservationScraper(db *db.Database, logger *zap.Logger) {
node_common.StartRunnable(rootCtx, nil, false, "observation_scraper", func(ctx context.Context) error {
func initObservationScraper(db *db.Database, logger *zap.Logger, errC chan error) {
node_common.StartRunnable(rootCtx, errC, false, "observation_scraper", func(ctx context.Context) error {
t := time.NewTicker(15 * time.Second)

for {
Expand Down Expand Up @@ -175,6 +177,25 @@ func initObservationScraper(db *db.Database, logger *zap.Logger) {
})
}

func initDatabaseCleanUp(db *db.Database, logger *zap.Logger, errC chan error) {
node_common.StartRunnable(rootCtx, errC, false, "db_cleanup", func(ctx context.Context) error {
t := time.NewTicker(common.DatabaseCleanUpInterval)

for {
select {
case <-ctx.Done():
return nil
case <-t.C:
err := db.RemoveObservationsByIndex(true, common.ExpiryDuration)
if err != nil {
logger.Error("RemoveObservationsByIndex error", zap.Error(err))
}
}
}
})

}

func main() {
loadEnvVars()
p2pBootstrap = "/dns4/wormhole-v2-mainnet-bootstrap.xlabs.xyz/udp/8999/quic/p2p/12D3KooWNQ9tVrcb64tw6bNs2CaNrUGPM7yRrKvBBheQ5yCyPHKC,/dns4/wormhole.mcf.rocks/udp/8999/quic/p2p/12D3KooWDZVv7BhZ8yFLkarNdaSWaB43D6UbQwExJ8nnGAEmfHcU,/dns4/wormhole-v2-mainnet-bootstrap.staking.fund/udp/8999/quic/p2p/12D3KooWG8obDX9DNi1KUwZNu9xkGwfKqTp2GFwuuHpWZ3nQruS1"
Expand Down Expand Up @@ -228,11 +249,23 @@ func main() {
}
gst.Set(&gs)

db := db.OpenDb(logger, nil)

db := db.OpenDb(logger, &dataDir)
promErrC := make(chan error)
// Start Prometheus scraper
initPromScraper(promRemoteURL, logger)
initObservationScraper(db, logger)
initPromScraper(promRemoteURL, logger, promErrC)
initObservationScraper(db, logger, promErrC)
initDatabaseCleanUp(db, logger, promErrC)

go func() {
for {
select {
case <-rootCtx.Done():
return
case err := <-promErrC:
logger.Error("error from prometheus scraper", zap.Error(err))
}
}
}()

// WIP(bing): add metrics for guardian observations
go func() {
Expand Down
3 changes: 3 additions & 0 deletions fly/common/consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,7 @@ import (

const (
ExpiryDuration = 30 * time.Hour
DatabaseCleanUpInterval = 48 * time.Hour

MessageUpdateBatchSize = 100
)
145 changes: 117 additions & 28 deletions fly/pkg/db/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"strings"
"time"

"github.com/wormhole-foundation/wormhole-monitor/fly/common"
"github.com/dgraph-io/badger/v3"
"github.com/wormhole-foundation/wormhole/sdk/vaa"
)
Expand Down Expand Up @@ -154,42 +155,91 @@ func (db *Database) AppendObservationIfNotExist(messageID string, observation Ob
})
}

// QueryMessagesByIndex retrieves messages based on indexed attributes.
func (db *Database) QueryMessagesByIndex(metricsChecked bool, cutOffTime time.Duration) ([]*Message, error) {
var messages []*Message
now := time.Now()
// batchUpdateMessages performs batch updates on a slice of messages.
func (db *Database) batchUpdateMessages(messages []*Message) error {
for i := 0; i < len(messages); i += common.MessageUpdateBatchSize {
end := i + common.MessageUpdateBatchSize
if end > len(messages) {
end = len(messages)
}
if err := db.updateMessagesBatch(messages[i:end]); err != nil {
return err
}
}
return nil
}

// Start a read-only transaction
err := db.db.View(func(txn *badger.Txn) error {
opts := badger.DefaultIteratorOptions
opts.PrefetchValues = false // Only keys are needed
// updateMessagesBatch updates a batch of messages in a single transaction.
func (db *Database) updateMessagesBatch(messagesBatch []*Message) error {
return db.db.Update(func(txn *badger.Txn) error {
for _, message := range messagesBatch {
data, err := json.Marshal(message)
if err != nil {
return fmt.Errorf("failed to marshal message: %w", err)
}
if err := txn.Set([]byte(message.MessageID), data); err != nil {
return fmt.Errorf("failed to save message: %w", err)
}
}
return nil
})
}

// iterateIndex iterates over a metricsChecked index and applies a callback function to each item.
func (db *Database) iterateIndex(metricsChecked bool, callback func(item *badger.Item) error) error {
opts := badger.DefaultIteratorOptions
opts.PrefetchValues = false // Only keys are needed
return db.db.View(func(txn *badger.Txn) error {
it := txn.NewIterator(opts)
defer it.Close()

metricsCheckedPrefix := fmt.Sprintf("metricsChecked|%t|", metricsChecked)

// Iterate over the metricsChecked index
for it.Seek([]byte(metricsCheckedPrefix)); it.ValidForPrefix([]byte(metricsCheckedPrefix)); it.Next() {
item := it.Item()
key := item.Key()

// Extract MessageID from the key and query lastObservedAt index
_, messageID, err := parseMetricsCheckedIndexKey(key)
if err != nil {
return fmt.Errorf("failed to parse index key: %w", err)
if err := callback(it.Item()); err != nil {
return err
}
}
return nil
})
}

message, err := db.GetMessage(messageID)
if err != nil {
return fmt.Errorf("failed to get message by ID: %w", err)
}

// Check if the last observed timestamp is before the specified hours
if message.LastObservedAt.Before(now.Add(-cutOffTime)) {
message.MetricsChecked = true
db.SaveMessage(message)
messages = append(messages, message)
}
// processMessage retrieves a message from the database and applies an update function to it if the message is older than the cut-off time
func (db *Database) processMessage(messageID string, now time.Time, cutOffTime time.Duration, updateFunc func(*Message) bool) (*Message, error) {
message, err := db.GetMessage(messageID)
if err != nil {
return nil, fmt.Errorf("failed to get message by ID: %w", err)
}

if message.LastObservedAt.Before(now.Add(-cutOffTime)) && updateFunc(message) {
return message, nil
}
return nil, nil
}

// QueryMessagesByIndex retrieves messages based on indexed attributes.
func (db *Database) QueryMessagesByIndex(metricsChecked bool, cutOffTime time.Duration) ([]*Message, error) {
var messagesToUpdate []*Message
now := time.Now()

err := db.iterateIndex(metricsChecked, func(item *badger.Item) error {
key := item.Key()
_, messageID, err := parseMetricsCheckedIndexKey(key)
if err != nil {
return fmt.Errorf("failed to parse index key: %w", err)
}

message, err := db.processMessage(messageID, now, cutOffTime, func(m *Message) bool {
m.MetricsChecked = true
return true
})

if err != nil {
return err
}

if message != nil {
messagesToUpdate = append(messagesToUpdate, message)
}

return nil
Expand All @@ -199,5 +249,44 @@ func (db *Database) QueryMessagesByIndex(metricsChecked bool, cutOffTime time.Du
return nil, fmt.Errorf("failed to query messages: %w", err)
}

return messages, nil
if err := db.batchUpdateMessages(messagesToUpdate); err != nil {
return nil, fmt.Errorf("failed to batch update messages: %w", err)
}

return messagesToUpdate, nil
}

// RemoveObservationsByIndex removes observations from messages based on indexed attributes.
func (db *Database) RemoveObservationsByIndex(metricsChecked bool, cutOffTime time.Duration) error {
var messagesToUpdate []*Message
now := time.Now()

err := db.iterateIndex(metricsChecked, func(item *badger.Item) error {
key := item.Key()
_, messageID, err := parseMetricsCheckedIndexKey(key)
if err != nil {
return fmt.Errorf("failed to parse index key: %w", err)
}

message, err := db.processMessage(messageID, now, cutOffTime, func(m *Message) bool {
m.Observations = nil
return true
})

if err != nil {
return err
}

if message != nil {
messagesToUpdate = append(messagesToUpdate, message)
}

return nil
})

if err != nil {
return err
}

return db.batchUpdateMessages(messagesToUpdate)
}
58 changes: 45 additions & 13 deletions fly/pkg/db/message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,33 +63,65 @@ func TestQueryMessagesByIndex(t *testing.T) {
db := OpenDb(zap.NewNop(), nil)
defer db.db.Close()

// Create a message with LastObservedAt set to 30 hours ago
message0 := getMessage(0, time.Now().Add(-30*time.Hour), false)
// Store the time for consistent comparison
observedTime := time.Now().Add(-30 * time.Hour)

// Create messages
message0 := getMessage(0, observedTime, false)
err := db.SaveMessage(message0)
require.NoError(t, err)

// Create a message with LastObservedAt set to 30 hours ago but with metrics checked
message1 := getMessage(1, time.Now().Add(-30*time.Hour), true)
message1 := getMessage(1, observedTime, true)
err = db.SaveMessage(message1)
require.NoError(t, err)

// Create a message with LastObservedAt set to 10 hours ago
message2 := getMessage(2, time.Now().Add(-10*time.Hour), false)
err = db.SaveMessage(message2)
require.NoError(t, err)

// Query messages
result, err := db.QueryMessagesByIndex(false, 30*time.Hour)
require.NoError(t, err)
require.Len(t, result, 1, "expected 1 message")

// Check if the message0 is in the result
require.Equal(t, message0.MessageID, result[0].MessageID)
require.True(t, observedTime.Equal(result[0].LastObservedAt), "message0 should be found in the result set")
}

func TestRemoveObservationsByIndex(t *testing.T) {
db := OpenDb(zap.NewNop(), nil)
defer db.db.Close()

length := len(result)
require.Equal(t, 1, length, "expected 1 message")
testCases := []struct {
messageID int
timeOffset time.Duration
metricsChecked bool
expectEmpty bool
}{
{0, -49 * time.Hour, true, true},
{1, -40 * time.Hour, true, false},
{2, -50 * time.Hour, false, false},
{3, -72 * time.Hour, true, true},
{4, -96 * time.Hour, true, true},
}

for _, tc := range testCases {
message := getMessage(tc.messageID, time.Now().Add(tc.timeOffset), tc.metricsChecked)
err := db.SaveMessage(message)
require.NoError(t, err)
}

err := db.RemoveObservationsByIndex(true, 48*time.Hour)
require.NoError(t, err)

found := false
for _, msg := range result {
if msg.MessageID == message0.MessageID && msg.LastObservedAt.Equal(message0.LastObservedAt) && msg.MetricsChecked == true {
found = true
break
for _, tc := range testCases {
messageFromDb, err := db.GetMessage(fmt.Sprintf("messageId%d", tc.messageID))
require.NoError(t, err)
if tc.expectEmpty {
require.Empty(t, messageFromDb.Observations, "expected observations to be removed for message", tc.messageID)
} else {
require.NotEmpty(t, messageFromDb.Observations, "expected observations to be present for message", tc.messageID)
}
}
require.True(t, found, "message found in the result set")
}

0 comments on commit 3ea71d7

Please sign in to comment.