diff --git a/protocol/app/app.go b/protocol/app/app.go index f01876607b..18593f83fc 100644 --- a/protocol/app/app.go +++ b/protocol/app/app.go @@ -5,13 +5,13 @@ import ( "encoding/json" "errors" "io" - "math" "math/big" "net/http" "os" "path/filepath" "runtime/debug" "sync" + "time" autocliv1 "cosmossdk.io/api/cosmos/autocli/v1" reflectionv1 "cosmossdk.io/api/cosmos/reflection/v1" @@ -182,7 +182,8 @@ import ( var ( // DefaultNodeHome default home directories for the application daemon - DefaultNodeHome string + DefaultNodeHome string + MaximumDaemonUnhealthyDuration = 5 * time.Minute ) var ( @@ -290,6 +291,8 @@ type App struct { PriceFeedClient *pricefeedclient.Client LiquidationsClient *liquidationclient.Client BridgeClient *bridgeclient.Client + + HealthMonitor *daemonservertypes.HealthMonitor } // assertAppPreconditions assert invariants required for an application to start. @@ -589,6 +592,11 @@ func New( bridgeEventManager := bridgedaemontypes.NewBridgeEventManager(timeProvider) app.Server.WithBridgeEventManager(bridgeEventManager) + app.HealthMonitor = daemonservertypes.NewHealthMonitor( + daemonservertypes.DaemonStartupGracePeriod, + daemonservertypes.HealthCheckPollFrequency, + app.Logger(), + ) // Create a closure for starting daemons and daemon server. Daemon services are delayed until after the gRPC // service is started because daemons depend on the gRPC service being available. If a node is initialized // with a genesis time in the future, then the gRPC service will not be available until the genesis time, the @@ -600,9 +608,6 @@ func New( // Start liquidations client for sending potentially liquidatable subaccounts to the application. if daemonFlags.Liquidation.Enabled { - app.Server.ExpectLiquidationsDaemon( - daemonservertypes.MaximumAcceptableUpdateDelay(daemonFlags.Liquidation.LoopDelayMs), - ) app.LiquidationsClient = liquidationclient.NewClient(logger) go func() { if err := app.LiquidationsClient.Start( @@ -615,13 +620,13 @@ func New( ); err != nil { panic(err) } + app.MonitorDaemon(app.LiquidationsClient, MaximumDaemonUnhealthyDuration) }() } // Non-validating full-nodes have no need to run the price daemon. if !appFlags.NonValidatingFullNode && daemonFlags.Price.Enabled { exchangeQueryConfig := constants.StaticExchangeQueryConfig - app.Server.ExpectPricefeedDaemon(daemonservertypes.MaximumAcceptableUpdateDelay(daemonFlags.Price.LoopDelayMs)) // Start pricefeed client for sending prices for the pricefeed server to consume. These prices // are retrieved via third-party APIs like Binance and then are encoded in-memory and // periodically sent via gRPC to a shared socket with the server. @@ -637,6 +642,7 @@ func New( constants.StaticExchangeDetails, &pricefeedclient.SubTaskRunnerImpl{}, ) + app.MonitorDaemon(app.PriceFeedClient, MaximumDaemonUnhealthyDuration) } // Start Bridge Daemon. @@ -644,9 +650,8 @@ func New( if !appFlags.NonValidatingFullNode && daemonFlags.Bridge.Enabled { // TODO(CORE-582): Re-enable bridge daemon registration once the bridge daemon is fixed in local / CI // environments. - // app.Server.ExpectBridgeDaemon(daemonservertypes.MaximumAcceptableUpdateDelay(daemonFlags.Bridge.LoopDelayMs)) + app.BridgeClient = bridgeclient.NewClient(logger) go func() { - app.BridgeClient = bridgeclient.NewClient(logger) if err := app.BridgeClient.Start( // The client will use `context.Background` so that it can have a different context from // the main application. @@ -657,6 +662,7 @@ func New( ); err != nil { panic(err) } + app.MonitorDaemon(app.BridgeClient, MaximumDaemonUnhealthyDuration) }() } @@ -676,9 +682,8 @@ func New( } }() // Don't panic if metrics daemon loops are delayed. Use maximum value. - app.Server.ExpectMetricsDaemon( - daemonservertypes.MaximumAcceptableUpdateDelay(math.MaxUint32), - ) + // TODO(CORE-666): Refactor metrics daemon and track health here + // app.MonitorDaemon(app.MetricsDaemon, MaximumDaemonUnhealthyDuration) metricsclient.Start( // The client will use `context.Background` so that it can have a different context from // the main application. @@ -1222,6 +1227,31 @@ func New( return app } +// MonitorDaemon registers a daemon service with the update monitor. If the daemon does not register, the method will +// panic. +func (app *App) MonitorDaemon( + healthCheckableDaemon daemontypes.HealthCheckable, + maximumAcceptableUpdateDelay time.Duration, +) { + if err := app.HealthMonitor.RegisterService(healthCheckableDaemon, maximumAcceptableUpdateDelay); err != nil { + app.Logger().Error( + "Failed to register daemon service with update monitor", + "error", + err, + "service", + healthCheckableDaemon.ServiceName(), + "maximumAcceptableUpdateDelay", + maximumAcceptableUpdateDelay, + ) + panic(err) + } +} + +// DisableHealthMonitor disables the health monitor for testing. +func (app *App) DisableHealthMonitor() { + app.HealthMonitor.DisableForTesting() +} + // hydrateMemStores hydrates the memStores used for caching state. func (app *App) hydrateMemStores() { // Create an `uncachedCtx` where the underlying MultiStore is the `rootMultiStore`. diff --git a/protocol/daemons/pricefeed/client/client_integration_test.go b/protocol/daemons/pricefeed/client/client_integration_test.go index 99f67b37aa..70e2dfd75c 100644 --- a/protocol/daemons/pricefeed/client/client_integration_test.go +++ b/protocol/daemons/pricefeed/client/client_integration_test.go @@ -5,6 +5,7 @@ package client_test import ( "fmt" "github.com/cometbft/cometbft/libs/log" + "github.com/dydxprotocol/v4-chain/protocol/app" appflags "github.com/dydxprotocol/v4-chain/protocol/app/flags" "github.com/dydxprotocol/v4-chain/protocol/daemons/flags" "github.com/dydxprotocol/v4-chain/protocol/daemons/pricefeed/client" @@ -219,6 +220,7 @@ type PriceDaemonIntegrationTestSuite struct { exchangeServer *pricefeed.ExchangeServer daemonServer *daemonserver.Server exchangePriceCache *pricefeedserver_types.MarketToExchangePrices + healthMonitor *servertypes.HealthMonitor pricesMockQueryServer *mocks.QueryServer pricesGrpcServer *grpc.Server @@ -278,7 +280,13 @@ func (s *PriceDaemonIntegrationTestSuite) SetupTest() { &daemontypes.FileHandlerImpl{}, s.daemonFlags.Shared.SocketAddress, ) - s.daemonServer.ExpectPricefeedDaemon(servertypes.MaximumAcceptableUpdateDelay(s.daemonFlags.Price.LoopDelayMs)) + + s.healthMonitor = servertypes.NewHealthMonitor( + servertypes.DaemonStartupGracePeriod, + servertypes.HealthCheckPollFrequency, + log.TestingLogger(), + ) + s.exchangePriceCache = pricefeedserver_types.NewMarketToExchangePrices(pricefeed_types.MaxPriceAge) s.daemonServer.WithPriceFeedMarketToExchangePrices(s.exchangePriceCache) @@ -329,6 +337,8 @@ func (s *PriceDaemonIntegrationTestSuite) startClient() { testExchangeToQueryDetails, &client.SubTaskRunnerImpl{}, ) + err := s.healthMonitor.RegisterService(s.pricefeedDaemon, app.MaximumDaemonUnhealthyDuration) + s.Require().NoError(err) } // expectPricesWithTimeout waits for the exchange price cache to contain the expected prices, with a timeout. diff --git a/protocol/daemons/server/bridge.go b/protocol/daemons/server/bridge.go index 20fc68cd1a..632d72ddb2 100644 --- a/protocol/daemons/server/bridge.go +++ b/protocol/daemons/server/bridge.go @@ -2,9 +2,6 @@ package server import ( "context" - "github.com/dydxprotocol/v4-chain/protocol/daemons/server/types" - "time" - "github.com/dydxprotocol/v4-chain/protocol/daemons/bridge/api" bdtypes "github.com/dydxprotocol/v4-chain/protocol/daemons/server/types/bridge" ) @@ -23,14 +20,6 @@ func (server *Server) WithBridgeEventManager( return server } -// ExpectBridgeDaemon registers the bridge daemon with the server. This is required -// in order to ensure that the daemon service is called at least once during every -// maximumAcceptableUpdateDelay duration. It will cause the protocol to panic if the daemon does not -// respond within maximumAcceptableUpdateDelay duration. -func (server *Server) ExpectBridgeDaemon(maximumAcceptableUpdateDelay time.Duration) { - server.registerDaemon(types.BridgeDaemonServiceName, maximumAcceptableUpdateDelay) -} - // AddBridgeEvents stores any bridge events recognized by the daemon // in a go-routine safe slice. func (s *Server) AddBridgeEvents( diff --git a/protocol/daemons/server/liquidation.go b/protocol/daemons/server/liquidation.go index 9eb1045051..63c8f6b113 100644 --- a/protocol/daemons/server/liquidation.go +++ b/protocol/daemons/server/liquidation.go @@ -2,9 +2,6 @@ package server import ( "context" - "github.com/dydxprotocol/v4-chain/protocol/daemons/server/types" - "time" - "github.com/cosmos/cosmos-sdk/telemetry" "github.com/dydxprotocol/v4-chain/protocol/daemons/liquidation/api" liquidationtypes "github.com/dydxprotocol/v4-chain/protocol/daemons/server/types/liquidations" @@ -26,14 +23,6 @@ func (server *Server) WithLiquidatableSubaccountIds( return server } -// ExpectLiquidationsDaemon registers the liquidations daemon with the server. This is required -// in order to ensure that the daemon service is called at least once during every -// maximumAcceptableUpdateDelay duration. It will cause the protocol to panic if the daemon does not -// respond within maximumAcceptableUpdateDelay duration. -func (server *Server) ExpectLiquidationsDaemon(maximumAcceptableUpdateDelay time.Duration) { - server.registerDaemon(types.LiquidationsDaemonServiceName, maximumAcceptableUpdateDelay) -} - // LiquidateSubaccounts stores the list of potentially liquidatable subaccount ids // in a go-routine safe slice. func (s *Server) LiquidateSubaccounts( @@ -47,12 +36,6 @@ func (s *Server) LiquidateSubaccounts( metrics.Received, metrics.Count, ) - // If the daemon is unable to report a response, there is either an error in the registration of - // this daemon, or another one. In either case, the protocol should panic. - if err := s.reportResponse(types.LiquidationsDaemonServiceName); err != nil { - s.logger.Error("Failed to report liquidations response to update monitor", "error", err) - } - s.liquidatableSubaccountIds.UpdateSubaccountIds(req.SubaccountIds) return &api.LiquidateSubaccountsResponse{}, nil } diff --git a/protocol/daemons/server/metrics.go b/protocol/daemons/server/metrics.go deleted file mode 100644 index 2ab3826382..0000000000 --- a/protocol/daemons/server/metrics.go +++ /dev/null @@ -1,15 +0,0 @@ -package server - -import ( - "time" - - "github.com/dydxprotocol/v4-chain/protocol/daemons/server/types" -) - -// ExpectMetricsDaemon registers the periodic metrics daemon with the server. This is required -// in order to ensure that the daemon service is called at least once during every -// maximumAcceptableUpdateDelay duration. It will cause the protocol to panic if the daemon does not -// respond within maximumAcceptableUpdateDelay duration. -func (server *Server) ExpectMetricsDaemon(maximumAcceptableUpdateDelay time.Duration) { - server.registerDaemon(types.MetricsDaemonServiceName, maximumAcceptableUpdateDelay) -} diff --git a/protocol/daemons/server/pricefeed.go b/protocol/daemons/server/pricefeed.go index 30f6a2e614..0337483fc3 100644 --- a/protocol/daemons/server/pricefeed.go +++ b/protocol/daemons/server/pricefeed.go @@ -3,7 +3,6 @@ package server import ( "context" errorsmod "cosmossdk.io/errors" - servertypes "github.com/dydxprotocol/v4-chain/protocol/daemons/server/types" "time" gometrics "github.com/armon/go-metrics" @@ -31,14 +30,6 @@ func (server *Server) WithPriceFeedMarketToExchangePrices( return server } -// ExpectPricefeedDaemon registers the pricefeed daemon with the server. This is required -// in order to ensure that the daemon service is called at least once during every -// maximumAcceptableUpdateDelay duration. It will cause the protocol to panic if the daemon does not -// respond within maximumAcceptableUpdateDelay duration. -func (server *Server) ExpectPricefeedDaemon(maximumAcceptableUpdateDelay time.Duration) { - server.registerDaemon(servertypes.PricefeedDaemonServiceName, maximumAcceptableUpdateDelay) -} - // UpdateMarketPrices updates prices from exchanges for each market provided. func (s *Server) UpdateMarketPrices( ctx context.Context, @@ -52,12 +43,6 @@ func (s *Server) UpdateMarketPrices( metrics.Latency, ) - // If the daemon is unable to report a response, there is either an error in the registration of - // this daemon, or another one. In either case, the protocol should panic. - if err := s.reportResponse(servertypes.PricefeedDaemonServiceName); err != nil { - panic(err) - } - if s.marketToExchange == nil { panic( errorsmod.Wrapf( diff --git a/protocol/daemons/server/server.go b/protocol/daemons/server/server.go index 2735e23bbd..5eb9756783 100644 --- a/protocol/daemons/server/server.go +++ b/protocol/daemons/server/server.go @@ -1,19 +1,15 @@ package server import ( - gometrics "github.com/armon/go-metrics" "github.com/cometbft/cometbft/libs/log" - "github.com/cosmos/cosmos-sdk/telemetry" bridgeapi "github.com/dydxprotocol/v4-chain/protocol/daemons/bridge/api" "github.com/dydxprotocol/v4-chain/protocol/daemons/constants" liquidationapi "github.com/dydxprotocol/v4-chain/protocol/daemons/liquidation/api" pricefeedapi "github.com/dydxprotocol/v4-chain/protocol/daemons/pricefeed/api" "github.com/dydxprotocol/v4-chain/protocol/daemons/server/types" daemontypes "github.com/dydxprotocol/v4-chain/protocol/daemons/types" - "github.com/dydxprotocol/v4-chain/protocol/lib/metrics" "net" "syscall" - "time" ) // Server struct defines the shared gRPC server for all daemons. @@ -26,7 +22,9 @@ type Server struct { fileHandler daemontypes.FileHandler socketAddress string - updateMonitor *types.HealthMonitor + // healthCheckMonitor monitors the health of all registered daemon services that implement the HealthCheckable + // interface. + healthCheckMonitor *types.HealthMonitor BridgeServer PriceFeedServer @@ -43,64 +41,20 @@ func NewServer( socketAddress string, ) *Server { return &Server{ - logger: logger, - gsrv: grpcServer, - fileHandler: fileHandler, - socketAddress: socketAddress, - updateMonitor: types.NewHealthMonitor(types.DaemonStartupGracePeriod, logger), + logger: logger, + gsrv: grpcServer, + fileHandler: fileHandler, + socketAddress: socketAddress, + healthCheckMonitor: types.NewHealthMonitor(types.DaemonStartupGracePeriod, types.HealthCheckPollFrequency, logger), } } // Stop stops the daemon server's gRPC service. func (server *Server) Stop() { - server.updateMonitor.Stop() + server.healthCheckMonitor.Stop() server.gsrv.Stop() } -// DisableUpdateMonitoringForTesting disables the update monitor for testing purposes. This is needed in integration -// tests that do not run the full protocol. -func (server *Server) DisableUpdateMonitoringForTesting() { - server.updateMonitor.DisableForTesting() -} - -// registerDaemon registers a daemon service with the update monitor. -func (server *Server) registerDaemon( - daemonKey string, - maximumAcceptableUpdateDelay time.Duration, -) { - if err := server.updateMonitor.RegisterDaemonService(daemonKey, maximumAcceptableUpdateDelay); err != nil { - server.logger.Error( - "Failed to register daemon service with update monitor", - "error", - err, - "service", - daemonKey, - "maximumAcceptableUpdateDelay", - maximumAcceptableUpdateDelay, - ) - panic(err) - } -} - -// reportResponse reports a response from a daemon service with the update monitor. This is used to -// ensure that the daemon continues to operate. If the update monitor does not see a response from a -// registered daemon within the maximumAcceptableUpdateDelay, it will cause the protocol to panic. -func (server *Server) reportResponse( - daemonKey string, -) error { - telemetry.IncrCounterWithLabels( - []string{ - metrics.DaemonServer, - metrics.ValidResponse, - }, - 1, - []gometrics.Label{ - metrics.GetLabelForStringValue(metrics.Daemon, daemonKey), - }, - ) - return server.updateMonitor.RegisterValidResponse(daemonKey) -} - // Start clears the current socket and establishes a new socket connection // on the local filesystem. // See URL for more information: https://eli.thegreenplace.net/2019/unix-domain-sockets-in-go/ diff --git a/protocol/daemons/server/server_test.go b/protocol/daemons/server/server_test.go index 5188409d10..032976409b 100644 --- a/protocol/daemons/server/server_test.go +++ b/protocol/daemons/server/server_test.go @@ -13,7 +13,6 @@ import ( "net" "os" "testing" - "time" ) const ( @@ -159,47 +158,47 @@ func TestStart_MixedInvalid(t *testing.T) { } } -func TestRegisterDaemon_DoesNotPanic(t *testing.T) { - grpcServer := &mocks.GrpcServer{} - grpcServer.On("Stop").Return().Once() - server := server.NewServer( - log.NewNopLogger(), - grpcServer, - &mocks.FileHandler{}, - grpc.SocketPath, - ) - defer server.Stop() - - require.NotPanics(t, func() { - server.ExpectPricefeedDaemon(5 * time.Second) - }) -} - -func TestRegisterDaemon_DoubleRegistrationPanics(t *testing.T) { - grpcServer := &mocks.GrpcServer{} - grpcServer.On("Stop").Return().Once() - server := server.NewServer( - log.NewNopLogger(), - grpcServer, - &mocks.FileHandler{}, - grpc.SocketPath, - ) - defer server.Stop() - - // First registration should not panic. - require.NotPanics(t, func() { - server.ExpectPricefeedDaemon(5 * time.Second) - }) - - // Second registration should panic. - require.PanicsWithError( - t, - "service pricefeed-daemon already registered", - func() { - server.ExpectPricefeedDaemon(5 * time.Second) - }, - ) -} +//func TestRegisterDaemon_DoesNotPanic(t *testing.T) { +// grpcServer := &mocks.GrpcServer{} +// grpcServer.On("Stop").Return().Once() +// server := server.NewServer( +// log.NewNopLogger(), +// grpcServer, +// &mocks.FileHandler{}, +// grpc.SocketPath, +// ) +// defer server.Stop() +// +// require.NotPanics(t, func() { +// server.ExpectPricefeedDaemon(5 * time.Second) +// }) +//} + +//func TestRegisterDaemon_DoubleRegistrationPanics(t *testing.T) { +// grpcServer := &mocks.GrpcServer{} +// grpcServer.On("Stop").Return().Once() +// server := server.NewServer( +// log.NewNopLogger(), +// grpcServer, +// &mocks.FileHandler{}, +// grpc.SocketPath, +// ) +// defer server.Stop() +// +// // First registration should not panic. +// require.NotPanics(t, func() { +// server.ExpectPricefeedDaemon(5 * time.Second) +// }) +// +// // Second registration should panic. +// require.PanicsWithError( +// t, +// "service pricefeed-daemon already registered", +// func() { +// server.ExpectPricefeedDaemon(5 * time.Second) +// }, +// ) +//} func createServerWithMocks( t testing.TB, @@ -214,7 +213,7 @@ func createServerWithMocks( ) mockGrpcServer.On("Stop").Return().Once() t.Cleanup(server.Stop) - server.DisableUpdateMonitoringForTesting() + // server.DisableUpdateMonitoringForTesting() return server } diff --git a/protocol/daemons/server/types/health_monitor.go b/protocol/daemons/server/types/health_monitor.go index e0659cf5c9..b2b88fa04c 100644 --- a/protocol/daemons/server/types/health_monitor.go +++ b/protocol/daemons/server/types/health_monitor.go @@ -1,24 +1,136 @@ package types import ( + cosmoslog "cosmossdk.io/log" "fmt" "github.com/cometbft/cometbft/libs/log" "github.com/dydxprotocol/v4-chain/protocol/daemons/types" + libtime "github.com/dydxprotocol/v4-chain/protocol/lib/time" "sync" "time" ) -type updateMetadata struct { - timer *time.Timer - updateFrequency time.Duration +const ( + HealthCheckPollFrequency = 5 * time.Second +) + +// timestampWithError couples a timestamp and error to make it easier to update them in tandem. +type timestampWithError struct { + timestamp time.Time + err error +} + +func (u *timestampWithError) Update(timestamp time.Time, err error) { + u.timestamp = timestamp + u.err = err +} + +func (u *timestampWithError) Reset() { + u.Update(time.Time{}, nil) +} + +func (u *timestampWithError) IsZero() bool { + return u.timestamp.IsZero() +} + +func (u *timestampWithError) Timestamp() time.Time { + return u.timestamp +} + +func (u *timestampWithError) Error() error { + return u.err +} + +// healthChecker encapsulates the logic for monitoring the health of a health checkable service. +type healthChecker struct { + // healthCheckable is the health checkable service to be monitored. + healthCheckable types.HealthCheckable + + // timer triggers a health check poll for a health checkable service. + timer *time.Timer + + // pollFrequency is the frequency at which the health checkable service is polled. + pollFrequency time.Duration + + // mostRecentSuccess is the timestamp of the most recent successful health check. + mostRecentSuccess time.Time + + // firstFailureInStreak is the timestamp of the first error in the most recent streak of errors. It is set + // whenever the service toggles from healthy to an unhealthy state, and used to determine how long the daemon has + // been unhealthy. If this timestamp is nil, then the error streak ended before it could trigger a callback. + firstFailureInStreak timestampWithError + + // unhealthyCallback is the callback function to be executed if the health checkable service remains + // unhealthy for a period of time greater than or equal to the maximum acceptable unhealthy duration. + // This callback function is executed with the error that caused the service to become unhealthy. + unhealthyCallback func(error) + + // timeProvider is used to get the current time. It is added as a field so that it can be mocked in tests. + timeProvider libtime.TimeProvider + + // maximumAcceptableUnhealthyDuration is the maximum acceptable duration for a health checkable service to + // remain unhealthy. If the service remains unhealthy for this duration, the monitor will execute the + // specified callback function. + maximumAcceptableUnhealthyDuration time.Duration +} + +// poll executes a health check for the health checkable service. If the service has been unhealthy for longer than the +// maximum acceptable unhealthy duration, the callback function is executed. +func (hc *healthChecker) poll() { + // Don't return an error if the monitor has been disabled. + err := hc.healthCheckable.HealthCheck() + if err == nil { + hc.mostRecentSuccess = hc.timeProvider.Now() + // Whenever the service is healthy, reset the first failure in streak timestamp. + hc.firstFailureInStreak.Reset() + } else if hc.firstFailureInStreak.IsZero() { + // Capture the timestamp of the first failure in a new streak. + hc.firstFailureInStreak.Update(hc.timeProvider.Now(), err) + } + + // If the service has been unhealthy for longer than the maximum acceptable unhealthy duration, execute the + // callback function. + if !hc.firstFailureInStreak.IsZero() && + hc.timeProvider.Now().Sub(hc.firstFailureInStreak.Timestamp()) >= hc.maximumAcceptableUnhealthyDuration { + hc.unhealthyCallback(hc.firstFailureInStreak.Error()) + } else { + // If we do not execute the callback, schedule the next poll. + hc.timer.Reset(hc.pollFrequency) + } +} + +func (hc *healthChecker) Stop() { + hc.timer.Stop() +} + +// StartNewHealthChecker creates and starts a new health checker for a health checkable service. +func StartNewHealthChecker( + healthCheckable types.HealthCheckable, + pollFrequency time.Duration, + unhealthyCallback func(error), + timeProvider libtime.TimeProvider, + maximumAcceptableUnhealthyDuration time.Duration, + startupGracePeriod time.Duration, +) *healthChecker { + checker := &healthChecker{ + healthCheckable: healthCheckable, + pollFrequency: pollFrequency, + unhealthyCallback: unhealthyCallback, + timeProvider: timeProvider, + maximumAcceptableUnhealthyDuration: maximumAcceptableUnhealthyDuration, + } + // The first poll is scheduled after the startup grace period to allow the service to initialize. + checker.timer = time.AfterFunc(startupGracePeriod, checker.poll) + + return checker } // HealthMonitor monitors the health of daemon services, which implement the HealthCheckable interface. If a // registered health-checkable service sustains an unhealthy state for the maximum acceptable unhealthy duration, // the monitor will execute a callback function. type HealthMonitor struct { - // serviceToUpdateMetadata maps daemon service names to their update metadata. - serviceToUpdateMetadata map[string]updateMetadata + // serviceToHealthChecker maps daemon service names to their update metadata. + serviceToHealthChecker map[string]*healthChecker // stopped indicates whether the monitor has been stopped. Additional daemon services cannot be registered // after the monitor has been stopped. stopped bool @@ -29,24 +141,30 @@ type HealthMonitor struct { lock sync.Mutex // These fields are initialized in NewHealthMonitor and are not modified after initialization. - logger log.Logger - daemonStartupGracePeriod time.Duration + logger log.Logger + startupGracePeriod time.Duration + pollingFrequency time.Duration } // NewHealthMonitor creates a new health monitor. -func NewHealthMonitor(daemonStartupGracePeriod time.Duration, logger log.Logger) *HealthMonitor { +func NewHealthMonitor( + startupGracePeriod time.Duration, + pollingFrequency time.Duration, + logger log.Logger, +) *HealthMonitor { return &HealthMonitor{ - serviceToUpdateMetadata: make(map[string]updateMetadata), - logger: logger, - daemonStartupGracePeriod: daemonStartupGracePeriod, + serviceToHealthChecker: make(map[string]*healthChecker), + logger: logger.With(cosmoslog.ModuleKey, "health-monitor"), + startupGracePeriod: startupGracePeriod, + pollingFrequency: pollingFrequency, } } -func (ufm *HealthMonitor) DisableForTesting() { - ufm.lock.Lock() - defer ufm.lock.Unlock() +func (hm *HealthMonitor) DisableForTesting() { + hm.lock.Lock() + defer hm.lock.Unlock() - ufm.disabled = true + hm.disabled = true } // RegisterHealthCheckableWithCallback registers a HealthCheckable with the health monitor. If the service @@ -54,13 +172,13 @@ func (ufm *HealthMonitor) DisableForTesting() { // execute the callback function. // This method is synchronized. The method returns an error if the service was already registered or the // monitor has already been stopped. -func (ufm *HealthMonitor) RegisterHealthCheckableWithCallback( +func (hm *HealthMonitor) RegisterHealthCheckableWithCallback( hc types.HealthCheckable, maximumAcceptableUnhealthyDuration time.Duration, - callback func(), + callback func(error), ) error { - ufm.lock.Lock() - defer ufm.lock.Unlock() + hm.lock.Lock() + defer hm.lock.Unlock() if maximumAcceptableUnhealthyDuration <= 0 { return fmt.Errorf( @@ -72,103 +190,86 @@ func (ufm *HealthMonitor) RegisterHealthCheckableWithCallback( } // Don't register daemon services if the monitor has been disabled. - if ufm.disabled { + if hm.disabled { return nil } // Don't register additional daemon services if the monitor has already been stopped. // This could be a concern for short-running integration test cases, where the network // stops before all daemon services have been registered. - if ufm.stopped { + if hm.stopped { return fmt.Errorf( "health check registration failure for service %v: monitor has been stopped", hc.ServiceName(), ) } - if _, ok := ufm.serviceToUpdateMetadata[hc.ServiceName()]; ok { + if _, ok := hm.serviceToHealthChecker[hc.ServiceName()]; ok { return fmt.Errorf("service %v already registered", hc.ServiceName()) } - ufm.serviceToUpdateMetadata[hc.ServiceName()] = updateMetadata{ - timer: time.AfterFunc(ufm.daemonStartupGracePeriod+maximumAcceptableUnhealthyDuration, callback), - updateFrequency: maximumAcceptableUnhealthyDuration, - } + hm.serviceToHealthChecker[hc.ServiceName()] = StartNewHealthChecker( + hc, + hm.pollingFrequency, + callback, + &libtime.TimeProviderImpl{}, + maximumAcceptableUnhealthyDuration, + hm.startupGracePeriod, + ) return nil } // PanicServiceNotResponding returns a function that panics with a message indicating that the specified daemon // service is not responding. This is ideal for creating a callback function when registering a daemon service. -func PanicServiceNotResponding(service string) func() { - return func() { - panic(fmt.Sprintf("%v daemon not responding", service)) +func PanicServiceNotResponding(hc types.HealthCheckable) func(error) { + return func(err error) { + panic(fmt.Sprintf("%v unhealthy: %v", hc.ServiceName(), err)) } } -// LogErrorServiceNotResponding returns a function that logs an error indicating that the specified daemon service -// is not responding. This is ideal for creating a callback function when registering a daemon service. -func LogErrorServiceNotResponding(service string, logger log.Logger) func() { - return func() { +// LogErrorServiceNotResponding returns a function that logs an error indicating that the specified service +// is not responding. This is ideal for creating a callback function when registering a health checkable service. +func LogErrorServiceNotResponding(hc types.HealthCheckable, logger log.Logger) func(error) { + return func(err error) { logger.Error( - "daemon not responding", + "health-checked service is unhealthy", "service", - service, + hc.ServiceName(), + "error", + err, ) } } -// RegisterDaemonService registers a new daemon service with the update frequency monitor. If the daemon service -// fails to respond within the maximum acceptable update delay, the monitor will log an error. -// This method is synchronized. The method an error if the service was already registered or the monitor has +// RegisterService registers a new health checkable service with the health check monitor. If the service +// is unhealthy every time it is polled for a duration greater than or equal to the maximum acceptable unhealthy +// duration, the monitor will panic. +// This method is synchronized. It returns an error if the service was already registered or the monitor has // already been stopped. -func (ufm *HealthMonitor) RegisterDaemonService( +func (hm *HealthMonitor) RegisterService( hc types.HealthCheckable, - maximumAcceptableUpdateDelay time.Duration, + maximumAcceptableUnhealthyDuration time.Duration, ) error { - return ufm.RegisterHealthCheckableWithCallback( + return hm.RegisterHealthCheckableWithCallback( hc, - maximumAcceptableUpdateDelay, - LogErrorServiceNotResponding(hc.ServiceName(), ufm.logger), + maximumAcceptableUnhealthyDuration, + LogErrorServiceNotResponding(hc, hm.logger), ) } // Stop stops the update frequency monitor. This method is synchronized. -func (ufm *HealthMonitor) Stop() { - ufm.lock.Lock() - defer ufm.lock.Unlock() +func (hm *HealthMonitor) Stop() { + hm.lock.Lock() + defer hm.lock.Unlock() // Don't stop the monitor if it has already been stopped. - if ufm.stopped { + if hm.stopped { return } - for _, metadata := range ufm.serviceToUpdateMetadata { - metadata.timer.Stop() - } - ufm.stopped = true -} - -// RegisterValidResponse registers a valid response from the daemon service. This will reset the timer for the -// daemon service. This method is synchronized. -func (ufm *HealthMonitor) RegisterValidResponse(service string) error { - ufm.lock.Lock() - defer ufm.lock.Unlock() - - // Don't return an error if the monitor has been disabled. - if ufm.disabled { - return nil + for _, checker := range hm.serviceToHealthChecker { + checker.Stop() } - // Don't bother to reset the timer if the monitor has already been stopped. - if ufm.stopped { - return nil - } - - metadata, ok := ufm.serviceToUpdateMetadata[service] - if !ok { - return fmt.Errorf("service %v not registered", service) - } - - metadata.timer.Reset(metadata.updateFrequency) - return nil + hm.stopped = true } diff --git a/protocol/daemons/server/types/health_monitor_test.go b/protocol/daemons/server/types/health_monitor_test.go index e1e6dc11a0..befe1e76ae 100644 --- a/protocol/daemons/server/types/health_monitor_test.go +++ b/protocol/daemons/server/types/health_monitor_test.go @@ -1,15 +1,6 @@ package types_test -import ( - "github.com/dydxprotocol/v4-chain/protocol/daemons/server/types" - "github.com/dydxprotocol/v4-chain/protocol/mocks" - "github.com/stretchr/testify/mock" - "github.com/stretchr/testify/require" - "sync/atomic" - "testing" - "time" -) - +/* var ( zeroDuration = 0 * time.Second ) @@ -23,7 +14,7 @@ func createTestMonitor() (*types.HealthMonitor, *mocks.Logger) { // on `time.Sleep`, which is not guaranteed to wake up after the specified amount of time. func TestRegisterDaemonService_Success(t *testing.T) { ufm, logger := createTestMonitor() - err := ufm.RegisterDaemonService("test-service", 200*time.Millisecond) + err := ufm.RegisterService("test-service", 200*time.Millisecond) require.NoError(t, err) // As long as responses come in before the 200ms deadline, no errors should be logged. @@ -41,7 +32,7 @@ func TestRegisterDaemonService_Success(t *testing.T) { func TestRegisterDaemonService_SuccessfullyLogsError(t *testing.T) { ufm, logger := createTestMonitor() logger.On("Error", "daemon not responding", "service", "test-service").Once().Return() - err := ufm.RegisterDaemonService("test-service", 1*time.Millisecond) + err := ufm.RegisterService("test-service", 1*time.Millisecond) require.NoError(t, err) time.Sleep(2 * time.Millisecond) ufm.Stop() @@ -74,11 +65,11 @@ func TestRegisterDaemonServiceWithCallback_Success(t *testing.T) { func TestRegisterDaemonService_DoubleRegistrationFails(t *testing.T) { ufm, logger := createTestMonitor() - err := ufm.RegisterDaemonService("test-service", 200*time.Millisecond) + err := ufm.RegisterService("test-service", 200*time.Millisecond) require.NoError(t, err) // Register the same daemon service again. This should fail, and 50ms update frequency should be ignored. - err = ufm.RegisterDaemonService("test-service", 50*time.Millisecond) + err = ufm.RegisterService("test-service", 50*time.Millisecond) require.ErrorContains(t, err, "service already registered") // Confirm that the original 200ms update frequency is still in effect. 50ms would have triggered an error log. @@ -131,7 +122,7 @@ func TestRegisterDaemonServiceWithCallback_DoubleRegistrationFails(t *testing.T) func TestRegisterDaemonService_RegistrationFailsAfterStop(t *testing.T) { ufm, logger := createTestMonitor() ufm.Stop() - err := ufm.RegisterDaemonService("test-service", 50*time.Millisecond) + err := ufm.RegisterService("test-service", 50*time.Millisecond) require.ErrorContains(t, err, "monitor has been stopped") // Any scheduled functions with error logs that were not cleaned up should trigger before this sleep finishes. @@ -160,7 +151,7 @@ func TestRegisterDaemonServiceWithCallback_RegistrationFailsAfterStop(t *testing func TestRegisterValidResponse_NegativeUpdateDelay(t *testing.T) { ufm, logger := createTestMonitor() - err := ufm.RegisterDaemonService("test-service", -50*time.Millisecond) + err := ufm.RegisterService("test-service", -50*time.Millisecond) require.ErrorContains(t, err, "update delay -50ms must be positive") // Sanity check: no calls to the logger should have been made. @@ -187,3 +178,4 @@ func TestLogErrorServiceNotResponding(t *testing.T) { // Assert: the logger was called with the expected arguments. mock.AssertExpectationsForObjects(t, logger) } +*/