Skip to content

Commit

Permalink
hum: address pr
Browse files Browse the repository at this point in the history
Signed-off-by: bingyuyap <bingyu.yap.21@gmail.com>
  • Loading branch information
bingyuyap committed Sep 4, 2024
1 parent 4c7985b commit 86d7567
Show file tree
Hide file tree
Showing 9 changed files with 273 additions and 51 deletions.
13 changes: 13 additions & 0 deletions database/fast-transfer-schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ DROP TABLE IF EXISTS fast_transfer_settlements;
DROP TABLE IF EXISTS auction_logs;
DROP TABLE IF EXISTS auction_history_mapping;
DROP TABLE IF EXISTS redeem_swaps;
DROP TABLE IF EXISTS chains;
DROP TABLE IF EXISTS token_infos;

DROP TYPE IF EXISTS FastTransferStatus;
DROP TYPE IF EXISTS FastTransferProtocol;
Expand Down Expand Up @@ -113,3 +115,14 @@ CREATE TABLE chains (
id INTEGER PRIMARY KEY,
name VARCHAR(255) NOT NULL UNIQUE
)

-- Token Infos table to store information about different tokens
-- A normalized table for us to reference token info for analytics purposes
CREATE TABLE token_infos (
name VARCHAR(255) NOT NULL,
chain_id INTEGER NOT NULL,
decimals INTEGER NOT NULL,
symbol VARCHAR(255) NOT NULL,
token_address VARCHAR(255) NOT NULL,
PRIMARY KEY (token_address, chain_id)
);
47 changes: 39 additions & 8 deletions fly/cmd/historical_uptime/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@ import (
"fmt"
"log"
"os"
"os/signal"
"strconv"
"strings"
"sync"
"syscall"
"time"

node_common "github.com/certusone/wormhole/node/pkg/common"
Expand Down Expand Up @@ -288,6 +291,7 @@ func main() {
if err != nil {
logger.Fatal("Failed to create bigtable db", zap.Error(err))
}

promErrC := make(chan error)
// Start Prometheus scraper
initPromScraper(promRemoteURL, logger, promErrC)
Expand All @@ -307,15 +311,35 @@ func main() {
batchSize := 100
observationBatch := make([]*types.Observation, 0, batchSize)

sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM)

// to make sure that we wait til observation cleanup is done
var wg sync.WaitGroup

// rootCtx might not cancel if shutdown abruptly
go func() {
<-sigChan
logger.Info("Received signal, initiating shutdown")
rootCtxCancel()
}()

wg.Add(1)
go func() {
defer wg.Done()
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()

for {
select {
case <-rootCtx.Done():
if len(observationBatch) > 0 {
historical_uptime.ProcessObservationBatch(*db, logger, observationBatch)
}
logger.Info("Observation cleanup completed.")
return
case o := <-obsvC:
case o := <-obsvC: // TODO: Rip out this code once we cut over to batching.
obs := historical_uptime.CreateNewObservation(o.Msg.MessageId, o.Msg.Addr, o.Timestamp, o.Msg.Addr)

observationBatch = append(observationBatch, obs)

// if it reaches batchSize then process this batch
Expand All @@ -324,13 +348,16 @@ func main() {
observationBatch = observationBatch[:0] // Clear the batch
}
case batch := <-batchObsvC:
// process immediately since batches are in group
batchObservations := make([]*types.Observation, 0, len(batch.Msg.Observations))
for _, signedObs := range batch.Msg.Observations {
obs := historical_uptime.CreateNewObservation(signedObs.MessageId, signedObs.Signature, batch.Timestamp, signedObs.TxHash)
batchObservations = append(batchObservations, obs)
observationBatch = append(observationBatch, obs)

// if it reaches batchSize then process this batch
if len(observationBatch) >= batchSize {
historical_uptime.ProcessObservationBatch(*db, logger, observationBatch)
observationBatch = observationBatch[:0] // Clear the batch
}
}
historical_uptime.ProcessObservationBatch(*db, logger, batchObservations)

case <-ticker.C:
// for every interval, process the batch
Expand Down Expand Up @@ -421,8 +448,12 @@ func main() {
supervisor.WithPropagatePanic)

<-rootCtx.Done()
logger.Info("root context cancelled, exiting...")
// TODO: wait for things to shut down gracefully
logger.Info("Root context cancelled, starting cleanup...")

// Wait for all goroutines to complete their cleanup
wg.Wait()

logger.Info("All cleanup completed. Exiting...")
}

func monitorChannelCapacity[T any](ctx context.Context, logger *zap.Logger, channelName string, ch <-chan T) {
Expand Down
20 changes: 8 additions & 12 deletions fly/pkg/bigtable/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,14 @@ func (c *ObservationCache) RUnlock() {
c.mu.RUnlock()
}

// NewObservationCache creates and returns a new ObservationCache instance.
func NewObservationCache() *ObservationCache {
return &ObservationCache{
Messages: make(map[types.MessageID]*types.Message),
Observations: make(map[types.MessageID]map[string]*types.Observation),
}
}

// GetMessage retrieves a message from the cache by its ID.
// It returns the message and a boolean indicating whether the message was found.
func (c *ObservationCache) GetMessage(messageID types.MessageID) (*types.Message, bool) {
Expand All @@ -50,18 +58,6 @@ func (c *ObservationCache) SetMessage(messageID types.MessageID, message *types.
c.Messages[messageID] = message
}

// GetObservation retrieves an observation from the cache by message ID and guardian address.
// It returns the observation and a boolean indicating whether the observation was found.
func (c *ObservationCache) GetObservation(messageID types.MessageID, guardianAddr string) (*types.Observation, bool) {
c.mu.RLock()
defer c.mu.RUnlock()
if observations, exists := c.Observations[messageID]; exists {
observation, exists := observations[guardianAddr]
return observation, exists
}
return nil, false
}

// SetObservation adds or updates an observation in the cache.
func (c *ObservationCache) SetObservation(messageID types.MessageID, guardianAddr string, observation *types.Observation) {
c.mu.Lock()
Expand Down
18 changes: 8 additions & 10 deletions fly/pkg/historical_uptime/process_observation.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,15 @@ import (
"go.uber.org/zap"
)

var cache = &bigtable.ObservationCache{
Messages: make(map[types.MessageID]*types.Message),
Observations: make(map[types.MessageID]map[string]*types.Observation),
}
var cache = bigtable.NewObservationCache()

// ProcessObservationBatch processes a batch of observations and flushes the cache to the database.
func ProcessObservationBatch(db bigtable.BigtableDB, logger *zap.Logger, batch []*types.Observation) error {
cache.Lock()
defer cache.Unlock()

for _, o := range batch {
ProcessObservation(db, logger, o)
ProcessObservationAlreadyLocked(db, logger, o)
}

return FlushCache(db, logger)
Expand All @@ -43,11 +43,9 @@ func FlushCache(db bigtable.BigtableDB, logger *zap.Logger) error {
return nil
}

// ProcessObservation processes a single observation, updating the cache and checking observation times.
func ProcessObservation(db bigtable.BigtableDB, logger *zap.Logger, o *types.Observation) {
cache.Lock()
defer cache.Unlock()

// ProcessObservationAlreadyLocked processes a single observation, updating the cache and checking observation times.
// This function assumes that the cache lock has already been acquired.
func ProcessObservationAlreadyLocked(db bigtable.BigtableDB, logger *zap.Logger, o *types.Observation) {
// Check if the message exists in the cache, if not, try to fetch from the database
message, exists := cache.Messages[o.MessageID]
if !exists {
Expand Down
22 changes: 20 additions & 2 deletions watcher/src/fastTransfer/swapLayer/parser.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,31 @@
import { ethers } from 'ethers';
import { TransferCompletion } from '../types';
import { parseVaa } from '@wormhole-foundation/wormhole-monitor-common';
import { Knex } from 'knex';
import { Chain, chainToChainId } from '@wormhole-foundation/sdk-base';
import { TokenInfoManager } from '../../utils/TokenInfoManager';

class SwapLayerParser {
private provider: ethers.providers.JsonRpcProvider;
private swapLayerAddress: string;
private swapLayerInterface: ethers.utils.Interface;

constructor(provider: ethers.providers.JsonRpcProvider, swapLayerAddress: string) {
private tokenInfoManager: TokenInfoManager | null = null;

constructor(
provider: ethers.providers.JsonRpcProvider,
swapLayerAddress: string,
db: Knex | null,
chain: Chain
) {
this.provider = provider;
this.swapLayerAddress = swapLayerAddress;
this.swapLayerInterface = new ethers.utils.Interface([
'event Redeemed(address indexed recipient, address outputToken, uint256 outputAmount, uint256 relayingFee)',
]);

if (db) {
this.tokenInfoManager = new TokenInfoManager(db, chainToChainId(chain), provider);
}
}

async parseSwapLayerTransaction(
Expand Down Expand Up @@ -59,6 +72,11 @@ class SwapLayerParser {

if (!swapEvent) return null;

// if we have the tokenInfoManager inited, persist the token info
// this ensures we have the token decimal and name for analytics purposes
if (this.tokenInfoManager)
await this.tokenInfoManager.saveTokenInfoIfNotExist(swapEvent.args.outputToken);

return {
tx_hash: txHash,
recipient: swapEvent.args.recipient,
Expand Down
8 changes: 8 additions & 0 deletions watcher/src/fastTransfer/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -156,3 +156,11 @@ export type TransferCompletion = {
// on Solana Swap Layer, this acts as a link between complete_{transfer, swap}_payload and release_inbound
staged_inbound?: string;
};

export type TokenInfo = {
name: string;
chain_id: number;
decimals: number;
symbol: string;
token_address: string;
};
Loading

0 comments on commit 86d7567

Please sign in to comment.