Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CORE-666] - Migrate daemon monitoring to use health checks #783

Merged
merged 15 commits into from
Dec 8, 2023
55 changes: 44 additions & 11 deletions protocol/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@ import (
"encoding/json"
"errors"
"io"
"math"
"math/big"
clemire marked this conversation as resolved.
Show resolved Hide resolved
"net/http"
"os"
"path/filepath"
"runtime/debug"
"sync"
"time"

autocliv1 "cosmossdk.io/api/cosmos/autocli/v1"
reflectionv1 "cosmossdk.io/api/cosmos/reflection/v1"
Expand Down Expand Up @@ -183,6 +183,10 @@ import (
var (
// DefaultNodeHome default home directories for the application daemon
DefaultNodeHome string

// MaximumDaemonUnhealthyDuration is the maximum amount of time that a daemon can be unhealthy before the
// application panics.
MaximumDaemonUnhealthyDuration = 5 * time.Minute
ttl33 marked this conversation as resolved.
Show resolved Hide resolved
clemire marked this conversation as resolved.
Show resolved Hide resolved
)
clemire marked this conversation as resolved.
Show resolved Hide resolved

var (
Expand Down Expand Up @@ -290,6 +294,8 @@ type App struct {
PriceFeedClient *pricefeedclient.Client
LiquidationsClient *liquidationclient.Client
BridgeClient *bridgeclient.Client

DaemonHealthMonitor *daemonservertypes.HealthMonitor
}

// assertAppPreconditions assert invariants required for an application to start.
Expand Down Expand Up @@ -589,6 +595,11 @@ func New(
bridgeEventManager := bridgedaemontypes.NewBridgeEventManager(timeProvider)
app.Server.WithBridgeEventManager(bridgeEventManager)

app.DaemonHealthMonitor = daemonservertypes.NewHealthMonitor(
daemonservertypes.DaemonStartupGracePeriod,
daemonservertypes.HealthCheckPollFrequency,
app.Logger(),
)
// Create a closure for starting daemons and daemon server. Daemon services are delayed until after the gRPC
// service is started because daemons depend on the gRPC service being available. If a node is initialized
// with a genesis time in the future, then the gRPC service will not be available until the genesis time, the
Expand All @@ -600,11 +611,9 @@ func New(

// Start liquidations client for sending potentially liquidatable subaccounts to the application.
if daemonFlags.Liquidation.Enabled {
app.Server.ExpectLiquidationsDaemon(
daemonservertypes.MaximumAcceptableUpdateDelay(daemonFlags.Liquidation.LoopDelayMs),
)
app.LiquidationsClient = liquidationclient.NewClient(logger)
go func() {
app.RegisterDaemonWithHealthMonitor(app.LiquidationsClient, MaximumDaemonUnhealthyDuration)
if err := app.LiquidationsClient.Start(
// The client will use `context.Background` so that it can have a different context from
// the main application.
Expand All @@ -621,7 +630,6 @@ func New(
// Non-validating full-nodes have no need to run the price daemon.
if !appFlags.NonValidatingFullNode && daemonFlags.Price.Enabled {
exchangeQueryConfig := constants.StaticExchangeQueryConfig
app.Server.ExpectPricefeedDaemon(daemonservertypes.MaximumAcceptableUpdateDelay(daemonFlags.Price.LoopDelayMs))
// Start pricefeed client for sending prices for the pricefeed server to consume. These prices
// are retrieved via third-party APIs like Binance and then are encoded in-memory and
// periodically sent via gRPC to a shared socket with the server.
Expand All @@ -637,16 +645,17 @@ func New(
constants.StaticExchangeDetails,
&pricefeedclient.SubTaskRunnerImpl{},
)
app.RegisterDaemonWithHealthMonitor(app.PriceFeedClient, MaximumDaemonUnhealthyDuration)
}

// Start Bridge Daemon.
// Non-validating full-nodes have no need to run the bridge daemon.
if !appFlags.NonValidatingFullNode && daemonFlags.Bridge.Enabled {
// TODO(CORE-582): Re-enable bridge daemon registration once the bridge daemon is fixed in local / CI
// environments.
// app.Server.ExpectBridgeDaemon(daemonservertypes.MaximumAcceptableUpdateDelay(daemonFlags.Bridge.LoopDelayMs))
app.BridgeClient = bridgeclient.NewClient(logger)
go func() {
app.BridgeClient = bridgeclient.NewClient(logger)
app.RegisterDaemonWithHealthMonitor(app.BridgeClient, MaximumDaemonUnhealthyDuration)
if err := app.BridgeClient.Start(
// The client will use `context.Background` so that it can have a different context from
// the main application.
Expand All @@ -663,6 +672,9 @@ func New(
// Start the Metrics Daemon.
// The metrics daemon is purely used for observability. It should never bring the app down.
// TODO(CLOB-960) Don't start this goroutine if telemetry is disabled
// Note: the metrics daemon is such a simple go-routine that we don't bother implementing a health-check
// for this service. The task loop does not produce any errors because the telemetry calls themselves are
// not error-returning, so in effect this daemon would never become unhealthy.
go func() {
defer func() {
if r := recover(); r != nil {
Expand All @@ -675,10 +687,6 @@ func New(
)
}
}()
// Don't panic if metrics daemon loops are delayed. Use maximum value.
clemire marked this conversation as resolved.
Show resolved Hide resolved
app.Server.ExpectMetricsDaemon(
daemonservertypes.MaximumAcceptableUpdateDelay(math.MaxUint32),
)
metricsclient.Start(
// The client will use `context.Background` so that it can have a different context from
// the main application.
clemire marked this conversation as resolved.
Show resolved Hide resolved
Expand Down Expand Up @@ -1222,6 +1230,31 @@ func New(
return app
}

// RegisterDaemonWithHealthMonitor registers a daemon service with the update monitor, which will commence monitoring
// the health of the daemon. If the daemon does not register, the method will panic.
func (app *App) RegisterDaemonWithHealthMonitor(
healthCheckableDaemon daemontypes.HealthCheckable,
maximumAcceptableUpdateDelay time.Duration,
) {
if err := app.DaemonHealthMonitor.RegisterService(healthCheckableDaemon, maximumAcceptableUpdateDelay); err != nil {
app.Logger().Error(
"Failed to register daemon service with update monitor",
"error",
err,
"service",
healthCheckableDaemon.ServiceName(),
"maximumAcceptableUpdateDelay",
maximumAcceptableUpdateDelay,
)
panic(err)
}
}

// DisableHealthMonitorForTesting disables the health monitor for testing.
func (app *App) DisableHealthMonitorForTesting() {
app.DaemonHealthMonitor.DisableForTesting()
}

// hydrateMemStores hydrates the memStores used for caching state.
func (app *App) hydrateMemStores() {
// Create an `uncachedCtx` where the underlying MultiStore is the `rootMultiStore`.
clemire marked this conversation as resolved.
Show resolved Hide resolved
Expand Down
17 changes: 17 additions & 0 deletions protocol/app/app_test.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package app_test

import (
"github.com/dydxprotocol/v4-chain/protocol/mocks"
"gopkg.in/typ.v4/slices"
"reflect"
"strings"
"testing"
"time"

delaymsgmodule "github.com/dydxprotocol/v4-chain/protocol/x/delaymsg"

Expand Down Expand Up @@ -222,3 +224,18 @@ func TestModuleBasics(t *testing.T) {
actualFieldTypes := getMapFieldsAndTypes(reflect.ValueOf(basic_manager.ModuleBasics))
require.Equal(t, expectedFieldTypes, actualFieldTypes, "Module basics does not match expected")
}

func TestRegisterDaemonWithHealthMonitor_Panics(t *testing.T) {
app := testapp.DefaultTestApp(nil)
hc := &mocks.HealthCheckable{}
hc.On("ServiceName").Return("test-service")
hc.On("HealthCheck").Return(nil)

app.RegisterDaemonWithHealthMonitor(hc, 5*time.Minute)
// The second registration should fail, causing a panic.
require.PanicsWithError(
t,
"service test-service already registered",
func() { app.RegisterDaemonWithHealthMonitor(hc, 5*time.Minute) },
)
}
3 changes: 3 additions & 0 deletions protocol/daemons/metrics/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ var (

// Start begins a job that periodically:
// 1) Emits metrics about app version and git commit.
// Note: the metrics daemon is such a simple go-routine that we don't bother implementing a health-check
// for this service. The task loop does not produce any errors because the telemetry calls themselves are
// not error-returning, so in effect this daemon would never become unhealthy.
func Start(
ctx context.Context,
logger log.Logger,
Expand Down
12 changes: 11 additions & 1 deletion protocol/daemons/pricefeed/client/client_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package client_test
import (
"fmt"
"github.com/cometbft/cometbft/libs/log"
"github.com/dydxprotocol/v4-chain/protocol/app"
appflags "github.com/dydxprotocol/v4-chain/protocol/app/flags"
"github.com/dydxprotocol/v4-chain/protocol/daemons/flags"
"github.com/dydxprotocol/v4-chain/protocol/daemons/pricefeed/client"
Expand Down Expand Up @@ -219,6 +220,7 @@ type PriceDaemonIntegrationTestSuite struct {
exchangeServer *pricefeed.ExchangeServer
daemonServer *daemonserver.Server
exchangePriceCache *pricefeedserver_types.MarketToExchangePrices
healthMonitor *servertypes.HealthMonitor

pricesMockQueryServer *mocks.QueryServer
pricesGrpcServer *grpc.Server
Expand Down Expand Up @@ -278,7 +280,13 @@ func (s *PriceDaemonIntegrationTestSuite) SetupTest() {
&daemontypes.FileHandlerImpl{},
s.daemonFlags.Shared.SocketAddress,
)
s.daemonServer.ExpectPricefeedDaemon(servertypes.MaximumAcceptableUpdateDelay(s.daemonFlags.Price.LoopDelayMs))

s.healthMonitor = servertypes.NewHealthMonitor(
servertypes.DaemonStartupGracePeriod,
servertypes.HealthCheckPollFrequency,
log.TestingLogger(),
)

s.exchangePriceCache = pricefeedserver_types.NewMarketToExchangePrices(pricefeed_types.MaxPriceAge)
s.daemonServer.WithPriceFeedMarketToExchangePrices(s.exchangePriceCache)

Expand Down Expand Up @@ -329,6 +337,8 @@ func (s *PriceDaemonIntegrationTestSuite) startClient() {
testExchangeToQueryDetails,
&client.SubTaskRunnerImpl{},
)
err := s.healthMonitor.RegisterService(s.pricefeedDaemon, app.MaximumDaemonUnhealthyDuration)
s.Require().NoError(err)
}

// expectPricesWithTimeout waits for the exchange price cache to contain the expected prices, with a timeout.
Expand Down
28 changes: 8 additions & 20 deletions protocol/daemons/server/bridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,8 @@ package server

import (
"context"
"github.com/dydxprotocol/v4-chain/protocol/daemons/server/types"
"time"

"github.com/dydxprotocol/v4-chain/protocol/daemons/bridge/api"
"github.com/dydxprotocol/v4-chain/protocol/daemons/server/types"
bdtypes "github.com/dydxprotocol/v4-chain/protocol/daemons/server/types/bridge"
)

Expand All @@ -23,31 +21,21 @@ func (server *Server) WithBridgeEventManager(
return server
}

// ExpectBridgeDaemon registers the bridge daemon with the server. This is required
// in order to ensure that the daemon service is called at least once during every
// maximumAcceptableUpdateDelay duration. It will cause the protocol to panic if the daemon does not
// respond within maximumAcceptableUpdateDelay duration.
func (server *Server) ExpectBridgeDaemon(maximumAcceptableUpdateDelay time.Duration) {
server.registerDaemon(types.BridgeDaemonServiceName, maximumAcceptableUpdateDelay)
}

// AddBridgeEvents stores any bridge events recognized by the daemon
// in a go-routine safe slice.
func (s *Server) AddBridgeEvents(
ctx context.Context,
req *api.AddBridgeEventsRequest,
) (
*api.AddBridgeEventsResponse,
error,
response *api.AddBridgeEventsResponse,
err error,
) {
// If the daemon is unable to report a response, there is either an error in the registration of
// this daemon, or another one. In either case, the protocol should panic.
// TODO(CORE-582): Re-enable this check once the bridge daemon is fixed in local / CI environments.
//if err := s.reportResponse(types.BridgeDaemonServiceName); err != nil {
// panic(err)
//}
if err := s.bridgeEventManager.AddBridgeEvents(req.BridgeEvents); err != nil {
if err = s.bridgeEventManager.AddBridgeEvents(req.BridgeEvents); err != nil {
return nil, err
}

// Capture valid responses in metrics.
s.reportValidResponse(types.BridgeDaemonServiceName)

return &api.AddBridgeEventsResponse{}, nil
}
26 changes: 9 additions & 17 deletions protocol/daemons/server/liquidation.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,9 @@ package server

import (
"context"
"github.com/dydxprotocol/v4-chain/protocol/daemons/server/types"
"time"

"github.com/cosmos/cosmos-sdk/telemetry"
"github.com/dydxprotocol/v4-chain/protocol/daemons/liquidation/api"
"github.com/dydxprotocol/v4-chain/protocol/daemons/server/types"
liquidationtypes "github.com/dydxprotocol/v4-chain/protocol/daemons/server/types/liquidations"
"github.com/dydxprotocol/v4-chain/protocol/lib/metrics"
)
Expand All @@ -26,33 +24,27 @@ func (server *Server) WithLiquidatableSubaccountIds(
return server
}

// ExpectLiquidationsDaemon registers the liquidations daemon with the server. This is required
// in order to ensure that the daemon service is called at least once during every
// maximumAcceptableUpdateDelay duration. It will cause the protocol to panic if the daemon does not
// respond within maximumAcceptableUpdateDelay duration.
func (server *Server) ExpectLiquidationsDaemon(maximumAcceptableUpdateDelay time.Duration) {
server.registerDaemon(types.LiquidationsDaemonServiceName, maximumAcceptableUpdateDelay)
}

// LiquidateSubaccounts stores the list of potentially liquidatable subaccount ids
// in a go-routine safe slice.
func (s *Server) LiquidateSubaccounts(
ctx context.Context,
req *api.LiquidateSubaccountsRequest,
) (*api.LiquidateSubaccountsResponse, error) {
) (
response *api.LiquidateSubaccountsResponse,
err error,
) {
telemetry.ModuleSetGauge(
metrics.LiquidationDaemon,
float32(len(req.SubaccountIds)),
metrics.LiquidatableSubaccountIds,
metrics.Received,
metrics.Count,
)
// If the daemon is unable to report a response, there is either an error in the registration of
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for my own understanding, why are we removing this error logging?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error here was emitted by the daemon monitor (which lived on the daemon server) if it failed to register the response. We don't report daemon responses to the monitor anymore; instead, we query the daemon for it's health directly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be useful to still log these errors for debugging purposes?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't seem to make sense to log this as we never expect the report response call to fail.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"...we never expect the response call to fail" - yes, since we've removed health monitoring code from this method, there's nothing here but telemetry and this call will never generate an error.

// this daemon, or another one. In either case, the protocol should panic.
if err := s.reportResponse(types.LiquidationsDaemonServiceName); err != nil {
s.logger.Error("Failed to report liquidations response to update monitor", "error", err)
}

s.liquidatableSubaccountIds.UpdateSubaccountIds(req.SubaccountIds)

// Capture valid responses in metrics.
s.reportValidResponse(types.LiquidationsDaemonServiceName)

return &api.LiquidateSubaccountsResponse{}, nil
}
15 changes: 0 additions & 15 deletions protocol/daemons/server/metrics.go

This file was deleted.

Loading
Loading