diff --git a/protocol/daemons/pricefeed/client/client.go b/protocol/daemons/pricefeed/client/client.go index 28a76f5103..96bcd4e30a 100644 --- a/protocol/daemons/pricefeed/client/client.go +++ b/protocol/daemons/pricefeed/client/client.go @@ -31,6 +31,9 @@ import ( // Note: price fetchers manage their own subtasks by blocking on their completion on every subtask run. // When the price fetcher is stopped, it will wait for all of its own subtasks to complete before returning. type Client struct { + // include HealthCheckable to track the health of the daemon. + daemontypes.HealthCheckable + // daemonStartup tracks whether the daemon has finished startup. The daemon // cannot be stopped until all persistent daemon subtasks have been launched within `Start`. daemonStartup sync.WaitGroup @@ -51,10 +54,17 @@ type Client struct { stopDaemon sync.Once } +// Ensure Client implements the HealthCheckable interface. +var _ daemontypes.HealthCheckable = (*Client)(nil) + func newClient() *Client { client := &Client{ tickers: []*time.Ticker{}, stops: []chan bool{}, + HealthCheckable: daemontypes.NewTimeBoundedHealthCheckable( + constants.PricefeedDaemonModuleName, + &libtime.TimeProviderImpl{}, + ), } // Set the client's daemonStartup state to indicate that the daemon has not finished starting up. @@ -66,7 +76,7 @@ func newClient() *Client { // for any subtask kicked off by the client. The ticker and channel are tracked in order to properly clean up and send // all needed stop signals when the daemon is stopped. // Note: this method is not synchronized. It is expected to be called from the client's `StartNewClient` method before -// `client.CompleteStartup`. +// the daemonStartup waitgroup signals. func (c *Client) newTickerWithStop(intervalMs int) (*time.Ticker, <-chan bool) { ticker := time.NewTicker(time.Duration(intervalMs) * time.Millisecond) c.tickers = append(c.tickers, ticker) @@ -249,6 +259,7 @@ func (c *Client) start(ctx context.Context, pricefeedClient := api.NewPriceFeedServiceClient(daemonConn) subTaskRunner.StartPriceUpdater( + c, ctx, priceUpdaterTicker, priceUpdaterStop, diff --git a/protocol/daemons/pricefeed/client/client_test.go b/protocol/daemons/pricefeed/client/client_test.go index 1d95d77fef..2ca7f46a72 100644 --- a/protocol/daemons/pricefeed/client/client_test.go +++ b/protocol/daemons/pricefeed/client/client_test.go @@ -12,6 +12,7 @@ import ( pricefeed_types "github.com/dydxprotocol/v4-chain/protocol/daemons/server/types/pricefeed" daemontypes "github.com/dydxprotocol/v4-chain/protocol/daemons/types" "github.com/dydxprotocol/v4-chain/protocol/testutil/appoptions" + daemontestutils "github.com/dydxprotocol/v4-chain/protocol/testutil/daemons" grpc_util "github.com/dydxprotocol/v4-chain/protocol/testutil/grpc" pricetypes "github.com/dydxprotocol/v4-chain/protocol/x/prices/types" "google.golang.org/grpc" @@ -51,6 +52,7 @@ type FakeSubTaskRunner struct { // StartPriceUpdater replaces `client.StartPriceUpdater` and advances `UpdaterCallCount` by one. func (f *FakeSubTaskRunner) StartPriceUpdater( + c *Client, ctx context.Context, ticker *time.Ticker, stop <-chan bool, @@ -247,6 +249,14 @@ func TestStart_InvalidConfig(t *testing.T) { &faketaskRunner, ) + // Expect daemon is not healthy on startup. Daemon becomes healthy after the first successful market + // update. + require.ErrorContains( + t, + client.HealthCheck(), + "no successful update has occurred", + ) + if tc.expectedError == nil { require.NoError(t, err) } else { @@ -619,6 +629,7 @@ func TestPriceUpdater_Mixed(t *testing.T) { }, "No exchange market prices, does not call `UpdateMarketPrices`": { exchangeAndMarketPrices: []*client.ExchangeIdMarketPriceTimestamp{}, + priceUpdateError: types.ErrEmptyMarketPriceUpdate, }, "One market for one exchange": { exchangeAndMarketPrices: []*client.ExchangeIdMarketPriceTimestamp{ @@ -721,6 +732,66 @@ func TestPriceUpdater_Mixed(t *testing.T) { } } +func TestHealthCheck_Mixed(t *testing.T) { + tests := map[string]struct { + updateMarketPricesError error + expectedError error + }{ + "No error - daemon healthy": { + updateMarketPricesError: nil, + expectedError: nil, + }, + "Error - daemon unhealthy": { + updateMarketPricesError: fmt.Errorf("failed to update market prices"), + expectedError: fmt.Errorf( + "failed to run price updater task loop for price daemon: failed to update market prices", + ), + }, + } + for name, tc := range tests { + t.Run(name, func(t *testing.T) { + // Setup. + // Create `ExchangeIdMarketPriceTimestamp` and populate it with market-price updates. + etmp, err := types.NewExchangeToMarketPrices([]types.ExchangeId{constants.ExchangeId1}) + require.NoError(t, err) + etmp.UpdatePrice(constants.ExchangeId1, constants.Market9_TimeT_Price1) + + // Create a mock `PriceFeedServiceClient`. + mockPriceFeedClient := generateMockQueryClient() + + // Mock the `UpdateMarketPrices` call to return an error if specified. + mockPriceFeedClient.On("UpdateMarketPrices", grpc_util.Ctx, mock.Anything). + Return(nil, tc.updateMarketPricesError).Once() + + ticker, stop := daemontestutils.SingleTickTickerAndStop() + client := newClient() + + // Act. + // Run the price updater for a single tick. Expect the daemon to toggle health state based on + // `UpdateMarketPrices` error response. + subTaskRunnerImpl.StartPriceUpdater( + client, + grpc_util.Ctx, + ticker, + stop, + etmp, + mockPriceFeedClient, + log.NewNopLogger(), + ) + + // Assert. + if tc.expectedError == nil { + require.NoError(t, client.HealthCheck()) + } else { + require.ErrorContains(t, client.HealthCheck(), tc.expectedError.Error()) + } + + // Cleanup. + close(stop) + }) + } +} + // TestMarketUpdater_Mixed tests the `RunMarketParamUpdaterTaskLoop` function invokes the grpc // query to the prices query client and that if the query succeeds, the config is updated. func TestMarketUpdater_Mixed(t *testing.T) { diff --git a/protocol/daemons/pricefeed/client/sub_task_runner.go b/protocol/daemons/pricefeed/client/sub_task_runner.go index b137bfcdc3..7d8d3078e0 100644 --- a/protocol/daemons/pricefeed/client/sub_task_runner.go +++ b/protocol/daemons/pricefeed/client/sub_task_runner.go @@ -2,6 +2,7 @@ package client import ( "context" + "cosmossdk.io/errors" "github.com/dydxprotocol/v4-chain/protocol/daemons/pricefeed/client/constants" "github.com/dydxprotocol/v4-chain/protocol/daemons/pricefeed/client/price_encoder" "github.com/dydxprotocol/v4-chain/protocol/daemons/pricefeed/client/price_fetcher" @@ -35,6 +36,7 @@ var _ SubTaskRunner = (*SubTaskRunnerImpl)(nil) // SubTaskRunner is the interface for running pricefeed client task functions. type SubTaskRunner interface { StartPriceUpdater( + c *Client, ctx context.Context, ticker *time.Ticker, stop <-chan bool, @@ -76,6 +78,7 @@ type SubTaskRunner interface { // StartPriceUpdater runs in the daemon's main goroutine and does not need access to the daemon's wait group // to signal task completion. func (s *SubTaskRunnerImpl) StartPriceUpdater( + c *Client, ctx context.Context, ticker *time.Ticker, stop <-chan bool, @@ -87,8 +90,14 @@ func (s *SubTaskRunnerImpl) StartPriceUpdater( select { case <-ticker.C: err := RunPriceUpdaterTaskLoop(ctx, exchangeToMarketPrices, priceFeedServiceClient, logger) - if err != nil { - panic(err) + + if err == nil { + // Record update success for the daemon health check. + c.ReportSuccess() + } else { + logger.Error("Failed to run price updater task loop for price daemon", constants.ErrorLogKey, err) + // Record update failure for the daemon health check. + c.ReportFailure(errors.Wrap(err, "failed to run price updater task loop for price daemon")) } case <-stop: @@ -265,8 +274,10 @@ func RunPriceUpdaterTaskLoop( metrics.Latency, ) - // On startup the length of request will likely be 0. However, sending a request of length 0 - // is a fatal error. + // On startup the length of request will likely be 0. Even so, we return an error here because this + // is unexpected behavior once the daemon reaches a steady state. The daemon health check process should + // be robust enough to ignore temporarily unhealthy daemons. + // Sending a request of length 0, however, causes a panic. // panic: rpc error: code = Unknown desc = Market price update has length of 0. if len(request.MarketPriceUpdates) > 0 { _, err := priceFeedServiceClient.UpdateMarketPrices(ctx, request) @@ -291,6 +302,7 @@ func RunPriceUpdaterTaskLoop( metrics.PriceUpdaterZeroPrices, metrics.Count, ) + return types.ErrEmptyMarketPriceUpdate } return nil diff --git a/protocol/daemons/pricefeed/client/types/errors.go b/protocol/daemons/pricefeed/client/types/errors.go new file mode 100644 index 0000000000..c03ce34d6e --- /dev/null +++ b/protocol/daemons/pricefeed/client/types/errors.go @@ -0,0 +1,9 @@ +package types + +import ( + "errors" +) + +var ( + ErrEmptyMarketPriceUpdate = errors.New("Market price update has length of 0") +) diff --git a/protocol/daemons/types/health_checkable.go b/protocol/daemons/types/health_checkable.go new file mode 100644 index 0000000000..188f1d30af --- /dev/null +++ b/protocol/daemons/types/health_checkable.go @@ -0,0 +1,132 @@ +package types + +import ( + "fmt" + libtime "github.com/dydxprotocol/v4-chain/protocol/lib/time" + "sync" + "time" +) + +const ( + MaxAcceptableUpdateDelay = 5 * time.Minute +) + +// HealthCheckable is a common interface for services that can be health checked. +// +// Instances of this type are thread-safe. +type HealthCheckable interface { + // HealthCheck returns an error if a service is unhealthy. If the service is healthy, this method returns nil. + HealthCheck() (err error) + // ReportFailure records a failed update. + ReportFailure(err error) + // ReportSuccess records a successful update. + ReportSuccess() +} + +// timestampWithError couples a timestamp and error to make it easier to update them in tandem. +type timestampWithError struct { + timestamp time.Time + err error +} + +func (u *timestampWithError) Update(timestamp time.Time, err error) { + u.timestamp = timestamp + u.err = err +} + +func (u *timestampWithError) Timestamp() time.Time { + return u.timestamp +} + +func (u *timestampWithError) Error() error { + return u.err +} + +// timeBoundedHealthCheckable implements the HealthCheckable interface by tracking the timestamps of the last successful +// and failed updates. +// If any of the following occurs, then the service should be considered unhealthy: +// - no update has occurred +// - the most recent update failed, or +// - the daemon has not seen a successful update within `MaxAcceptableUpdateDelay`. +// +// This object is thread-safe. +type timeBoundedHealthCheckable struct { + sync.Mutex + + // lastSuccessfulUpdate is the timestamp of the last successful update. + lastSuccessfulUpdate time.Time + // lastFailedUpdate is the timestamp, error pair of the last failed update. + lastFailedUpdate timestampWithError + + // timeProvider is the time provider used to determine the current time. This is used for timestamping + // creation and checking for update staleness during HealthCheck. + timeProvider libtime.TimeProvider +} + +// NewTimeBoundedHealthCheckable creates a new HealthCheckable instance. +func NewTimeBoundedHealthCheckable(serviceName string, timeProvider libtime.TimeProvider) HealthCheckable { + hc := &timeBoundedHealthCheckable{ + timeProvider: timeProvider, + } + // Initialize the timeBoudnedHealthCheckable to an unhealthy state by reporting an error. + hc.ReportFailure(fmt.Errorf("%v is initializing", serviceName)) + return hc +} + +// ReportSuccess records a successful update. This method is thread-safe. +func (h *timeBoundedHealthCheckable) ReportSuccess() { + h.Lock() + defer h.Unlock() + + h.lastSuccessfulUpdate = h.timeProvider.Now() +} + +// ReportFailure records a failed update. This method is thread-safe. +func (h *timeBoundedHealthCheckable) ReportFailure(err error) { + h.Lock() + defer h.Unlock() + h.lastFailedUpdate.Update(h.timeProvider.Now(), err) +} + +// HealthCheck returns an error if a service is unhealthy. +// The service is unhealthy if any of the following are true: +// - no successful update has occurred +// - the most recent update failed, or +// - the daemon has not seen a successful update in at least 5 minutes, +// Note: since the timeBoundedHealthCheckable is not exposed and can only be created via +// NewTimeBoundedHealthCheckable, we expect that the lastFailedUpdate is never a zero value. +// This method is thread-safe. +func (h *timeBoundedHealthCheckable) HealthCheck() error { + h.Lock() + defer h.Unlock() + + if h.lastSuccessfulUpdate.IsZero() { + return fmt.Errorf( + "no successful update has occurred; last failed update occurred at %v with error '%w'", + h.lastFailedUpdate.Timestamp(), + h.lastFailedUpdate.Error(), + ) + } + + if h.lastFailedUpdate.Timestamp().After(h.lastSuccessfulUpdate) { + return fmt.Errorf( + "last update failed at %v with error: '%w', most recent successful update occurred at %v", + h.lastFailedUpdate.Timestamp(), + h.lastFailedUpdate.Error(), + h.lastSuccessfulUpdate, + ) + } + + // If the last successful update was more than 5 minutes ago, report the specific error. + if h.timeProvider.Now().Sub(h.lastSuccessfulUpdate) > MaxAcceptableUpdateDelay { + return fmt.Errorf( + "last successful update occurred at %v, which is more than %v ago. Last failure occurred at %v with error '%w'", + h.lastSuccessfulUpdate, + MaxAcceptableUpdateDelay, + h.lastFailedUpdate.Timestamp(), + h.lastFailedUpdate.Error(), + ) + } + + return nil +} diff --git a/protocol/daemons/types/health_checkable_test.go b/protocol/daemons/types/health_checkable_test.go new file mode 100644 index 0000000000..4ec5ca9949 --- /dev/null +++ b/protocol/daemons/types/health_checkable_test.go @@ -0,0 +1,163 @@ +package types_test + +import ( + "fmt" + "github.com/dydxprotocol/v4-chain/protocol/daemons/types" + libtime "github.com/dydxprotocol/v4-chain/protocol/lib/time" + "github.com/dydxprotocol/v4-chain/protocol/mocks" + "github.com/stretchr/testify/require" + "testing" + "time" +) + +var ( + Time0 = time.Unix(0, 0) + Time1 = Time0.Add(time.Second) + Time2 = Time0.Add(2 * time.Second) + Time3 = Time0.Add(3 * time.Second) + Time4 = Time0.Add(4 * time.Second) + Time_5Minutes_And_2Seconds = Time0.Add(5*time.Minute + 2*time.Second) + + TestError = fmt.Errorf("test error") + InitializingStatus = fmt.Errorf("test is initializing") +) + +// mockTimeProviderWithTimestamps returns a TimeProvider that returns the given timestamps in order. +func mockTimeProviderWithTimestamps(times []time.Time) libtime.TimeProvider { + m := mocks.TimeProvider{} + for _, timestamp := range times { + m.On("Now").Return(timestamp).Once() + } + return &m +} + +func TestHealthCheckableImpl_Mixed(t *testing.T) { + tests := map[string]struct { + updates []struct { + timestamp time.Time + // leave error nil for a successful update + err error + } + healthCheckTime time.Time + expectedHealthStatus error + }{ + "unhealthy: no updates, returns initializing error": { + healthCheckTime: Time1, + expectedHealthStatus: fmt.Errorf( + "no successful update has occurred; last failed update occurred at %v with error '%w'", + Time0, + InitializingStatus, + ), + }, + "unhealthy: no successful updates": { + updates: []struct { + timestamp time.Time + err error + }{ + {Time1, TestError}, // failed update + }, + healthCheckTime: Time2, + expectedHealthStatus: fmt.Errorf( + "no successful update has occurred; last failed update occurred at %v with error '%w'", + Time1, + TestError, + ), + }, + "healthy: one recent successful update": { + updates: []struct { + timestamp time.Time + err error + }{ + {Time1, nil}, // successful update + }, + healthCheckTime: Time2, + expectedHealthStatus: nil, // expect healthy + }, + "unhealthy: one recent successful update, followed by a failed update": { + updates: []struct { + timestamp time.Time + err error + }{ + {Time1, nil}, // successful update + {Time2, TestError}, // failed update + }, + healthCheckTime: Time3, + expectedHealthStatus: fmt.Errorf( + "last update failed at %v with error: '%w', most recent successful update occurred at %v", + Time2, + TestError, + Time1, + ), + }, + "healthy: one recent failed update followed by a successful update": { + updates: []struct { + timestamp time.Time + err error + }{ + {Time1, TestError}, // failed update + {Time2, nil}, // successful update + }, + healthCheckTime: Time3, + expectedHealthStatus: nil, // expect healthy + }, + "unhealthy: last successful update was more than max delay": { + updates: []struct { + timestamp time.Time + err error + }{ + {Time1, nil}, // successful update + }, + healthCheckTime: Time_5Minutes_And_2Seconds, + expectedHealthStatus: fmt.Errorf( + "last successful update occurred at %v, which is more than %v ago. "+ + "Last failure occurred at %v with error '%w'", + Time1, + types.MaxAcceptableUpdateDelay, + Time0, + InitializingStatus, + ), + }, + } + for name, tc := range tests { + t.Run(name, func(t *testing.T) { + // Setup. + + // Construct list of timestamps to provide for the timeProvider stored within the health checkable instance. + timestamps := make([]time.Time, 0, len(tc.updates)+2) + + // The first timestamp is used during HealthCheckable initialization. + timestamps = append(timestamps, Time0) + // One timestamp used for each update. + for _, update := range tc.updates { + timestamps = append(timestamps, update.timestamp) + } + // A final timestamp is consumed by the HealthCheck call. + timestamps = append(timestamps, tc.healthCheckTime) + + // Create a new time-bounded health checkable instance. + hci := types.NewTimeBoundedHealthCheckable( + "test", + mockTimeProviderWithTimestamps(timestamps), + ) + + // Act. + // Report the test sequence of successful / failed updates. + for _, update := range tc.updates { + if update.err == nil { + hci.ReportSuccess() + } else { + hci.ReportFailure(update.err) + } + } + + // Assert. + // Check the health status after all updates have been reported. + err := hci.HealthCheck() + if tc.expectedHealthStatus == nil { + require.NoError(t, err) + } else { + require.ErrorContains(t, err, tc.expectedHealthStatus.Error()) + } + }) + } +} diff --git a/protocol/testutil/daemons/common.go b/protocol/testutil/daemons/common.go new file mode 100644 index 0000000000..2039b4e2f7 --- /dev/null +++ b/protocol/testutil/daemons/common.go @@ -0,0 +1,31 @@ +package daemons + +import "time" + +// SingleTickTickerAndStop creates a ticker that ticks once before the stop channel is signaled. +func SingleTickTickerAndStop() (*time.Ticker, chan bool) { + // Create a ticker with a duration long enough that we do not expect to see a tick within the timeframe + // of a normal unit test. + ticker := time.NewTicker(10 * time.Minute) + // Override the ticker's channel with a new channel we can insert into directly, and add a single tick. + newChan := make(chan time.Time, 1) + newChan <- time.Now() + ticker.C = newChan + + stop := make(chan bool, 1) + + // Start a go-routine that will signal the stop channel once the single tick is consumed. + go func() { + for { + // Once the single tick is consumed, stop the ticker and signal the stop channel. + if len(ticker.C) == 0 { + stop <- true + ticker.Stop() + return + } + time.Sleep(10 * time.Millisecond) + } + }() + + return ticker, stop +}