diff --git a/fly/cmd/historical_uptime/main.go b/fly/cmd/historical_uptime/main.go index e818522b..ae6f2785 100644 --- a/fly/cmd/historical_uptime/main.go +++ b/fly/cmd/historical_uptime/main.go @@ -259,7 +259,7 @@ func main() { defer rootCtxCancel() // Inbound observations - obsvC := make(chan *node_common.MsgWithTimeStamp[gossipv1.SignedObservation], 20000) + obsvC := make(chan *node_common.MsgWithTimeStamp[gossipv1.SignedObservation], 1024) batchObsvC := make(chan *node_common.MsgWithTimeStamp[gossipv1.SignedObservationBatch], 1024) // Add channel capacity checks @@ -304,23 +304,39 @@ func main() { } }() - // WIP(bing): add metrics for guardian observations + batchSize := 100 + observationBatch := make([]*types.Observation, 0, batchSize) + go func() { + ticker := time.NewTicker(5 * time.Second) for { select { case <-rootCtx.Done(): return - case o := <-obsvC: // TODO: Rip out this code once we cut over to batching. - obs := &gossipv1.Observation{ - Hash: o.Msg.Hash, - Signature: o.Msg.Signature, - TxHash: o.Msg.TxHash, - MessageId: o.Msg.MessageId, + case o := <-obsvC: + obs := historical_uptime.CreateNewObservation(o.Msg.MessageId, o.Msg.Addr, o.Timestamp, o.Msg.Addr) + + observationBatch = append(observationBatch, obs) + + // if it reaches batchSize then process this batch + if len(observationBatch) >= batchSize { + historical_uptime.ProcessObservationBatch(*db, logger, observationBatch) + observationBatch = observationBatch[:0] // Clear the batch } - historical_uptime.ProcessObservation(*db, logger, o.Timestamp, o.Msg.Addr, obs) case batch := <-batchObsvC: - for _, o := range batch.Msg.Observations { - historical_uptime.ProcessObservation(*db, logger, batch.Timestamp, batch.Msg.Addr, o) + // process immediately since batches are in group + batchObservations := make([]*types.Observation, 0, len(batch.Msg.Observations)) + for _, signedObs := range batch.Msg.Observations { + obs := historical_uptime.CreateNewObservation(signedObs.MessageId, signedObs.Signature, batch.Timestamp, signedObs.TxHash) + batchObservations = append(batchObservations, obs) + } + historical_uptime.ProcessObservationBatch(*db, logger, batchObservations) + + case <-ticker.C: + // for every interval, process the batch + if len(observationBatch) > 0 { + historical_uptime.ProcessObservationBatch(*db, logger, observationBatch) + observationBatch = observationBatch[:0] // Clear the batch } } } diff --git a/fly/pkg/bigtable/cache.go b/fly/pkg/bigtable/cache.go new file mode 100644 index 00000000..898bfc25 --- /dev/null +++ b/fly/pkg/bigtable/cache.go @@ -0,0 +1,73 @@ +// Package bigtable provides a cache implementation for storing and retrieving messages and observations. +package bigtable + +import ( + "sync" + + "github.com/wormhole-foundation/wormhole-monitor/fly/pkg/types" +) + +// ObservationCache is a thread-safe cache for storing messages and observations. +type ObservationCache struct { + Messages map[types.MessageID]*types.Message // Stores messages indexed by their ID + Observations map[types.MessageID]map[string]*types.Observation // Stores observations indexed by message ID and guardian address + mu sync.RWMutex // Mutex for ensuring thread-safety +} + +// Lock acquires a write lock on the cache. +func (c *ObservationCache) Lock() { + c.mu.Lock() +} + +// Unlock releases the write lock on the cache. +func (c *ObservationCache) Unlock() { + c.mu.Unlock() +} + +// RLock acquires a read lock on the cache. +func (c *ObservationCache) RLock() { + c.mu.RLock() +} + +// RUnlock releases the read lock on the cache. +func (c *ObservationCache) RUnlock() { + c.mu.RUnlock() +} + +// GetMessage retrieves a message from the cache by its ID. +// It returns the message and a boolean indicating whether the message was found. +func (c *ObservationCache) GetMessage(messageID types.MessageID) (*types.Message, bool) { + c.mu.RLock() + defer c.mu.RUnlock() + message, exists := c.Messages[messageID] + return message, exists +} + +// SetMessage adds or updates a message in the cache. +func (c *ObservationCache) SetMessage(messageID types.MessageID, message *types.Message) { + c.mu.Lock() + defer c.mu.Unlock() + c.Messages[messageID] = message +} + +// GetObservation retrieves an observation from the cache by message ID and guardian address. +// It returns the observation and a boolean indicating whether the observation was found. +func (c *ObservationCache) GetObservation(messageID types.MessageID, guardianAddr string) (*types.Observation, bool) { + c.mu.RLock() + defer c.mu.RUnlock() + if observations, exists := c.Observations[messageID]; exists { + observation, exists := observations[guardianAddr] + return observation, exists + } + return nil, false +} + +// SetObservation adds or updates an observation in the cache. +func (c *ObservationCache) SetObservation(messageID types.MessageID, guardianAddr string, observation *types.Observation) { + c.mu.Lock() + defer c.mu.Unlock() + if _, exists := c.Observations[messageID]; !exists { + c.Observations[messageID] = make(map[string]*types.Observation) + } + c.Observations[messageID][guardianAddr] = observation +} diff --git a/fly/pkg/bigtable/message.go b/fly/pkg/bigtable/message.go index a11a480f..3565c3a8 100644 --- a/fly/pkg/bigtable/message.go +++ b/fly/pkg/bigtable/message.go @@ -30,6 +30,7 @@ func NewBigtableDB(ctx context.Context, projectID, instanceID, credentialsFile, client, err = bigtable.NewClient(ctx, projectID, instanceID, option.WithCredentialsFile(credentialsFile)) } else if useBigtableEmulator && emulatorHost != "" { client, err = bigtable.NewClient(ctx, projectID, instanceID, option.WithoutAuthentication(), option.WithEndpoint(emulatorHost)) + SetupEmulator() } else { return nil, errors.New("invalid Bigtable configuration, if using emulator, set emulatorHost, else set credentialsFile") } diff --git a/fly/pkg/bigtable/message_test.go b/fly/pkg/bigtable/message_test.go index 30c5a7d1..83cc6f5c 100644 --- a/fly/pkg/bigtable/message_test.go +++ b/fly/pkg/bigtable/message_test.go @@ -265,32 +265,3 @@ func TestGetUnprocessedMessagesBeforeCutOffTime(t *testing.T) { assert.NoError(t, err) assert.Len(t, messageIndexes, 0) } - -func TestSaveMessages(t *testing.T) { - ctx := context.Background() - defer ClearTables() - - messages := []*types.Message{} - for i := 0; i < 10; i++ { - messageID := utils.GenerateRandomID() - message := types.Message{ - MessageID: types.MessageID(messageID), - LastObservedAt: time.Now(), - MetricsChecked: true, - } - messages = append(messages, &message) - } - - err := db.SaveMessages(ctx, messages) - assert.NoError(t, err) - - // Verify that the messages are saved correctly - for _, message := range messages { - msg, err := db.GetMessage(ctx, message.MessageID) - assert.NoError(t, err) - assert.NotNil(t, msg) - assert.Equal(t, message.MessageID, msg.MessageID) - assert.Equal(t, message.LastObservedAt.Unix(), msg.LastObservedAt.Unix()) - assert.Equal(t, message.MetricsChecked, msg.MetricsChecked) - } -} diff --git a/fly/pkg/bigtable/observation.go b/fly/pkg/bigtable/observation.go index 49598561..42092ceb 100644 --- a/fly/pkg/bigtable/observation.go +++ b/fly/pkg/bigtable/observation.go @@ -116,29 +116,6 @@ func (db *BigtableDB) bigtableRowToObservation(row bigtable.Row) (*types.Observa return &observation, nil } -func (db *BigtableDB) GetObservation(ctx context.Context, messageID, guardianAddr string) (*types.Observation, error) { - tableName := ObservationTableName - rowKey := messageID + "_" + guardianAddr - - table := db.client.Open(tableName) - row, err := table.ReadRow(ctx, rowKey) - if err != nil { - return nil, fmt.Errorf("failed to read observation: %v", err) - } - - if len(row) == 0 { - return nil, fmt.Errorf("observation not found: %s", rowKey) - } - - var observation *types.Observation - observation, err = db.bigtableRowToObservation(row) - if err != nil { - return nil, fmt.Errorf("failed to convert row to observation: %v", err) - } - - return observation, nil -} - func (db *BigtableDB) GetObservationsByMessageID(ctx context.Context, messageID string) ([]*types.Observation, error) { tableName := ObservationTableName prefix := messageID + "_" diff --git a/fly/pkg/bigtable/operations.go b/fly/pkg/bigtable/operations.go new file mode 100644 index 00000000..fae8b9e1 --- /dev/null +++ b/fly/pkg/bigtable/operations.go @@ -0,0 +1,113 @@ +package bigtable + +import ( + "context" + "fmt" + "strconv" + + "cloud.google.com/go/bigtable" + "github.com/wormhole-foundation/wormhole-monitor/fly/pkg/types" + "go.uber.org/zap" +) + +// Method to apply bulk mutations to bigtable +// This method can perform both inserts and updates: +// - If a row doesn't exist, it will be inserted +// - If a row already exists, the specified columns will be updated +func (db *BigtableDB) ApplyBulk(ctx context.Context, tableName string, rowKeys []string, muts []*bigtable.Mutation) error { + if len(rowKeys) != len(muts) { + return fmt.Errorf("mismatch between number of row keys (%d) and mutations (%d)", len(rowKeys), len(muts)) + } + + table := db.client.Open(tableName) + errs, err := table.ApplyBulk(ctx, rowKeys, muts) + if err != nil { + return fmt.Errorf("failed to apply bulk mutations: %v", err) + } + + for i, err := range errs { + if err != nil { + return fmt.Errorf("failed to apply mutation for row %s: %v", rowKeys[i], err) + } + } + + return nil +} + +// Takes the cached data and flush it to bigtable +func (db *BigtableDB) FlushCache(ctx context.Context, logger *zap.Logger, cache *ObservationCache) error { + // Prepare bulk mutations for messages + messageMuts := make([]*bigtable.Mutation, 0, len(cache.Messages)) + messageRows := make([]string, 0, len(cache.Messages)) + + // Prepare bulk mutations for observations + observationMuts := make([]*bigtable.Mutation, 0) + observationRows := make([]string, 0) + + for messageID, message := range cache.Messages { + // Prepare message mutation + messageMut, err := createMessageMutation(message) + if err != nil { + logger.Error("Failed to create message mutation", zap.String("messageID", string(messageID)), zap.Error(err)) + continue + } + messageMuts = append(messageMuts, messageMut) + messageRows = append(messageRows, string(messageID)) + + // Prepare observation mutations + for _, observation := range cache.Observations[messageID] { + observationMut, observationRow, err := createObservationMutation(observation) + if err != nil { + logger.Error("Failed to create observation mutation", zap.String("messageID", string(messageID)), zap.String("guardianAddr", observation.GuardianAddr), zap.Error(err)) + continue + } + observationMuts = append(observationMuts, observationMut) + observationRows = append(observationRows, observationRow) + } + } + + err := db.ApplyBulk(ctx, MessageTableName, messageRows, messageMuts) + if err != nil { + logger.Error("Failed to apply bulk mutations for messages", zap.Error(err)) + return err + } + + err = db.ApplyBulk(ctx, ObservationTableName, observationRows, observationMuts) + if err != nil { + logger.Error("Failed to apply bulk mutations for observations", zap.Error(err)) + return err + } + + return nil +} + +// Mutation to update lastObservedAt and metricsChecked +func createMessageMutation(message *types.Message) (*bigtable.Mutation, error) { + lastObservedAtBytes, err := message.LastObservedAt.UTC().MarshalBinary() + if err != nil { + return nil, err + } + + mut := bigtable.NewMutation() + mut.Set("messageData", "lastObservedAt", bigtable.Timestamp(0), lastObservedAtBytes) + mut.Set("messageData", "metricsChecked", bigtable.Timestamp(0), []byte(strconv.FormatBool(message.MetricsChecked))) + + return mut, nil +} + +// Mutation to update observation data +func createObservationMutation(observation *types.Observation) (*bigtable.Mutation, string, error) { + rowKey := string(observation.MessageID) + "_" + observation.GuardianAddr + + timeBinary, err := observation.ObservedAt.UTC().MarshalBinary() + if err != nil { + return nil, "", err + } + + mut := bigtable.NewMutation() + mut.Set("observationData", "signature", bigtable.Timestamp(0), []byte(observation.Signature)) + mut.Set("observationData", "observedAt", bigtable.Timestamp(0), timeBinary) + mut.Set("observationData", "status", bigtable.Timestamp(0), []byte(strconv.Itoa(int(observation.Status)))) + + return mut, rowKey, nil +} diff --git a/fly/pkg/bigtable/test_setup.go b/fly/pkg/bigtable/test_setup.go index 5a165b50..544ace44 100644 --- a/fly/pkg/bigtable/test_setup.go +++ b/fly/pkg/bigtable/test_setup.go @@ -25,6 +25,9 @@ func SetupEmulator() error { } defer adminClient.Close() + // Clear everything and then recreate the tables for a clean slate + CleanUp() + tables := []struct { name string columnFamilies []string diff --git a/fly/pkg/historical_uptime/process_observation.go b/fly/pkg/historical_uptime/process_observation.go index 19dd8953..79be7041 100644 --- a/fly/pkg/historical_uptime/process_observation.go +++ b/fly/pkg/historical_uptime/process_observation.go @@ -3,53 +3,119 @@ package historical_uptime import ( "context" "encoding/hex" - "fmt" "time" - gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1" eth_common "github.com/ethereum/go-ethereum/common" "github.com/wormhole-foundation/wormhole-monitor/fly/common" "github.com/wormhole-foundation/wormhole-monitor/fly/pkg/bigtable" + "github.com/wormhole-foundation/wormhole-monitor/fly/pkg/types" "go.uber.org/zap" ) -// createNewObservation creates a new observation from the given observation -func createNewObservation(timestamp time.Time, addr []byte, o *gossipv1.Observation) types.Observation { - ga := eth_common.BytesToAddress(addr).String() - return types.Observation{ - MessageID: types.MessageID(o.MessageId), - GuardianAddr: ga, - Signature: hex.EncodeToString(o.Signature), - ObservedAt: timestamp, - Status: types.OnTime, - } +var cache = &bigtable.ObservationCache{ + Messages: make(map[types.MessageID]*types.Message), + Observations: make(map[types.MessageID]map[string]*types.Observation), } -// checkObservationTime checks if the observation is late -func checkObservationTime(message *types.Message, newObservation types.Observation) types.Observation { - nextExpiry := message.LastObservedAt.Add(common.ExpiryDuration) - if newObservation.ObservedAt.After(nextExpiry) { - newObservation.Status = types.Late +// ProcessObservationBatch processes a batch of observations and flushes the cache to the database. +func ProcessObservationBatch(db bigtable.BigtableDB, logger *zap.Logger, batch []*types.Observation) error { + for _, o := range batch { + ProcessObservation(db, logger, o) } - return newObservation + + return FlushCache(db, logger) } -// ProcessObservation processes an observation and updates the database accordingly. -// If the message does not exist in the database, it will be created. -// If the message exists, the observation will be appended to the message. -// If the observation is late, observation status will be set to Late. -func ProcessObservation(db bigtable.BigtableDB, logger *zap.Logger, timestamp time.Time, addr []byte, o *gossipv1.Observation) error { - newObservation := createNewObservation(timestamp, addr, o) +// FlushCache writes the cached observations and messages to the database and clears the cache. +func FlushCache(db bigtable.BigtableDB, logger *zap.Logger) error { + ctx := context.Background() - message, err := db.GetMessage(context.TODO(), types.MessageID(o.MessageId)) + err := db.FlushCache(ctx, logger, cache) if err != nil { - fmt.Printf("failed to get message: %v", err) return err } - if message != nil { - newObservation = checkObservationTime(message, newObservation) + + // Clear the cache after flushing + cache.Messages = make(map[types.MessageID]*types.Message) + cache.Observations = make(map[types.MessageID]map[string]*types.Observation) + + return nil +} + +// ProcessObservation processes a single observation, updating the cache and checking observation times. +func ProcessObservation(db bigtable.BigtableDB, logger *zap.Logger, o *types.Observation) { + cache.Lock() + defer cache.Unlock() + + // Check if the message exists in the cache, if not, try to fetch from the database + message, exists := cache.Messages[o.MessageID] + if !exists { + // Try to get the message from the database + dbMessage, err := db.GetMessage(context.Background(), o.MessageID) + if err != nil { + logger.Error("Failed to get message from database", zap.Error(err)) + } + if dbMessage != nil { + message = dbMessage + cache.Messages[o.MessageID] = message + } else { + // Create a new message if it doesn't exist in the database + message = &types.Message{ + MessageID: o.MessageID, + LastObservedAt: o.ObservedAt, + MetricsChecked: false, + } + // Put message in cache, likely observation for the message happens around the same time so this will + // reduce the db calls + cache.Messages[o.MessageID] = message + } } - return db.SaveObservationAndUpdateMessage(context.Background(), &newObservation) + checkObservationTime(message, o) + + // Initialize the observations map for this message if it doesn't exist + if cache.Observations[o.MessageID] == nil { + cache.Observations[o.MessageID] = make(map[string]*types.Observation) + // Try to get existing observations from the database + // This is for deduping observations. We do not want to persist duplicated observations + dbObservations, err := db.GetObservationsByMessageID(context.Background(), string(o.MessageID)) + if err != nil { + logger.Error("Failed to get observations from database", zap.Error(err)) + } + for _, dbObs := range dbObservations { + cache.Observations[o.MessageID][dbObs.GuardianAddr] = dbObs + } + } + + // Add the new observation if it doesn't exist + if _, exists := cache.Observations[o.MessageID][o.GuardianAddr]; !exists { + cache.Observations[o.MessageID][o.GuardianAddr] = o + + // Update LastObservedAt only if it's a new observation and within the expiry duration + if o.ObservedAt.After(message.LastObservedAt) && + !o.ObservedAt.After(message.LastObservedAt.Add(common.ExpiryDuration)) { + message.LastObservedAt = o.ObservedAt + } + } +} + +// CreateNewObservation creates a new observation from the given parameters. +func CreateNewObservation(messageID string, signature []byte, timestamp time.Time, addr []byte) *types.Observation { + ga := eth_common.BytesToAddress(addr).String() + return &types.Observation{ + MessageID: types.MessageID(messageID), + GuardianAddr: ga, + Signature: hex.EncodeToString(signature), + ObservedAt: timestamp, + Status: types.OnTime, + } +} + +// checkObservationTime checks if the observation is late based on the message's last observed time. +func checkObservationTime(message *types.Message, newObservation *types.Observation) { + nextExpiry := message.LastObservedAt.UTC().Add(common.ExpiryDuration) + if newObservation.ObservedAt.UTC().After(nextExpiry) { + newObservation.Status = types.Late + } } diff --git a/fly/pkg/historical_uptime/process_observation_test.go b/fly/pkg/historical_uptime/process_observation_test.go index f0025512..6da064f1 100644 --- a/fly/pkg/historical_uptime/process_observation_test.go +++ b/fly/pkg/historical_uptime/process_observation_test.go @@ -23,7 +23,7 @@ const ( EmulatorHost = "localhost:8086" ) -func processObservation(t *testing.T, observations []node_common.MsgWithTimeStamp[gossipv1.SignedObservation], expectedObservations expectedObservation, expectedLastObservedAt time.Time) { +func processObservation(t *testing.T, observationBatches [][]node_common.MsgWithTimeStamp[gossipv1.SignedObservation], expectedObservations expectedObservation, expectedLastObservedAt time.Time) { database, err := bigtable.NewBigtableDB(context.TODO(), ProjectID, InstanceID, "", EmulatorHost, true) if err != nil { t.Errorf("failed to create Bigtable client: %v", err) @@ -31,22 +31,27 @@ func processObservation(t *testing.T, observations []node_common.MsgWithTimeStam logger := ipfslog.Logger("historical-uptime").Desugar() defer database.Close() - // Process each observation - for _, o := range observations { - err := ProcessObservation(*database, logger, o.Timestamp, o.Msg.Addr, &gossipv1.Observation{ - Hash: o.Msg.Hash, - Signature: o.Msg.Signature, - TxHash: o.Msg.TxHash, - MessageId: o.Msg.MessageId, - }) + // Process observations by batch + // Between batches flush to db is performed + for _, obsBatch := range observationBatches { + observationBatch := make([]*types.Observation, 0, len(observationBatches)) + + // Convert SignedObservations to Observations + for _, so := range obsBatch { + o := CreateNewObservation(so.Msg.MessageId, so.Msg.Signature, so.Timestamp, so.Msg.Addr) + observationBatch = append(observationBatch, o) + } + + // Process the batch of observations + err = ProcessObservationBatch(*database, logger, observationBatch) if err != nil { - t.Errorf("failed to process observation: %v", err) + t.Errorf("failed to process observation batch: %v", err) } } dbObservations, err := database.GetObservationsByMessageID(context.TODO(), "1/chain1/1") if err != nil { - t.Errorf("failed to get message: %v", err) + t.Errorf("failed to get observations: %v", err) } if len(dbObservations) != len(expectedObservations) { @@ -66,7 +71,7 @@ func processObservation(t *testing.T, observations []node_common.MsgWithTimeStam } if !message.LastObservedAt.UTC().Equal(expectedLastObservedAt.UTC()) { - t.Errorf("expected last observed at to be %s, got %s", expectedLastObservedAt, message.LastObservedAt) + t.Errorf("expected last observed at to be %s, got %s", expectedLastObservedAt.UTC(), message.LastObservedAt.UTC()) } } @@ -100,16 +105,18 @@ func TestMain(m *testing.M) { // go test -v pkg/historical_uptime/process_observation_test.go pkg/historical_uptime/process_observation.go func TestProcessObservation(t *testing.T) { expectedLastObservedAt := time.Now() + fmt.Printf("expectedLastObservedAt: %s\n", expectedLastObservedAt) testCases := []struct { - name string - input []node_common.MsgWithTimeStamp[gossipv1.SignedObservation] + name string + // Batches of observation batches + input [][]node_common.MsgWithTimeStamp[gossipv1.SignedObservation] expectedObservations expectedObservation expectedLastObservedAt time.Time }{ { name: "normal test case", - input: []node_common.MsgWithTimeStamp[gossipv1.SignedObservation]{ + input: [][]node_common.MsgWithTimeStamp[gossipv1.SignedObservation]{{ { Msg: &gossipv1.SignedObservation{ MessageId: "1/chain1/1", @@ -126,7 +133,7 @@ func TestProcessObservation(t *testing.T) { }, Timestamp: expectedLastObservedAt, }, - }, + }}, expectedObservations: expectedObservation{ "0xfF6CB952589BDE862c25Ef4392132fb9D4A42157": types.OnTime, "0x114De8460193bdf3A2fCf81f86a09765F4762fD1": types.OnTime, @@ -136,7 +143,7 @@ func TestProcessObservation(t *testing.T) { }, { name: "duplicated observations", - input: []node_common.MsgWithTimeStamp[gossipv1.SignedObservation]{ + input: [][]node_common.MsgWithTimeStamp[gossipv1.SignedObservation]{{ { Msg: &gossipv1.SignedObservation{ MessageId: "1/chain1/1", @@ -161,7 +168,7 @@ func TestProcessObservation(t *testing.T) { }, Timestamp: time.Now().Add(time.Hour * 1), // 1 hour later }, - }, + }}, expectedObservations: expectedObservation{ "0xfF6CB952589BDE862c25Ef4392132fb9D4A42157": types.OnTime, "0x114De8460193bdf3A2fCf81f86a09765F4762fD1": types.OnTime, @@ -170,7 +177,7 @@ func TestProcessObservation(t *testing.T) { }, { name: "late observations", - input: []node_common.MsgWithTimeStamp[gossipv1.SignedObservation]{ + input: [][]node_common.MsgWithTimeStamp[gossipv1.SignedObservation]{{ { Msg: &gossipv1.SignedObservation{ MessageId: "1/chain1/1", @@ -195,7 +202,7 @@ func TestProcessObservation(t *testing.T) { }, Timestamp: time.Now().Add(time.Hour * 31), // 31 hours after }, - }, + }}, expectedObservations: expectedObservation{ "0xfF6CB952589BDE862c25Ef4392132fb9D4A42157": types.OnTime, "0x114De8460193bdf3A2fCf81f86a09765F4762fD1": types.OnTime, @@ -203,6 +210,85 @@ func TestProcessObservation(t *testing.T) { }, expectedLastObservedAt: expectedLastObservedAt, }, + // To test if flushing and processing post flushing works correctly + { + name: "flush and continue", + input: [][]node_common.MsgWithTimeStamp[gossipv1.SignedObservation]{ + { + { + Msg: &gossipv1.SignedObservation{ + MessageId: "1/chain1/1", + Addr: eth_common.HexToAddress("0xfF6CB952589BDE862c25Ef4392132fb9D4A42157").Bytes(), + Signature: []byte("signature1"), + }, + Timestamp: time.Now().Add(-6 * time.Second), + }, + { + Msg: &gossipv1.SignedObservation{ + MessageId: "1/chain1/1", + Addr: eth_common.HexToAddress("0x114De8460193bdf3A2fCf81f86a09765F4762fD1").Bytes(), + Signature: []byte("signature2"), + }, + Timestamp: time.Now().Add(-5 * time.Second), + }, + }, + { + { + Msg: &gossipv1.SignedObservation{ + MessageId: "1/chain1/1", + Addr: eth_common.HexToAddress("0x107A0086b32d7A0977926A205131d8731D39cbEB").Bytes(), + Signature: []byte("signature3"), + }, + Timestamp: time.Now(), + }, + }, + }, + expectedObservations: expectedObservation{ + "0xfF6CB952589BDE862c25Ef4392132fb9D4A42157": types.OnTime, + "0x114De8460193bdf3A2fCf81f86a09765F4762fD1": types.OnTime, + "0x107A0086b32d7A0977926A205131d8731D39cbEB": types.OnTime, + }, + expectedLastObservedAt: time.Now(), + }, + // To test if processing post flushing works correctly and observations are deduped + { + name: "flush and continue - duplicated observations", + input: [][]node_common.MsgWithTimeStamp[gossipv1.SignedObservation]{ + { + { + Msg: &gossipv1.SignedObservation{ + MessageId: "1/chain1/1", + Addr: eth_common.HexToAddress("0xfF6CB952589BDE862c25Ef4392132fb9D4A42157").Bytes(), + Signature: []byte("signature1"), + }, + Timestamp: expectedLastObservedAt.Add(-6 * time.Second), + }, + { + Msg: &gossipv1.SignedObservation{ + MessageId: "1/chain1/1", + Addr: eth_common.HexToAddress("0x114De8460193bdf3A2fCf81f86a09765F4762fD1").Bytes(), + Signature: []byte("signature2"), + }, + Timestamp: expectedLastObservedAt, + }, + }, + { + { + Msg: &gossipv1.SignedObservation{ + MessageId: "1/chain1/1", + Addr: eth_common.HexToAddress("0xfF6CB952589BDE862c25Ef4392132fb9D4A42157").Bytes(), + Signature: []byte("signature1"), + }, + Timestamp: expectedLastObservedAt.Add(1 * time.Second), + }, + }, + }, + expectedObservations: expectedObservation{ + "0xfF6CB952589BDE862c25Ef4392132fb9D4A42157": types.OnTime, + "0x114De8460193bdf3A2fCf81f86a09765F4762fD1": types.OnTime, + }, + expectedLastObservedAt: expectedLastObservedAt, + }, } for _, tc := range testCases { @@ -210,4 +296,5 @@ func TestProcessObservation(t *testing.T) { processObservation(t, tc.input, tc.expectedObservations, tc.expectedLastObservedAt) }) } + // Convert SignedObservations to Observations }