Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

hum: batch process obs #371

Merged
merged 4 commits into from
Sep 4, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 27 additions & 11 deletions fly/cmd/historical_uptime/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ func main() {
defer rootCtxCancel()

// Inbound observations
obsvC := make(chan *node_common.MsgWithTimeStamp[gossipv1.SignedObservation], 20000)
obsvC := make(chan *node_common.MsgWithTimeStamp[gossipv1.SignedObservation], 1024)
batchObsvC := make(chan *node_common.MsgWithTimeStamp[gossipv1.SignedObservationBatch], 1024)

// Add channel capacity checks
Expand Down Expand Up @@ -303,23 +303,39 @@ func main() {
}
}()

// WIP(bing): add metrics for guardian observations
batchSize := 100
observationBatch := make([]*types.Observation, 0, batchSize)

go func() {
ticker := time.NewTicker(5 * time.Second)
for {
select {
case <-rootCtx.Done():
bingyuyap marked this conversation as resolved.
Show resolved Hide resolved
return
case o := <-obsvC: // TODO: Rip out this code once we cut over to batching.
obs := &gossipv1.Observation{
Hash: o.Msg.Hash,
Signature: o.Msg.Signature,
TxHash: o.Msg.TxHash,
MessageId: o.Msg.MessageId,
case o := <-obsvC:
bingyuyap marked this conversation as resolved.
Show resolved Hide resolved
obs := historical_uptime.CreateNewObservation(o.Msg.MessageId, o.Msg.Addr, o.Timestamp, o.Msg.Addr)

observationBatch = append(observationBatch, obs)

// if it reaches batchSize then process this batch
if len(observationBatch) >= batchSize {
historical_uptime.ProcessObservationBatch(*db, logger, observationBatch)
observationBatch = observationBatch[:0] // Clear the batch
}
historical_uptime.ProcessObservation(*db, logger, o.Timestamp, o.Msg.Addr, obs)
case batch := <-batchObsvC:
for _, o := range batch.Msg.Observations {
historical_uptime.ProcessObservation(*db, logger, batch.Timestamp, batch.Msg.Addr, o)
// process immediately since batches are in group
bingyuyap marked this conversation as resolved.
Show resolved Hide resolved
batchObservations := make([]*types.Observation, 0, len(batch.Msg.Observations))
for _, signedObs := range batch.Msg.Observations {
obs := historical_uptime.CreateNewObservation(signedObs.MessageId, signedObs.Signature, batch.Timestamp, signedObs.TxHash)
batchObservations = append(batchObservations, obs)
}
historical_uptime.ProcessObservationBatch(*db, logger, batchObservations)

case <-ticker.C:
// for every interval, process the batch
if len(observationBatch) > 0 {
historical_uptime.ProcessObservationBatch(*db, logger, observationBatch)
observationBatch = observationBatch[:0] // Clear the batch
}
}
}
Expand Down
73 changes: 73 additions & 0 deletions fly/pkg/bigtable/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// Package bigtable provides a cache implementation for storing and retrieving messages and observations.
package bigtable

import (
"sync"

"github.com/wormhole-foundation/wormhole-monitor/fly/pkg/types"
)

// ObservationCache is a thread-safe cache for storing messages and observations.
type ObservationCache struct {
Messages map[types.MessageID]*types.Message // Stores messages indexed by their ID
Observations map[types.MessageID]map[string]*types.Observation // Stores observations indexed by message ID and guardian address
mu sync.RWMutex // Mutex for ensuring thread-safety
}

// Lock acquires a write lock on the cache.
func (c *ObservationCache) Lock() {
c.mu.Lock()
}

// Unlock releases the write lock on the cache.
func (c *ObservationCache) Unlock() {
c.mu.Unlock()
}

// RLock acquires a read lock on the cache.
func (c *ObservationCache) RLock() {
c.mu.RLock()
}

// RUnlock releases the read lock on the cache.
func (c *ObservationCache) RUnlock() {
c.mu.RUnlock()
}

// GetMessage retrieves a message from the cache by its ID.
// It returns the message and a boolean indicating whether the message was found.
func (c *ObservationCache) GetMessage(messageID types.MessageID) (*types.Message, bool) {
c.mu.RLock()
defer c.mu.RUnlock()
message, exists := c.Messages[messageID]
return message, exists
}

// SetMessage adds or updates a message in the cache.
func (c *ObservationCache) SetMessage(messageID types.MessageID, message *types.Message) {
c.mu.Lock()
defer c.mu.Unlock()
c.Messages[messageID] = message
}

// GetObservation retrieves an observation from the cache by message ID and guardian address.
// It returns the observation and a boolean indicating whether the observation was found.
func (c *ObservationCache) GetObservation(messageID types.MessageID, guardianAddr string) (*types.Observation, bool) {
c.mu.RLock()
defer c.mu.RUnlock()
if observations, exists := c.Observations[messageID]; exists {
observation, exists := observations[guardianAddr]
bingyuyap marked this conversation as resolved.
Show resolved Hide resolved
return observation, exists
}
return nil, false
}

// SetObservation adds or updates an observation in the cache.
func (c *ObservationCache) SetObservation(messageID types.MessageID, guardianAddr string, observation *types.Observation) {
c.mu.Lock()
defer c.mu.Unlock()
if _, exists := c.Observations[messageID]; !exists {
c.Observations[messageID] = make(map[string]*types.Observation)
}
c.Observations[messageID][guardianAddr] = observation
}
1 change: 1 addition & 0 deletions fly/pkg/bigtable/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ func NewBigtableDB(ctx context.Context, projectID, instanceID, credentialsFile,
client, err = bigtable.NewClient(ctx, projectID, instanceID, option.WithCredentialsFile(credentialsFile))
} else if useBigtableEmulator && emulatorHost != "" {
client, err = bigtable.NewClient(ctx, projectID, instanceID, option.WithoutAuthentication(), option.WithEndpoint(emulatorHost))
SetupEmulator()
} else {
return nil, errors.New("invalid Bigtable configuration, if using emulator, set emulatorHost, else set credentialsFile")
}
Expand Down
29 changes: 0 additions & 29 deletions fly/pkg/bigtable/message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,32 +265,3 @@ func TestGetUnprocessedMessagesBeforeCutOffTime(t *testing.T) {
assert.NoError(t, err)
assert.Len(t, messageIndexes, 0)
}

func TestSaveMessages(t *testing.T) {
ctx := context.Background()
defer ClearTables()

messages := []*types.Message{}
for i := 0; i < 10; i++ {
messageID := utils.GenerateRandomID()
message := types.Message{
MessageID: types.MessageID(messageID),
LastObservedAt: time.Now(),
MetricsChecked: true,
}
messages = append(messages, &message)
}

err := db.SaveMessages(ctx, messages)
assert.NoError(t, err)

// Verify that the messages are saved correctly
for _, message := range messages {
msg, err := db.GetMessage(ctx, message.MessageID)
assert.NoError(t, err)
assert.NotNil(t, msg)
assert.Equal(t, message.MessageID, msg.MessageID)
assert.Equal(t, message.LastObservedAt.Unix(), msg.LastObservedAt.Unix())
assert.Equal(t, message.MetricsChecked, msg.MetricsChecked)
}
}
23 changes: 0 additions & 23 deletions fly/pkg/bigtable/observation.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,29 +116,6 @@ func (db *BigtableDB) bigtableRowToObservation(row bigtable.Row) (*types.Observa
return &observation, nil
}

func (db *BigtableDB) GetObservation(ctx context.Context, messageID, guardianAddr string) (*types.Observation, error) {
tableName := ObservationTableName
rowKey := messageID + "_" + guardianAddr

table := db.client.Open(tableName)
row, err := table.ReadRow(ctx, rowKey)
if err != nil {
return nil, fmt.Errorf("failed to read observation: %v", err)
}

if len(row) == 0 {
return nil, fmt.Errorf("observation not found: %s", rowKey)
}

var observation *types.Observation
observation, err = db.bigtableRowToObservation(row)
if err != nil {
return nil, fmt.Errorf("failed to convert row to observation: %v", err)
}

return observation, nil
}

func (db *BigtableDB) GetObservationsByMessageID(ctx context.Context, messageID string) ([]*types.Observation, error) {
tableName := ObservationTableName
prefix := messageID + "_"
Expand Down
113 changes: 113 additions & 0 deletions fly/pkg/bigtable/operations.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package bigtable

import (
"context"
"fmt"
"strconv"

"cloud.google.com/go/bigtable"
"github.com/wormhole-foundation/wormhole-monitor/fly/pkg/types"
"go.uber.org/zap"
)

// Method to apply bulk mutations to bigtable
// This method can perform both inserts and updates:
// - If a row doesn't exist, it will be inserted
// - If a row already exists, the specified columns will be updated
func (db *BigtableDB) ApplyBulk(ctx context.Context, tableName string, rowKeys []string, muts []*bigtable.Mutation) error {
if len(rowKeys) != len(muts) {
return fmt.Errorf("mismatch between number of row keys (%d) and mutations (%d)", len(rowKeys), len(muts))
}

table := db.client.Open(tableName)
errs, err := table.ApplyBulk(ctx, rowKeys, muts)
if err != nil {
return fmt.Errorf("failed to apply bulk mutations: %v", err)
}

for i, err := range errs {
if err != nil {
return fmt.Errorf("failed to apply mutation for row %s: %v", rowKeys[i], err)
}
}

return nil
}

// Takes the cached data and flush it to bigtable
func (db *BigtableDB) FlushCache(ctx context.Context, logger *zap.Logger, cache *ObservationCache) error {
// Prepare bulk mutations for messages
messageMuts := make([]*bigtable.Mutation, 0, len(cache.Messages))
messageRows := make([]string, 0, len(cache.Messages))

// Prepare bulk mutations for observations
observationMuts := make([]*bigtable.Mutation, 0)
observationRows := make([]string, 0)

for messageID, message := range cache.Messages {
// Prepare message mutation
messageMut, err := createMessageMutation(message)
if err != nil {
logger.Error("Failed to create message mutation", zap.String("messageID", string(messageID)), zap.Error(err))
continue
}
messageMuts = append(messageMuts, messageMut)
messageRows = append(messageRows, string(messageID))

// Prepare observation mutations
for _, observation := range cache.Observations[messageID] {
observationMut, observationRow, err := createObservationMutation(observation)
if err != nil {
logger.Error("Failed to create observation mutation", zap.String("messageID", string(messageID)), zap.String("guardianAddr", observation.GuardianAddr), zap.Error(err))
continue
}
observationMuts = append(observationMuts, observationMut)
observationRows = append(observationRows, observationRow)
}
}

err := db.ApplyBulk(ctx, MessageTableName, messageRows, messageMuts)
if err != nil {
logger.Error("Failed to apply bulk mutations for messages", zap.Error(err))
return err
}

err = db.ApplyBulk(ctx, ObservationTableName, observationRows, observationMuts)
if err != nil {
logger.Error("Failed to apply bulk mutations for observations", zap.Error(err))
return err
}

return nil
}

// Mutation to update lastObservedAt and metricsChecked
func createMessageMutation(message *types.Message) (*bigtable.Mutation, error) {
lastObservedAtBytes, err := message.LastObservedAt.UTC().MarshalBinary()
if err != nil {
return nil, err
}

mut := bigtable.NewMutation()
mut.Set("messageData", "lastObservedAt", bigtable.Timestamp(0), lastObservedAtBytes)
mut.Set("messageData", "metricsChecked", bigtable.Timestamp(0), []byte(strconv.FormatBool(message.MetricsChecked)))

return mut, nil
}

// Mutation to update observation data
func createObservationMutation(observation *types.Observation) (*bigtable.Mutation, string, error) {
rowKey := string(observation.MessageID) + "_" + observation.GuardianAddr

timeBinary, err := observation.ObservedAt.UTC().MarshalBinary()
if err != nil {
return nil, "", err
}

mut := bigtable.NewMutation()
mut.Set("observationData", "signature", bigtable.Timestamp(0), []byte(observation.Signature))
mut.Set("observationData", "observedAt", bigtable.Timestamp(0), timeBinary)
mut.Set("observationData", "status", bigtable.Timestamp(0), []byte(strconv.Itoa(int(observation.Status))))

return mut, rowKey, nil
}
3 changes: 3 additions & 0 deletions fly/pkg/bigtable/test_setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ func SetupEmulator() error {
}
defer adminClient.Close()

// Clear everything and then recreate the tables for a clean slate
CleanUp()

tables := []struct {
name string
columnFamilies []string
Expand Down
Loading
Loading