diff --git a/protocol/app/app.go b/protocol/app/app.go index f01876607b..6a6b04fb61 100644 --- a/protocol/app/app.go +++ b/protocol/app/app.go @@ -5,13 +5,13 @@ import ( "encoding/json" "errors" "io" - "math" "math/big" "net/http" "os" "path/filepath" "runtime/debug" "sync" + "time" autocliv1 "cosmossdk.io/api/cosmos/autocli/v1" reflectionv1 "cosmossdk.io/api/cosmos/reflection/v1" @@ -290,6 +290,8 @@ type App struct { PriceFeedClient *pricefeedclient.Client LiquidationsClient *liquidationclient.Client BridgeClient *bridgeclient.Client + + DaemonHealthMonitor *daemonservertypes.HealthMonitor } // assertAppPreconditions assert invariants required for an application to start. @@ -589,22 +591,27 @@ func New( bridgeEventManager := bridgedaemontypes.NewBridgeEventManager(timeProvider) app.Server.WithBridgeEventManager(bridgeEventManager) + app.DaemonHealthMonitor = daemonservertypes.NewHealthMonitor( + daemonservertypes.DaemonStartupGracePeriod, + daemonservertypes.HealthCheckPollFrequency, + app.Logger(), + daemonFlags.Shared.PanicOnDaemonFailureEnabled, + ) // Create a closure for starting daemons and daemon server. Daemon services are delayed until after the gRPC // service is started because daemons depend on the gRPC service being available. If a node is initialized // with a genesis time in the future, then the gRPC service will not be available until the genesis time, the // daemons will not be able to connect to the cosmos gRPC query service and finish initialization, and the daemon // monitoring service will panic. app.startDaemons = func() { + maxDaemonUnhealthyDuration := time.Duration(daemonFlags.Shared.MaxDaemonUnhealthySeconds) * time.Second // Start server for handling gRPC messages from daemons. go app.Server.Start() // Start liquidations client for sending potentially liquidatable subaccounts to the application. if daemonFlags.Liquidation.Enabled { - app.Server.ExpectLiquidationsDaemon( - daemonservertypes.MaximumAcceptableUpdateDelay(daemonFlags.Liquidation.LoopDelayMs), - ) app.LiquidationsClient = liquidationclient.NewClient(logger) go func() { + app.RegisterDaemonWithHealthMonitor(app.LiquidationsClient, maxDaemonUnhealthyDuration) if err := app.LiquidationsClient.Start( // The client will use `context.Background` so that it can have a different context from // the main application. @@ -621,7 +628,6 @@ func New( // Non-validating full-nodes have no need to run the price daemon. if !appFlags.NonValidatingFullNode && daemonFlags.Price.Enabled { exchangeQueryConfig := constants.StaticExchangeQueryConfig - app.Server.ExpectPricefeedDaemon(daemonservertypes.MaximumAcceptableUpdateDelay(daemonFlags.Price.LoopDelayMs)) // Start pricefeed client for sending prices for the pricefeed server to consume. These prices // are retrieved via third-party APIs like Binance and then are encoded in-memory and // periodically sent via gRPC to a shared socket with the server. @@ -637,16 +643,15 @@ func New( constants.StaticExchangeDetails, &pricefeedclient.SubTaskRunnerImpl{}, ) + app.RegisterDaemonWithHealthMonitor(app.PriceFeedClient, maxDaemonUnhealthyDuration) } // Start Bridge Daemon. // Non-validating full-nodes have no need to run the bridge daemon. if !appFlags.NonValidatingFullNode && daemonFlags.Bridge.Enabled { - // TODO(CORE-582): Re-enable bridge daemon registration once the bridge daemon is fixed in local / CI - // environments. - // app.Server.ExpectBridgeDaemon(daemonservertypes.MaximumAcceptableUpdateDelay(daemonFlags.Bridge.LoopDelayMs)) + app.BridgeClient = bridgeclient.NewClient(logger) go func() { - app.BridgeClient = bridgeclient.NewClient(logger) + app.RegisterDaemonWithHealthMonitor(app.BridgeClient, maxDaemonUnhealthyDuration) if err := app.BridgeClient.Start( // The client will use `context.Background` so that it can have a different context from // the main application. @@ -663,6 +668,9 @@ func New( // Start the Metrics Daemon. // The metrics daemon is purely used for observability. It should never bring the app down. // TODO(CLOB-960) Don't start this goroutine if telemetry is disabled + // Note: the metrics daemon is such a simple go-routine that we don't bother implementing a health-check + // for this service. The task loop does not produce any errors because the telemetry calls themselves are + // not error-returning, so in effect this daemon would never become unhealthy. go func() { defer func() { if r := recover(); r != nil { @@ -675,10 +683,6 @@ func New( ) } }() - // Don't panic if metrics daemon loops are delayed. Use maximum value. - app.Server.ExpectMetricsDaemon( - daemonservertypes.MaximumAcceptableUpdateDelay(math.MaxUint32), - ) metricsclient.Start( // The client will use `context.Background` so that it can have a different context from // the main application. @@ -1222,6 +1226,31 @@ func New( return app } +// RegisterDaemonWithHealthMonitor registers a daemon service with the update monitor, which will commence monitoring +// the health of the daemon. If the daemon does not register, the method will panic. +func (app *App) RegisterDaemonWithHealthMonitor( + healthCheckableDaemon daemontypes.HealthCheckable, + maxDaemonUnhealthyDuration time.Duration, +) { + if err := app.DaemonHealthMonitor.RegisterService(healthCheckableDaemon, maxDaemonUnhealthyDuration); err != nil { + app.Logger().Error( + "Failed to register daemon service with update monitor", + "error", + err, + "service", + healthCheckableDaemon.ServiceName(), + "maxDaemonUnhealthyDuration", + maxDaemonUnhealthyDuration, + ) + panic(err) + } +} + +// DisableHealthMonitorForTesting disables the health monitor for testing. +func (app *App) DisableHealthMonitorForTesting() { + app.DaemonHealthMonitor.DisableForTesting() +} + // hydrateMemStores hydrates the memStores used for caching state. func (app *App) hydrateMemStores() { // Create an `uncachedCtx` where the underlying MultiStore is the `rootMultiStore`. diff --git a/protocol/app/app_test.go b/protocol/app/app_test.go index 42fc06dd7c..ded9ee8b8d 100644 --- a/protocol/app/app_test.go +++ b/protocol/app/app_test.go @@ -1,10 +1,12 @@ package app_test import ( + "github.com/dydxprotocol/v4-chain/protocol/mocks" "gopkg.in/typ.v4/slices" "reflect" "strings" "testing" + "time" delaymsgmodule "github.com/dydxprotocol/v4-chain/protocol/x/delaymsg" @@ -222,3 +224,18 @@ func TestModuleBasics(t *testing.T) { actualFieldTypes := getMapFieldsAndTypes(reflect.ValueOf(basic_manager.ModuleBasics)) require.Equal(t, expectedFieldTypes, actualFieldTypes, "Module basics does not match expected") } + +func TestRegisterDaemonWithHealthMonitor_Panics(t *testing.T) { + app := testapp.DefaultTestApp(nil) + hc := &mocks.HealthCheckable{} + hc.On("ServiceName").Return("test-service") + hc.On("HealthCheck").Return(nil) + + app.RegisterDaemonWithHealthMonitor(hc, 5*time.Minute) + // The second registration should fail, causing a panic. + require.PanicsWithError( + t, + "service test-service already registered", + func() { app.RegisterDaemonWithHealthMonitor(hc, 5*time.Minute) }, + ) +} diff --git a/protocol/app/flags/flags.go b/protocol/app/flags/flags.go index 21333d95f1..0603997801 100644 --- a/protocol/app/flags/flags.go +++ b/protocol/app/flags/flags.go @@ -2,7 +2,6 @@ package flags import ( "fmt" - "github.com/cosmos/cosmos-sdk/server/config" servertypes "github.com/cosmos/cosmos-sdk/server/types" "github.com/spf13/cast" diff --git a/protocol/daemons/flags/flags.go b/protocol/daemons/flags/flags.go index eebdf7e631..e386f379ab 100644 --- a/protocol/daemons/flags/flags.go +++ b/protocol/daemons/flags/flags.go @@ -9,7 +9,9 @@ import ( // List of CLI flags for Server and Client. const ( // Flag names - FlagUnixSocketAddress = "unix-socket-address" + FlagUnixSocketAddress = "unix-socket-address" + FlagPanicOnDaemonFailureEnabled = "panic-on-daemon-failure-enabled" + FlagMaxDaemonUnhealthySeconds = "max-daemon-unhealthy-seconds" FlagPriceDaemonEnabled = "price-daemon-enabled" FlagPriceDaemonLoopDelayMs = "price-daemon-loop-delay-ms" @@ -28,6 +30,10 @@ const ( type SharedFlags struct { // SocketAddress is the location of the unix socket to communicate with the daemon gRPC service. SocketAddress string + // PanicOnDaemonFailureEnabled toggles whether the daemon should panic on failure. + PanicOnDaemonFailureEnabled bool + // MaxDaemonUnhealthySeconds is the maximum allowable duration for which a daemon can be unhealthy. + MaxDaemonUnhealthySeconds uint32 } // BridgeFlags contains configuration flags for the Bridge Daemon. @@ -74,7 +80,9 @@ func GetDefaultDaemonFlags() DaemonFlags { if defaultDaemonFlags == nil { defaultDaemonFlags = &DaemonFlags{ Shared: SharedFlags{ - SocketAddress: "/tmp/daemons.sock", + SocketAddress: "/tmp/daemons.sock", + PanicOnDaemonFailureEnabled: true, + MaxDaemonUnhealthySeconds: 5 * 60, // 5 minutes. }, Bridge: BridgeFlags{ Enabled: true, @@ -109,8 +117,18 @@ func AddDaemonFlagsToCmd( cmd.Flags().String( FlagUnixSocketAddress, df.Shared.SocketAddress, - "Socket address for the price daemon to send updates to, if not set "+ - "will establish default location to ingest price updates from", + "Socket address for the daemons to send updates to, if not set "+ + "will establish default location to ingest daemon updates from", + ) + cmd.Flags().Bool( + FlagPanicOnDaemonFailureEnabled, + df.Shared.PanicOnDaemonFailureEnabled, + "Enables panicking when a daemon fails.", + ) + cmd.Flags().Uint32( + FlagMaxDaemonUnhealthySeconds, + df.Shared.MaxDaemonUnhealthySeconds, + "Maximum allowable duration for which a daemon can be unhealthy.", ) // Bridge Daemon. @@ -178,6 +196,16 @@ func GetDaemonFlagValuesFromOptions( result.Shared.SocketAddress = v } } + if option := appOpts.Get(FlagPanicOnDaemonFailureEnabled); option != nil { + if v, err := cast.ToBoolE(option); err == nil { + result.Shared.PanicOnDaemonFailureEnabled = v + } + } + if option := appOpts.Get(FlagMaxDaemonUnhealthySeconds); option != nil { + if v, err := cast.ToUint32E(option); err == nil { + result.Shared.MaxDaemonUnhealthySeconds = v + } + } // Bridge Daemon. if option := appOpts.Get(FlagBridgeDaemonEnabled); option != nil { diff --git a/protocol/daemons/flags/flags_test.go b/protocol/daemons/flags/flags_test.go index 5c79395a39..04191032f6 100644 --- a/protocol/daemons/flags/flags_test.go +++ b/protocol/daemons/flags/flags_test.go @@ -17,6 +17,8 @@ func TestAddDaemonFlagsToCmd(t *testing.T) { flags.AddDaemonFlagsToCmd(&cmd) tests := []string{ flags.FlagUnixSocketAddress, + flags.FlagPanicOnDaemonFailureEnabled, + flags.FlagMaxDaemonUnhealthySeconds, flags.FlagBridgeDaemonEnabled, flags.FlagBridgeDaemonLoopDelayMs, @@ -41,6 +43,8 @@ func TestGetDaemonFlagValuesFromOptions_Custom(t *testing.T) { optsMap := make(map[string]interface{}) optsMap[flags.FlagUnixSocketAddress] = "test-socket-address" + optsMap[flags.FlagPanicOnDaemonFailureEnabled] = false + optsMap[flags.FlagMaxDaemonUnhealthySeconds] = uint32(1234) optsMap[flags.FlagBridgeDaemonEnabled] = true optsMap[flags.FlagBridgeDaemonLoopDelayMs] = uint32(1111) @@ -64,6 +68,12 @@ func TestGetDaemonFlagValuesFromOptions_Custom(t *testing.T) { // Shared. require.Equal(t, optsMap[flags.FlagUnixSocketAddress], r.Shared.SocketAddress) + require.Equal(t, optsMap[flags.FlagPanicOnDaemonFailureEnabled], r.Shared.PanicOnDaemonFailureEnabled) + require.Equal( + t, + optsMap[flags.FlagMaxDaemonUnhealthySeconds], + r.Shared.MaxDaemonUnhealthySeconds, + ) // Bridge Daemon. require.Equal(t, optsMap[flags.FlagBridgeDaemonEnabled], r.Bridge.Enabled) @@ -81,7 +91,7 @@ func TestGetDaemonFlagValuesFromOptions_Custom(t *testing.T) { require.Equal(t, optsMap[flags.FlagPriceDaemonLoopDelayMs], r.Price.LoopDelayMs) } -func TestGetDaemonFlagValuesFromOptions_Defaul(t *testing.T) { +func TestGetDaemonFlagValuesFromOptions_Default(t *testing.T) { mockOpts := mocks.AppOptions{} mockOpts.On("Get", mock.Anything). Return(func(key string) interface{} { diff --git a/protocol/daemons/metrics/client/client.go b/protocol/daemons/metrics/client/client.go index a714a63713..5e86debedd 100644 --- a/protocol/daemons/metrics/client/client.go +++ b/protocol/daemons/metrics/client/client.go @@ -19,6 +19,9 @@ var ( // Start begins a job that periodically: // 1) Emits metrics about app version and git commit. +// Note: the metrics daemon is such a simple go-routine that we don't bother implementing a health-check +// for this service. The task loop does not produce any errors because the telemetry calls themselves are +// not error-returning, so in effect this daemon would never become unhealthy. func Start( ctx context.Context, logger log.Logger, diff --git a/protocol/daemons/pricefeed/client/client_integration_test.go b/protocol/daemons/pricefeed/client/client_integration_test.go index 99f67b37aa..45d32a28a4 100644 --- a/protocol/daemons/pricefeed/client/client_integration_test.go +++ b/protocol/daemons/pricefeed/client/client_integration_test.go @@ -219,6 +219,7 @@ type PriceDaemonIntegrationTestSuite struct { exchangeServer *pricefeed.ExchangeServer daemonServer *daemonserver.Server exchangePriceCache *pricefeedserver_types.MarketToExchangePrices + healthMonitor *servertypes.HealthMonitor pricesMockQueryServer *mocks.QueryServer pricesGrpcServer *grpc.Server @@ -278,7 +279,14 @@ func (s *PriceDaemonIntegrationTestSuite) SetupTest() { &daemontypes.FileHandlerImpl{}, s.daemonFlags.Shared.SocketAddress, ) - s.daemonServer.ExpectPricefeedDaemon(servertypes.MaximumAcceptableUpdateDelay(s.daemonFlags.Price.LoopDelayMs)) + + s.healthMonitor = servertypes.NewHealthMonitor( + servertypes.DaemonStartupGracePeriod, + servertypes.HealthCheckPollFrequency, + log.TestingLogger(), + flags.GetDefaultDaemonFlags().Shared.PanicOnDaemonFailureEnabled, // Use default behavior for testing + ) + s.exchangePriceCache = pricefeedserver_types.NewMarketToExchangePrices(pricefeed_types.MaxPriceAge) s.daemonServer.WithPriceFeedMarketToExchangePrices(s.exchangePriceCache) @@ -329,6 +337,11 @@ func (s *PriceDaemonIntegrationTestSuite) startClient() { testExchangeToQueryDetails, &client.SubTaskRunnerImpl{}, ) + err := s.healthMonitor.RegisterService( + s.pricefeedDaemon, + time.Duration(s.daemonFlags.Shared.MaxDaemonUnhealthySeconds)*time.Second, + ) + s.Require().NoError(err) } // expectPricesWithTimeout waits for the exchange price cache to contain the expected prices, with a timeout. diff --git a/protocol/daemons/server/bridge.go b/protocol/daemons/server/bridge.go index 20fc68cd1a..f794e0bb86 100644 --- a/protocol/daemons/server/bridge.go +++ b/protocol/daemons/server/bridge.go @@ -2,10 +2,8 @@ package server import ( "context" - "github.com/dydxprotocol/v4-chain/protocol/daemons/server/types" - "time" - "github.com/dydxprotocol/v4-chain/protocol/daemons/bridge/api" + "github.com/dydxprotocol/v4-chain/protocol/daemons/server/types" bdtypes "github.com/dydxprotocol/v4-chain/protocol/daemons/server/types/bridge" ) @@ -23,31 +21,21 @@ func (server *Server) WithBridgeEventManager( return server } -// ExpectBridgeDaemon registers the bridge daemon with the server. This is required -// in order to ensure that the daemon service is called at least once during every -// maximumAcceptableUpdateDelay duration. It will cause the protocol to panic if the daemon does not -// respond within maximumAcceptableUpdateDelay duration. -func (server *Server) ExpectBridgeDaemon(maximumAcceptableUpdateDelay time.Duration) { - server.registerDaemon(types.BridgeDaemonServiceName, maximumAcceptableUpdateDelay) -} - // AddBridgeEvents stores any bridge events recognized by the daemon // in a go-routine safe slice. func (s *Server) AddBridgeEvents( ctx context.Context, req *api.AddBridgeEventsRequest, ) ( - *api.AddBridgeEventsResponse, - error, + response *api.AddBridgeEventsResponse, + err error, ) { - // If the daemon is unable to report a response, there is either an error in the registration of - // this daemon, or another one. In either case, the protocol should panic. - // TODO(CORE-582): Re-enable this check once the bridge daemon is fixed in local / CI environments. - //if err := s.reportResponse(types.BridgeDaemonServiceName); err != nil { - // panic(err) - //} - if err := s.bridgeEventManager.AddBridgeEvents(req.BridgeEvents); err != nil { + if err = s.bridgeEventManager.AddBridgeEvents(req.BridgeEvents); err != nil { return nil, err } + + // Capture valid responses in metrics. + s.reportValidResponse(types.BridgeDaemonServiceName) + return &api.AddBridgeEventsResponse{}, nil } diff --git a/protocol/daemons/server/liquidation.go b/protocol/daemons/server/liquidation.go index 9eb1045051..4cb44acfa1 100644 --- a/protocol/daemons/server/liquidation.go +++ b/protocol/daemons/server/liquidation.go @@ -2,11 +2,9 @@ package server import ( "context" - "github.com/dydxprotocol/v4-chain/protocol/daemons/server/types" - "time" - "github.com/cosmos/cosmos-sdk/telemetry" "github.com/dydxprotocol/v4-chain/protocol/daemons/liquidation/api" + "github.com/dydxprotocol/v4-chain/protocol/daemons/server/types" liquidationtypes "github.com/dydxprotocol/v4-chain/protocol/daemons/server/types/liquidations" "github.com/dydxprotocol/v4-chain/protocol/lib/metrics" ) @@ -26,20 +24,15 @@ func (server *Server) WithLiquidatableSubaccountIds( return server } -// ExpectLiquidationsDaemon registers the liquidations daemon with the server. This is required -// in order to ensure that the daemon service is called at least once during every -// maximumAcceptableUpdateDelay duration. It will cause the protocol to panic if the daemon does not -// respond within maximumAcceptableUpdateDelay duration. -func (server *Server) ExpectLiquidationsDaemon(maximumAcceptableUpdateDelay time.Duration) { - server.registerDaemon(types.LiquidationsDaemonServiceName, maximumAcceptableUpdateDelay) -} - // LiquidateSubaccounts stores the list of potentially liquidatable subaccount ids // in a go-routine safe slice. func (s *Server) LiquidateSubaccounts( ctx context.Context, req *api.LiquidateSubaccountsRequest, -) (*api.LiquidateSubaccountsResponse, error) { +) ( + response *api.LiquidateSubaccountsResponse, + err error, +) { telemetry.ModuleSetGauge( metrics.LiquidationDaemon, float32(len(req.SubaccountIds)), @@ -47,12 +40,11 @@ func (s *Server) LiquidateSubaccounts( metrics.Received, metrics.Count, ) - // If the daemon is unable to report a response, there is either an error in the registration of - // this daemon, or another one. In either case, the protocol should panic. - if err := s.reportResponse(types.LiquidationsDaemonServiceName); err != nil { - s.logger.Error("Failed to report liquidations response to update monitor", "error", err) - } s.liquidatableSubaccountIds.UpdateSubaccountIds(req.SubaccountIds) + + // Capture valid responses in metrics. + s.reportValidResponse(types.LiquidationsDaemonServiceName) + return &api.LiquidateSubaccountsResponse{}, nil } diff --git a/protocol/daemons/server/metrics.go b/protocol/daemons/server/metrics.go deleted file mode 100644 index 2ab3826382..0000000000 --- a/protocol/daemons/server/metrics.go +++ /dev/null @@ -1,15 +0,0 @@ -package server - -import ( - "time" - - "github.com/dydxprotocol/v4-chain/protocol/daemons/server/types" -) - -// ExpectMetricsDaemon registers the periodic metrics daemon with the server. This is required -// in order to ensure that the daemon service is called at least once during every -// maximumAcceptableUpdateDelay duration. It will cause the protocol to panic if the daemon does not -// respond within maximumAcceptableUpdateDelay duration. -func (server *Server) ExpectMetricsDaemon(maximumAcceptableUpdateDelay time.Duration) { - server.registerDaemon(types.MetricsDaemonServiceName, maximumAcceptableUpdateDelay) -} diff --git a/protocol/daemons/server/pricefeed.go b/protocol/daemons/server/pricefeed.go index 30f6a2e614..d95af3dfb1 100644 --- a/protocol/daemons/server/pricefeed.go +++ b/protocol/daemons/server/pricefeed.go @@ -3,7 +3,7 @@ package server import ( "context" errorsmod "cosmossdk.io/errors" - servertypes "github.com/dydxprotocol/v4-chain/protocol/daemons/server/types" + "github.com/dydxprotocol/v4-chain/protocol/daemons/server/types" "time" gometrics "github.com/armon/go-metrics" @@ -12,7 +12,7 @@ import ( "github.com/dydxprotocol/v4-chain/protocol/daemons/pricefeed/api" pricefeedmetrics "github.com/dydxprotocol/v4-chain/protocol/daemons/pricefeed/metrics" pricefeedtypes "github.com/dydxprotocol/v4-chain/protocol/daemons/server/types/pricefeed" - "github.com/dydxprotocol/v4-chain/protocol/daemons/types" + daemontypes "github.com/dydxprotocol/v4-chain/protocol/daemons/types" "github.com/dydxprotocol/v4-chain/protocol/lib/metrics" ) @@ -31,19 +31,14 @@ func (server *Server) WithPriceFeedMarketToExchangePrices( return server } -// ExpectPricefeedDaemon registers the pricefeed daemon with the server. This is required -// in order to ensure that the daemon service is called at least once during every -// maximumAcceptableUpdateDelay duration. It will cause the protocol to panic if the daemon does not -// respond within maximumAcceptableUpdateDelay duration. -func (server *Server) ExpectPricefeedDaemon(maximumAcceptableUpdateDelay time.Duration) { - server.registerDaemon(servertypes.PricefeedDaemonServiceName, maximumAcceptableUpdateDelay) -} - // UpdateMarketPrices updates prices from exchanges for each market provided. func (s *Server) UpdateMarketPrices( ctx context.Context, req *api.UpdateMarketPricesRequest, -) (*api.UpdateMarketPricesResponse, error) { +) ( + response *api.UpdateMarketPricesResponse, + err error, +) { // Measure latency in ingesting and handling gRPC price update. defer telemetry.ModuleMeasureSince( metrics.PricefeedServer, @@ -52,35 +47,35 @@ func (s *Server) UpdateMarketPrices( metrics.Latency, ) - // If the daemon is unable to report a response, there is either an error in the registration of - // this daemon, or another one. In either case, the protocol should panic. - if err := s.reportResponse(servertypes.PricefeedDaemonServiceName); err != nil { - panic(err) - } - + // This panic is an unexpected condition because we initialize the market price cache in app initialization before + // starting the server or daemons. if s.marketToExchange == nil { panic( errorsmod.Wrapf( - types.ErrServerNotInitializedCorrectly, + daemontypes.ErrServerNotInitializedCorrectly, "MarketToExchange not initialized", ), ) } - if err := validateMarketPricesUpdatesMessage(req); err != nil { + if err = validateMarketPricesUpdatesMessage(req); err != nil { // Log if failure occurs during an update. s.logger.Error("Failed to validate price update message", "error", err) return nil, err } s.marketToExchange.UpdatePrices(req.MarketPriceUpdates) + + // Capture valid responses in metrics. + s.reportValidResponse(types.PricefeedDaemonServiceName) + return &api.UpdateMarketPricesResponse{}, nil } // validateMarketPricesUpdatesMessage validates a `UpdateMarketPricesRequest`. func validateMarketPricesUpdatesMessage(req *api.UpdateMarketPricesRequest) error { if len(req.MarketPriceUpdates) == 0 { - return types.ErrPriceFeedMarketPriceUpdateEmpty + return daemontypes.ErrPriceFeedMarketPriceUpdateEmpty } for _, mpu := range req.MarketPriceUpdates { @@ -110,7 +105,7 @@ func validateMarketPriceUpdate(mpu *api.MarketPriceUpdate) error { for _, ep := range mpu.ExchangePrices { if ep.Price == constants.DefaultPrice { return generateSdkErrorForExchangePrice( - types.ErrPriceFeedInvalidPrice, + daemontypes.ErrPriceFeedInvalidPrice, ep, mpu.MarketId, ) @@ -118,7 +113,7 @@ func validateMarketPriceUpdate(mpu *api.MarketPriceUpdate) error { if ep.LastUpdateTime == nil { return generateSdkErrorForExchangePrice( - types.ErrPriceFeedLastUpdateTimeNotSet, + daemontypes.ErrPriceFeedLastUpdateTimeNotSet, ep, mpu.MarketId, ) diff --git a/protocol/daemons/server/server.go b/protocol/daemons/server/server.go index d146587731..60a34c1552 100644 --- a/protocol/daemons/server/server.go +++ b/protocol/daemons/server/server.go @@ -8,12 +8,10 @@ import ( "github.com/dydxprotocol/v4-chain/protocol/daemons/constants" liquidationapi "github.com/dydxprotocol/v4-chain/protocol/daemons/liquidation/api" pricefeedapi "github.com/dydxprotocol/v4-chain/protocol/daemons/pricefeed/api" - "github.com/dydxprotocol/v4-chain/protocol/daemons/server/types" daemontypes "github.com/dydxprotocol/v4-chain/protocol/daemons/types" "github.com/dydxprotocol/v4-chain/protocol/lib/metrics" "net" "syscall" - "time" ) // Server struct defines the shared gRPC server for all daemons. @@ -26,8 +24,6 @@ type Server struct { fileHandler daemontypes.FileHandler socketAddress string - updateMonitor *types.UpdateMonitor - BridgeServer PriceFeedServer LiquidationServer @@ -47,47 +43,16 @@ func NewServer( gsrv: grpcServer, fileHandler: fileHandler, socketAddress: socketAddress, - updateMonitor: types.NewUpdateFrequencyMonitor(types.DaemonStartupGracePeriod, logger), } } // Stop stops the daemon server's gRPC service. func (server *Server) Stop() { - server.updateMonitor.Stop() server.gsrv.Stop() } -// DisableUpdateMonitoringForTesting disables the update monitor for testing purposes. This is needed in integration -// tests that do not run the full protocol. -func (server *Server) DisableUpdateMonitoringForTesting() { - server.updateMonitor.DisableForTesting() -} - -// registerDaemon registers a daemon service with the update monitor. -func (server *Server) registerDaemon( - daemonKey string, - maximumAcceptableUpdateDelay time.Duration, -) { - if err := server.updateMonitor.RegisterDaemonService(daemonKey, maximumAcceptableUpdateDelay); err != nil { - server.logger.Error( - "Failed to register daemon service with update monitor", - "error", - err, - "service", - daemonKey, - "maximumAcceptableUpdateDelay", - maximumAcceptableUpdateDelay, - ) - panic(err) - } -} - -// reportResponse reports a response from a daemon service with the update monitor. This is used to -// ensure that the daemon continues to operate. If the update monitor does not see a response from a -// registered daemon within the maximumAcceptableUpdateDelay, it will cause the protocol to panic. -func (server *Server) reportResponse( - daemonKey string, -) error { +// reportValidResponse reports a valid request/response from a daemon service for metrics collection purposes. +func (server *Server) reportValidResponse(daemonKey string) { telemetry.IncrCounterWithLabels( []string{ metrics.DaemonServer, @@ -98,7 +63,6 @@ func (server *Server) reportResponse( metrics.GetLabelForStringValue(metrics.Daemon, daemonKey), }, ) - return server.updateMonitor.RegisterValidResponse(daemonKey) } // Start clears the current socket and establishes a new socket connection diff --git a/protocol/daemons/server/server_test.go b/protocol/daemons/server/server_test.go index 5188409d10..5d09b53d47 100644 --- a/protocol/daemons/server/server_test.go +++ b/protocol/daemons/server/server_test.go @@ -13,7 +13,6 @@ import ( "net" "os" "testing" - "time" ) const ( @@ -159,48 +158,6 @@ func TestStart_MixedInvalid(t *testing.T) { } } -func TestRegisterDaemon_DoesNotPanic(t *testing.T) { - grpcServer := &mocks.GrpcServer{} - grpcServer.On("Stop").Return().Once() - server := server.NewServer( - log.NewNopLogger(), - grpcServer, - &mocks.FileHandler{}, - grpc.SocketPath, - ) - defer server.Stop() - - require.NotPanics(t, func() { - server.ExpectPricefeedDaemon(5 * time.Second) - }) -} - -func TestRegisterDaemon_DoubleRegistrationPanics(t *testing.T) { - grpcServer := &mocks.GrpcServer{} - grpcServer.On("Stop").Return().Once() - server := server.NewServer( - log.NewNopLogger(), - grpcServer, - &mocks.FileHandler{}, - grpc.SocketPath, - ) - defer server.Stop() - - // First registration should not panic. - require.NotPanics(t, func() { - server.ExpectPricefeedDaemon(5 * time.Second) - }) - - // Second registration should panic. - require.PanicsWithError( - t, - "service pricefeed-daemon already registered", - func() { - server.ExpectPricefeedDaemon(5 * time.Second) - }, - ) -} - func createServerWithMocks( t testing.TB, mockGrpcServer *mocks.GrpcServer, @@ -214,7 +171,6 @@ func createServerWithMocks( ) mockGrpcServer.On("Stop").Return().Once() t.Cleanup(server.Stop) - server.DisableUpdateMonitoringForTesting() return server } diff --git a/protocol/daemons/server/types/health_checker.go b/protocol/daemons/server/types/health_checker.go new file mode 100644 index 0000000000..16e6f0b900 --- /dev/null +++ b/protocol/daemons/server/types/health_checker.go @@ -0,0 +1,210 @@ +package types + +import ( + "github.com/cometbft/cometbft/libs/log" + "github.com/dydxprotocol/v4-chain/protocol/daemons/types" + libtime "github.com/dydxprotocol/v4-chain/protocol/lib/time" + "sync" + "time" +) + +// errorStreak tracks two relevant statistics for an error streak returned by a HealthCheckable - the timestamp of the +// beginning of the error streak, and the most recent error. This is useful for determining how long a service has been +// unhealthy, as well as the current state of the service. +type errorStreak struct { + startOfStreak time.Time + mostRecentError error +} + +// UpdateLastError updates the errorStreak to reflect the current error. If the startOfStreak timestamp is zero, this +// error the first error in a new error streak, so the startOfStreak timestamp is set to the current timestamp. +func (u *errorStreak) UpdateLastError(timestamp time.Time, err error) { + // If the startOfStreak is zero, this is the first update, so set the startOfStreak. + if u.startOfStreak.IsZero() { + u.startOfStreak = timestamp + } + + u.mostRecentError = err +} + +// Reset resets the errorStreak to its zero value, indicating that the service has no active error streak. +func (u *errorStreak) Reset() { + u.startOfStreak = time.Time{} + u.mostRecentError = nil +} + +// IsUnset returns true if the errorStreak is unset, indicating that the service has no active error streak. +func (u *errorStreak) IsUnset() bool { + return u.startOfStreak.IsZero() && u.mostRecentError == nil +} + +// StartOfStreak returns the timestamp of th start of the most recent error streak. +func (u *errorStreak) StartOfStreak() time.Time { + return u.startOfStreak +} + +// MostRecentError returns the most recent error associated with the current error streak. +func (u *errorStreak) MostRecentError() error { + return u.mostRecentError +} + +// healthCheckerMutableState tracks the current health state of the HealthCheckable, encapsulating all mutable state +// into a single struct for ease of synchronization. +type healthCheckerMutableState struct { + // lock is used to synchronize access to mutable state fields. + lock sync.Mutex + + // lastSuccessTimestamp is the startOfStreak of the most recent successful health check. + // Access to lastSuccessTimestamp is synchronized. + lastSuccessTimestamp time.Time + + // mostRecentErrorStreak tracks the beginning of the most recent streak, as well as the current error in the streak. + // It is updated on every error and reset every time the service sees a healthy response. + // This field is used to determine how long the daemon has been unhealthy. If the mostRecentErrorStreak is unset, + // then either the service has never been unhealthy, or the most recent error streak ended before it could trigger + // a callback. + // Access to mostRecentErrorStreak is synchronized. + mostRecentErrorStreak errorStreak + + // timer triggers a health check poll for a health-checkable service. + timer *time.Timer + + // stopped indicates whether the health checker has been stopped. Additional health checks cannot be scheduled + // after the health checker has been stopped. + stopped bool +} + +// ReportSuccess updates the health checker's mutable state to reflect a successful health check and schedules the next +// poll as an atomic operation. +func (u *healthCheckerMutableState) ReportSuccess(now time.Time) { + u.lock.Lock() + defer u.lock.Unlock() + + u.lastSuccessTimestamp = now + + // Whenever the service is healthy, reset the first failure in streak startOfStreak. + u.mostRecentErrorStreak.Reset() +} + +// ReportFailure updates the health checker's mutable state to reflect a failed health check and schedules the next +// poll as an atomic operation. The method returns the duration of the current failure streak. +func (u *healthCheckerMutableState) ReportFailure(now time.Time, err error) time.Duration { + u.lock.Lock() + defer u.lock.Unlock() + + u.mostRecentErrorStreak.UpdateLastError(now, err) + + return now.Sub(u.mostRecentErrorStreak.StartOfStreak()) +} + +// SchedulePoll schedules the next poll for the health-checkable service. If the service is stopped, the next poll +// will not be scheduled. This method is synchronized. +func (u *healthCheckerMutableState) SchedulePoll(nextPollDelay time.Duration) { + u.lock.Lock() + defer u.lock.Unlock() + + // Don't schedule a poll if the health checker has been stopped. + if u.stopped { + return + } + + // Schedule the next poll. + u.timer.Reset(nextPollDelay) +} + +// Stop stops the health checker. This method is synchronized. +func (u *healthCheckerMutableState) Stop() { + u.lock.Lock() + defer u.lock.Unlock() + + // Don't stop the health checker if it has already been stopped. + if u.stopped { + return + } + + u.timer.Stop() + u.stopped = true +} + +// healthChecker encapsulates the logic for monitoring the health of a health-checkable service. +type healthChecker struct { + // mutableState is the mutable state of the health checker. Access to these fields is synchronized. + mutableState *healthCheckerMutableState + + // healthCheckable is the health-checkable service to be monitored. + healthCheckable types.HealthCheckable + + // pollFrequency is the frequency at which the health-checkable service is polled. + pollFrequency time.Duration + + // maxUnhealthyDuration is the maximum acceptable duration for a health-checkable service to + // remain unhealthy. If the service remains unhealthy for this duration, the monitor will execute the + // specified callback function. + maxUnhealthyDuration time.Duration + + // unhealthyCallback is the callback function to be executed if the health-checkable service remains + // unhealthy for a period of time greater than or equal to the maximum acceptable unhealthy duration. + // This callback function is executed with the error that caused the service to become unhealthy. + unhealthyCallback func(error) + + // timeProvider is used to get the current time. It is added as a field so that it can be mocked in tests. + timeProvider libtime.TimeProvider + + logger log.Logger +} + +// Poll executes a health check for the health-checkable service. If the service has been unhealthy for longer than the +// maximum acceptable unhealthy duration, the callback function is executed. +// This method is publicly exposed for testing. This method is synchronized. +func (hc *healthChecker) Poll() { + err := hc.healthCheckable.HealthCheck() + now := hc.timeProvider.Now() + + if err == nil { // Capture healthy response. + hc.mutableState.ReportSuccess(now) + } else { // Capture unhealthy response. + streakDuration := hc.mutableState.ReportFailure(now, err) + // If the service has been unhealthy for longer than the maximum acceptable unhealthy duration, execute the + // callback function. + if streakDuration >= hc.maxUnhealthyDuration { + hc.unhealthyCallback(err) + } + } + + // Schedule next poll. We schedule another poll whether the callback was invoked or not, as callbacks are not + // guaranteed to panic or otherwise halt the daemon. In such cases, we may end up invoking the callback several + // times once the service exceeds the maximum unhealthy duration. For example, a callback that emits error logs + // will continue to emit error logs every 5s until the service becomes healthy again. + hc.mutableState.SchedulePoll(hc.pollFrequency) +} + +// Stop stops the health checker. This method is not synchronized, as the timer does not need synchronization. +func (hc *healthChecker) Stop() { + hc.mutableState.Stop() +} + +// StartNewHealthChecker creates and starts a new health checker for a health-checkable service. +func StartNewHealthChecker( + healthCheckable types.HealthCheckable, + pollFrequency time.Duration, + unhealthyCallback func(error), + timeProvider libtime.TimeProvider, + maxUnhealthyDuration time.Duration, + startupGracePeriod time.Duration, + logger log.Logger, +) *healthChecker { + checker := &healthChecker{ + healthCheckable: healthCheckable, + pollFrequency: pollFrequency, + unhealthyCallback: unhealthyCallback, + timeProvider: timeProvider, + maxUnhealthyDuration: maxUnhealthyDuration, + logger: logger, + mutableState: &healthCheckerMutableState{}, + } + + // The first poll is scheduled after the startup grace period to allow the service to initialize. + checker.mutableState.timer = time.AfterFunc(startupGracePeriod, checker.Poll) + + return checker +} diff --git a/protocol/daemons/server/types/health_checker_test.go b/protocol/daemons/server/types/health_checker_test.go new file mode 100644 index 0000000000..fc37b356bc --- /dev/null +++ b/protocol/daemons/server/types/health_checker_test.go @@ -0,0 +1,127 @@ +package types_test + +import ( + "github.com/dydxprotocol/v4-chain/protocol/daemons/server/types" + "github.com/dydxprotocol/v4-chain/protocol/mocks" + "github.com/stretchr/testify/require" + "testing" + "time" +) + +func TestHealthChecker(t *testing.T) { + tests := map[string]struct { + healthCheckResponses []error + expectedUnhealthy error + }{ + "initialized: no callback triggered": { + healthCheckResponses: []error{}, + expectedUnhealthy: nil, + }, + "healthy, then unhealthy for < maximum unhealthy duration: no callback triggered": { + healthCheckResponses: []error{ + nil, + TestError1, + }, + expectedUnhealthy: nil, + }, + "unhealthy for < maximum unhealthy duration: no callback triggered": { + healthCheckResponses: []error{ + TestError1, + TestError1, + TestError1, + TestError1, + TestError1, + }, + expectedUnhealthy: nil, + }, + "unhealthy, healthy, unhealthy: no callback triggered": { + healthCheckResponses: []error{ + TestError1, + TestError1, + TestError1, + TestError1, + TestError1, + nil, + TestError1, + TestError1, + TestError1, + TestError1, + TestError1, + }, + expectedUnhealthy: nil, + }, + "unhealthy for maximum unhealthy duration: callback triggered": { + healthCheckResponses: []error{ + TestError1, + TestError1, + TestError1, + TestError1, + TestError1, + TestError1, + }, + expectedUnhealthy: TestError1, + }, + "unhealthy with multiple errors: most recent error returned": { + healthCheckResponses: []error{ + TestError1, + TestError1, + TestError1, + TestError1, + TestError1, + TestError2, + }, + expectedUnhealthy: TestError2, + }, + } + for name, test := range tests { + t.Run(name, func(t *testing.T) { + // Setup. + // Set up callback to track error passed to it. + callback, callbackError := callbackWithErrorPointer() + + // Set up health checkable service. + checkable := &mocks.HealthCheckable{} + for _, response := range test.healthCheckResponses { + checkable.On("HealthCheck").Return(response).Once() + } + + // Set up time provider to return a sequence of timestamps one second apart starting at Time0. + timeProvider := &mocks.TimeProvider{} + for i := range test.healthCheckResponses { + timeProvider.On("Now").Return(Time0.Add(time.Duration(i) * time.Second)).Once() + } + + healthChecker := types.StartNewHealthChecker( + checkable, + TestLargeDuration, // set to a >> value so that poll is never auto-triggered by the timer + callback, + timeProvider, + TestMaximumUnhealthyDuration, + types.DaemonStartupGracePeriod, + &mocks.Logger{}, + ) + + // Cleanup. + defer func() { + healthChecker.Stop() + }() + + // Act - simulate the health checker polling for updates. + for i := 0; i < len(test.healthCheckResponses); i++ { + healthChecker.Poll() + } + + // Assert. + // Validate the expected polls occurred according to the mocks. + checkable.AssertExpectations(t) + timeProvider.AssertExpectations(t) + + // Validate the callback was called with the expected error. + if test.expectedUnhealthy == nil { + require.NoError(t, *callbackError) + } else { + require.ErrorContains(t, *callbackError, test.expectedUnhealthy.Error()) + } + }) + } +} diff --git a/protocol/daemons/server/types/health_monitor.go b/protocol/daemons/server/types/health_monitor.go new file mode 100644 index 0000000000..1ac3109589 --- /dev/null +++ b/protocol/daemons/server/types/health_monitor.go @@ -0,0 +1,238 @@ +package types + +import ( + cosmoslog "cosmossdk.io/log" + "fmt" + "github.com/cometbft/cometbft/libs/log" + "github.com/dydxprotocol/v4-chain/protocol/daemons/types" + libtime "github.com/dydxprotocol/v4-chain/protocol/lib/time" + "sync" + "time" +) + +const ( + // HealthCheckPollFrequency is the frequency at which the health-checkable service is polled. + HealthCheckPollFrequency = 5 * time.Second + + // HealthMonitorLogModuleName is the module name used for logging within the health monitor. + HealthMonitorLogModuleName = "daemon-health-monitor" +) + +// healthMonitorMutableState tracks all mutable state associated with the health monitor. This state is gathered into +// a single struct for ease of synchronization. +type healthMonitorMutableState struct { + sync.Mutex + + // serviceToHealthChecker maps daemon service names to their update metadata. + serviceToHealthChecker map[string]*healthChecker + // stopped indicates whether the monitor has been stopped. Additional daemon services cannot be registered + // after the monitor has been stopped. + stopped bool + // disabled indicates whether the monitor has been disabled. This is used to disable the monitor in testApp + // tests, where app.New is not executed. + disabled bool +} + +// newHealthMonitorMutableState creates a new health monitor mutable state. +func newHealthMonitorMutableState() *healthMonitorMutableState { + return &healthMonitorMutableState{ + serviceToHealthChecker: make(map[string]*healthChecker), + } +} + +// DisableForTesting disables the health monitor mutable state from receiving updates. This prevents the monitor +// from registering services when called before app initialization and is used for testing. +func (ms *healthMonitorMutableState) DisableForTesting() { + ms.Lock() + defer ms.Unlock() + + ms.disabled = true +} + +// Stop stops the update frequency monitor. This method is synchronized. +func (ms *healthMonitorMutableState) Stop() { + ms.Lock() + defer ms.Unlock() + + // Don't stop the monitor if it has already been stopped. + if ms.stopped { + return + } + + // Stop all health checkers. + for _, checker := range ms.serviceToHealthChecker { + checker.Stop() + } + + ms.stopped = true +} + +// RegisterHealthChecker registers a new health checker for a health-checkable with the health monitor. The health +// checker is lazily created using the provided function if needed. This method is synchronized. It returns an error if +// the service was already registered. +func (ms *healthMonitorMutableState) RegisterHealthChecker( + checkable types.HealthCheckable, + lazyHealthCheckerCreator func() *healthChecker, +) error { + stopService := false + + // If the monitor has already been stopped, we want to stop the checkable service before returning. + // However, we'd prefer not to stop the service within the critical section in order to prevent deadlocks. + // This defer will be called last, after the lock is released. + defer func() { + if stopService { + // If the service is stoppable, stop it. This helps us to clean up daemon services in test cases + // where the monitor is stopped before all daemon services have been registered. + if stoppable, ok := checkable.(Stoppable); ok { + stoppable.Stop() + } + } + }() + + // Enter into the critical section. + ms.Lock() + defer ms.Unlock() + + // Don't register daemon services if the monitor has been disabled. + if ms.disabled { + return nil + } + + // Don't register additional daemon services if the monitor has already been stopped. + // This could be a concern for short-running integration test cases, where the network + // stops before all daemon services have been registered. + if ms.stopped { + // Toggle the stopService flag to true so that the service is stopped after the lock is released. + stopService = true + return nil + } + + if _, ok := ms.serviceToHealthChecker[checkable.ServiceName()]; ok { + return fmt.Errorf("service %v already registered", checkable.ServiceName()) + } + + ms.serviceToHealthChecker[checkable.ServiceName()] = lazyHealthCheckerCreator() + return nil +} + +// HealthMonitor monitors the health of daemon services, which implement the HealthCheckable interface. If a +// registered health-checkable service sustains an unhealthy state for the maximum acceptable unhealthy duration, +// the monitor will execute a callback function. +type HealthMonitor struct { + mutableState *healthMonitorMutableState + + // These fields are initialized in NewHealthMonitor and are not modified after initialization. + logger log.Logger + // startupGracePeriod is the grace period before the monitor starts polling the health-checkable services. + startupGracePeriod time.Duration + // pollingFrequency is the frequency at which the health-checkable services are polled. + pollingFrequency time.Duration + // enablePanics is used to toggle between panics or error logs when a daemon sustains an unhealthy state past the + // maximum allowable duration. + enablePanics bool +} + +// NewHealthMonitor creates a new health monitor. +func NewHealthMonitor( + startupGracePeriod time.Duration, + pollingFrequency time.Duration, + logger log.Logger, + enablePanics bool, +) *HealthMonitor { + return &HealthMonitor{ + mutableState: newHealthMonitorMutableState(), + logger: logger.With(cosmoslog.ModuleKey, HealthMonitorLogModuleName), + startupGracePeriod: startupGracePeriod, + pollingFrequency: pollingFrequency, + enablePanics: enablePanics, + } +} + +func (hm *HealthMonitor) DisableForTesting() { + hm.mutableState.DisableForTesting() +} + +// RegisterServiceWithCallback registers a HealthCheckable with the health monitor. If the service +// stays unhealthy every time it is polled during the maximum acceptable unhealthy duration, the monitor will +// execute the callback function. +// This method is synchronized. The method returns an error if the service was already registered or the +// monitor has already been stopped. If the monitor has been stopped, this method will proactively stop the +// health-checkable service before returning. +func (hm *HealthMonitor) RegisterServiceWithCallback( + hc types.HealthCheckable, + maxUnhealthyDuration time.Duration, + callback func(error), +) error { + if maxUnhealthyDuration <= 0 { + return fmt.Errorf( + "health check registration failure for service %v: "+ + "maximum unhealthy duration %v must be positive", + hc.ServiceName(), + maxUnhealthyDuration, + ) + } + + return hm.mutableState.RegisterHealthChecker(hc, func() *healthChecker { + return StartNewHealthChecker( + hc, + hm.pollingFrequency, + callback, + &libtime.TimeProviderImpl{}, + maxUnhealthyDuration, + hm.startupGracePeriod, + hm.logger, + ) + }) +} + +// PanicServiceNotResponding returns a function that panics with a message indicating that the specified daemon +// service is not responding. This is ideal for creating a callback function when registering a daemon service. +func PanicServiceNotResponding(hc types.HealthCheckable) func(error) { + return func(err error) { + panic(fmt.Sprintf("%v unhealthy: %v", hc.ServiceName(), err)) + } +} + +// LogErrorServiceNotResponding returns a function that logs an error indicating that the specified service +// is not responding. This is ideal for creating a callback function when registering a health-checkable service. +func LogErrorServiceNotResponding(hc types.HealthCheckable, logger log.Logger) func(error) { + return func(err error) { + logger.Error( + "health-checked service is unhealthy", + "service", + hc.ServiceName(), + "error", + err, + ) + } +} + +// RegisterService registers a new health-checkable service with the health check monitor. If the service +// is unhealthy every time it is polled for a duration greater than or equal to the maximum acceptable unhealthy +// duration, the monitor will panic or log an error, depending on the app configuration via the +// `panic-on-daemon-failure-enabled` flag. +// This method is synchronized. It returns an error if the service was already registered or the monitor has +// already been stopped. If the monitor has been stopped, this method will proactively stop the health-checkable +// service before returning. +func (hm *HealthMonitor) RegisterService( + hc types.HealthCheckable, + maxDaemonUnhealthyDuration time.Duration, +) error { + // If the monitor is configured to panic, use the panic callback. Otherwise, use the error log callback. + // This behavior is configured via flag and defaults to panicking on daemon failure. + callback := LogErrorServiceNotResponding(hc, hm.logger) + if hm.enablePanics { + callback = PanicServiceNotResponding(hc) + } + + return hm.RegisterServiceWithCallback( + hc, + maxDaemonUnhealthyDuration, + callback, + ) +} + +// Stop stops the update frequency monitor. This method is synchronized. +func (hm *HealthMonitor) Stop() { + hm.mutableState.Stop() +} diff --git a/protocol/daemons/server/types/health_monitor_test.go b/protocol/daemons/server/types/health_monitor_test.go new file mode 100644 index 0000000000..8ba07f748b --- /dev/null +++ b/protocol/daemons/server/types/health_monitor_test.go @@ -0,0 +1,273 @@ +package types_test + +import ( + "fmt" + "github.com/dydxprotocol/v4-chain/protocol/daemons/server/types" + daemontypes "github.com/dydxprotocol/v4-chain/protocol/daemons/types" + "github.com/dydxprotocol/v4-chain/protocol/mocks" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + "testing" + "time" +) + +var ( + TestError1 = fmt.Errorf("test error 1") + TestError2 = fmt.Errorf("test error 2") + + Time0 = time.Unix(0, 0) + Time1 = Time0.Add(time.Second) + Time2 = Time0.Add(2 * time.Second) + Time3 = Time0.Add(3 * time.Second) + Time4 = Time0.Add(4 * time.Second) + Time5 = Time0.Add(5 * time.Second) + + // Use a maximum unhealthy duration of 5 seconds for testing, simulating a poll frequency of 1s with timestamps. + TestMaximumUnhealthyDuration = 5 * time.Second + + // TestLargeDuration is used to ensure that the health checker does not trigger a callback through the timer. + TestLargeDuration = 5 * time.Minute + + SmallDuration = 10 * time.Millisecond +) + +// createTestMonitor creates a health monitor with a poll frequency of 10ms and a zero duration grace period. +func createTestMonitor() (*types.HealthMonitor, *mocks.Logger) { + logger := &mocks.Logger{} + logger.On("With", "module", "daemon-health-monitor").Return(logger).Once() + return types.NewHealthMonitor( + SmallDuration, + 10*time.Millisecond, + logger, + true, // enable panics here for stricter testing - a panic will definitely cause a test failure. + ), logger +} + +// mockFailingHealthCheckerWithError creates a mock health checkable service that returns the given error. +func mockFailingHealthCheckerWithError(name string, err error) *mocks.HealthCheckable { + hc := &mocks.HealthCheckable{} + hc.On("ServiceName").Return(name) + hc.On("HealthCheck").Return(err) + return hc +} + +// callbackWithErrorPointer returns a callback function and an error pointer that tracks the error passed to the +// callback. This can be used to validate that a service was considered unhealthy for the maximum allowable duration. +func callbackWithErrorPointer() (func(error), *error) { + var callbackError error + callback := func(err error) { + callbackError = err + } + return callback, &callbackError +} + +// The following tests may still intermittently fail or report false positives on an overloaded system as they rely +// on callbacks to execute before the termination of the `time.Sleep` call, which is not guaranteed. +func TestRegisterService_Healthy(t *testing.T) { + // Setup. + ufm, logger := createTestMonitor() + hc := mockFailingHealthCheckerWithError("test-service", nil) + + // Act. + err := ufm.RegisterService(hc, 50*time.Millisecond) + require.NoError(t, err) + + // Cleanup. + defer func() { + ufm.Stop() + }() + + // Give the monitor time to poll the health checkable service. Polls occur once every 10ms. + time.Sleep(100 * time.Millisecond) + + // Assert: no calls to the logger were made. + mock.AssertExpectationsForObjects(t, hc, logger) +} + +func TestRegisterServiceWithCallback_Mixed(t *testing.T) { + tests := map[string]struct { + healthCheckResponse error + }{ + "healthy: no callback triggered": { + healthCheckResponse: nil, + }, + "unhealthy: callback triggered": { + healthCheckResponse: TestError1, + }, + } + for name, test := range tests { + t.Run(name, func(t *testing.T) { + // Setup. + ufm, logger := createTestMonitor() + hc := mockFailingHealthCheckerWithError("test-service", test.healthCheckResponse) + callback, callbackError := callbackWithErrorPointer() + + // Act. + err := ufm.RegisterServiceWithCallback( + hc, + 50*time.Millisecond, + callback, + ) + require.NoError(t, err) + + // Cleanup. + defer func() { + ufm.Stop() + }() + + // Give the monitor time to poll the health checkable service. Polls occur once every 10ms. + time.Sleep(100 * time.Millisecond) + + // Assert: no calls to the logger were made. + mock.AssertExpectationsForObjects(t, hc, logger) + + // Assert: the callback was called or not called as expected. + require.Equal(t, test.healthCheckResponse, *callbackError) + }) + } +} + +func TestHealthMonitor_DisablePanics_DoesNotPanic(t *testing.T) { + logger := &mocks.Logger{} + logger.On("With", "module", "daemon-health-monitor").Return(logger).Once() + logger.On( + "Error", + "health-checked service is unhealthy", + "service", + "test-service", + "error", + mock.Anything, + ).Return() + + hm := types.NewHealthMonitor( + SmallDuration, + 10*time.Millisecond, + logger, + false, + ) + + hc := mockFailingHealthCheckerWithError("test-service", TestError1) + + err := hm.RegisterService(hc, 10*time.Millisecond) + require.NoError(t, err) + + defer func() { + hm.Stop() + }() + + // A 100ms sleep should be sufficient for the health monitor to detect the unhealthy service and trigger a callback. + time.Sleep(100 * time.Millisecond) + + // Assert. + // This test is confirmed to panic when panics are not disabled - but because the panic occurs in a separate + // go-routine, it cannot be easily captured with an assert. Instead, we do not try to capture the panic, but + // assert that the logger was called with the expected arguments. + mock.AssertExpectationsForObjects(t, logger) +} + +func TestRegisterServiceWithCallback_DoubleRegistrationFails(t *testing.T) { + // Setup. + ufm, logger := createTestMonitor() + + hc := mockFailingHealthCheckerWithError("test-service", TestError1) + hc2 := mockFailingHealthCheckerWithError("test-service", TestError2) + + callback, callbackError := callbackWithErrorPointer() + + err := ufm.RegisterServiceWithCallback(hc, 50*time.Millisecond, callback) + require.NoError(t, err) + + // Register a service with the same name. This registration should fail. + err = ufm.RegisterServiceWithCallback(hc2, 50*time.Millisecond, callback) + require.ErrorContains(t, err, "service already registered") + + // Expect that the first service is still operating and will produce a callback after a sustained unhealthy period. + time.Sleep(100 * time.Millisecond) + ufm.Stop() + + // Assert no calls to the logger were made. + mock.AssertExpectationsForObjects(t, logger, hc) + hc2.AssertNotCalled(t, "HealthCheck") + + // Assert the callback was called with the expected error. + require.Equal(t, TestError1, *callbackError) +} + +// Create a struct that implements HealthCheckable and Stoppable to check that the monitor stops the service. +type stoppableFakeHealthChecker struct { + stopped bool +} + +// Implement stub methods to conform to interfaces. +func (f *stoppableFakeHealthChecker) ServiceName() string { return "test-service" } +func (f *stoppableFakeHealthChecker) HealthCheck() error { return fmt.Errorf("unhealthy") } +func (f *stoppableFakeHealthChecker) ReportSuccess() {} +func (f *stoppableFakeHealthChecker) ReportFailure(_ error) {} + +// Stop stub tracks whether the service was stopped. +func (f *stoppableFakeHealthChecker) Stop() { + f.stopped = true +} + +var _ types.Stoppable = (*stoppableFakeHealthChecker)(nil) +var _ daemontypes.HealthCheckable = (*stoppableFakeHealthChecker)(nil) + +func TestRegisterService_RegistrationFailsAfterStop(t *testing.T) { + ufm, _ := createTestMonitor() + ufm.Stop() + + stoppableHc := &stoppableFakeHealthChecker{} + hc2 := mockFailingHealthCheckerWithError("test-service-2", TestError1) + + // Register unhealthy services. These services are confirmed to trigger a panic if registered when the monitor is + // not stopped. + // Register a stoppable unhealthy service. + err := ufm.RegisterService(stoppableHc, 10*time.Millisecond) + require.Nil(t, err) + + // Register a non-stoppable unhealthy service. + err = ufm.RegisterService(hc2, 10*time.Millisecond) + require.Nil(t, err) + + // Since the max allowable unhealthy duration is 10ms, and the polling period is 10ms, 100ms is long enough to wait + // in order to trigger a panic if a service is polled. + time.Sleep(100 * time.Millisecond) + + // Assert that the monitor proactively stops any stoppable service that was registered after the monitor was + // stopped. + require.True(t, stoppableHc.stopped) +} + +func TestRegisterValidResponseWithCallback_NegativeUnhealthyDuration(t *testing.T) { + ufm, _ := createTestMonitor() + hc := mockFailingHealthCheckerWithError("test-service", TestError1) + err := ufm.RegisterServiceWithCallback(hc, -50*time.Millisecond, func(error) {}) + require.ErrorContains(t, err, "maximum unhealthy duration -50ms must be positive") +} + +func TestPanicServiceNotResponding(t *testing.T) { + panicFunc := types.PanicServiceNotResponding(&mocks.HealthCheckable{}) + require.Panics(t, func() { + panicFunc(TestError1) + }) +} + +func TestLogErrorServiceNotResponding(t *testing.T) { + logger := &mocks.Logger{} + hc := &mocks.HealthCheckable{} + + hc.On("ServiceName").Return("test-service") + logger.On( + "Error", + "health-checked service is unhealthy", + "service", + "test-service", + "error", + TestError1, + ).Return() + logFunc := types.LogErrorServiceNotResponding(hc, logger) + logFunc(TestError1) + + // Assert: the logger was called with the expected arguments. + mock.AssertExpectationsForObjects(t, logger) +} diff --git a/protocol/daemons/server/types/stoppable.go b/protocol/daemons/server/types/stoppable.go new file mode 100644 index 0000000000..fda69d34a8 --- /dev/null +++ b/protocol/daemons/server/types/stoppable.go @@ -0,0 +1,7 @@ +package types + +// Stoppable is an interface for a service that can be stopped. +// This is used to stop services registered with the health monitor. +type Stoppable interface { + Stop() +} diff --git a/protocol/daemons/server/types/update_monitor.go b/protocol/daemons/server/types/update_monitor.go deleted file mode 100644 index 7c437ff0d7..0000000000 --- a/protocol/daemons/server/types/update_monitor.go +++ /dev/null @@ -1,169 +0,0 @@ -package types - -import ( - "fmt" - "github.com/cometbft/cometbft/libs/log" - "sync" - "time" -) - -type updateMetadata struct { - timer *time.Timer - updateFrequency time.Duration -} - -// UpdateMonitor monitors the update frequency of daemon services. If a daemon service does not respond within -// the maximum acceptable update delay set when the daemon is registered, the monitor will log an error and halt the -// protocol. This was judged to be the best solution for network performance because it prevents any validator from -// participating in the network at all if a daemon service is not responding. -type UpdateMonitor struct { - // serviceToUpdateMetadata maps daemon service names to their update metadata. - serviceToUpdateMetadata map[string]updateMetadata - // stopped indicates whether the monitor has been stopped. Additional daemon services cannot be registered - // after the monitor has been stopped. - stopped bool - // disabled indicates whether the monitor has been disabled. This is used to disable the monitor in testApp - // tests, where app.New is not executed. - disabled bool - // lock is used to synchronize access to the monitor. - lock sync.Mutex - - // These fields are initialized in NewUpdateFrequencyMonitor and are not modified after initialization. - logger log.Logger - daemonStartupGracePeriod time.Duration -} - -// NewUpdateFrequencyMonitor creates a new update frequency monitor. -func NewUpdateFrequencyMonitor(daemonStartupGracePeriod time.Duration, logger log.Logger) *UpdateMonitor { - return &UpdateMonitor{ - serviceToUpdateMetadata: make(map[string]updateMetadata), - logger: logger, - daemonStartupGracePeriod: daemonStartupGracePeriod, - } -} - -func (ufm *UpdateMonitor) DisableForTesting() { - ufm.lock.Lock() - defer ufm.lock.Unlock() - - ufm.disabled = true -} - -// RegisterDaemonServiceWithCallback registers a new daemon service with the update frequency monitor. If the daemon -// service fails to respond within the maximum acceptable update delay, the monitor will execute the callback function. -// This method is synchronized. The method returns an error if the daemon service was already registered or the -// monitor has already been stopped. -func (ufm *UpdateMonitor) RegisterDaemonServiceWithCallback( - service string, - maximumAcceptableUpdateDelay time.Duration, - callback func(), -) error { - ufm.lock.Lock() - defer ufm.lock.Unlock() - - if maximumAcceptableUpdateDelay <= 0 { - return fmt.Errorf( - "registration failure for service %v: maximum acceptable update delay %v must be positive", - service, - maximumAcceptableUpdateDelay, - ) - } - - // Don't register daemon services if the monitor has been disabled. - if ufm.disabled { - return nil - } - - // Don't register additional daemon services if the monitor has already been stopped. - // This could be a concern for short-running integration test cases, where the network - // stops before all daemon services have been registered. - if ufm.stopped { - return fmt.Errorf("registration failure for service %v: monitor has been stopped", service) - } - - if _, ok := ufm.serviceToUpdateMetadata[service]; ok { - return fmt.Errorf("service %v already registered", service) - } - - ufm.serviceToUpdateMetadata[service] = updateMetadata{ - timer: time.AfterFunc(ufm.daemonStartupGracePeriod+maximumAcceptableUpdateDelay, callback), - updateFrequency: maximumAcceptableUpdateDelay, - } - return nil -} - -// PanicServiceNotResponding returns a function that panics with a message indicating that the specified daemon -// service is not responding. This is ideal for creating a callback function when registering a daemon service. -func PanicServiceNotResponding(service string) func() { - return func() { - panic(fmt.Sprintf("%v daemon not responding", service)) - } -} - -// LogErrorServiceNotResponding returns a function that logs an error indicating that the specified daemon service -// is not responding. This is ideal for creating a callback function when registering a daemon service. -func LogErrorServiceNotResponding(service string, logger log.Logger) func() { - return func() { - logger.Error( - "daemon not responding", - "service", - service, - ) - } -} - -// RegisterDaemonService registers a new daemon service with the update frequency monitor. If the daemon service -// fails to respond within the maximum acceptable update delay, the monitor will log an error. -// This method is synchronized. The method an error if the daemon service was already registered or the monitor has -// already been stopped. -func (ufm *UpdateMonitor) RegisterDaemonService( - service string, - maximumAcceptableUpdateDelay time.Duration, -) error { - return ufm.RegisterDaemonServiceWithCallback( - service, - maximumAcceptableUpdateDelay, - LogErrorServiceNotResponding(service, ufm.logger), - ) -} - -// Stop stops the update frequency monitor. This method is synchronized. -func (ufm *UpdateMonitor) Stop() { - ufm.lock.Lock() - defer ufm.lock.Unlock() - - // Don't stop the monitor if it has already been stopped. - if ufm.stopped { - return - } - - for _, metadata := range ufm.serviceToUpdateMetadata { - metadata.timer.Stop() - } - ufm.stopped = true -} - -// RegisterValidResponse registers a valid response from the daemon service. This will reset the timer for the -// daemon service. This method is synchronized. -func (ufm *UpdateMonitor) RegisterValidResponse(service string) error { - ufm.lock.Lock() - defer ufm.lock.Unlock() - - // Don't return an error if the monitor has been disabled. - if ufm.disabled { - return nil - } - - // Don't bother to reset the timer if the monitor has already been stopped. - if ufm.stopped { - return nil - } - - metadata, ok := ufm.serviceToUpdateMetadata[service] - if !ok { - return fmt.Errorf("service %v not registered", service) - } - - metadata.timer.Reset(metadata.updateFrequency) - return nil -} diff --git a/protocol/daemons/server/types/update_monitor_test.go b/protocol/daemons/server/types/update_monitor_test.go deleted file mode 100644 index c9abf1094f..0000000000 --- a/protocol/daemons/server/types/update_monitor_test.go +++ /dev/null @@ -1,189 +0,0 @@ -package types_test - -import ( - "github.com/dydxprotocol/v4-chain/protocol/daemons/server/types" - "github.com/dydxprotocol/v4-chain/protocol/mocks" - "github.com/stretchr/testify/mock" - "github.com/stretchr/testify/require" - "sync/atomic" - "testing" - "time" -) - -var ( - zeroDuration = 0 * time.Second -) - -func createTestMonitor() (*types.UpdateMonitor, *mocks.Logger) { - logger := &mocks.Logger{} - return types.NewUpdateFrequencyMonitor(zeroDuration, logger), logger -} - -// The following tests may still intermittently fail on an overloaded system as they rely -// on `time.Sleep`, which is not guaranteed to wake up after the specified amount of time. -func TestRegisterDaemonService_Success(t *testing.T) { - ufm, logger := createTestMonitor() - err := ufm.RegisterDaemonService("test-service", 200*time.Millisecond) - require.NoError(t, err) - - // As long as responses come in before the 200ms deadline, no errors should be logged. - time.Sleep(80 * time.Millisecond) - require.NoError(t, ufm.RegisterValidResponse("test-service")) - time.Sleep(80 * time.Millisecond) - require.NoError(t, ufm.RegisterValidResponse("test-service")) - time.Sleep(80 * time.Millisecond) - - ufm.Stop() - // Assert: no calls to the logger were made. - mock.AssertExpectationsForObjects(t, logger) -} - -func TestRegisterDaemonService_SuccessfullyLogsError(t *testing.T) { - ufm, logger := createTestMonitor() - logger.On("Error", "daemon not responding", "service", "test-service").Once().Return() - err := ufm.RegisterDaemonService("test-service", 1*time.Millisecond) - require.NoError(t, err) - time.Sleep(2 * time.Millisecond) - ufm.Stop() - - // Assert: the logger was called with the expected arguments. - mock.AssertExpectationsForObjects(t, logger) -} - -func TestRegisterDaemonServiceWithCallback_Success(t *testing.T) { - callbackCalled := atomic.Bool{} - - ufm, _ := createTestMonitor() - err := ufm.RegisterDaemonServiceWithCallback("test-service", 200*time.Millisecond, func() { - callbackCalled.Store(true) - }) - require.NoError(t, err) - - // As long as responses come in before the 200ms deadline, no panic should occur. - time.Sleep(80 * time.Millisecond) - require.NoError(t, ufm.RegisterValidResponse("test-service")) - time.Sleep(80 * time.Millisecond) - require.NoError(t, ufm.RegisterValidResponse("test-service")) - time.Sleep(80 * time.Millisecond) - - require.False(t, callbackCalled.Load()) - - ufm.Stop() -} - -func TestRegisterDaemonService_DoubleRegistrationFails(t *testing.T) { - ufm, logger := createTestMonitor() - - err := ufm.RegisterDaemonService("test-service", 200*time.Millisecond) - require.NoError(t, err) - - // Register the same daemon service again. This should fail, and 50ms update frequency should be ignored. - err = ufm.RegisterDaemonService("test-service", 50*time.Millisecond) - require.ErrorContains(t, err, "service already registered") - - // Confirm that the original 200ms update frequency is still in effect. 50ms would have triggered an error log. - // Note there is a possibility that 200ms will still cause an error log due to the semantics of Sleep, which is - // not guaranteed to sleep for exactly the specified duration. - time.Sleep(80 * time.Millisecond) - require.NoError(t, ufm.RegisterValidResponse("test-service")) - time.Sleep(80 * time.Millisecond) - ufm.Stop() - - // Assert no calls to the logger were made. - mock.AssertExpectationsForObjects(t, logger) -} - -func TestRegisterDaemonServiceWithCallback_DoubleRegistrationFails(t *testing.T) { - // lock synchronizes callback flags and was added to avoid race test failures. - callback1Called := atomic.Bool{} - callback2Called := atomic.Bool{} - - ufm, _ := createTestMonitor() - // First registration should succeed. - err := ufm.RegisterDaemonServiceWithCallback("test-service", 200*time.Millisecond, func() { - callback1Called.Store(true) - }) - require.NoError(t, err) - - // Register the same daemon service again. This should fail, and 50ms update frequency should be ignored. - err = ufm.RegisterDaemonServiceWithCallback("test-service", 50*time.Millisecond, func() { - callback2Called.Store(true) - }) - require.ErrorContains(t, err, "service already registered") - - // Validate that the original callback is still in effect for the original 200ms update frequency. - // The 50ms update frequency should have invoked a callback if it were applied. - time.Sleep(80 * time.Millisecond) - require.False(t, callback1Called.Load()) - require.False(t, callback2Called.Load()) - - // Validate no issues with RegisterValidResponse after a double registration was attempted. - require.NoError(t, ufm.RegisterValidResponse("test-service")) - - // Sleep until the callback should be called. - time.Sleep(250 * time.Millisecond) - require.True(t, callback1Called.Load()) - require.False(t, callback2Called.Load()) - - ufm.Stop() -} - -func TestRegisterDaemonService_RegistrationFailsAfterStop(t *testing.T) { - ufm, logger := createTestMonitor() - ufm.Stop() - err := ufm.RegisterDaemonService("test-service", 50*time.Millisecond) - require.ErrorContains(t, err, "monitor has been stopped") - - // Any scheduled functions with error logs that were not cleaned up should trigger before this sleep finishes. - time.Sleep(100 * time.Millisecond) - mock.AssertExpectationsForObjects(t, logger) -} - -func TestRegisterDaemonServiceWithCallback_RegistrationFailsAfterStop(t *testing.T) { - ufm, _ := createTestMonitor() - ufm.Stop() - - callbackCalled := atomic.Bool{} - - // Registering a daemon service with a callback should fail after the monitor has been stopped. - err := ufm.RegisterDaemonServiceWithCallback("test-service", 50*time.Millisecond, func() { - callbackCalled.Store(true) - }) - require.ErrorContains(t, err, "monitor has been stopped") - - // Wait until after the callback duration has expired. The callback should not be called. - time.Sleep(75 * time.Millisecond) - - // Validate that the callback was not called. - require.False(t, callbackCalled.Load()) -} - -func TestRegisterValidResponse_NegativeUpdateDelay(t *testing.T) { - ufm, logger := createTestMonitor() - err := ufm.RegisterDaemonService("test-service", -50*time.Millisecond) - require.ErrorContains(t, err, "update delay -50ms must be positive") - - // Sanity check: no calls to the logger should have been made. - mock.AssertExpectationsForObjects(t, logger) -} - -func TestRegisterValidResponseWithCallback_NegativeUpdateDelay(t *testing.T) { - ufm, _ := createTestMonitor() - err := ufm.RegisterDaemonServiceWithCallback("test-service", -50*time.Millisecond, func() {}) - require.ErrorContains(t, err, "update delay -50ms must be positive") -} - -func TestPanicServiceNotResponding(t *testing.T) { - panicFunc := types.PanicServiceNotResponding("test-service") - require.Panics(t, panicFunc) -} - -func TestLogErrorServiceNotResponding(t *testing.T) { - logger := &mocks.Logger{} - logger.On("Error", "daemon not responding", "service", "test-service").Return() - logFunc := types.LogErrorServiceNotResponding("test-service", logger) - logFunc() - - // Assert: the logger was called with the expected arguments. - mock.AssertExpectationsForObjects(t, logger) -} diff --git a/protocol/daemons/server/types/util.go b/protocol/daemons/server/types/util.go deleted file mode 100644 index 59c07dcb6f..0000000000 --- a/protocol/daemons/server/types/util.go +++ /dev/null @@ -1,9 +0,0 @@ -package types - -import "time" - -// MaximumAcceptableUpdateDelay computes the maximum acceptable update delay for a daemon service as a -// multiple of the loop delay. -func MaximumAcceptableUpdateDelay(loopDelayMs uint32) time.Duration { - return MaximumLoopDelayMultiple * time.Duration(loopDelayMs) * time.Millisecond -} diff --git a/protocol/daemons/server/types/util_test.go b/protocol/daemons/server/types/util_test.go deleted file mode 100644 index 497faf19dd..0000000000 --- a/protocol/daemons/server/types/util_test.go +++ /dev/null @@ -1,14 +0,0 @@ -package types - -import ( - "github.com/stretchr/testify/require" - "testing" - "time" -) - -func TestMaximumAcceptableUpdateDelay(t *testing.T) { - loopDelayMs := uint32(1000) - expected := time.Duration(MaximumLoopDelayMultiple*loopDelayMs) * time.Millisecond - actual := MaximumAcceptableUpdateDelay(loopDelayMs) - require.Equal(t, expected, actual) -} diff --git a/protocol/daemons/types/health_checkable.go b/protocol/daemons/types/health_checkable.go index da79405685..fd5c2246b0 100644 --- a/protocol/daemons/types/health_checkable.go +++ b/protocol/daemons/types/health_checkable.go @@ -22,6 +22,8 @@ type HealthCheckable interface { ReportFailure(err error) // ReportSuccess records a successful update. ReportSuccess() + // ServiceName returns the name of the service being monitored. This name is expected to be unique. + ServiceName() string } // timestampWithError couples a timestamp and error to make it easier to update them in tandem. @@ -65,6 +67,9 @@ type timeBoundedHealthCheckable struct { // logger is the logger used to log errors. logger log.Logger + + // serviceName is the name of the service being monitored. This field is read-only and not synchronized. + serviceName string } // NewTimeBoundedHealthCheckable creates a new HealthCheckable instance. @@ -76,12 +81,18 @@ func NewTimeBoundedHealthCheckable( hc := &timeBoundedHealthCheckable{ timeProvider: timeProvider, logger: logger, + serviceName: serviceName, } // Initialize the timeBoudnedHealthCheckable to an unhealthy state by reporting an error. hc.ReportFailure(fmt.Errorf("%v is initializing", serviceName)) return hc } +// ServiceName returns the name of the service being monitored. +func (hc *timeBoundedHealthCheckable) ServiceName() string { + return hc.serviceName +} + // ReportSuccess records a successful update. This method is thread-safe. func (h *timeBoundedHealthCheckable) ReportSuccess() { h.Lock() diff --git a/protocol/docker-compose.yml b/protocol/docker-compose.yml index 203f802f66..c303801158 100644 --- a/protocol/docker-compose.yml +++ b/protocol/docker-compose.yml @@ -10,13 +10,15 @@ services: - --log_level # Note that only this validator has a log-level of `info`; other validators use `error` by default. # Change to `debug` for more verbose log-level. - - info + - info - --home - /dydxprotocol/chain/.alice - - --p2p.persistent_peers + - --p2p.persistent_peers - "17e5e45691f0d01449c84fd4ae87279578cdd7ec@dydxprotocold0:26656,b69182310be02559483e42c77b7b104352713166@dydxprotocold1:26656,47539956aaa8e624e0f1d926040e54908ad0eb44@dydxprotocold2:26656,5882428984d83b03d0c907c1f0af343534987052@dydxprotocold3:26656" - --bridge-daemon-eth-rpc-endpoint - "${ETH_RPC_ENDPOINT}" + - --max-daemon-unhealthy-seconds + - "4294967295" # Effectively disable the daemon monitor because bridge daemon is flaky in localnet. environment: # See https://docs.datadoghq.com/profiler/enabling/go/ for DD_ specific environment variables - DD_ENV=localnet_${USER} @@ -28,7 +30,7 @@ services: - "26657:26657" - "9090:9090" - "1317:1317" - + dydxprotocold1: image: local:dydxprotocol entrypoint: @@ -39,10 +41,12 @@ services: - error - --home - /dydxprotocol/chain/.bob - - --p2p.persistent_peers + - --p2p.persistent_peers - "17e5e45691f0d01449c84fd4ae87279578cdd7ec@dydxprotocold0:26656,b69182310be02559483e42c77b7b104352713166@dydxprotocold1:26656,47539956aaa8e624e0f1d926040e54908ad0eb44@dydxprotocold2:26656,5882428984d83b03d0c907c1f0af343534987052@dydxprotocold3:26656" - --bridge-daemon-eth-rpc-endpoint - "${ETH_RPC_ENDPOINT}" + - --max-daemon-unhealthy-seconds + - "4294967295" environment: # See https://docs.datadoghq.com/profiler/enabling/go/ for DD_ specific environment variables - DD_ENV=localnet_${USER} @@ -52,7 +56,7 @@ services: - ./localnet/dydxprotocol1:/dydxprotocol/chain/.bob/data ports: - "26658:26657" - + dydxprotocold2: image: local:dydxprotocol entrypoint: @@ -67,6 +71,8 @@ services: - "17e5e45691f0d01449c84fd4ae87279578cdd7ec@dydxprotocold0:26656,b69182310be02559483e42c77b7b104352713166@dydxprotocold1:26656,47539956aaa8e624e0f1d926040e54908ad0eb44@dydxprotocold2:26656,5882428984d83b03d0c907c1f0af343534987052@dydxprotocold3:26656" - --bridge-daemon-eth-rpc-endpoint - "${ETH_RPC_ENDPOINT}" + - --max-daemon-unhealthy-seconds + - "4294967295" environment: # See https://docs.datadoghq.com/profiler/enabling/go/ for DD_ specific environment variables - DD_ENV=localnet_${USER} @@ -74,7 +80,7 @@ services: - DAEMON_HOME=/dydxprotocol/chain/.carl volumes: - ./localnet/dydxprotocol2:/dydxprotocol/chain/.carl/data - + dydxprotocold3: image: local:dydxprotocol entrypoint: @@ -85,10 +91,12 @@ services: - error - --home - /dydxprotocol/chain/.dave - - --p2p.persistent_peers + - --p2p.persistent_peers - "17e5e45691f0d01449c84fd4ae87279578cdd7ec@dydxprotocold0:26656,b69182310be02559483e42c77b7b104352713166@dydxprotocold1:26656,47539956aaa8e624e0f1d926040e54908ad0eb44@dydxprotocold2:26656,5882428984d83b03d0c907c1f0af343534987052@dydxprotocold3:26656" - --bridge-daemon-eth-rpc-endpoint - "${ETH_RPC_ENDPOINT}" + - --max-daemon-unhealthy-seconds + - "4294967295" environment: # See https://docs.datadoghq.com/profiler/enabling/go/ for DD_ specific environment variables - DD_ENV=localnet_${USER} diff --git a/protocol/mocks/HealthCheckable.go b/protocol/mocks/HealthCheckable.go new file mode 100644 index 0000000000..4d69fd74b2 --- /dev/null +++ b/protocol/mocks/HealthCheckable.go @@ -0,0 +1,63 @@ +// Code generated by mockery v2.14.0. DO NOT EDIT. + +package mocks + +import mock "github.com/stretchr/testify/mock" + +// HealthCheckable is an autogenerated mock type for the HealthCheckable type +type HealthCheckable struct { + mock.Mock +} + +// HealthCheck provides a mock function with given fields: +func (_m *HealthCheckable) HealthCheck() error { + ret := _m.Called() + + var r0 error + if rf, ok := ret.Get(0).(func() error); ok { + r0 = rf() + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// ReportFailure provides a mock function with given fields: err +func (_m *HealthCheckable) ReportFailure(err error) { + _m.Called(err) +} + +// ReportSuccess provides a mock function with given fields: +func (_m *HealthCheckable) ReportSuccess() { + _m.Called() +} + +// ServiceName provides a mock function with given fields: +func (_m *HealthCheckable) ServiceName() string { + ret := _m.Called() + + var r0 string + if rf, ok := ret.Get(0).(func() string); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(string) + } + + return r0 +} + +type mockConstructorTestingTNewHealthCheckable interface { + mock.TestingT + Cleanup(func()) +} + +// NewHealthCheckable creates a new instance of HealthCheckable. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewHealthCheckable(t mockConstructorTestingTNewHealthCheckable) *HealthCheckable { + mock := &HealthCheckable{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/protocol/mocks/Makefile b/protocol/mocks/Makefile index f7c549814c..b47321d1bf 100644 --- a/protocol/mocks/Makefile +++ b/protocol/mocks/Makefile @@ -21,6 +21,7 @@ mock-gen: @go run github.com/vektra/mockery/v2 --name=AppOptions --dir=$(GOPATH)/pkg/mod/github.com/dydxprotocol/cosmos-sdk@$(COSMOS_VERSION)/server/types --recursive --output=./mocks @go run github.com/vektra/mockery/v2 --name=Logger --dir=$(GOPATH)/pkg/mod/github.com/dydxprotocol/cometbft@$(COMETBFT_VERSION)/libs/log --filename=logger.go --recursive --output=./mocks @go run github.com/vektra/mockery/v2 --name=MsgRouter --dir=./lib --recursive --output=./mocks + @go run github.com/vektra/mockery/v2 --name=HealthCheckable --dir=./daemons/types --recursive --output=./mocks @go run github.com/vektra/mockery/v2 --name=PrepareBridgeKeeper --dir=./app/prepare --recursive --output=./mocks @go run github.com/vektra/mockery/v2 --name=PrepareClobKeeper --dir=./app/prepare --recursive --output=./mocks @go run github.com/vektra/mockery/v2 --name=PreparePerpetualsKeeper --dir=./app/prepare --recursive --output=./mocks diff --git a/protocol/testutil/app/app.go b/protocol/testutil/app/app.go index c228da42d4..fa902d41bf 100644 --- a/protocol/testutil/app/app.go +++ b/protocol/testutil/app/app.go @@ -490,7 +490,7 @@ func (tApp *TestApp) initChainIfNeeded() { } if tApp.builder.usesDefaultAppConfig { - tApp.App.Server.DisableUpdateMonitoringForTesting() + tApp.App.DisableHealthMonitorForTesting() } if tApp.builder.enableNonDeterminismChecks { diff --git a/protocol/x/clob/client/cli/cancel_order_cli_test.go b/protocol/x/clob/client/cli/cancel_order_cli_test.go index 9f148f36ba..de7afeafe4 100644 --- a/protocol/x/clob/client/cli/cancel_order_cli_test.go +++ b/protocol/x/clob/client/cli/cancel_order_cli_test.go @@ -8,6 +8,7 @@ import ( appflags "github.com/dydxprotocol/v4-chain/protocol/app/flags" daemonflags "github.com/dydxprotocol/v4-chain/protocol/daemons/flags" "github.com/dydxprotocol/v4-chain/protocol/testutil/appoptions" + "math" "math/big" "testing" @@ -73,6 +74,13 @@ func (s *CancelOrderIntegrationTestSuite) SetupTest() { appOptions.Set(daemonflags.FlagPriceDaemonEnabled, false) appOptions.Set(daemonflags.FlagBridgeDaemonEnabled, false) + // Effectively disable the health monitor panic timeout for these tests. This is necessary + // because all clob cli tests are running in the same process and the total time to run is >> 5 minutes + // on CI, causing the panic to trigger for liquidations daemon go routines that haven't been properly + // cleaned up after a test run. + // TODO(CORE-29): Remove this once the liquidations daemon is refactored to be stoppable. + appOptions.Set(daemonflags.FlagMaxDaemonUnhealthySeconds, math.MaxUint32) + // Make sure the daemon is using the correct GRPC address. appOptions.Set(appflags.GrpcAddress, testval.AppConfig.GRPC.Address) }, diff --git a/protocol/x/clob/client/cli/liquidations_cli_test.go b/protocol/x/clob/client/cli/liquidations_cli_test.go index 01d02bf1a1..9c8223ae95 100644 --- a/protocol/x/clob/client/cli/liquidations_cli_test.go +++ b/protocol/x/clob/client/cli/liquidations_cli_test.go @@ -4,6 +4,7 @@ package cli_test import ( "fmt" + "math" appflags "github.com/dydxprotocol/v4-chain/protocol/app/flags" "math/big" @@ -74,6 +75,13 @@ func TestLiquidationOrderIntegrationTestSuite(t *testing.T) { appOptions.Set(daemonflags.FlagPriceDaemonEnabled, false) appOptions.Set(daemonflags.FlagBridgeDaemonEnabled, false) + // Effectively disable the health monitor panic timeout for these tests. This is necessary + // because all clob cli tests are running in the same process and the total time to run is >> 5 minutes + // on CI, causing the panic to trigger for liquidations daemon go routines that haven't been properly + // cleaned up after a test run. + // TODO(CORE-29): Remove this once the liquidations daemon is refactored to be stoppable. + appOptions.Set(daemonflags.FlagMaxDaemonUnhealthySeconds, math.MaxUint32) + // Make sure the daemon is using the correct GRPC address. appOptions.Set(appflags.GrpcAddress, testval.AppConfig.GRPC.Address) diff --git a/protocol/x/clob/client/cli/place_order_cli_test.go b/protocol/x/clob/client/cli/place_order_cli_test.go index bf6832efe8..5bbd5511fc 100644 --- a/protocol/x/clob/client/cli/place_order_cli_test.go +++ b/protocol/x/clob/client/cli/place_order_cli_test.go @@ -5,6 +5,7 @@ package cli_test import ( "fmt" appflags "github.com/dydxprotocol/v4-chain/protocol/app/flags" + "math" "math/big" "testing" @@ -68,6 +69,13 @@ func TestPlaceOrderIntegrationTestSuite(t *testing.T) { appOptions.Set(daemonflags.FlagPriceDaemonEnabled, false) appOptions.Set(daemonflags.FlagBridgeDaemonEnabled, false) + // Effectively disable the health monitor panic timeout for these tests. This is necessary + // because all clob cli tests are running in the same process and the total time to run is >> 5 minutes + // on CI, causing the panic to trigger for liquidations daemon go routines that haven't been properly + // cleaned up after a test run. + // TODO(CORE-29): Remove this once the liquidations daemon is refactored to be stoppable. + appOptions.Set(daemonflags.FlagMaxDaemonUnhealthySeconds, math.MaxUint32) + // Make sure the daemon is using the correct GRPC address. appOptions.Set(appflags.GrpcAddress, testval.AppConfig.GRPC.Address) },