Skip to content

Commit

Permalink
checkpoint - implementation, moved methods to app. need to migrate / …
Browse files Browse the repository at this point in the history
…add testing.
  • Loading branch information
Crystal Lemire committed Nov 10, 2023
1 parent 9476f01 commit a80613a
Show file tree
Hide file tree
Showing 10 changed files with 286 additions and 258 deletions.
52 changes: 41 additions & 11 deletions protocol/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@ import (
"encoding/json"
"errors"
"io"
"math"
"math/big"
"net/http"
"os"
"path/filepath"
"runtime/debug"
"sync"
"time"

autocliv1 "cosmossdk.io/api/cosmos/autocli/v1"
reflectionv1 "cosmossdk.io/api/cosmos/reflection/v1"
Expand Down Expand Up @@ -182,7 +182,8 @@ import (

var (
// DefaultNodeHome default home directories for the application daemon
DefaultNodeHome string
DefaultNodeHome string
MaximumDaemonUnhealthyDuration = 5 * time.Minute
)

var (
Expand Down Expand Up @@ -290,6 +291,8 @@ type App struct {
PriceFeedClient *pricefeedclient.Client
LiquidationsClient *liquidationclient.Client
BridgeClient *bridgeclient.Client

HealthMonitor *daemonservertypes.HealthMonitor
}

// assertAppPreconditions assert invariants required for an application to start.
Expand Down Expand Up @@ -589,6 +592,11 @@ func New(
bridgeEventManager := bridgedaemontypes.NewBridgeEventManager(timeProvider)
app.Server.WithBridgeEventManager(bridgeEventManager)

app.HealthMonitor = daemonservertypes.NewHealthMonitor(
daemonservertypes.DaemonStartupGracePeriod,
daemonservertypes.HealthCheckPollFrequency,
app.Logger(),
)
// Create a closure for starting daemons and daemon server. Daemon services are delayed until after the gRPC
// service is started because daemons depend on the gRPC service being available. If a node is initialized
// with a genesis time in the future, then the gRPC service will not be available until the genesis time, the
Expand All @@ -600,9 +608,6 @@ func New(

// Start liquidations client for sending potentially liquidatable subaccounts to the application.
if daemonFlags.Liquidation.Enabled {
app.Server.ExpectLiquidationsDaemon(
daemonservertypes.MaximumAcceptableUpdateDelay(daemonFlags.Liquidation.LoopDelayMs),
)
app.LiquidationsClient = liquidationclient.NewClient(logger)
go func() {
if err := app.LiquidationsClient.Start(
Expand All @@ -615,13 +620,13 @@ func New(
); err != nil {
panic(err)
}
app.MonitorDaemon(app.LiquidationsClient, MaximumDaemonUnhealthyDuration)
}()
}

// Non-validating full-nodes have no need to run the price daemon.
if !appFlags.NonValidatingFullNode && daemonFlags.Price.Enabled {
exchangeQueryConfig := constants.StaticExchangeQueryConfig
app.Server.ExpectPricefeedDaemon(daemonservertypes.MaximumAcceptableUpdateDelay(daemonFlags.Price.LoopDelayMs))
// Start pricefeed client for sending prices for the pricefeed server to consume. These prices
// are retrieved via third-party APIs like Binance and then are encoded in-memory and
// periodically sent via gRPC to a shared socket with the server.
Expand All @@ -637,16 +642,16 @@ func New(
constants.StaticExchangeDetails,
&pricefeedclient.SubTaskRunnerImpl{},
)
app.MonitorDaemon(app.PriceFeedClient, MaximumDaemonUnhealthyDuration)
}

// Start Bridge Daemon.
// Non-validating full-nodes have no need to run the bridge daemon.
if !appFlags.NonValidatingFullNode && daemonFlags.Bridge.Enabled {
// TODO(CORE-582): Re-enable bridge daemon registration once the bridge daemon is fixed in local / CI
// environments.
// app.Server.ExpectBridgeDaemon(daemonservertypes.MaximumAcceptableUpdateDelay(daemonFlags.Bridge.LoopDelayMs))
app.BridgeClient = bridgeclient.NewClient(logger)
go func() {
app.BridgeClient = bridgeclient.NewClient(logger)
if err := app.BridgeClient.Start(
// The client will use `context.Background` so that it can have a different context from
// the main application.
Expand All @@ -657,6 +662,7 @@ func New(
); err != nil {
panic(err)
}
app.MonitorDaemon(app.BridgeClient, MaximumDaemonUnhealthyDuration)
}()
}

Expand All @@ -676,9 +682,8 @@ func New(
}
}()
// Don't panic if metrics daemon loops are delayed. Use maximum value.
app.Server.ExpectMetricsDaemon(
daemonservertypes.MaximumAcceptableUpdateDelay(math.MaxUint32),
)
// TODO(CORE-666): Refactor metrics daemon and track health here
// app.MonitorDaemon(app.MetricsDaemon, MaximumDaemonUnhealthyDuration)
metricsclient.Start(
// The client will use `context.Background` so that it can have a different context from
// the main application.
Expand Down Expand Up @@ -1222,6 +1227,31 @@ func New(
return app
}

// MonitorDaemon registers a daemon service with the update monitor. If the daemon does not register, the method will
// panic.
func (app *App) MonitorDaemon(
healthCheckableDaemon daemontypes.HealthCheckable,
maximumAcceptableUpdateDelay time.Duration,
) {
if err := app.HealthMonitor.RegisterService(healthCheckableDaemon, maximumAcceptableUpdateDelay); err != nil {
app.Logger().Error(
"Failed to register daemon service with update monitor",
"error",
err,
"service",
healthCheckableDaemon.ServiceName(),
"maximumAcceptableUpdateDelay",
maximumAcceptableUpdateDelay,
)
panic(err)
}
}

// DisableHealthMonitor disables the health monitor for testing.
func (app *App) DisableHealthMonitor() {
app.HealthMonitor.DisableForTesting()
}

// hydrateMemStores hydrates the memStores used for caching state.
func (app *App) hydrateMemStores() {
// Create an `uncachedCtx` where the underlying MultiStore is the `rootMultiStore`.
Expand Down
12 changes: 11 additions & 1 deletion protocol/daemons/pricefeed/client/client_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package client_test
import (
"fmt"
"github.com/cometbft/cometbft/libs/log"
"github.com/dydxprotocol/v4-chain/protocol/app"
appflags "github.com/dydxprotocol/v4-chain/protocol/app/flags"
"github.com/dydxprotocol/v4-chain/protocol/daemons/flags"
"github.com/dydxprotocol/v4-chain/protocol/daemons/pricefeed/client"
Expand Down Expand Up @@ -219,6 +220,7 @@ type PriceDaemonIntegrationTestSuite struct {
exchangeServer *pricefeed.ExchangeServer
daemonServer *daemonserver.Server
exchangePriceCache *pricefeedserver_types.MarketToExchangePrices
healthMonitor *servertypes.HealthMonitor

pricesMockQueryServer *mocks.QueryServer
pricesGrpcServer *grpc.Server
Expand Down Expand Up @@ -278,7 +280,13 @@ func (s *PriceDaemonIntegrationTestSuite) SetupTest() {
&daemontypes.FileHandlerImpl{},
s.daemonFlags.Shared.SocketAddress,
)
s.daemonServer.ExpectPricefeedDaemon(servertypes.MaximumAcceptableUpdateDelay(s.daemonFlags.Price.LoopDelayMs))

s.healthMonitor = servertypes.NewHealthMonitor(
servertypes.DaemonStartupGracePeriod,
servertypes.HealthCheckPollFrequency,
log.TestingLogger(),
)

s.exchangePriceCache = pricefeedserver_types.NewMarketToExchangePrices(pricefeed_types.MaxPriceAge)
s.daemonServer.WithPriceFeedMarketToExchangePrices(s.exchangePriceCache)

Expand Down Expand Up @@ -329,6 +337,8 @@ func (s *PriceDaemonIntegrationTestSuite) startClient() {
testExchangeToQueryDetails,
&client.SubTaskRunnerImpl{},
)
err := s.healthMonitor.RegisterService(s.pricefeedDaemon, app.MaximumDaemonUnhealthyDuration)
s.Require().NoError(err)
}

// expectPricesWithTimeout waits for the exchange price cache to contain the expected prices, with a timeout.
Expand Down
11 changes: 0 additions & 11 deletions protocol/daemons/server/bridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,6 @@ package server

import (
"context"
"github.com/dydxprotocol/v4-chain/protocol/daemons/server/types"
"time"

"github.com/dydxprotocol/v4-chain/protocol/daemons/bridge/api"
bdtypes "github.com/dydxprotocol/v4-chain/protocol/daemons/server/types/bridge"
)
Expand All @@ -23,14 +20,6 @@ func (server *Server) WithBridgeEventManager(
return server
}

// ExpectBridgeDaemon registers the bridge daemon with the server. This is required
// in order to ensure that the daemon service is called at least once during every
// maximumAcceptableUpdateDelay duration. It will cause the protocol to panic if the daemon does not
// respond within maximumAcceptableUpdateDelay duration.
func (server *Server) ExpectBridgeDaemon(maximumAcceptableUpdateDelay time.Duration) {
server.registerDaemon(types.BridgeDaemonServiceName, maximumAcceptableUpdateDelay)
}

// AddBridgeEvents stores any bridge events recognized by the daemon
// in a go-routine safe slice.
func (s *Server) AddBridgeEvents(
Expand Down
17 changes: 0 additions & 17 deletions protocol/daemons/server/liquidation.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,6 @@ package server

import (
"context"
"github.com/dydxprotocol/v4-chain/protocol/daemons/server/types"
"time"

"github.com/cosmos/cosmos-sdk/telemetry"
"github.com/dydxprotocol/v4-chain/protocol/daemons/liquidation/api"
liquidationtypes "github.com/dydxprotocol/v4-chain/protocol/daemons/server/types/liquidations"
Expand All @@ -26,14 +23,6 @@ func (server *Server) WithLiquidatableSubaccountIds(
return server
}

// ExpectLiquidationsDaemon registers the liquidations daemon with the server. This is required
// in order to ensure that the daemon service is called at least once during every
// maximumAcceptableUpdateDelay duration. It will cause the protocol to panic if the daemon does not
// respond within maximumAcceptableUpdateDelay duration.
func (server *Server) ExpectLiquidationsDaemon(maximumAcceptableUpdateDelay time.Duration) {
server.registerDaemon(types.LiquidationsDaemonServiceName, maximumAcceptableUpdateDelay)
}

// LiquidateSubaccounts stores the list of potentially liquidatable subaccount ids
// in a go-routine safe slice.
func (s *Server) LiquidateSubaccounts(
Expand All @@ -47,12 +36,6 @@ func (s *Server) LiquidateSubaccounts(
metrics.Received,
metrics.Count,
)
// If the daemon is unable to report a response, there is either an error in the registration of
// this daemon, or another one. In either case, the protocol should panic.
if err := s.reportResponse(types.LiquidationsDaemonServiceName); err != nil {
s.logger.Error("Failed to report liquidations response to update monitor", "error", err)
}

s.liquidatableSubaccountIds.UpdateSubaccountIds(req.SubaccountIds)
return &api.LiquidateSubaccountsResponse{}, nil
}
15 changes: 0 additions & 15 deletions protocol/daemons/server/metrics.go

This file was deleted.

15 changes: 0 additions & 15 deletions protocol/daemons/server/pricefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package server
import (
"context"
errorsmod "cosmossdk.io/errors"
servertypes "github.com/dydxprotocol/v4-chain/protocol/daemons/server/types"
"time"

gometrics "github.com/armon/go-metrics"
Expand Down Expand Up @@ -31,14 +30,6 @@ func (server *Server) WithPriceFeedMarketToExchangePrices(
return server
}

// ExpectPricefeedDaemon registers the pricefeed daemon with the server. This is required
// in order to ensure that the daemon service is called at least once during every
// maximumAcceptableUpdateDelay duration. It will cause the protocol to panic if the daemon does not
// respond within maximumAcceptableUpdateDelay duration.
func (server *Server) ExpectPricefeedDaemon(maximumAcceptableUpdateDelay time.Duration) {
server.registerDaemon(servertypes.PricefeedDaemonServiceName, maximumAcceptableUpdateDelay)
}

// UpdateMarketPrices updates prices from exchanges for each market provided.
func (s *Server) UpdateMarketPrices(
ctx context.Context,
Expand All @@ -52,12 +43,6 @@ func (s *Server) UpdateMarketPrices(
metrics.Latency,
)

// If the daemon is unable to report a response, there is either an error in the registration of
// this daemon, or another one. In either case, the protocol should panic.
if err := s.reportResponse(servertypes.PricefeedDaemonServiceName); err != nil {
panic(err)
}

if s.marketToExchange == nil {
panic(
errorsmod.Wrapf(
Expand Down
64 changes: 9 additions & 55 deletions protocol/daemons/server/server.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,15 @@
package server

import (
gometrics "github.com/armon/go-metrics"
"github.com/cometbft/cometbft/libs/log"
"github.com/cosmos/cosmos-sdk/telemetry"
bridgeapi "github.com/dydxprotocol/v4-chain/protocol/daemons/bridge/api"
"github.com/dydxprotocol/v4-chain/protocol/daemons/constants"
liquidationapi "github.com/dydxprotocol/v4-chain/protocol/daemons/liquidation/api"
pricefeedapi "github.com/dydxprotocol/v4-chain/protocol/daemons/pricefeed/api"
"github.com/dydxprotocol/v4-chain/protocol/daemons/server/types"
daemontypes "github.com/dydxprotocol/v4-chain/protocol/daemons/types"
"github.com/dydxprotocol/v4-chain/protocol/lib/metrics"
"net"
"syscall"
"time"
)

// Server struct defines the shared gRPC server for all daemons.
Expand All @@ -26,7 +22,9 @@ type Server struct {
fileHandler daemontypes.FileHandler
socketAddress string

updateMonitor *types.HealthMonitor
// healthCheckMonitor monitors the health of all registered daemon services that implement the HealthCheckable
// interface.
healthCheckMonitor *types.HealthMonitor

BridgeServer
PriceFeedServer
Expand All @@ -43,64 +41,20 @@ func NewServer(
socketAddress string,
) *Server {
return &Server{
logger: logger,
gsrv: grpcServer,
fileHandler: fileHandler,
socketAddress: socketAddress,
updateMonitor: types.NewHealthMonitor(types.DaemonStartupGracePeriod, logger),
logger: logger,
gsrv: grpcServer,
fileHandler: fileHandler,
socketAddress: socketAddress,
healthCheckMonitor: types.NewHealthMonitor(types.DaemonStartupGracePeriod, types.HealthCheckPollFrequency, logger),
}
}

// Stop stops the daemon server's gRPC service.
func (server *Server) Stop() {
server.updateMonitor.Stop()
server.healthCheckMonitor.Stop()
server.gsrv.Stop()
}

// DisableUpdateMonitoringForTesting disables the update monitor for testing purposes. This is needed in integration
// tests that do not run the full protocol.
func (server *Server) DisableUpdateMonitoringForTesting() {
server.updateMonitor.DisableForTesting()
}

// registerDaemon registers a daemon service with the update monitor.
func (server *Server) registerDaemon(
daemonKey string,
maximumAcceptableUpdateDelay time.Duration,
) {
if err := server.updateMonitor.RegisterDaemonService(daemonKey, maximumAcceptableUpdateDelay); err != nil {
server.logger.Error(
"Failed to register daemon service with update monitor",
"error",
err,
"service",
daemonKey,
"maximumAcceptableUpdateDelay",
maximumAcceptableUpdateDelay,
)
panic(err)
}
}

// reportResponse reports a response from a daemon service with the update monitor. This is used to
// ensure that the daemon continues to operate. If the update monitor does not see a response from a
// registered daemon within the maximumAcceptableUpdateDelay, it will cause the protocol to panic.
func (server *Server) reportResponse(
daemonKey string,
) error {
telemetry.IncrCounterWithLabels(
[]string{
metrics.DaemonServer,
metrics.ValidResponse,
},
1,
[]gometrics.Label{
metrics.GetLabelForStringValue(metrics.Daemon, daemonKey),
},
)
return server.updateMonitor.RegisterValidResponse(daemonKey)
}

// Start clears the current socket and establishes a new socket connection
// on the local filesystem.
// See URL for more information: https://eli.thegreenplace.net/2019/unix-domain-sockets-in-go/
Expand Down
Loading

0 comments on commit a80613a

Please sign in to comment.