-
Notifications
You must be signed in to change notification settings - Fork 14
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
hum: fix tests hum: add tests for flushing hum: clean up hum: final clean up Signed-off-by: bingyuyap <bingyu.yap.21@gmail.com>
- Loading branch information
Showing
9 changed files
with
419 additions
and
112 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,73 @@ | ||
// Package bigtable provides a cache implementation for storing and retrieving messages and observations. | ||
package bigtable | ||
|
||
import ( | ||
"sync" | ||
|
||
"github.com/wormhole-foundation/wormhole-monitor/fly/pkg/types" | ||
) | ||
|
||
// ObservationCache is a thread-safe cache for storing messages and observations. | ||
type ObservationCache struct { | ||
Messages map[types.MessageID]*types.Message // Stores messages indexed by their ID | ||
Observations map[types.MessageID]map[string]*types.Observation // Stores observations indexed by message ID and guardian address | ||
mu sync.RWMutex // Mutex for ensuring thread-safety | ||
} | ||
|
||
// Lock acquires a write lock on the cache. | ||
func (c *ObservationCache) Lock() { | ||
c.mu.Lock() | ||
} | ||
|
||
// Unlock releases the write lock on the cache. | ||
func (c *ObservationCache) Unlock() { | ||
c.mu.Unlock() | ||
} | ||
|
||
// RLock acquires a read lock on the cache. | ||
func (c *ObservationCache) RLock() { | ||
c.mu.RLock() | ||
} | ||
|
||
// RUnlock releases the read lock on the cache. | ||
func (c *ObservationCache) RUnlock() { | ||
c.mu.RUnlock() | ||
} | ||
|
||
// GetMessage retrieves a message from the cache by its ID. | ||
// It returns the message and a boolean indicating whether the message was found. | ||
func (c *ObservationCache) GetMessage(messageID types.MessageID) (*types.Message, bool) { | ||
c.mu.RLock() | ||
defer c.mu.RUnlock() | ||
message, exists := c.Messages[messageID] | ||
return message, exists | ||
} | ||
|
||
// SetMessage adds or updates a message in the cache. | ||
func (c *ObservationCache) SetMessage(messageID types.MessageID, message *types.Message) { | ||
c.mu.Lock() | ||
defer c.mu.Unlock() | ||
c.Messages[messageID] = message | ||
} | ||
|
||
// GetObservation retrieves an observation from the cache by message ID and guardian address. | ||
// It returns the observation and a boolean indicating whether the observation was found. | ||
func (c *ObservationCache) GetObservation(messageID types.MessageID, guardianAddr string) (*types.Observation, bool) { | ||
c.mu.RLock() | ||
defer c.mu.RUnlock() | ||
if observations, exists := c.Observations[messageID]; exists { | ||
observation, exists := observations[guardianAddr] | ||
return observation, exists | ||
} | ||
return nil, false | ||
} | ||
|
||
// SetObservation adds or updates an observation in the cache. | ||
func (c *ObservationCache) SetObservation(messageID types.MessageID, guardianAddr string, observation *types.Observation) { | ||
c.mu.Lock() | ||
defer c.mu.Unlock() | ||
if _, exists := c.Observations[messageID]; !exists { | ||
c.Observations[messageID] = make(map[string]*types.Observation) | ||
} | ||
c.Observations[messageID][guardianAddr] = observation | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,113 @@ | ||
package bigtable | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"strconv" | ||
|
||
"cloud.google.com/go/bigtable" | ||
"github.com/wormhole-foundation/wormhole-monitor/fly/pkg/types" | ||
"go.uber.org/zap" | ||
) | ||
|
||
// Method to apply bulk mutations to bigtable | ||
// This method can perform both inserts and updates: | ||
// - If a row doesn't exist, it will be inserted | ||
// - If a row already exists, the specified columns will be updated | ||
func (db *BigtableDB) ApplyBulk(ctx context.Context, tableName string, rowKeys []string, muts []*bigtable.Mutation) error { | ||
if len(rowKeys) != len(muts) { | ||
return fmt.Errorf("mismatch between number of row keys (%d) and mutations (%d)", len(rowKeys), len(muts)) | ||
} | ||
|
||
table := db.client.Open(tableName) | ||
errs, err := table.ApplyBulk(ctx, rowKeys, muts) | ||
if err != nil { | ||
return fmt.Errorf("failed to apply bulk mutations: %v", err) | ||
} | ||
|
||
for i, err := range errs { | ||
if err != nil { | ||
return fmt.Errorf("failed to apply mutation for row %s: %v", rowKeys[i], err) | ||
} | ||
} | ||
|
||
return nil | ||
} | ||
|
||
// Takes the cached data and flush it to bigtable | ||
func (db *BigtableDB) FlushCache(ctx context.Context, logger *zap.Logger, cache *ObservationCache) error { | ||
// Prepare bulk mutations for messages | ||
messageMuts := make([]*bigtable.Mutation, 0, len(cache.Messages)) | ||
messageRows := make([]string, 0, len(cache.Messages)) | ||
|
||
// Prepare bulk mutations for observations | ||
observationMuts := make([]*bigtable.Mutation, 0) | ||
observationRows := make([]string, 0) | ||
|
||
for messageID, message := range cache.Messages { | ||
// Prepare message mutation | ||
messageMut, err := createMessageMutation(message) | ||
if err != nil { | ||
logger.Error("Failed to create message mutation", zap.String("messageID", string(messageID)), zap.Error(err)) | ||
continue | ||
} | ||
messageMuts = append(messageMuts, messageMut) | ||
messageRows = append(messageRows, string(messageID)) | ||
|
||
// Prepare observation mutations | ||
for _, observation := range cache.Observations[messageID] { | ||
observationMut, observationRow, err := createObservationMutation(observation) | ||
if err != nil { | ||
logger.Error("Failed to create observation mutation", zap.String("messageID", string(messageID)), zap.String("guardianAddr", observation.GuardianAddr), zap.Error(err)) | ||
continue | ||
} | ||
observationMuts = append(observationMuts, observationMut) | ||
observationRows = append(observationRows, observationRow) | ||
} | ||
} | ||
|
||
err := db.ApplyBulk(ctx, MessageTableName, messageRows, messageMuts) | ||
if err != nil { | ||
logger.Error("Failed to apply bulk mutations for messages", zap.Error(err)) | ||
return err | ||
} | ||
|
||
err = db.ApplyBulk(ctx, ObservationTableName, observationRows, observationMuts) | ||
if err != nil { | ||
logger.Error("Failed to apply bulk mutations for observations", zap.Error(err)) | ||
return err | ||
} | ||
|
||
return nil | ||
} | ||
|
||
// Mutation to update lastObservedAt and metricsChecked | ||
func createMessageMutation(message *types.Message) (*bigtable.Mutation, error) { | ||
lastObservedAtBytes, err := message.LastObservedAt.UTC().MarshalBinary() | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
mut := bigtable.NewMutation() | ||
mut.Set("messageData", "lastObservedAt", bigtable.Timestamp(0), lastObservedAtBytes) | ||
mut.Set("messageData", "metricsChecked", bigtable.Timestamp(0), []byte(strconv.FormatBool(message.MetricsChecked))) | ||
|
||
return mut, nil | ||
} | ||
|
||
// Mutation to update observation data | ||
func createObservationMutation(observation *types.Observation) (*bigtable.Mutation, string, error) { | ||
rowKey := string(observation.MessageID) + "_" + observation.GuardianAddr | ||
|
||
timeBinary, err := observation.ObservedAt.UTC().MarshalBinary() | ||
if err != nil { | ||
return nil, "", err | ||
} | ||
|
||
mut := bigtable.NewMutation() | ||
mut.Set("observationData", "signature", bigtable.Timestamp(0), []byte(observation.Signature)) | ||
mut.Set("observationData", "observedAt", bigtable.Timestamp(0), timeBinary) | ||
mut.Set("observationData", "status", bigtable.Timestamp(0), []byte(strconv.Itoa(int(observation.Status)))) | ||
|
||
return mut, rowKey, nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.