Skip to content

Commit

Permalink
[CORE-666] - Migrate daemon monitoring to use health checks (#783)
Browse files Browse the repository at this point in the history
  • Loading branch information
Crystal Lemire authored Dec 8, 2023
1 parent fb70453 commit f57fd40
Show file tree
Hide file tree
Showing 30 changed files with 1,125 additions and 565 deletions.
55 changes: 42 additions & 13 deletions protocol/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@ import (
"encoding/json"
"errors"
"io"
"math"
"math/big"
"net/http"
"os"
"path/filepath"
"runtime/debug"
"sync"
"time"

autocliv1 "cosmossdk.io/api/cosmos/autocli/v1"
reflectionv1 "cosmossdk.io/api/cosmos/reflection/v1"
Expand Down Expand Up @@ -290,6 +290,8 @@ type App struct {
PriceFeedClient *pricefeedclient.Client
LiquidationsClient *liquidationclient.Client
BridgeClient *bridgeclient.Client

DaemonHealthMonitor *daemonservertypes.HealthMonitor
}

// assertAppPreconditions assert invariants required for an application to start.
Expand Down Expand Up @@ -589,22 +591,27 @@ func New(
bridgeEventManager := bridgedaemontypes.NewBridgeEventManager(timeProvider)
app.Server.WithBridgeEventManager(bridgeEventManager)

app.DaemonHealthMonitor = daemonservertypes.NewHealthMonitor(
daemonservertypes.DaemonStartupGracePeriod,
daemonservertypes.HealthCheckPollFrequency,
app.Logger(),
daemonFlags.Shared.PanicOnDaemonFailureEnabled,
)
// Create a closure for starting daemons and daemon server. Daemon services are delayed until after the gRPC
// service is started because daemons depend on the gRPC service being available. If a node is initialized
// with a genesis time in the future, then the gRPC service will not be available until the genesis time, the
// daemons will not be able to connect to the cosmos gRPC query service and finish initialization, and the daemon
// monitoring service will panic.
app.startDaemons = func() {
maxDaemonUnhealthyDuration := time.Duration(daemonFlags.Shared.MaxDaemonUnhealthySeconds) * time.Second
// Start server for handling gRPC messages from daemons.
go app.Server.Start()

// Start liquidations client for sending potentially liquidatable subaccounts to the application.
if daemonFlags.Liquidation.Enabled {
app.Server.ExpectLiquidationsDaemon(
daemonservertypes.MaximumAcceptableUpdateDelay(daemonFlags.Liquidation.LoopDelayMs),
)
app.LiquidationsClient = liquidationclient.NewClient(logger)
go func() {
app.RegisterDaemonWithHealthMonitor(app.LiquidationsClient, maxDaemonUnhealthyDuration)
if err := app.LiquidationsClient.Start(
// The client will use `context.Background` so that it can have a different context from
// the main application.
Expand All @@ -621,7 +628,6 @@ func New(
// Non-validating full-nodes have no need to run the price daemon.
if !appFlags.NonValidatingFullNode && daemonFlags.Price.Enabled {
exchangeQueryConfig := constants.StaticExchangeQueryConfig
app.Server.ExpectPricefeedDaemon(daemonservertypes.MaximumAcceptableUpdateDelay(daemonFlags.Price.LoopDelayMs))
// Start pricefeed client for sending prices for the pricefeed server to consume. These prices
// are retrieved via third-party APIs like Binance and then are encoded in-memory and
// periodically sent via gRPC to a shared socket with the server.
Expand All @@ -637,16 +643,15 @@ func New(
constants.StaticExchangeDetails,
&pricefeedclient.SubTaskRunnerImpl{},
)
app.RegisterDaemonWithHealthMonitor(app.PriceFeedClient, maxDaemonUnhealthyDuration)
}

// Start Bridge Daemon.
// Non-validating full-nodes have no need to run the bridge daemon.
if !appFlags.NonValidatingFullNode && daemonFlags.Bridge.Enabled {
// TODO(CORE-582): Re-enable bridge daemon registration once the bridge daemon is fixed in local / CI
// environments.
// app.Server.ExpectBridgeDaemon(daemonservertypes.MaximumAcceptableUpdateDelay(daemonFlags.Bridge.LoopDelayMs))
app.BridgeClient = bridgeclient.NewClient(logger)
go func() {
app.BridgeClient = bridgeclient.NewClient(logger)
app.RegisterDaemonWithHealthMonitor(app.BridgeClient, maxDaemonUnhealthyDuration)
if err := app.BridgeClient.Start(
// The client will use `context.Background` so that it can have a different context from
// the main application.
Expand All @@ -663,6 +668,9 @@ func New(
// Start the Metrics Daemon.
// The metrics daemon is purely used for observability. It should never bring the app down.
// TODO(CLOB-960) Don't start this goroutine if telemetry is disabled
// Note: the metrics daemon is such a simple go-routine that we don't bother implementing a health-check
// for this service. The task loop does not produce any errors because the telemetry calls themselves are
// not error-returning, so in effect this daemon would never become unhealthy.
go func() {
defer func() {
if r := recover(); r != nil {
Expand All @@ -675,10 +683,6 @@ func New(
)
}
}()
// Don't panic if metrics daemon loops are delayed. Use maximum value.
app.Server.ExpectMetricsDaemon(
daemonservertypes.MaximumAcceptableUpdateDelay(math.MaxUint32),
)
metricsclient.Start(
// The client will use `context.Background` so that it can have a different context from
// the main application.
Expand Down Expand Up @@ -1222,6 +1226,31 @@ func New(
return app
}

// RegisterDaemonWithHealthMonitor registers a daemon service with the update monitor, which will commence monitoring
// the health of the daemon. If the daemon does not register, the method will panic.
func (app *App) RegisterDaemonWithHealthMonitor(
healthCheckableDaemon daemontypes.HealthCheckable,
maxDaemonUnhealthyDuration time.Duration,
) {
if err := app.DaemonHealthMonitor.RegisterService(healthCheckableDaemon, maxDaemonUnhealthyDuration); err != nil {
app.Logger().Error(
"Failed to register daemon service with update monitor",
"error",
err,
"service",
healthCheckableDaemon.ServiceName(),
"maxDaemonUnhealthyDuration",
maxDaemonUnhealthyDuration,
)
panic(err)
}
}

// DisableHealthMonitorForTesting disables the health monitor for testing.
func (app *App) DisableHealthMonitorForTesting() {
app.DaemonHealthMonitor.DisableForTesting()
}

// hydrateMemStores hydrates the memStores used for caching state.
func (app *App) hydrateMemStores() {
// Create an `uncachedCtx` where the underlying MultiStore is the `rootMultiStore`.
Expand Down
17 changes: 17 additions & 0 deletions protocol/app/app_test.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package app_test

import (
"github.com/dydxprotocol/v4-chain/protocol/mocks"
"gopkg.in/typ.v4/slices"
"reflect"
"strings"
"testing"
"time"

delaymsgmodule "github.com/dydxprotocol/v4-chain/protocol/x/delaymsg"

Expand Down Expand Up @@ -222,3 +224,18 @@ func TestModuleBasics(t *testing.T) {
actualFieldTypes := getMapFieldsAndTypes(reflect.ValueOf(basic_manager.ModuleBasics))
require.Equal(t, expectedFieldTypes, actualFieldTypes, "Module basics does not match expected")
}

func TestRegisterDaemonWithHealthMonitor_Panics(t *testing.T) {
app := testapp.DefaultTestApp(nil)
hc := &mocks.HealthCheckable{}
hc.On("ServiceName").Return("test-service")
hc.On("HealthCheck").Return(nil)

app.RegisterDaemonWithHealthMonitor(hc, 5*time.Minute)
// The second registration should fail, causing a panic.
require.PanicsWithError(
t,
"service test-service already registered",
func() { app.RegisterDaemonWithHealthMonitor(hc, 5*time.Minute) },
)
}
1 change: 0 additions & 1 deletion protocol/app/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package flags

import (
"fmt"

"github.com/cosmos/cosmos-sdk/server/config"
servertypes "github.com/cosmos/cosmos-sdk/server/types"
"github.com/spf13/cast"
Expand Down
36 changes: 32 additions & 4 deletions protocol/daemons/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ import (
// List of CLI flags for Server and Client.
const (
// Flag names
FlagUnixSocketAddress = "unix-socket-address"
FlagUnixSocketAddress = "unix-socket-address"
FlagPanicOnDaemonFailureEnabled = "panic-on-daemon-failure-enabled"
FlagMaxDaemonUnhealthySeconds = "max-daemon-unhealthy-seconds"

FlagPriceDaemonEnabled = "price-daemon-enabled"
FlagPriceDaemonLoopDelayMs = "price-daemon-loop-delay-ms"
Expand All @@ -28,6 +30,10 @@ const (
type SharedFlags struct {
// SocketAddress is the location of the unix socket to communicate with the daemon gRPC service.
SocketAddress string
// PanicOnDaemonFailureEnabled toggles whether the daemon should panic on failure.
PanicOnDaemonFailureEnabled bool
// MaxDaemonUnhealthySeconds is the maximum allowable duration for which a daemon can be unhealthy.
MaxDaemonUnhealthySeconds uint32
}

// BridgeFlags contains configuration flags for the Bridge Daemon.
Expand Down Expand Up @@ -74,7 +80,9 @@ func GetDefaultDaemonFlags() DaemonFlags {
if defaultDaemonFlags == nil {
defaultDaemonFlags = &DaemonFlags{
Shared: SharedFlags{
SocketAddress: "/tmp/daemons.sock",
SocketAddress: "/tmp/daemons.sock",
PanicOnDaemonFailureEnabled: true,
MaxDaemonUnhealthySeconds: 5 * 60, // 5 minutes.
},
Bridge: BridgeFlags{
Enabled: true,
Expand Down Expand Up @@ -109,8 +117,18 @@ func AddDaemonFlagsToCmd(
cmd.Flags().String(
FlagUnixSocketAddress,
df.Shared.SocketAddress,
"Socket address for the price daemon to send updates to, if not set "+
"will establish default location to ingest price updates from",
"Socket address for the daemons to send updates to, if not set "+
"will establish default location to ingest daemon updates from",
)
cmd.Flags().Bool(
FlagPanicOnDaemonFailureEnabled,
df.Shared.PanicOnDaemonFailureEnabled,
"Enables panicking when a daemon fails.",
)
cmd.Flags().Uint32(
FlagMaxDaemonUnhealthySeconds,
df.Shared.MaxDaemonUnhealthySeconds,
"Maximum allowable duration for which a daemon can be unhealthy.",
)

// Bridge Daemon.
Expand Down Expand Up @@ -178,6 +196,16 @@ func GetDaemonFlagValuesFromOptions(
result.Shared.SocketAddress = v
}
}
if option := appOpts.Get(FlagPanicOnDaemonFailureEnabled); option != nil {
if v, err := cast.ToBoolE(option); err == nil {
result.Shared.PanicOnDaemonFailureEnabled = v
}
}
if option := appOpts.Get(FlagMaxDaemonUnhealthySeconds); option != nil {
if v, err := cast.ToUint32E(option); err == nil {
result.Shared.MaxDaemonUnhealthySeconds = v
}
}

// Bridge Daemon.
if option := appOpts.Get(FlagBridgeDaemonEnabled); option != nil {
Expand Down
12 changes: 11 additions & 1 deletion protocol/daemons/flags/flags_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ func TestAddDaemonFlagsToCmd(t *testing.T) {
flags.AddDaemonFlagsToCmd(&cmd)
tests := []string{
flags.FlagUnixSocketAddress,
flags.FlagPanicOnDaemonFailureEnabled,
flags.FlagMaxDaemonUnhealthySeconds,

flags.FlagBridgeDaemonEnabled,
flags.FlagBridgeDaemonLoopDelayMs,
Expand All @@ -41,6 +43,8 @@ func TestGetDaemonFlagValuesFromOptions_Custom(t *testing.T) {
optsMap := make(map[string]interface{})

optsMap[flags.FlagUnixSocketAddress] = "test-socket-address"
optsMap[flags.FlagPanicOnDaemonFailureEnabled] = false
optsMap[flags.FlagMaxDaemonUnhealthySeconds] = uint32(1234)

optsMap[flags.FlagBridgeDaemonEnabled] = true
optsMap[flags.FlagBridgeDaemonLoopDelayMs] = uint32(1111)
Expand All @@ -64,6 +68,12 @@ func TestGetDaemonFlagValuesFromOptions_Custom(t *testing.T) {

// Shared.
require.Equal(t, optsMap[flags.FlagUnixSocketAddress], r.Shared.SocketAddress)
require.Equal(t, optsMap[flags.FlagPanicOnDaemonFailureEnabled], r.Shared.PanicOnDaemonFailureEnabled)
require.Equal(
t,
optsMap[flags.FlagMaxDaemonUnhealthySeconds],
r.Shared.MaxDaemonUnhealthySeconds,
)

// Bridge Daemon.
require.Equal(t, optsMap[flags.FlagBridgeDaemonEnabled], r.Bridge.Enabled)
Expand All @@ -81,7 +91,7 @@ func TestGetDaemonFlagValuesFromOptions_Custom(t *testing.T) {
require.Equal(t, optsMap[flags.FlagPriceDaemonLoopDelayMs], r.Price.LoopDelayMs)
}

func TestGetDaemonFlagValuesFromOptions_Defaul(t *testing.T) {
func TestGetDaemonFlagValuesFromOptions_Default(t *testing.T) {
mockOpts := mocks.AppOptions{}
mockOpts.On("Get", mock.Anything).
Return(func(key string) interface{} {
Expand Down
3 changes: 3 additions & 0 deletions protocol/daemons/metrics/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ var (

// Start begins a job that periodically:
// 1) Emits metrics about app version and git commit.
// Note: the metrics daemon is such a simple go-routine that we don't bother implementing a health-check
// for this service. The task loop does not produce any errors because the telemetry calls themselves are
// not error-returning, so in effect this daemon would never become unhealthy.
func Start(
ctx context.Context,
logger log.Logger,
Expand Down
15 changes: 14 additions & 1 deletion protocol/daemons/pricefeed/client/client_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ type PriceDaemonIntegrationTestSuite struct {
exchangeServer *pricefeed.ExchangeServer
daemonServer *daemonserver.Server
exchangePriceCache *pricefeedserver_types.MarketToExchangePrices
healthMonitor *servertypes.HealthMonitor

pricesMockQueryServer *mocks.QueryServer
pricesGrpcServer *grpc.Server
Expand Down Expand Up @@ -278,7 +279,14 @@ func (s *PriceDaemonIntegrationTestSuite) SetupTest() {
&daemontypes.FileHandlerImpl{},
s.daemonFlags.Shared.SocketAddress,
)
s.daemonServer.ExpectPricefeedDaemon(servertypes.MaximumAcceptableUpdateDelay(s.daemonFlags.Price.LoopDelayMs))

s.healthMonitor = servertypes.NewHealthMonitor(
servertypes.DaemonStartupGracePeriod,
servertypes.HealthCheckPollFrequency,
log.TestingLogger(),
flags.GetDefaultDaemonFlags().Shared.PanicOnDaemonFailureEnabled, // Use default behavior for testing
)

s.exchangePriceCache = pricefeedserver_types.NewMarketToExchangePrices(pricefeed_types.MaxPriceAge)
s.daemonServer.WithPriceFeedMarketToExchangePrices(s.exchangePriceCache)

Expand Down Expand Up @@ -329,6 +337,11 @@ func (s *PriceDaemonIntegrationTestSuite) startClient() {
testExchangeToQueryDetails,
&client.SubTaskRunnerImpl{},
)
err := s.healthMonitor.RegisterService(
s.pricefeedDaemon,
time.Duration(s.daemonFlags.Shared.MaxDaemonUnhealthySeconds)*time.Second,
)
s.Require().NoError(err)
}

// expectPricesWithTimeout waits for the exchange price cache to contain the expected prices, with a timeout.
Expand Down
28 changes: 8 additions & 20 deletions protocol/daemons/server/bridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,8 @@ package server

import (
"context"
"github.com/dydxprotocol/v4-chain/protocol/daemons/server/types"
"time"

"github.com/dydxprotocol/v4-chain/protocol/daemons/bridge/api"
"github.com/dydxprotocol/v4-chain/protocol/daemons/server/types"
bdtypes "github.com/dydxprotocol/v4-chain/protocol/daemons/server/types/bridge"
)

Expand All @@ -23,31 +21,21 @@ func (server *Server) WithBridgeEventManager(
return server
}

// ExpectBridgeDaemon registers the bridge daemon with the server. This is required
// in order to ensure that the daemon service is called at least once during every
// maximumAcceptableUpdateDelay duration. It will cause the protocol to panic if the daemon does not
// respond within maximumAcceptableUpdateDelay duration.
func (server *Server) ExpectBridgeDaemon(maximumAcceptableUpdateDelay time.Duration) {
server.registerDaemon(types.BridgeDaemonServiceName, maximumAcceptableUpdateDelay)
}

// AddBridgeEvents stores any bridge events recognized by the daemon
// in a go-routine safe slice.
func (s *Server) AddBridgeEvents(
ctx context.Context,
req *api.AddBridgeEventsRequest,
) (
*api.AddBridgeEventsResponse,
error,
response *api.AddBridgeEventsResponse,
err error,
) {
// If the daemon is unable to report a response, there is either an error in the registration of
// this daemon, or another one. In either case, the protocol should panic.
// TODO(CORE-582): Re-enable this check once the bridge daemon is fixed in local / CI environments.
//if err := s.reportResponse(types.BridgeDaemonServiceName); err != nil {
// panic(err)
//}
if err := s.bridgeEventManager.AddBridgeEvents(req.BridgeEvents); err != nil {
if err = s.bridgeEventManager.AddBridgeEvents(req.BridgeEvents); err != nil {
return nil, err
}

// Capture valid responses in metrics.
s.reportValidResponse(types.BridgeDaemonServiceName)

return &api.AddBridgeEventsResponse{}, nil
}
Loading

0 comments on commit f57fd40

Please sign in to comment.