Skip to content

Commit

Permalink
[CORE-666] - Robust health check (#715)
Browse files Browse the repository at this point in the history
  • Loading branch information
clemire authored Nov 6, 2023
1 parent 32f713f commit cc9317d
Show file tree
Hide file tree
Showing 7 changed files with 434 additions and 5 deletions.
13 changes: 12 additions & 1 deletion protocol/daemons/pricefeed/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ import (
// Note: price fetchers manage their own subtasks by blocking on their completion on every subtask run.
// When the price fetcher is stopped, it will wait for all of its own subtasks to complete before returning.
type Client struct {
// include HealthCheckable to track the health of the daemon.
daemontypes.HealthCheckable

// daemonStartup tracks whether the daemon has finished startup. The daemon
// cannot be stopped until all persistent daemon subtasks have been launched within `Start`.
daemonStartup sync.WaitGroup
Expand All @@ -51,10 +54,17 @@ type Client struct {
stopDaemon sync.Once
}

// Ensure Client implements the HealthCheckable interface.
var _ daemontypes.HealthCheckable = (*Client)(nil)

func newClient() *Client {
client := &Client{
tickers: []*time.Ticker{},
stops: []chan bool{},
HealthCheckable: daemontypes.NewTimeBoundedHealthCheckable(
constants.PricefeedDaemonModuleName,
&libtime.TimeProviderImpl{},
),
}

// Set the client's daemonStartup state to indicate that the daemon has not finished starting up.
Expand All @@ -66,7 +76,7 @@ func newClient() *Client {
// for any subtask kicked off by the client. The ticker and channel are tracked in order to properly clean up and send
// all needed stop signals when the daemon is stopped.
// Note: this method is not synchronized. It is expected to be called from the client's `StartNewClient` method before
// `client.CompleteStartup`.
// the daemonStartup waitgroup signals.
func (c *Client) newTickerWithStop(intervalMs int) (*time.Ticker, <-chan bool) {
ticker := time.NewTicker(time.Duration(intervalMs) * time.Millisecond)
c.tickers = append(c.tickers, ticker)
Expand Down Expand Up @@ -249,6 +259,7 @@ func (c *Client) start(ctx context.Context,

pricefeedClient := api.NewPriceFeedServiceClient(daemonConn)
subTaskRunner.StartPriceUpdater(
c,
ctx,
priceUpdaterTicker,
priceUpdaterStop,
Expand Down
71 changes: 71 additions & 0 deletions protocol/daemons/pricefeed/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
pricefeed_types "github.com/dydxprotocol/v4-chain/protocol/daemons/server/types/pricefeed"
daemontypes "github.com/dydxprotocol/v4-chain/protocol/daemons/types"
"github.com/dydxprotocol/v4-chain/protocol/testutil/appoptions"
daemontestutils "github.com/dydxprotocol/v4-chain/protocol/testutil/daemons"
grpc_util "github.com/dydxprotocol/v4-chain/protocol/testutil/grpc"
pricetypes "github.com/dydxprotocol/v4-chain/protocol/x/prices/types"
"google.golang.org/grpc"
Expand Down Expand Up @@ -51,6 +52,7 @@ type FakeSubTaskRunner struct {

// StartPriceUpdater replaces `client.StartPriceUpdater` and advances `UpdaterCallCount` by one.
func (f *FakeSubTaskRunner) StartPriceUpdater(
c *Client,
ctx context.Context,
ticker *time.Ticker,
stop <-chan bool,
Expand Down Expand Up @@ -247,6 +249,14 @@ func TestStart_InvalidConfig(t *testing.T) {
&faketaskRunner,
)

// Expect daemon is not healthy on startup. Daemon becomes healthy after the first successful market
// update.
require.ErrorContains(
t,
client.HealthCheck(),
"no successful update has occurred",
)

if tc.expectedError == nil {
require.NoError(t, err)
} else {
Expand Down Expand Up @@ -619,6 +629,7 @@ func TestPriceUpdater_Mixed(t *testing.T) {
},
"No exchange market prices, does not call `UpdateMarketPrices`": {
exchangeAndMarketPrices: []*client.ExchangeIdMarketPriceTimestamp{},
priceUpdateError: types.ErrEmptyMarketPriceUpdate,
},
"One market for one exchange": {
exchangeAndMarketPrices: []*client.ExchangeIdMarketPriceTimestamp{
Expand Down Expand Up @@ -721,6 +732,66 @@ func TestPriceUpdater_Mixed(t *testing.T) {
}
}

func TestHealthCheck_Mixed(t *testing.T) {
tests := map[string]struct {
updateMarketPricesError error
expectedError error
}{
"No error - daemon healthy": {
updateMarketPricesError: nil,
expectedError: nil,
},
"Error - daemon unhealthy": {
updateMarketPricesError: fmt.Errorf("failed to update market prices"),
expectedError: fmt.Errorf(
"failed to run price updater task loop for price daemon: failed to update market prices",
),
},
}
for name, tc := range tests {
t.Run(name, func(t *testing.T) {
// Setup.
// Create `ExchangeIdMarketPriceTimestamp` and populate it with market-price updates.
etmp, err := types.NewExchangeToMarketPrices([]types.ExchangeId{constants.ExchangeId1})
require.NoError(t, err)
etmp.UpdatePrice(constants.ExchangeId1, constants.Market9_TimeT_Price1)

// Create a mock `PriceFeedServiceClient`.
mockPriceFeedClient := generateMockQueryClient()

// Mock the `UpdateMarketPrices` call to return an error if specified.
mockPriceFeedClient.On("UpdateMarketPrices", grpc_util.Ctx, mock.Anything).
Return(nil, tc.updateMarketPricesError).Once()

ticker, stop := daemontestutils.SingleTickTickerAndStop()
client := newClient()

// Act.
// Run the price updater for a single tick. Expect the daemon to toggle health state based on
// `UpdateMarketPrices` error response.
subTaskRunnerImpl.StartPriceUpdater(
client,
grpc_util.Ctx,
ticker,
stop,
etmp,
mockPriceFeedClient,
log.NewNopLogger(),
)

// Assert.
if tc.expectedError == nil {
require.NoError(t, client.HealthCheck())
} else {
require.ErrorContains(t, client.HealthCheck(), tc.expectedError.Error())
}

// Cleanup.
close(stop)
})
}
}

// TestMarketUpdater_Mixed tests the `RunMarketParamUpdaterTaskLoop` function invokes the grpc
// query to the prices query client and that if the query succeeds, the config is updated.
func TestMarketUpdater_Mixed(t *testing.T) {
Expand Down
20 changes: 16 additions & 4 deletions protocol/daemons/pricefeed/client/sub_task_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package client

import (
"context"
"cosmossdk.io/errors"
"github.com/dydxprotocol/v4-chain/protocol/daemons/pricefeed/client/constants"
"github.com/dydxprotocol/v4-chain/protocol/daemons/pricefeed/client/price_encoder"
"github.com/dydxprotocol/v4-chain/protocol/daemons/pricefeed/client/price_fetcher"
Expand Down Expand Up @@ -35,6 +36,7 @@ var _ SubTaskRunner = (*SubTaskRunnerImpl)(nil)
// SubTaskRunner is the interface for running pricefeed client task functions.
type SubTaskRunner interface {
StartPriceUpdater(
c *Client,
ctx context.Context,
ticker *time.Ticker,
stop <-chan bool,
Expand Down Expand Up @@ -76,6 +78,7 @@ type SubTaskRunner interface {
// StartPriceUpdater runs in the daemon's main goroutine and does not need access to the daemon's wait group
// to signal task completion.
func (s *SubTaskRunnerImpl) StartPriceUpdater(
c *Client,
ctx context.Context,
ticker *time.Ticker,
stop <-chan bool,
Expand All @@ -87,8 +90,14 @@ func (s *SubTaskRunnerImpl) StartPriceUpdater(
select {
case <-ticker.C:
err := RunPriceUpdaterTaskLoop(ctx, exchangeToMarketPrices, priceFeedServiceClient, logger)
if err != nil {
panic(err)

if err == nil {
// Record update success for the daemon health check.
c.ReportSuccess()
} else {
logger.Error("Failed to run price updater task loop for price daemon", constants.ErrorLogKey, err)
// Record update failure for the daemon health check.
c.ReportFailure(errors.Wrap(err, "failed to run price updater task loop for price daemon"))
}

case <-stop:
Expand Down Expand Up @@ -265,8 +274,10 @@ func RunPriceUpdaterTaskLoop(
metrics.Latency,
)

// On startup the length of request will likely be 0. However, sending a request of length 0
// is a fatal error.
// On startup the length of request will likely be 0. Even so, we return an error here because this
// is unexpected behavior once the daemon reaches a steady state. The daemon health check process should
// be robust enough to ignore temporarily unhealthy daemons.
// Sending a request of length 0, however, causes a panic.
// panic: rpc error: code = Unknown desc = Market price update has length of 0.
if len(request.MarketPriceUpdates) > 0 {
_, err := priceFeedServiceClient.UpdateMarketPrices(ctx, request)
Expand All @@ -291,6 +302,7 @@ func RunPriceUpdaterTaskLoop(
metrics.PriceUpdaterZeroPrices,
metrics.Count,
)
return types.ErrEmptyMarketPriceUpdate
}

return nil
Expand Down
9 changes: 9 additions & 0 deletions protocol/daemons/pricefeed/client/types/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package types

import (
"errors"
)

var (
ErrEmptyMarketPriceUpdate = errors.New("Market price update has length of 0")
)
132 changes: 132 additions & 0 deletions protocol/daemons/types/health_checkable.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
package types

import (
"fmt"
libtime "github.com/dydxprotocol/v4-chain/protocol/lib/time"
"sync"
"time"
)

const (
MaxAcceptableUpdateDelay = 5 * time.Minute
)

// HealthCheckable is a common interface for services that can be health checked.
//
// Instances of this type are thread-safe.
type HealthCheckable interface {
// HealthCheck returns an error if a service is unhealthy. If the service is healthy, this method returns nil.
HealthCheck() (err error)
// ReportFailure records a failed update.
ReportFailure(err error)
// ReportSuccess records a successful update.
ReportSuccess()
}

// timestampWithError couples a timestamp and error to make it easier to update them in tandem.
type timestampWithError struct {
timestamp time.Time
err error
}

func (u *timestampWithError) Update(timestamp time.Time, err error) {
u.timestamp = timestamp
u.err = err
}

func (u *timestampWithError) Timestamp() time.Time {
return u.timestamp
}

func (u *timestampWithError) Error() error {
return u.err
}

// timeBoundedHealthCheckable implements the HealthCheckable interface by tracking the timestamps of the last successful
// and failed updates.
// If any of the following occurs, then the service should be considered unhealthy:
// - no update has occurred
// - the most recent update failed, or
// - the daemon has not seen a successful update within `MaxAcceptableUpdateDelay`.
//
// This object is thread-safe.
type timeBoundedHealthCheckable struct {
sync.Mutex

// lastSuccessfulUpdate is the timestamp of the last successful update.
lastSuccessfulUpdate time.Time
// lastFailedUpdate is the timestamp, error pair of the last failed update.
lastFailedUpdate timestampWithError

// timeProvider is the time provider used to determine the current time. This is used for timestamping
// creation and checking for update staleness during HealthCheck.
timeProvider libtime.TimeProvider
}

// NewTimeBoundedHealthCheckable creates a new HealthCheckable instance.
func NewTimeBoundedHealthCheckable(serviceName string, timeProvider libtime.TimeProvider) HealthCheckable {
hc := &timeBoundedHealthCheckable{
timeProvider: timeProvider,
}
// Initialize the timeBoudnedHealthCheckable to an unhealthy state by reporting an error.
hc.ReportFailure(fmt.Errorf("%v is initializing", serviceName))
return hc
}

// ReportSuccess records a successful update. This method is thread-safe.
func (h *timeBoundedHealthCheckable) ReportSuccess() {
h.Lock()
defer h.Unlock()

h.lastSuccessfulUpdate = h.timeProvider.Now()
}

// ReportFailure records a failed update. This method is thread-safe.
func (h *timeBoundedHealthCheckable) ReportFailure(err error) {
h.Lock()
defer h.Unlock()
h.lastFailedUpdate.Update(h.timeProvider.Now(), err)
}

// HealthCheck returns an error if a service is unhealthy.
// The service is unhealthy if any of the following are true:
// - no successful update has occurred
// - the most recent update failed, or
// - the daemon has not seen a successful update in at least 5 minutes,
// Note: since the timeBoundedHealthCheckable is not exposed and can only be created via
// NewTimeBoundedHealthCheckable, we expect that the lastFailedUpdate is never a zero value.
// This method is thread-safe.
func (h *timeBoundedHealthCheckable) HealthCheck() error {
h.Lock()
defer h.Unlock()

if h.lastSuccessfulUpdate.IsZero() {
return fmt.Errorf(
"no successful update has occurred; last failed update occurred at %v with error '%w'",
h.lastFailedUpdate.Timestamp(),
h.lastFailedUpdate.Error(),
)
}

if h.lastFailedUpdate.Timestamp().After(h.lastSuccessfulUpdate) {
return fmt.Errorf(
"last update failed at %v with error: '%w', most recent successful update occurred at %v",
h.lastFailedUpdate.Timestamp(),
h.lastFailedUpdate.Error(),
h.lastSuccessfulUpdate,
)
}

// If the last successful update was more than 5 minutes ago, report the specific error.
if h.timeProvider.Now().Sub(h.lastSuccessfulUpdate) > MaxAcceptableUpdateDelay {
return fmt.Errorf(
"last successful update occurred at %v, which is more than %v ago. Last failure occurred at %v with error '%w'",
h.lastSuccessfulUpdate,
MaxAcceptableUpdateDelay,
h.lastFailedUpdate.Timestamp(),
h.lastFailedUpdate.Error(),
)
}

return nil
}
Loading

0 comments on commit cc9317d

Please sign in to comment.