Skip to content

Commit

Permalink
[CORE-668,CORE-669] - Add health monitor flags (#802)
Browse files Browse the repository at this point in the history
  • Loading branch information
Crystal Lemire authored Dec 7, 2023
1 parent 21585a0 commit 4336388
Show file tree
Hide file tree
Showing 12 changed files with 173 additions and 50 deletions.
22 changes: 9 additions & 13 deletions protocol/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,10 +183,6 @@ import (
var (
// DefaultNodeHome default home directories for the application daemon
DefaultNodeHome string

// MaximumDaemonUnhealthyDuration is the maximum amount of time that a daemon can be unhealthy before the
// application panics.
MaximumDaemonUnhealthyDuration = 5 * time.Minute
)

var (
Expand Down Expand Up @@ -599,21 +595,23 @@ func New(
daemonservertypes.DaemonStartupGracePeriod,
daemonservertypes.HealthCheckPollFrequency,
app.Logger(),
daemonFlags.Shared.PanicOnDaemonFailureEnabled,
)
// Create a closure for starting daemons and daemon server. Daemon services are delayed until after the gRPC
// service is started because daemons depend on the gRPC service being available. If a node is initialized
// with a genesis time in the future, then the gRPC service will not be available until the genesis time, the
// daemons will not be able to connect to the cosmos gRPC query service and finish initialization, and the daemon
// monitoring service will panic.
app.startDaemons = func() {
maxDaemonUnhealthyDuration := time.Duration(daemonFlags.Shared.MaxDaemonUnhealthySeconds) * time.Second
// Start server for handling gRPC messages from daemons.
go app.Server.Start()

// Start liquidations client for sending potentially liquidatable subaccounts to the application.
if daemonFlags.Liquidation.Enabled {
app.LiquidationsClient = liquidationclient.NewClient(logger)
go func() {
app.RegisterDaemonWithHealthMonitor(app.LiquidationsClient, MaximumDaemonUnhealthyDuration)
app.RegisterDaemonWithHealthMonitor(app.LiquidationsClient, maxDaemonUnhealthyDuration)
if err := app.LiquidationsClient.Start(
// The client will use `context.Background` so that it can have a different context from
// the main application.
Expand Down Expand Up @@ -645,17 +643,15 @@ func New(
constants.StaticExchangeDetails,
&pricefeedclient.SubTaskRunnerImpl{},
)
app.RegisterDaemonWithHealthMonitor(app.PriceFeedClient, MaximumDaemonUnhealthyDuration)
app.RegisterDaemonWithHealthMonitor(app.PriceFeedClient, maxDaemonUnhealthyDuration)
}

// Start Bridge Daemon.
// Non-validating full-nodes have no need to run the bridge daemon.
if !appFlags.NonValidatingFullNode && daemonFlags.Bridge.Enabled {
// TODO(CORE-582): Re-enable bridge daemon registration once the bridge daemon is fixed in local / CI
// environments.
app.BridgeClient = bridgeclient.NewClient(logger)
go func() {
app.RegisterDaemonWithHealthMonitor(app.BridgeClient, MaximumDaemonUnhealthyDuration)
app.RegisterDaemonWithHealthMonitor(app.BridgeClient, maxDaemonUnhealthyDuration)
if err := app.BridgeClient.Start(
// The client will use `context.Background` so that it can have a different context from
// the main application.
Expand Down Expand Up @@ -1234,17 +1230,17 @@ func New(
// the health of the daemon. If the daemon does not register, the method will panic.
func (app *App) RegisterDaemonWithHealthMonitor(
healthCheckableDaemon daemontypes.HealthCheckable,
maximumAcceptableUpdateDelay time.Duration,
maxDaemonUnhealthyDuration time.Duration,
) {
if err := app.DaemonHealthMonitor.RegisterService(healthCheckableDaemon, maximumAcceptableUpdateDelay); err != nil {
if err := app.DaemonHealthMonitor.RegisterService(healthCheckableDaemon, maxDaemonUnhealthyDuration); err != nil {
app.Logger().Error(
"Failed to register daemon service with update monitor",
"error",
err,
"service",
healthCheckableDaemon.ServiceName(),
"maximumAcceptableUpdateDelay",
maximumAcceptableUpdateDelay,
"maxDaemonUnhealthyDuration",
maxDaemonUnhealthyDuration,
)
panic(err)
}
Expand Down
1 change: 0 additions & 1 deletion protocol/app/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package flags

import (
"fmt"

"github.com/cosmos/cosmos-sdk/server/config"
servertypes "github.com/cosmos/cosmos-sdk/server/types"
"github.com/spf13/cast"
Expand Down
36 changes: 32 additions & 4 deletions protocol/daemons/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ import (
// List of CLI flags for Server and Client.
const (
// Flag names
FlagUnixSocketAddress = "unix-socket-address"
FlagUnixSocketAddress = "unix-socket-address"
FlagPanicOnDaemonFailureEnabled = "panic-on-daemon-failure-enabled"
FlagMaxDaemonUnhealthySeconds = "max-daemon-unhealthy-seconds"

FlagPriceDaemonEnabled = "price-daemon-enabled"
FlagPriceDaemonLoopDelayMs = "price-daemon-loop-delay-ms"
Expand All @@ -28,6 +30,10 @@ const (
type SharedFlags struct {
// SocketAddress is the location of the unix socket to communicate with the daemon gRPC service.
SocketAddress string
// PanicOnDaemonFailureEnabled toggles whether the daemon should panic on failure.
PanicOnDaemonFailureEnabled bool
// MaxDaemonUnhealthySeconds is the maximum allowable duration for which a daemon can be unhealthy.
MaxDaemonUnhealthySeconds uint32
}

// BridgeFlags contains configuration flags for the Bridge Daemon.
Expand Down Expand Up @@ -74,7 +80,9 @@ func GetDefaultDaemonFlags() DaemonFlags {
if defaultDaemonFlags == nil {
defaultDaemonFlags = &DaemonFlags{
Shared: SharedFlags{
SocketAddress: "/tmp/daemons.sock",
SocketAddress: "/tmp/daemons.sock",
PanicOnDaemonFailureEnabled: true,
MaxDaemonUnhealthySeconds: 5 * 60, // 5 minutes.
},
Bridge: BridgeFlags{
Enabled: true,
Expand Down Expand Up @@ -109,8 +117,18 @@ func AddDaemonFlagsToCmd(
cmd.Flags().String(
FlagUnixSocketAddress,
df.Shared.SocketAddress,
"Socket address for the price daemon to send updates to, if not set "+
"will establish default location to ingest price updates from",
"Socket address for the daemons to send updates to, if not set "+
"will establish default location to ingest daemon updates from",
)
cmd.Flags().Bool(
FlagPanicOnDaemonFailureEnabled,
df.Shared.PanicOnDaemonFailureEnabled,
"Enables panicking when a daemon fails.",
)
cmd.Flags().Uint32(
FlagMaxDaemonUnhealthySeconds,
df.Shared.MaxDaemonUnhealthySeconds,
"Maximum allowable duration for which a daemon can be unhealthy.",
)

// Bridge Daemon.
Expand Down Expand Up @@ -178,6 +196,16 @@ func GetDaemonFlagValuesFromOptions(
result.Shared.SocketAddress = v
}
}
if option := appOpts.Get(FlagPanicOnDaemonFailureEnabled); option != nil {
if v, err := cast.ToBoolE(option); err == nil {
result.Shared.PanicOnDaemonFailureEnabled = v
}
}
if option := appOpts.Get(FlagMaxDaemonUnhealthySeconds); option != nil {
if v, err := cast.ToUint32E(option); err == nil {
result.Shared.MaxDaemonUnhealthySeconds = v
}
}

// Bridge Daemon.
if option := appOpts.Get(FlagBridgeDaemonEnabled); option != nil {
Expand Down
12 changes: 11 additions & 1 deletion protocol/daemons/flags/flags_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ func TestAddDaemonFlagsToCmd(t *testing.T) {
flags.AddDaemonFlagsToCmd(&cmd)
tests := []string{
flags.FlagUnixSocketAddress,
flags.FlagPanicOnDaemonFailureEnabled,
flags.FlagMaxDaemonUnhealthySeconds,

flags.FlagBridgeDaemonEnabled,
flags.FlagBridgeDaemonLoopDelayMs,
Expand All @@ -41,6 +43,8 @@ func TestGetDaemonFlagValuesFromOptions_Custom(t *testing.T) {
optsMap := make(map[string]interface{})

optsMap[flags.FlagUnixSocketAddress] = "test-socket-address"
optsMap[flags.FlagPanicOnDaemonFailureEnabled] = false
optsMap[flags.FlagMaxDaemonUnhealthySeconds] = uint32(1234)

optsMap[flags.FlagBridgeDaemonEnabled] = true
optsMap[flags.FlagBridgeDaemonLoopDelayMs] = uint32(1111)
Expand All @@ -64,6 +68,12 @@ func TestGetDaemonFlagValuesFromOptions_Custom(t *testing.T) {

// Shared.
require.Equal(t, optsMap[flags.FlagUnixSocketAddress], r.Shared.SocketAddress)
require.Equal(t, optsMap[flags.FlagPanicOnDaemonFailureEnabled], r.Shared.PanicOnDaemonFailureEnabled)
require.Equal(
t,
optsMap[flags.FlagMaxDaemonUnhealthySeconds],
r.Shared.MaxDaemonUnhealthySeconds,
)

// Bridge Daemon.
require.Equal(t, optsMap[flags.FlagBridgeDaemonEnabled], r.Bridge.Enabled)
Expand All @@ -81,7 +91,7 @@ func TestGetDaemonFlagValuesFromOptions_Custom(t *testing.T) {
require.Equal(t, optsMap[flags.FlagPriceDaemonLoopDelayMs], r.Price.LoopDelayMs)
}

func TestGetDaemonFlagValuesFromOptions_Defaul(t *testing.T) {
func TestGetDaemonFlagValuesFromOptions_Default(t *testing.T) {
mockOpts := mocks.AppOptions{}
mockOpts.On("Get", mock.Anything).
Return(func(key string) interface{} {
Expand Down
7 changes: 5 additions & 2 deletions protocol/daemons/pricefeed/client/client_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package client_test
import (
"fmt"
"github.com/cometbft/cometbft/libs/log"
"github.com/dydxprotocol/v4-chain/protocol/app"
appflags "github.com/dydxprotocol/v4-chain/protocol/app/flags"
"github.com/dydxprotocol/v4-chain/protocol/daemons/flags"
"github.com/dydxprotocol/v4-chain/protocol/daemons/pricefeed/client"
Expand Down Expand Up @@ -285,6 +284,7 @@ func (s *PriceDaemonIntegrationTestSuite) SetupTest() {
servertypes.DaemonStartupGracePeriod,
servertypes.HealthCheckPollFrequency,
log.TestingLogger(),
flags.GetDefaultDaemonFlags().Shared.PanicOnDaemonFailureEnabled, // Use default behavior for testing
)

s.exchangePriceCache = pricefeedserver_types.NewMarketToExchangePrices(pricefeed_types.MaxPriceAge)
Expand Down Expand Up @@ -337,7 +337,10 @@ func (s *PriceDaemonIntegrationTestSuite) startClient() {
testExchangeToQueryDetails,
&client.SubTaskRunnerImpl{},
)
err := s.healthMonitor.RegisterService(s.pricefeedDaemon, app.MaximumDaemonUnhealthyDuration)
err := s.healthMonitor.RegisterService(
s.pricefeedDaemon,
time.Duration(s.daemonFlags.Shared.MaxDaemonUnhealthySeconds)*time.Second,
)
s.Require().NoError(err)
}

Expand Down
21 changes: 11 additions & 10 deletions protocol/daemons/server/types/health_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,10 +145,10 @@ type healthChecker struct {
// pollFrequency is the frequency at which the health-checkable service is polled.
pollFrequency time.Duration

// maxAcceptableUnhealthyDuration is the maximum acceptable duration for a health-checkable service to
// maxUnhealthyDuration is the maximum acceptable duration for a health-checkable service to
// remain unhealthy. If the service remains unhealthy for this duration, the monitor will execute the
// specified callback function.
maxAcceptableUnhealthyDuration time.Duration
maxUnhealthyDuration time.Duration

// unhealthyCallback is the callback function to be executed if the health-checkable service remains
// unhealthy for a period of time greater than or equal to the maximum acceptable unhealthy duration.
Expand All @@ -174,7 +174,7 @@ func (hc *healthChecker) Poll() {
streakDuration := hc.mutableState.ReportFailure(now, err)
// If the service has been unhealthy for longer than the maximum acceptable unhealthy duration, execute the
// callback function.
if streakDuration >= hc.maxAcceptableUnhealthyDuration {
if streakDuration >= hc.maxUnhealthyDuration {
hc.unhealthyCallback(err)
}
}
Expand All @@ -197,17 +197,18 @@ func StartNewHealthChecker(
pollFrequency time.Duration,
unhealthyCallback func(error),
timeProvider libtime.TimeProvider,
maximumAcceptableUnhealthyDuration time.Duration,
maxUnhealthyDuration time.Duration,
startupGracePeriod time.Duration,
logger log.Logger,
) *healthChecker {
checker := &healthChecker{
healthCheckable: healthCheckable,
pollFrequency: pollFrequency,
unhealthyCallback: unhealthyCallback,
timeProvider: timeProvider,
maxAcceptableUnhealthyDuration: maximumAcceptableUnhealthyDuration,
logger: logger,
healthCheckable: healthCheckable,
pollFrequency: pollFrequency,
unhealthyCallback: unhealthyCallback,
timeProvider: timeProvider,
maxUnhealthyDuration: maxUnhealthyDuration,
logger: logger,
mutableState: &healthCheckerMutableState{},
}

// The first poll is scheduled after the startup grace period to allow the service to initialize.
Expand Down
37 changes: 26 additions & 11 deletions protocol/daemons/server/types/health_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,22 +122,29 @@ type HealthMonitor struct {
mutableState *healthMonitorMutableState

// These fields are initialized in NewHealthMonitor and are not modified after initialization.
logger log.Logger
logger log.Logger
// startupGracePeriod is the grace period before the monitor starts polling the health-checkable services.
startupGracePeriod time.Duration
pollingFrequency time.Duration
// pollingFrequency is the frequency at which the health-checkable services are polled.
pollingFrequency time.Duration
// enablePanics is used to toggle between panics or error logs when a daemon sustains an unhealthy state past the
// maximum allowable duration.
enablePanics bool
}

// NewHealthMonitor creates a new health monitor.
func NewHealthMonitor(
startupGracePeriod time.Duration,
pollingFrequency time.Duration,
logger log.Logger,
enablePanics bool,
) *HealthMonitor {
return &HealthMonitor{
mutableState: newHealthMonitorMutableState(),
logger: logger.With(cosmoslog.ModuleKey, HealthMonitorLogModuleName),
startupGracePeriod: startupGracePeriod,
pollingFrequency: pollingFrequency,
enablePanics: enablePanics,
}
}

Expand All @@ -153,15 +160,15 @@ func (hm *HealthMonitor) DisableForTesting() {
// health-checkable service before returning.
func (hm *HealthMonitor) RegisterServiceWithCallback(
hc types.HealthCheckable,
maximumAcceptableUnhealthyDuration time.Duration,
maxUnhealthyDuration time.Duration,
callback func(error),
) error {
if maximumAcceptableUnhealthyDuration <= 0 {
if maxUnhealthyDuration <= 0 {
return fmt.Errorf(
"health check registration failure for service %v: "+
"maximum acceptable unhealthy duration %v must be positive",
"maximum unhealthy duration %v must be positive",
hc.ServiceName(),
maximumAcceptableUnhealthyDuration,
maxUnhealthyDuration,
)
}

Expand All @@ -171,7 +178,7 @@ func (hm *HealthMonitor) RegisterServiceWithCallback(
hm.pollingFrequency,
callback,
&libtime.TimeProviderImpl{},
maximumAcceptableUnhealthyDuration,
maxUnhealthyDuration,
hm.startupGracePeriod,
hm.logger,
)
Expand Down Expand Up @@ -202,18 +209,26 @@ func LogErrorServiceNotResponding(hc types.HealthCheckable, logger log.Logger) f

// RegisterService registers a new health-checkable service with the health check monitor. If the service
// is unhealthy every time it is polled for a duration greater than or equal to the maximum acceptable unhealthy
// duration, the monitor will panic.
// duration, the monitor will panic or log an error, depending on the app configuration via the
// `panic-on-daemon-failure-enabled` flag.
// This method is synchronized. It returns an error if the service was already registered or the monitor has
// already been stopped. If the monitor has been stopped, this method will proactively stop the health-checkable
// service before returning.
func (hm *HealthMonitor) RegisterService(
hc types.HealthCheckable,
maximumAcceptableUnhealthyDuration time.Duration,
maxDaemonUnhealthyDuration time.Duration,
) error {
// If the monitor is configured to panic, use the panic callback. Otherwise, use the error log callback.
// This behavior is configured via flag and defaults to panicking on daemon failure.
callback := LogErrorServiceNotResponding(hc, hm.logger)
if hm.enablePanics {
callback = PanicServiceNotResponding(hc)
}

return hm.RegisterServiceWithCallback(
hc,
maximumAcceptableUnhealthyDuration,
PanicServiceNotResponding(hc),
maxDaemonUnhealthyDuration,
callback,
)
}

Expand Down
Loading

0 comments on commit 4336388

Please sign in to comment.