Skip to content

Commit

Permalink
hum: batch process obs
Browse files Browse the repository at this point in the history
hum: fix tests

hum: add tests for flushing

hum: clean up

hum: final clean up

Signed-off-by: bingyuyap <bingyu.yap.21@gmail.com>
  • Loading branch information
bingyuyap committed Sep 4, 2024
1 parent d522ad1 commit 28f6551
Show file tree
Hide file tree
Showing 9 changed files with 419 additions and 112 deletions.
38 changes: 27 additions & 11 deletions fly/cmd/historical_uptime/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ func main() {
defer rootCtxCancel()

// Inbound observations
obsvC := make(chan *node_common.MsgWithTimeStamp[gossipv1.SignedObservation], 20000)
obsvC := make(chan *node_common.MsgWithTimeStamp[gossipv1.SignedObservation], 1024)
batchObsvC := make(chan *node_common.MsgWithTimeStamp[gossipv1.SignedObservationBatch], 1024)

// Add channel capacity checks
Expand Down Expand Up @@ -303,23 +303,39 @@ func main() {
}
}()

// WIP(bing): add metrics for guardian observations
batchSize := 100
observationBatch := make([]*types.Observation, 0, batchSize)

go func() {
ticker := time.NewTicker(5 * time.Second)
for {
select {
case <-rootCtx.Done():
return
case o := <-obsvC: // TODO: Rip out this code once we cut over to batching.
obs := &gossipv1.Observation{
Hash: o.Msg.Hash,
Signature: o.Msg.Signature,
TxHash: o.Msg.TxHash,
MessageId: o.Msg.MessageId,
case o := <-obsvC:
obs := historical_uptime.CreateNewObservation(o.Msg.MessageId, o.Msg.Addr, o.Timestamp, o.Msg.Addr)

observationBatch = append(observationBatch, obs)

// if it reaches batchSize then process this batch
if len(observationBatch) >= batchSize {
historical_uptime.ProcessObservationBatch(*db, logger, observationBatch)
observationBatch = observationBatch[:0] // Clear the batch
}
historical_uptime.ProcessObservation(*db, logger, o.Timestamp, o.Msg.Addr, obs)
case batch := <-batchObsvC:
for _, o := range batch.Msg.Observations {
historical_uptime.ProcessObservation(*db, logger, batch.Timestamp, batch.Msg.Addr, o)
// process immediately since batches are in group
batchObservations := make([]*types.Observation, 0, len(batch.Msg.Observations))
for _, signedObs := range batch.Msg.Observations {
obs := historical_uptime.CreateNewObservation(signedObs.MessageId, signedObs.Signature, batch.Timestamp, signedObs.TxHash)
batchObservations = append(batchObservations, obs)
}
historical_uptime.ProcessObservationBatch(*db, logger, batchObservations)

case <-ticker.C:
// for every interval, process the batch
if len(observationBatch) > 0 {
historical_uptime.ProcessObservationBatch(*db, logger, observationBatch)
observationBatch = observationBatch[:0] // Clear the batch
}
}
}
Expand Down
73 changes: 73 additions & 0 deletions fly/pkg/bigtable/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// Package bigtable provides a cache implementation for storing and retrieving messages and observations.
package bigtable

import (
"sync"

"github.com/wormhole-foundation/wormhole-monitor/fly/pkg/types"
)

// ObservationCache is a thread-safe cache for storing messages and observations.
type ObservationCache struct {
Messages map[types.MessageID]*types.Message // Stores messages indexed by their ID
Observations map[types.MessageID]map[string]*types.Observation // Stores observations indexed by message ID and guardian address
mu sync.RWMutex // Mutex for ensuring thread-safety
}

// Lock acquires a write lock on the cache.
func (c *ObservationCache) Lock() {
c.mu.Lock()
}

// Unlock releases the write lock on the cache.
func (c *ObservationCache) Unlock() {
c.mu.Unlock()
}

// RLock acquires a read lock on the cache.
func (c *ObservationCache) RLock() {
c.mu.RLock()
}

// RUnlock releases the read lock on the cache.
func (c *ObservationCache) RUnlock() {
c.mu.RUnlock()
}

// GetMessage retrieves a message from the cache by its ID.
// It returns the message and a boolean indicating whether the message was found.
func (c *ObservationCache) GetMessage(messageID types.MessageID) (*types.Message, bool) {
c.mu.RLock()
defer c.mu.RUnlock()
message, exists := c.Messages[messageID]
return message, exists
}

// SetMessage adds or updates a message in the cache.
func (c *ObservationCache) SetMessage(messageID types.MessageID, message *types.Message) {
c.mu.Lock()
defer c.mu.Unlock()
c.Messages[messageID] = message
}

// GetObservation retrieves an observation from the cache by message ID and guardian address.
// It returns the observation and a boolean indicating whether the observation was found.
func (c *ObservationCache) GetObservation(messageID types.MessageID, guardianAddr string) (*types.Observation, bool) {
c.mu.RLock()
defer c.mu.RUnlock()
if observations, exists := c.Observations[messageID]; exists {
observation, exists := observations[guardianAddr]
return observation, exists
}
return nil, false
}

// SetObservation adds or updates an observation in the cache.
func (c *ObservationCache) SetObservation(messageID types.MessageID, guardianAddr string, observation *types.Observation) {
c.mu.Lock()
defer c.mu.Unlock()
if _, exists := c.Observations[messageID]; !exists {
c.Observations[messageID] = make(map[string]*types.Observation)
}
c.Observations[messageID][guardianAddr] = observation
}
1 change: 1 addition & 0 deletions fly/pkg/bigtable/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ func NewBigtableDB(ctx context.Context, projectID, instanceID, credentialsFile,
client, err = bigtable.NewClient(ctx, projectID, instanceID, option.WithCredentialsFile(credentialsFile))
} else if useBigtableEmulator && emulatorHost != "" {
client, err = bigtable.NewClient(ctx, projectID, instanceID, option.WithoutAuthentication(), option.WithEndpoint(emulatorHost))
SetupEmulator()
} else {
return nil, errors.New("invalid Bigtable configuration, if using emulator, set emulatorHost, else set credentialsFile")
}
Expand Down
29 changes: 0 additions & 29 deletions fly/pkg/bigtable/message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,32 +265,3 @@ func TestGetUnprocessedMessagesBeforeCutOffTime(t *testing.T) {
assert.NoError(t, err)
assert.Len(t, messageIndexes, 0)
}

func TestSaveMessages(t *testing.T) {
ctx := context.Background()
defer ClearTables()

messages := []*types.Message{}
for i := 0; i < 10; i++ {
messageID := utils.GenerateRandomID()
message := types.Message{
MessageID: types.MessageID(messageID),
LastObservedAt: time.Now(),
MetricsChecked: true,
}
messages = append(messages, &message)
}

err := db.SaveMessages(ctx, messages)
assert.NoError(t, err)

// Verify that the messages are saved correctly
for _, message := range messages {
msg, err := db.GetMessage(ctx, message.MessageID)
assert.NoError(t, err)
assert.NotNil(t, msg)
assert.Equal(t, message.MessageID, msg.MessageID)
assert.Equal(t, message.LastObservedAt.Unix(), msg.LastObservedAt.Unix())
assert.Equal(t, message.MetricsChecked, msg.MetricsChecked)
}
}
23 changes: 0 additions & 23 deletions fly/pkg/bigtable/observation.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,29 +116,6 @@ func (db *BigtableDB) bigtableRowToObservation(row bigtable.Row) (*types.Observa
return &observation, nil
}

func (db *BigtableDB) GetObservation(ctx context.Context, messageID, guardianAddr string) (*types.Observation, error) {
tableName := ObservationTableName
rowKey := messageID + "_" + guardianAddr

table := db.client.Open(tableName)
row, err := table.ReadRow(ctx, rowKey)
if err != nil {
return nil, fmt.Errorf("failed to read observation: %v", err)
}

if len(row) == 0 {
return nil, fmt.Errorf("observation not found: %s", rowKey)
}

var observation *types.Observation
observation, err = db.bigtableRowToObservation(row)
if err != nil {
return nil, fmt.Errorf("failed to convert row to observation: %v", err)
}

return observation, nil
}

func (db *BigtableDB) GetObservationsByMessageID(ctx context.Context, messageID string) ([]*types.Observation, error) {
tableName := ObservationTableName
prefix := messageID + "_"
Expand Down
113 changes: 113 additions & 0 deletions fly/pkg/bigtable/operations.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package bigtable

import (
"context"
"fmt"
"strconv"

"cloud.google.com/go/bigtable"
"github.com/wormhole-foundation/wormhole-monitor/fly/pkg/types"
"go.uber.org/zap"
)

// Method to apply bulk mutations to bigtable
// This method can perform both inserts and updates:
// - If a row doesn't exist, it will be inserted
// - If a row already exists, the specified columns will be updated
func (db *BigtableDB) ApplyBulk(ctx context.Context, tableName string, rowKeys []string, muts []*bigtable.Mutation) error {
if len(rowKeys) != len(muts) {
return fmt.Errorf("mismatch between number of row keys (%d) and mutations (%d)", len(rowKeys), len(muts))
}

table := db.client.Open(tableName)
errs, err := table.ApplyBulk(ctx, rowKeys, muts)
if err != nil {
return fmt.Errorf("failed to apply bulk mutations: %v", err)
}

for i, err := range errs {
if err != nil {
return fmt.Errorf("failed to apply mutation for row %s: %v", rowKeys[i], err)
}
}

return nil
}

// Takes the cached data and flush it to bigtable
func (db *BigtableDB) FlushCache(ctx context.Context, logger *zap.Logger, cache *ObservationCache) error {
// Prepare bulk mutations for messages
messageMuts := make([]*bigtable.Mutation, 0, len(cache.Messages))
messageRows := make([]string, 0, len(cache.Messages))

// Prepare bulk mutations for observations
observationMuts := make([]*bigtable.Mutation, 0)
observationRows := make([]string, 0)

for messageID, message := range cache.Messages {
// Prepare message mutation
messageMut, err := createMessageMutation(message)
if err != nil {
logger.Error("Failed to create message mutation", zap.String("messageID", string(messageID)), zap.Error(err))
continue
}
messageMuts = append(messageMuts, messageMut)
messageRows = append(messageRows, string(messageID))

// Prepare observation mutations
for _, observation := range cache.Observations[messageID] {
observationMut, observationRow, err := createObservationMutation(observation)
if err != nil {
logger.Error("Failed to create observation mutation", zap.String("messageID", string(messageID)), zap.String("guardianAddr", observation.GuardianAddr), zap.Error(err))
continue
}
observationMuts = append(observationMuts, observationMut)
observationRows = append(observationRows, observationRow)
}
}

err := db.ApplyBulk(ctx, MessageTableName, messageRows, messageMuts)
if err != nil {
logger.Error("Failed to apply bulk mutations for messages", zap.Error(err))
return err
}

err = db.ApplyBulk(ctx, ObservationTableName, observationRows, observationMuts)
if err != nil {
logger.Error("Failed to apply bulk mutations for observations", zap.Error(err))
return err
}

return nil
}

// Mutation to update lastObservedAt and metricsChecked
func createMessageMutation(message *types.Message) (*bigtable.Mutation, error) {
lastObservedAtBytes, err := message.LastObservedAt.UTC().MarshalBinary()
if err != nil {
return nil, err
}

mut := bigtable.NewMutation()
mut.Set("messageData", "lastObservedAt", bigtable.Timestamp(0), lastObservedAtBytes)
mut.Set("messageData", "metricsChecked", bigtable.Timestamp(0), []byte(strconv.FormatBool(message.MetricsChecked)))

return mut, nil
}

// Mutation to update observation data
func createObservationMutation(observation *types.Observation) (*bigtable.Mutation, string, error) {
rowKey := string(observation.MessageID) + "_" + observation.GuardianAddr

timeBinary, err := observation.ObservedAt.UTC().MarshalBinary()
if err != nil {
return nil, "", err
}

mut := bigtable.NewMutation()
mut.Set("observationData", "signature", bigtable.Timestamp(0), []byte(observation.Signature))
mut.Set("observationData", "observedAt", bigtable.Timestamp(0), timeBinary)
mut.Set("observationData", "status", bigtable.Timestamp(0), []byte(strconv.Itoa(int(observation.Status))))

return mut, rowKey, nil
}
3 changes: 3 additions & 0 deletions fly/pkg/bigtable/test_setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ func SetupEmulator() error {
}
defer adminClient.Close()

// Clear everything and then recreate the tables for a clean slate
CleanUp()

tables := []struct {
name string
columnFamilies []string
Expand Down
Loading

0 comments on commit 28f6551

Please sign in to comment.