Skip to content

Commit

Permalink
PR comments.
Browse files Browse the repository at this point in the history
  • Loading branch information
Crystal Lemire committed Nov 30, 2023
1 parent 7cc4020 commit 6bedc57
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 39 deletions.
24 changes: 8 additions & 16 deletions protocol/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -603,17 +603,15 @@ func New(
// daemons will not be able to connect to the cosmos gRPC query service and finish initialization, and the daemon
// monitoring service will panic.
app.startDaemons = func() {
maxDaemonUnhealthyDuration := time.Duration(daemonFlags.Shared.MaxDaemonUnhealthySeconds) * time.Second
// Start server for handling gRPC messages from daemons.
go app.Server.Start()

// Start liquidations client for sending potentially liquidatable subaccounts to the application.
if daemonFlags.Liquidation.Enabled {
app.LiquidationsClient = liquidationclient.NewClient(logger)
go func() {
app.MonitorDaemon(
app.LiquidationsClient,
time.Duration(daemonFlags.Shared.MaxDaemonUnhealthySeconds)*time.Second,
)
app.MonitorDaemon(app.LiquidationsClient, maxDaemonUnhealthyDuration)
if err := app.LiquidationsClient.Start(
// The client will use `context.Background` so that it can have a different context from
// the main application.
Expand Down Expand Up @@ -645,21 +643,15 @@ func New(
constants.StaticExchangeDetails,
&pricefeedclient.SubTaskRunnerImpl{},
)
app.MonitorDaemon(
app.PriceFeedClient,
time.Duration(daemonFlags.Shared.MaxDaemonUnhealthySeconds)*time.Second,
)
app.MonitorDaemon(app.PriceFeedClient, maxDaemonUnhealthyDuration)
}

// Start Bridge Daemon.
// Non-validating full-nodes have no need to run the bridge daemon.
if !appFlags.NonValidatingFullNode && daemonFlags.Bridge.Enabled {
app.BridgeClient = bridgeclient.NewClient(logger)
go func() {
app.MonitorDaemon(
app.BridgeClient,
time.Duration(daemonFlags.Shared.MaxDaemonUnhealthySeconds)*time.Second,
)
app.MonitorDaemon(app.BridgeClient, maxDaemonUnhealthyDuration)
if err := app.BridgeClient.Start(
// The client will use `context.Background` so that it can have a different context from
// the main application.
Expand Down Expand Up @@ -1238,17 +1230,17 @@ func New(
// panic.
func (app *App) MonitorDaemon(
healthCheckableDaemon daemontypes.HealthCheckable,
maximumAcceptableUpdateDelay time.Duration,
maxDaemonUnhealthyDuration time.Duration,
) {
if err := app.DaemonHealthMonitor.RegisterService(healthCheckableDaemon, maximumAcceptableUpdateDelay); err != nil {
if err := app.DaemonHealthMonitor.RegisterService(healthCheckableDaemon, maxDaemonUnhealthyDuration); err != nil {
app.Logger().Error(
"Failed to register daemon service with update monitor",
"error",
err,
"service",
healthCheckableDaemon.ServiceName(),
"maximumAcceptableUpdateDelay",
maximumAcceptableUpdateDelay,
"maxDaemonUnhealthyDuration",
maxDaemonUnhealthyDuration,
)
panic(err)
}
Expand Down
2 changes: 1 addition & 1 deletion protocol/daemons/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func GetDefaultDaemonFlags() DaemonFlags {
Shared: SharedFlags{
SocketAddress: "/tmp/daemons.sock",
PanicOnDaemonFailureEnabled: true,
MaxDaemonUnhealthySeconds: 5 * 60,
MaxDaemonUnhealthySeconds: 5 * 60, // 5 minutes.
},
Bridge: BridgeFlags{
Enabled: true,
Expand Down
24 changes: 12 additions & 12 deletions protocol/daemons/server/types/health_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ type healthCheckerMutableState struct {

// mostRecentFailureStreakError tracks the timestamp of the first error in the most recent streak of errors, as well
// as the most recent error. It is updated on every error and reset every time the service sees a healthy response.
// This field is used to determine how long the daemon has been unhealthy. If this timestamp is nil, then either
// This field is used to determine how long the service has been unhealthy. If this timestamp is nil, then either
// the service has never been unhealthy, or the most recent error streak ended before it could trigger a callback.
// Access to mostRecentFailureStreakError is synchronized.
mostRecentFailureStreakError timestampWithError
Expand Down Expand Up @@ -97,10 +97,10 @@ type healthChecker struct {
// timer triggers a health check poll for a health-checkable service.
timer *time.Timer

// maxAcceptableUnhealthyDuration is the maximum acceptable duration for a health-checkable service to
// maxUnhealthyDuration is the maximum acceptable duration for a health-checkable service to
// remain unhealthy. If the service remains unhealthy for this duration, the monitor will execute the
// specified callback function.
maxAcceptableUnhealthyDuration time.Duration
maxUnhealthyDuration time.Duration

// unhealthyCallback is the callback function to be executed if the health-checkable service remains
// unhealthy for a period of time greater than or equal to the maximum acceptable unhealthy duration.
Expand Down Expand Up @@ -128,7 +128,7 @@ func (hc *healthChecker) Poll() {
streakDuration := hc.mutableState.ReportFailure(now, err)
// If the service has been unhealthy for longer than the maximum acceptable unhealthy duration, execute the
// callback function.
if streakDuration >= hc.maxAcceptableUnhealthyDuration {
if streakDuration >= hc.maxUnhealthyDuration {
hc.unhealthyCallback(err)
}
}
Expand All @@ -148,18 +148,18 @@ func StartNewHealthChecker(
pollFrequency time.Duration,
unhealthyCallback func(error),
timeProvider libtime.TimeProvider,
maximumAcceptableUnhealthyDuration time.Duration,
maxUnhealthyDuration time.Duration,
startupGracePeriod time.Duration,
logger log.Logger,
) *healthChecker {
checker := &healthChecker{
healthCheckable: healthCheckable,
pollFrequency: pollFrequency,
unhealthyCallback: unhealthyCallback,
timeProvider: timeProvider,
maxAcceptableUnhealthyDuration: maximumAcceptableUnhealthyDuration,
logger: logger,
mutableState: &healthCheckerMutableState{},
healthCheckable: healthCheckable,
pollFrequency: pollFrequency,
unhealthyCallback: unhealthyCallback,
timeProvider: timeProvider,
maxUnhealthyDuration: maxUnhealthyDuration,
logger: logger,
mutableState: &healthCheckerMutableState{},
}

// The first poll is scheduled after the startup grace period to allow the service to initialize.
Expand Down
18 changes: 9 additions & 9 deletions protocol/daemons/server/types/health_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,9 +120,9 @@ type HealthMonitor struct {

// These fields are initialized in NewHealthMonitor and are not modified after initialization.
logger log.Logger
// startupGracePeriod is the grace period before the monitor starts polling the health checkable services.
// startupGracePeriod is the grace period before the monitor starts polling the health-checkable services.
startupGracePeriod time.Duration
// pollingFrequency is the frequency at which the health checkable services are polled.
// pollingFrequency is the frequency at which the health-checkable services are polled.
pollingFrequency time.Duration
// enablePanics is used to toggle between panics or error logs when a daemon sustains an unhealthy state past the
// maximum allowable duration.
Expand Down Expand Up @@ -157,15 +157,15 @@ func (hm *HealthMonitor) DisableForTesting() {
// health-checkable service before returning.
func (hm *HealthMonitor) RegisterServiceWithCallback(
hc types.HealthCheckable,
maximumAcceptableUnhealthyDuration time.Duration,
maxUnhealthyDuration time.Duration,
callback func(error),
) error {
if maximumAcceptableUnhealthyDuration <= 0 {
if maxUnhealthyDuration <= 0 {
return fmt.Errorf(
"health check registration failure for service %v: "+
"maximum acceptable unhealthy duration %v must be positive",
"maximum unhealthy duration %v must be positive",
hc.ServiceName(),
maximumAcceptableUnhealthyDuration,
maxUnhealthyDuration,
)
}

Expand All @@ -175,7 +175,7 @@ func (hm *HealthMonitor) RegisterServiceWithCallback(
hm.pollingFrequency,
callback,
&libtime.TimeProviderImpl{},
maximumAcceptableUnhealthyDuration,
maxUnhealthyDuration,
hm.startupGracePeriod,
hm.logger,
)
Expand Down Expand Up @@ -213,7 +213,7 @@ func LogErrorServiceNotResponding(hc types.HealthCheckable, logger log.Logger) f
// service before returning.
func (hm *HealthMonitor) RegisterService(
hc types.HealthCheckable,
maximumAcceptableUnhealthyDuration time.Duration,
maxDaemonUnhealthyDuration time.Duration,
) error {
// If the monitor is configured to panic, use the panic callback. Otherwise, use the error log callback.
// This behavior is configured via flag and defaults to panicking on daemon failure.
Expand All @@ -224,7 +224,7 @@ func (hm *HealthMonitor) RegisterService(

return hm.RegisterServiceWithCallback(
hc,
maximumAcceptableUnhealthyDuration,
maxDaemonUnhealthyDuration,
callback,
)
}
Expand Down
2 changes: 1 addition & 1 deletion protocol/daemons/server/types/health_monitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ func TestRegisterValidResponseWithCallback_NegativeUnhealthyDuration(t *testing.
ufm, _ := createTestMonitor()
hc := mockFailingHealthCheckerWithError("test-service", TestError1)
err := ufm.RegisterServiceWithCallback(hc, -50*time.Millisecond, func(error) {})
require.ErrorContains(t, err, "maximum acceptable unhealthy duration -50ms must be positive")
require.ErrorContains(t, err, "maximum unhealthy duration -50ms must be positive")
}

func TestPanicServiceNotResponding(t *testing.T) {
Expand Down

0 comments on commit 6bedc57

Please sign in to comment.