Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CORE-668,CORE-669] - Add health monitor flags #802

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 9 additions & 13 deletions protocol/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,10 +183,6 @@ import (
var (
// DefaultNodeHome default home directories for the application daemon
DefaultNodeHome string

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This constant is now defined within daemon flags, not needed here.

// MaximumDaemonUnhealthyDuration is the maximum amount of time that a daemon can be unhealthy before the
// application panics.
MaximumDaemonUnhealthyDuration = 5 * time.Minute
)

var (
Expand Down Expand Up @@ -599,21 +595,23 @@ func New(
daemonservertypes.DaemonStartupGracePeriod,
daemonservertypes.HealthCheckPollFrequency,
app.Logger(),
daemonFlags.Shared.PanicOnDaemonFailureEnabled,
)
// Create a closure for starting daemons and daemon server. Daemon services are delayed until after the gRPC
// service is started because daemons depend on the gRPC service being available. If a node is initialized
// with a genesis time in the future, then the gRPC service will not be available until the genesis time, the
// daemons will not be able to connect to the cosmos gRPC query service and finish initialization, and the daemon
// monitoring service will panic.
app.startDaemons = func() {
maxDaemonUnhealthyDuration := time.Duration(daemonFlags.Shared.MaxDaemonUnhealthySeconds) * time.Second
// Start server for handling gRPC messages from daemons.
go app.Server.Start()

// Start liquidations client for sending potentially liquidatable subaccounts to the application.
if daemonFlags.Liquidation.Enabled {
app.LiquidationsClient = liquidationclient.NewClient(logger)
go func() {
app.RegisterDaemonWithHealthMonitor(app.LiquidationsClient, MaximumDaemonUnhealthyDuration)
app.RegisterDaemonWithHealthMonitor(app.LiquidationsClient, maxDaemonUnhealthyDuration)
if err := app.LiquidationsClient.Start(
// The client will use `context.Background` so that it can have a different context from
// the main application.
Expand Down Expand Up @@ -645,17 +643,15 @@ func New(
constants.StaticExchangeDetails,
&pricefeedclient.SubTaskRunnerImpl{},
)
app.RegisterDaemonWithHealthMonitor(app.PriceFeedClient, MaximumDaemonUnhealthyDuration)
app.RegisterDaemonWithHealthMonitor(app.PriceFeedClient, maxDaemonUnhealthyDuration)
}

// Start Bridge Daemon.
// Non-validating full-nodes have no need to run the bridge daemon.
if !appFlags.NonValidatingFullNode && daemonFlags.Bridge.Enabled {
// TODO(CORE-582): Re-enable bridge daemon registration once the bridge daemon is fixed in local / CI
// environments.
app.BridgeClient = bridgeclient.NewClient(logger)
go func() {
app.RegisterDaemonWithHealthMonitor(app.BridgeClient, MaximumDaemonUnhealthyDuration)
app.RegisterDaemonWithHealthMonitor(app.BridgeClient, maxDaemonUnhealthyDuration)
if err := app.BridgeClient.Start(
// The client will use `context.Background` so that it can have a different context from
// the main application.
Expand Down Expand Up @@ -1234,17 +1230,17 @@ func New(
// the health of the daemon. If the daemon does not register, the method will panic.
func (app *App) RegisterDaemonWithHealthMonitor(
healthCheckableDaemon daemontypes.HealthCheckable,
maximumAcceptableUpdateDelay time.Duration,
maxDaemonUnhealthyDuration time.Duration,
) {
if err := app.DaemonHealthMonitor.RegisterService(healthCheckableDaemon, maximumAcceptableUpdateDelay); err != nil {
if err := app.DaemonHealthMonitor.RegisterService(healthCheckableDaemon, maxDaemonUnhealthyDuration); err != nil {
app.Logger().Error(
"Failed to register daemon service with update monitor",
"error",
err,
"service",
healthCheckableDaemon.ServiceName(),
"maximumAcceptableUpdateDelay",
maximumAcceptableUpdateDelay,
"maxDaemonUnhealthyDuration",
maxDaemonUnhealthyDuration,
)
panic(err)
}
Expand Down
1 change: 0 additions & 1 deletion protocol/app/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package flags

import (
"fmt"

"github.com/cosmos/cosmos-sdk/server/config"
servertypes "github.com/cosmos/cosmos-sdk/server/types"
"github.com/spf13/cast"
Expand Down
36 changes: 32 additions & 4 deletions protocol/daemons/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ import (
// List of CLI flags for Server and Client.
const (
// Flag names
FlagUnixSocketAddress = "unix-socket-address"
FlagUnixSocketAddress = "unix-socket-address"
FlagPanicOnDaemonFailureEnabled = "panic-on-daemon-failure-enabled"
FlagMaxDaemonUnhealthySeconds = "max-daemon-unhealthy-seconds"

FlagPriceDaemonEnabled = "price-daemon-enabled"
FlagPriceDaemonLoopDelayMs = "price-daemon-loop-delay-ms"
Expand All @@ -28,6 +30,10 @@ const (
type SharedFlags struct {
// SocketAddress is the location of the unix socket to communicate with the daemon gRPC service.
SocketAddress string
// PanicOnDaemonFailureEnabled toggles whether the daemon should panic on failure.
PanicOnDaemonFailureEnabled bool
// MaxDaemonUnhealthySeconds is the maximum allowable duration for which a daemon can be unhealthy.
MaxDaemonUnhealthySeconds uint32
}

// BridgeFlags contains configuration flags for the Bridge Daemon.
Expand Down Expand Up @@ -74,7 +80,9 @@ func GetDefaultDaemonFlags() DaemonFlags {
if defaultDaemonFlags == nil {
defaultDaemonFlags = &DaemonFlags{
Shared: SharedFlags{
SocketAddress: "/tmp/daemons.sock",
SocketAddress: "/tmp/daemons.sock",
PanicOnDaemonFailureEnabled: true,
MaxDaemonUnhealthySeconds: 5 * 60, // 5 minutes.
},
Bridge: BridgeFlags{
Enabled: true,
Expand Down Expand Up @@ -109,8 +117,18 @@ func AddDaemonFlagsToCmd(
cmd.Flags().String(
FlagUnixSocketAddress,
df.Shared.SocketAddress,
"Socket address for the price daemon to send updates to, if not set "+
"will establish default location to ingest price updates from",
"Socket address for the daemons to send updates to, if not set "+
"will establish default location to ingest daemon updates from",
)
cmd.Flags().Bool(
clemire marked this conversation as resolved.
Show resolved Hide resolved
FlagPanicOnDaemonFailureEnabled,
df.Shared.PanicOnDaemonFailureEnabled,
"Enables panicking when a daemon fails.",
)
cmd.Flags().Uint32(
FlagMaxDaemonUnhealthySeconds,
df.Shared.MaxDaemonUnhealthySeconds,
"Maximum allowable duration for which a daemon can be unhealthy.",
)

// Bridge Daemon.
Expand Down Expand Up @@ -178,6 +196,16 @@ func GetDaemonFlagValuesFromOptions(
result.Shared.SocketAddress = v
}
}
if option := appOpts.Get(FlagPanicOnDaemonFailureEnabled); option != nil {
if v, err := cast.ToBoolE(option); err == nil {
result.Shared.PanicOnDaemonFailureEnabled = v
}
}
if option := appOpts.Get(FlagMaxDaemonUnhealthySeconds); option != nil {
if v, err := cast.ToUint32E(option); err == nil {
result.Shared.MaxDaemonUnhealthySeconds = v
}
}

// Bridge Daemon.
if option := appOpts.Get(FlagBridgeDaemonEnabled); option != nil {
Expand Down
12 changes: 11 additions & 1 deletion protocol/daemons/flags/flags_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ func TestAddDaemonFlagsToCmd(t *testing.T) {
flags.AddDaemonFlagsToCmd(&cmd)
tests := []string{
flags.FlagUnixSocketAddress,
flags.FlagPanicOnDaemonFailureEnabled,
flags.FlagMaxDaemonUnhealthySeconds,

flags.FlagBridgeDaemonEnabled,
flags.FlagBridgeDaemonLoopDelayMs,
Expand All @@ -41,6 +43,8 @@ func TestGetDaemonFlagValuesFromOptions_Custom(t *testing.T) {
optsMap := make(map[string]interface{})

optsMap[flags.FlagUnixSocketAddress] = "test-socket-address"
optsMap[flags.FlagPanicOnDaemonFailureEnabled] = false
optsMap[flags.FlagMaxDaemonUnhealthySeconds] = uint32(1234)

optsMap[flags.FlagBridgeDaemonEnabled] = true
optsMap[flags.FlagBridgeDaemonLoopDelayMs] = uint32(1111)
Expand All @@ -64,6 +68,12 @@ func TestGetDaemonFlagValuesFromOptions_Custom(t *testing.T) {

// Shared.
require.Equal(t, optsMap[flags.FlagUnixSocketAddress], r.Shared.SocketAddress)
require.Equal(t, optsMap[flags.FlagPanicOnDaemonFailureEnabled], r.Shared.PanicOnDaemonFailureEnabled)
require.Equal(
t,
optsMap[flags.FlagMaxDaemonUnhealthySeconds],
r.Shared.MaxDaemonUnhealthySeconds,
)

// Bridge Daemon.
require.Equal(t, optsMap[flags.FlagBridgeDaemonEnabled], r.Bridge.Enabled)
Expand All @@ -81,7 +91,7 @@ func TestGetDaemonFlagValuesFromOptions_Custom(t *testing.T) {
require.Equal(t, optsMap[flags.FlagPriceDaemonLoopDelayMs], r.Price.LoopDelayMs)
}

func TestGetDaemonFlagValuesFromOptions_Defaul(t *testing.T) {
func TestGetDaemonFlagValuesFromOptions_Default(t *testing.T) {
mockOpts := mocks.AppOptions{}
mockOpts.On("Get", mock.Anything).
Return(func(key string) interface{} {
Expand Down
7 changes: 5 additions & 2 deletions protocol/daemons/pricefeed/client/client_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package client_test
import (
"fmt"
"github.com/cometbft/cometbft/libs/log"
"github.com/dydxprotocol/v4-chain/protocol/app"
appflags "github.com/dydxprotocol/v4-chain/protocol/app/flags"
"github.com/dydxprotocol/v4-chain/protocol/daemons/flags"
"github.com/dydxprotocol/v4-chain/protocol/daemons/pricefeed/client"
Expand Down Expand Up @@ -285,6 +284,7 @@ func (s *PriceDaemonIntegrationTestSuite) SetupTest() {
servertypes.DaemonStartupGracePeriod,
servertypes.HealthCheckPollFrequency,
log.TestingLogger(),
flags.GetDefaultDaemonFlags().Shared.PanicOnDaemonFailureEnabled, // Use default behavior for testing
)

s.exchangePriceCache = pricefeedserver_types.NewMarketToExchangePrices(pricefeed_types.MaxPriceAge)
Expand Down Expand Up @@ -337,7 +337,10 @@ func (s *PriceDaemonIntegrationTestSuite) startClient() {
testExchangeToQueryDetails,
&client.SubTaskRunnerImpl{},
)
err := s.healthMonitor.RegisterService(s.pricefeedDaemon, app.MaximumDaemonUnhealthyDuration)
err := s.healthMonitor.RegisterService(
s.pricefeedDaemon,
time.Duration(s.daemonFlags.Shared.MaxDaemonUnhealthySeconds)*time.Second,
)
s.Require().NoError(err)
}

Expand Down
21 changes: 11 additions & 10 deletions protocol/daemons/server/types/health_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,10 +145,10 @@ type healthChecker struct {
// pollFrequency is the frequency at which the health-checkable service is polled.
pollFrequency time.Duration

// maxAcceptableUnhealthyDuration is the maximum acceptable duration for a health-checkable service to
// maxUnhealthyDuration is the maximum acceptable duration for a health-checkable service to
// remain unhealthy. If the service remains unhealthy for this duration, the monitor will execute the
// specified callback function.
maxAcceptableUnhealthyDuration time.Duration
maxUnhealthyDuration time.Duration

// unhealthyCallback is the callback function to be executed if the health-checkable service remains
// unhealthy for a period of time greater than or equal to the maximum acceptable unhealthy duration.
Expand All @@ -174,7 +174,7 @@ func (hc *healthChecker) Poll() {
streakDuration := hc.mutableState.ReportFailure(now, err)
// If the service has been unhealthy for longer than the maximum acceptable unhealthy duration, execute the
// callback function.
if streakDuration >= hc.maxAcceptableUnhealthyDuration {
if streakDuration >= hc.maxUnhealthyDuration {
hc.unhealthyCallback(err)
}
}
Expand All @@ -197,17 +197,18 @@ func StartNewHealthChecker(
pollFrequency time.Duration,
unhealthyCallback func(error),
timeProvider libtime.TimeProvider,
maximumAcceptableUnhealthyDuration time.Duration,
maxUnhealthyDuration time.Duration,
startupGracePeriod time.Duration,
logger log.Logger,
) *healthChecker {
checker := &healthChecker{
healthCheckable: healthCheckable,
pollFrequency: pollFrequency,
unhealthyCallback: unhealthyCallback,
timeProvider: timeProvider,
maxAcceptableUnhealthyDuration: maximumAcceptableUnhealthyDuration,
logger: logger,
healthCheckable: healthCheckable,
pollFrequency: pollFrequency,
unhealthyCallback: unhealthyCallback,
timeProvider: timeProvider,
maxUnhealthyDuration: maxUnhealthyDuration,
logger: logger,
mutableState: &healthCheckerMutableState{},
}

// The first poll is scheduled after the startup grace period to allow the service to initialize.
Expand Down
37 changes: 26 additions & 11 deletions protocol/daemons/server/types/health_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,22 +122,29 @@ type HealthMonitor struct {
mutableState *healthMonitorMutableState

// These fields are initialized in NewHealthMonitor and are not modified after initialization.
logger log.Logger
logger log.Logger
// startupGracePeriod is the grace period before the monitor starts polling the health-checkable services.
startupGracePeriod time.Duration
pollingFrequency time.Duration
// pollingFrequency is the frequency at which the health-checkable services are polled.
pollingFrequency time.Duration
// enablePanics is used to toggle between panics or error logs when a daemon sustains an unhealthy state past the
// maximum allowable duration.
enablePanics bool
}

// NewHealthMonitor creates a new health monitor.
func NewHealthMonitor(
startupGracePeriod time.Duration,
pollingFrequency time.Duration,
logger log.Logger,
enablePanics bool,
) *HealthMonitor {
return &HealthMonitor{
mutableState: newHealthMonitorMutableState(),
logger: logger.With(cosmoslog.ModuleKey, HealthMonitorLogModuleName),
startupGracePeriod: startupGracePeriod,
pollingFrequency: pollingFrequency,
enablePanics: enablePanics,
}
}

Expand All @@ -153,15 +160,15 @@ func (hm *HealthMonitor) DisableForTesting() {
// health-checkable service before returning.
func (hm *HealthMonitor) RegisterServiceWithCallback(
hc types.HealthCheckable,
maximumAcceptableUnhealthyDuration time.Duration,
maxUnhealthyDuration time.Duration,
callback func(error),
) error {
if maximumAcceptableUnhealthyDuration <= 0 {
if maxUnhealthyDuration <= 0 {
return fmt.Errorf(
"health check registration failure for service %v: "+
"maximum acceptable unhealthy duration %v must be positive",
"maximum unhealthy duration %v must be positive",
hc.ServiceName(),
maximumAcceptableUnhealthyDuration,
maxUnhealthyDuration,
)
}

Expand All @@ -171,7 +178,7 @@ func (hm *HealthMonitor) RegisterServiceWithCallback(
hm.pollingFrequency,
callback,
&libtime.TimeProviderImpl{},
maximumAcceptableUnhealthyDuration,
maxUnhealthyDuration,
hm.startupGracePeriod,
hm.logger,
)
Expand Down Expand Up @@ -202,18 +209,26 @@ func LogErrorServiceNotResponding(hc types.HealthCheckable, logger log.Logger) f

// RegisterService registers a new health-checkable service with the health check monitor. If the service
// is unhealthy every time it is polled for a duration greater than or equal to the maximum acceptable unhealthy
// duration, the monitor will panic.
// duration, the monitor will panic or log an error, depending on the app configuration via the
// `panic-on-daemon-failure-enabled` flag.
// This method is synchronized. It returns an error if the service was already registered or the monitor has
// already been stopped. If the monitor has been stopped, this method will proactively stop the health-checkable
// service before returning.
func (hm *HealthMonitor) RegisterService(
hc types.HealthCheckable,
maximumAcceptableUnhealthyDuration time.Duration,
maxDaemonUnhealthyDuration time.Duration,
) error {
// If the monitor is configured to panic, use the panic callback. Otherwise, use the error log callback.
// This behavior is configured via flag and defaults to panicking on daemon failure.
callback := LogErrorServiceNotResponding(hc, hm.logger)
if hm.enablePanics {
callback = PanicServiceNotResponding(hc)
}

return hm.RegisterServiceWithCallback(
hc,
maximumAcceptableUnhealthyDuration,
PanicServiceNotResponding(hc),
maxDaemonUnhealthyDuration,
callback,
)
}

Expand Down
Loading
Loading