Skip to content

Commit

Permalink
hum: address pr
Browse files Browse the repository at this point in the history
Signed-off-by: bingyuyap <bingyu.yap.21@gmail.com>
  • Loading branch information
bingyuyap committed Sep 4, 2024
1 parent 28f6551 commit 34511fa
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 30 deletions.
47 changes: 39 additions & 8 deletions fly/cmd/historical_uptime/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@ import (
"fmt"
"log"
"os"
"os/signal"
"strconv"
"strings"
"sync"
"syscall"
"time"

node_common "github.com/certusone/wormhole/node/pkg/common"
Expand Down Expand Up @@ -287,6 +290,7 @@ func main() {
if err != nil {
logger.Fatal("Failed to create bigtable db", zap.Error(err))
}

promErrC := make(chan error)
// Start Prometheus scraper
initPromScraper(promRemoteURL, logger, promErrC)
Expand All @@ -306,15 +310,35 @@ func main() {
batchSize := 100
observationBatch := make([]*types.Observation, 0, batchSize)

sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM)

// to make sure that we wait til observation cleanup is done
var wg sync.WaitGroup

// rootCtx might not cancel if shutdown abruptly
go func() {
<-sigChan
logger.Info("Received signal, initiating shutdown")
rootCtxCancel()
}()

wg.Add(1)
go func() {
defer wg.Done()
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()

for {
select {
case <-rootCtx.Done():
if len(observationBatch) > 0 {
historical_uptime.ProcessObservationBatch(*db, logger, observationBatch)
}
logger.Info("Observation cleanup completed.")
return
case o := <-obsvC:
case o := <-obsvC: // TODO: Rip out this code once we cut over to batching.
obs := historical_uptime.CreateNewObservation(o.Msg.MessageId, o.Msg.Addr, o.Timestamp, o.Msg.Addr)

observationBatch = append(observationBatch, obs)

// if it reaches batchSize then process this batch
Expand All @@ -323,13 +347,16 @@ func main() {
observationBatch = observationBatch[:0] // Clear the batch
}
case batch := <-batchObsvC:
// process immediately since batches are in group
batchObservations := make([]*types.Observation, 0, len(batch.Msg.Observations))
for _, signedObs := range batch.Msg.Observations {
obs := historical_uptime.CreateNewObservation(signedObs.MessageId, signedObs.Signature, batch.Timestamp, signedObs.TxHash)
batchObservations = append(batchObservations, obs)
observationBatch = append(observationBatch, obs)

// if it reaches batchSize then process this batch
if len(observationBatch) >= batchSize {
historical_uptime.ProcessObservationBatch(*db, logger, observationBatch)
observationBatch = observationBatch[:0] // Clear the batch
}
}
historical_uptime.ProcessObservationBatch(*db, logger, batchObservations)

case <-ticker.C:
// for every interval, process the batch
Expand Down Expand Up @@ -420,8 +447,12 @@ func main() {
supervisor.WithPropagatePanic)

<-rootCtx.Done()
logger.Info("root context cancelled, exiting...")
// TODO: wait for things to shut down gracefully
logger.Info("Root context cancelled, starting cleanup...")

// Wait for all goroutines to complete their cleanup
wg.Wait()

logger.Info("All cleanup completed. Exiting...")
}

func monitorChannelCapacity[T any](ctx context.Context, logger *zap.Logger, channelName string, ch <-chan T) {
Expand Down
20 changes: 8 additions & 12 deletions fly/pkg/bigtable/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,14 @@ func (c *ObservationCache) RUnlock() {
c.mu.RUnlock()
}

// NewObservationCache creates and returns a new ObservationCache instance.
func NewObservationCache() *ObservationCache {
return &ObservationCache{
Messages: make(map[types.MessageID]*types.Message),
Observations: make(map[types.MessageID]map[string]*types.Observation),
}
}

// GetMessage retrieves a message from the cache by its ID.
// It returns the message and a boolean indicating whether the message was found.
func (c *ObservationCache) GetMessage(messageID types.MessageID) (*types.Message, bool) {
Expand All @@ -50,18 +58,6 @@ func (c *ObservationCache) SetMessage(messageID types.MessageID, message *types.
c.Messages[messageID] = message
}

// GetObservation retrieves an observation from the cache by message ID and guardian address.
// It returns the observation and a boolean indicating whether the observation was found.
func (c *ObservationCache) GetObservation(messageID types.MessageID, guardianAddr string) (*types.Observation, bool) {
c.mu.RLock()
defer c.mu.RUnlock()
if observations, exists := c.Observations[messageID]; exists {
observation, exists := observations[guardianAddr]
return observation, exists
}
return nil, false
}

// SetObservation adds or updates an observation in the cache.
func (c *ObservationCache) SetObservation(messageID types.MessageID, guardianAddr string, observation *types.Observation) {
c.mu.Lock()
Expand Down
18 changes: 8 additions & 10 deletions fly/pkg/historical_uptime/process_observation.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,15 @@ import (
"go.uber.org/zap"
)

var cache = &bigtable.ObservationCache{
Messages: make(map[types.MessageID]*types.Message),
Observations: make(map[types.MessageID]map[string]*types.Observation),
}
var cache = bigtable.NewObservationCache()

// ProcessObservationBatch processes a batch of observations and flushes the cache to the database.
func ProcessObservationBatch(db bigtable.BigtableDB, logger *zap.Logger, batch []*types.Observation) error {
cache.Lock()
defer cache.Unlock()

for _, o := range batch {
ProcessObservation(db, logger, o)
ProcessObservationAlreadyLocked(db, logger, o)
}

return FlushCache(db, logger)
Expand All @@ -43,11 +43,9 @@ func FlushCache(db bigtable.BigtableDB, logger *zap.Logger) error {
return nil
}

// ProcessObservation processes a single observation, updating the cache and checking observation times.
func ProcessObservation(db bigtable.BigtableDB, logger *zap.Logger, o *types.Observation) {
cache.Lock()
defer cache.Unlock()

// ProcessObservationAlreadyLocked processes a single observation, updating the cache and checking observation times.
// This function assumes that the cache lock has already been acquired.
func ProcessObservationAlreadyLocked(db bigtable.BigtableDB, logger *zap.Logger, o *types.Observation) {
// Check if the message exists in the cache, if not, try to fetch from the database
message, exists := cache.Messages[o.MessageID]
if !exists {
Expand Down

0 comments on commit 34511fa

Please sign in to comment.