From cd77cee3fda4259e1b90a69310246273b9c305e7 Mon Sep 17 00:00:00 2001 From: Crystal Lemire Date: Fri, 1 Dec 2023 17:51:26 -0800 Subject: [PATCH] PR comments. --- protocol/app/app.go | 12 ++-- protocol/app/app_test.go | 6 +- protocol/daemons/server/bridge.go | 12 ++-- protocol/daemons/server/liquidation.go | 11 ++- protocol/daemons/server/pricefeed.go | 12 ++-- .../daemons/server/types/health_checker.go | 71 ++++++++++++++++--- .../daemons/server/types/health_monitor.go | 5 +- 7 files changed, 86 insertions(+), 43 deletions(-) diff --git a/protocol/app/app.go b/protocol/app/app.go index ee947db019b..a13d66b9dc5 100644 --- a/protocol/app/app.go +++ b/protocol/app/app.go @@ -613,7 +613,7 @@ func New( if daemonFlags.Liquidation.Enabled { app.LiquidationsClient = liquidationclient.NewClient(logger) go func() { - app.MonitorDaemon(app.LiquidationsClient, MaximumDaemonUnhealthyDuration) + app.RegisterDaemonWithHealthMonitor(app.LiquidationsClient, MaximumDaemonUnhealthyDuration) if err := app.LiquidationsClient.Start( // The client will use `context.Background` so that it can have a different context from // the main application. @@ -645,7 +645,7 @@ func New( constants.StaticExchangeDetails, &pricefeedclient.SubTaskRunnerImpl{}, ) - app.MonitorDaemon(app.PriceFeedClient, MaximumDaemonUnhealthyDuration) + app.RegisterDaemonWithHealthMonitor(app.PriceFeedClient, MaximumDaemonUnhealthyDuration) } // Start Bridge Daemon. @@ -655,7 +655,7 @@ func New( // environments. app.BridgeClient = bridgeclient.NewClient(logger) go func() { - app.MonitorDaemon(app.BridgeClient, MaximumDaemonUnhealthyDuration) + app.RegisterDaemonWithHealthMonitor(app.BridgeClient, MaximumDaemonUnhealthyDuration) if err := app.BridgeClient.Start( // The client will use `context.Background` so that it can have a different context from // the main application. @@ -1230,9 +1230,9 @@ func New( return app } -// MonitorDaemon registers a daemon service with the update monitor. If the daemon does not register, the method will -// panic. -func (app *App) MonitorDaemon( +// RegisterDaemonWithHealthMonitor registers a daemon service with the update monitor, which will commence monitoring +// the health of the daemon. If the daemon does not register, the method will panic. +func (app *App) RegisterDaemonWithHealthMonitor( healthCheckableDaemon daemontypes.HealthCheckable, maximumAcceptableUpdateDelay time.Duration, ) { diff --git a/protocol/app/app_test.go b/protocol/app/app_test.go index 6503d9c4190..ded9ee8b8d8 100644 --- a/protocol/app/app_test.go +++ b/protocol/app/app_test.go @@ -225,17 +225,17 @@ func TestModuleBasics(t *testing.T) { require.Equal(t, expectedFieldTypes, actualFieldTypes, "Module basics does not match expected") } -func TestMonitorDaemon_Panics(t *testing.T) { +func TestRegisterDaemonWithHealthMonitor_Panics(t *testing.T) { app := testapp.DefaultTestApp(nil) hc := &mocks.HealthCheckable{} hc.On("ServiceName").Return("test-service") hc.On("HealthCheck").Return(nil) - app.MonitorDaemon(hc, 5*time.Minute) + app.RegisterDaemonWithHealthMonitor(hc, 5*time.Minute) // The second registration should fail, causing a panic. require.PanicsWithError( t, "service test-service already registered", - func() { app.MonitorDaemon(hc, 5*time.Minute) }, + func() { app.RegisterDaemonWithHealthMonitor(hc, 5*time.Minute) }, ) } diff --git a/protocol/daemons/server/bridge.go b/protocol/daemons/server/bridge.go index ad76af6066e..f794e0bb86a 100644 --- a/protocol/daemons/server/bridge.go +++ b/protocol/daemons/server/bridge.go @@ -30,16 +30,12 @@ func (s *Server) AddBridgeEvents( response *api.AddBridgeEventsResponse, err error, ) { - // Capture valid responses in metrics. - defer func() { - if err == nil { - s.reportValidResponse(types.PricefeedDaemonServiceName) - } - }() - - s.reportValidResponse(types.BridgeDaemonServiceName) if err = s.bridgeEventManager.AddBridgeEvents(req.BridgeEvents); err != nil { return nil, err } + + // Capture valid responses in metrics. + s.reportValidResponse(types.BridgeDaemonServiceName) + return &api.AddBridgeEventsResponse{}, nil } diff --git a/protocol/daemons/server/liquidation.go b/protocol/daemons/server/liquidation.go index f418b6c251f..4cb44acfa1c 100644 --- a/protocol/daemons/server/liquidation.go +++ b/protocol/daemons/server/liquidation.go @@ -33,13 +33,6 @@ func (s *Server) LiquidateSubaccounts( response *api.LiquidateSubaccountsResponse, err error, ) { - // Capture valid responses in metrics. - defer func() { - if err == nil { - s.reportValidResponse(types.PricefeedDaemonServiceName) - } - }() - telemetry.ModuleSetGauge( metrics.LiquidationDaemon, float32(len(req.SubaccountIds)), @@ -49,5 +42,9 @@ func (s *Server) LiquidateSubaccounts( ) s.liquidatableSubaccountIds.UpdateSubaccountIds(req.SubaccountIds) + + // Capture valid responses in metrics. + s.reportValidResponse(types.LiquidationsDaemonServiceName) + return &api.LiquidateSubaccountsResponse{}, nil } diff --git a/protocol/daemons/server/pricefeed.go b/protocol/daemons/server/pricefeed.go index b4ef56df4cf..d95af3dfb1a 100644 --- a/protocol/daemons/server/pricefeed.go +++ b/protocol/daemons/server/pricefeed.go @@ -39,13 +39,6 @@ func (s *Server) UpdateMarketPrices( response *api.UpdateMarketPricesResponse, err error, ) { - // Capture valid responses in metrics. - defer func() { - if err == nil { - s.reportValidResponse(types.PricefeedDaemonServiceName) - } - }() - // Measure latency in ingesting and handling gRPC price update. defer telemetry.ModuleMeasureSince( metrics.PricefeedServer, @@ -54,6 +47,8 @@ func (s *Server) UpdateMarketPrices( metrics.Latency, ) + // This panic is an unexpected condition because we initialize the market price cache in app initialization before + // starting the server or daemons. if s.marketToExchange == nil { panic( errorsmod.Wrapf( @@ -71,6 +66,9 @@ func (s *Server) UpdateMarketPrices( s.marketToExchange.UpdatePrices(req.MarketPriceUpdates) + // Capture valid responses in metrics. + s.reportValidResponse(types.PricefeedDaemonServiceName) + return &api.UpdateMarketPricesResponse{}, nil } diff --git a/protocol/daemons/server/types/health_checker.go b/protocol/daemons/server/types/health_checker.go index 2f42c95cdda..835b0a5ad05 100644 --- a/protocol/daemons/server/types/health_checker.go +++ b/protocol/daemons/server/types/health_checker.go @@ -17,27 +17,36 @@ type timestampWithError struct { err error } +// Update updates the timeStampWithError to reflect the current error. If the timestamp is zero, this is the first +// update, so set the timestamp. func (u *timestampWithError) Update(timestamp time.Time, err error) { // If the timestamp is zero, this is the first update, so set the timestamp. if u.timestamp.IsZero() { u.timestamp = timestamp } + u.err = err } +// Reset resets the timestampWithError to its zero value, indicating that the service is healthy. func (u *timestampWithError) Reset() { u.timestamp = time.Time{} u.err = nil } +// IsZero returns true if the timestampWithError is zero, indicating that the service is healthy. func (u *timestampWithError) IsZero() bool { return u.timestamp.IsZero() && u.err == nil } +// Timestamp returns the timestamp associated with the timestampWithError, which is the timestamp of the first error +// in the current error streak. func (u *timestampWithError) Timestamp() time.Time { return u.timestamp } +// Error returns the error associated with the timestampWithError, which is the most recent error in the current error +// streak. func (u *timestampWithError) Error() error { return u.err } @@ -48,9 +57,9 @@ type healthCheckerMutableState struct { // lock is used to synchronize access to mutable state fields. lock sync.Mutex - // mostRecentSuccess is the timestamp of the most recent successful health check. - // Access to mostRecentSuccess is synchronized. - mostRecentSuccess time.Time + // lastSuccessTimestamp is the timestamp of the most recent successful health check. + // Access to lastSuccessTimestamp is synchronized. + lastSuccessTimestamp time.Time // mostRecentFailureStreakError tracks the timestamp of the first error in the most recent streak of errors, as well // as the most recent error. It is updated on every error and reset every time the service sees a healthy response. @@ -58,6 +67,21 @@ type healthCheckerMutableState struct { // the service has never been unhealthy, or the most recent error streak ended before it could trigger a callback. // Access to mostRecentFailureStreakError is synchronized. mostRecentFailureStreakError timestampWithError + + // timer triggers a health check poll for a health-checkable service. + timer *time.Timer + + // stopped indicates whether the health checker has been stopped. Additional health checks cannot be scheduled + // after the health checker has been stopped. + stopped bool +} + +// newHealthCheckerMutableState creates a new health checker mutable state scheduled to trigger a poll after the +// initial poll delay. +func newHealthCheckerMutableState(initialPollDelay time.Duration, pollFunc func()) *healthCheckerMutableState { + return &healthCheckerMutableState{ + timer: time.AfterFunc(initialPollDelay, pollFunc), + } } // ReportSuccess updates the health checker's mutable state to reflect a successful health check and schedules the next @@ -66,7 +90,7 @@ func (u *healthCheckerMutableState) ReportSuccess(now time.Time) { u.lock.Lock() defer u.lock.Unlock() - u.mostRecentSuccess = now + u.lastSuccessTimestamp = now // Whenever the service is healthy, reset the first failure in streak timestamp. u.mostRecentFailureStreakError.Reset() @@ -83,6 +107,35 @@ func (u *healthCheckerMutableState) ReportFailure(now time.Time, err error) time return now.Sub(u.mostRecentFailureStreakError.Timestamp()) } +// SchedulePoll schedules the next poll for the health-checkable service. If the service is stopped, the next poll +// will not be scheduled. This method is synchronized. +func (u *healthCheckerMutableState) SchedulePoll(nextPollDelay time.Duration) { + u.lock.Lock() + defer u.lock.Unlock() + + // Don't schedule a poll if the health checker has been stopped. + if u.stopped { + return + } + + // Schedule the next poll. + u.timer.Reset(nextPollDelay) +} + +// Stop stops the health checker. This method is synchronized. +func (u *healthCheckerMutableState) Stop() { + u.lock.Lock() + defer u.lock.Unlock() + + // Don't stop the health checker if it has already been stopped. + if u.stopped { + return + } + + u.timer.Stop() + u.stopped = true +} + // healthChecker encapsulates the logic for monitoring the health of a health-checkable service. type healthChecker struct { // mutableState is the mutable state of the health checker. Access to these fields is synchronized. @@ -94,9 +147,6 @@ type healthChecker struct { // pollFrequency is the frequency at which the health-checkable service is polled. pollFrequency time.Duration - // timer triggers a health check poll for a health-checkable service. - timer *time.Timer - // maxAcceptableUnhealthyDuration is the maximum acceptable duration for a health-checkable service to // remain unhealthy. If the service remains unhealthy for this duration, the monitor will execute the // specified callback function. @@ -134,12 +184,12 @@ func (hc *healthChecker) Poll() { } // Schedule next poll. - hc.timer.Reset(hc.pollFrequency) + hc.mutableState.SchedulePoll(hc.pollFrequency) } // Stop stops the health checker. This method is not synchronized, as the timer does not need synchronization. func (hc *healthChecker) Stop() { - hc.timer.Stop() + hc.mutableState.Stop() } // StartNewHealthChecker creates and starts a new health checker for a health-checkable service. @@ -159,11 +209,10 @@ func StartNewHealthChecker( timeProvider: timeProvider, maxAcceptableUnhealthyDuration: maximumAcceptableUnhealthyDuration, logger: logger, - mutableState: &healthCheckerMutableState{}, } // The first poll is scheduled after the startup grace period to allow the service to initialize. - checker.timer = time.AfterFunc(startupGracePeriod, checker.Poll) + checker.mutableState = newHealthCheckerMutableState(startupGracePeriod, checker.Poll) return checker } diff --git a/protocol/daemons/server/types/health_monitor.go b/protocol/daemons/server/types/health_monitor.go index 1cff5f5b0af..130398d18d7 100644 --- a/protocol/daemons/server/types/health_monitor.go +++ b/protocol/daemons/server/types/health_monitor.go @@ -13,6 +13,9 @@ import ( const ( // HealthCheckPollFrequency is the frequency at which the health-checkable service is polled. HealthCheckPollFrequency = 5 * time.Second + + // HealthMonitorLogModuleName is the module name used for logging within the health monitor. + HealthMonitorLogModuleName = "daemon-health-monitor" ) // healthMonitorMutableState tracks all mutable state associated with the health monitor. This state is gathered into @@ -132,7 +135,7 @@ func NewHealthMonitor( ) *HealthMonitor { return &HealthMonitor{ mutableState: newHealthMonitorMutableState(), - logger: logger.With(cosmoslog.ModuleKey, "health-monitor"), + logger: logger.With(cosmoslog.ModuleKey, HealthMonitorLogModuleName), startupGracePeriod: startupGracePeriod, pollingFrequency: pollingFrequency, }