Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CORE-666] - Migrate daemon monitoring to use health checks #783

Merged
merged 15 commits into from
Dec 8, 2023
55 changes: 42 additions & 13 deletions protocol/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@ import (
"encoding/json"
"errors"
"io"
"math"
"math/big"
clemire marked this conversation as resolved.
Show resolved Hide resolved
"net/http"
"os"
"path/filepath"
"runtime/debug"
"sync"
"time"

autocliv1 "cosmossdk.io/api/cosmos/autocli/v1"
reflectionv1 "cosmossdk.io/api/cosmos/reflection/v1"
Expand Down Expand Up @@ -290,6 +290,8 @@ type App struct {
PriceFeedClient *pricefeedclient.Client
LiquidationsClient *liquidationclient.Client
BridgeClient *bridgeclient.Client

DaemonHealthMonitor *daemonservertypes.HealthMonitor
}

// assertAppPreconditions assert invariants required for an application to start.
Expand Down Expand Up @@ -589,22 +591,27 @@ func New(
bridgeEventManager := bridgedaemontypes.NewBridgeEventManager(timeProvider)
app.Server.WithBridgeEventManager(bridgeEventManager)

app.DaemonHealthMonitor = daemonservertypes.NewHealthMonitor(
daemonservertypes.DaemonStartupGracePeriod,
daemonservertypes.HealthCheckPollFrequency,
app.Logger(),
daemonFlags.Shared.PanicOnDaemonFailureEnabled,
)
// Create a closure for starting daemons and daemon server. Daemon services are delayed until after the gRPC
// service is started because daemons depend on the gRPC service being available. If a node is initialized
// with a genesis time in the future, then the gRPC service will not be available until the genesis time, the
// daemons will not be able to connect to the cosmos gRPC query service and finish initialization, and the daemon
// monitoring service will panic.
app.startDaemons = func() {
maxDaemonUnhealthyDuration := time.Duration(daemonFlags.Shared.MaxDaemonUnhealthySeconds) * time.Second
// Start server for handling gRPC messages from daemons.
go app.Server.Start()

// Start liquidations client for sending potentially liquidatable subaccounts to the application.
if daemonFlags.Liquidation.Enabled {
app.Server.ExpectLiquidationsDaemon(
daemonservertypes.MaximumAcceptableUpdateDelay(daemonFlags.Liquidation.LoopDelayMs),
)
app.LiquidationsClient = liquidationclient.NewClient(logger)
go func() {
app.RegisterDaemonWithHealthMonitor(app.LiquidationsClient, maxDaemonUnhealthyDuration)
if err := app.LiquidationsClient.Start(
// The client will use `context.Background` so that it can have a different context from
// the main application.
Expand All @@ -621,7 +628,6 @@ func New(
// Non-validating full-nodes have no need to run the price daemon.
if !appFlags.NonValidatingFullNode && daemonFlags.Price.Enabled {
exchangeQueryConfig := constants.StaticExchangeQueryConfig
app.Server.ExpectPricefeedDaemon(daemonservertypes.MaximumAcceptableUpdateDelay(daemonFlags.Price.LoopDelayMs))
// Start pricefeed client for sending prices for the pricefeed server to consume. These prices
// are retrieved via third-party APIs like Binance and then are encoded in-memory and
// periodically sent via gRPC to a shared socket with the server.
Expand All @@ -637,16 +643,15 @@ func New(
constants.StaticExchangeDetails,
&pricefeedclient.SubTaskRunnerImpl{},
)
app.RegisterDaemonWithHealthMonitor(app.PriceFeedClient, maxDaemonUnhealthyDuration)
}

// Start Bridge Daemon.
// Non-validating full-nodes have no need to run the bridge daemon.
if !appFlags.NonValidatingFullNode && daemonFlags.Bridge.Enabled {
// TODO(CORE-582): Re-enable bridge daemon registration once the bridge daemon is fixed in local / CI
// environments.
// app.Server.ExpectBridgeDaemon(daemonservertypes.MaximumAcceptableUpdateDelay(daemonFlags.Bridge.LoopDelayMs))
app.BridgeClient = bridgeclient.NewClient(logger)
go func() {
app.BridgeClient = bridgeclient.NewClient(logger)
app.RegisterDaemonWithHealthMonitor(app.BridgeClient, maxDaemonUnhealthyDuration)
if err := app.BridgeClient.Start(
// The client will use `context.Background` so that it can have a different context from
// the main application.
Expand All @@ -663,6 +668,9 @@ func New(
// Start the Metrics Daemon.
// The metrics daemon is purely used for observability. It should never bring the app down.
// TODO(CLOB-960) Don't start this goroutine if telemetry is disabled
// Note: the metrics daemon is such a simple go-routine that we don't bother implementing a health-check
// for this service. The task loop does not produce any errors because the telemetry calls themselves are
// not error-returning, so in effect this daemon would never become unhealthy.
go func() {
defer func() {
if r := recover(); r != nil {
Expand All @@ -675,10 +683,6 @@ func New(
)
}
}()
// Don't panic if metrics daemon loops are delayed. Use maximum value.
clemire marked this conversation as resolved.
Show resolved Hide resolved
app.Server.ExpectMetricsDaemon(
daemonservertypes.MaximumAcceptableUpdateDelay(math.MaxUint32),
)
metricsclient.Start(
// The client will use `context.Background` so that it can have a different context from
// the main application.
clemire marked this conversation as resolved.
Show resolved Hide resolved
Expand Down Expand Up @@ -1222,6 +1226,31 @@ func New(
return app
}

// RegisterDaemonWithHealthMonitor registers a daemon service with the update monitor, which will commence monitoring
// the health of the daemon. If the daemon does not register, the method will panic.
func (app *App) RegisterDaemonWithHealthMonitor(
healthCheckableDaemon daemontypes.HealthCheckable,
maxDaemonUnhealthyDuration time.Duration,
) {
if err := app.DaemonHealthMonitor.RegisterService(healthCheckableDaemon, maxDaemonUnhealthyDuration); err != nil {
app.Logger().Error(
"Failed to register daemon service with update monitor",
"error",
err,
"service",
healthCheckableDaemon.ServiceName(),
"maxDaemonUnhealthyDuration",
maxDaemonUnhealthyDuration,
)
panic(err)
}
}

// DisableHealthMonitorForTesting disables the health monitor for testing.
func (app *App) DisableHealthMonitorForTesting() {
app.DaemonHealthMonitor.DisableForTesting()
}

// hydrateMemStores hydrates the memStores used for caching state.
func (app *App) hydrateMemStores() {
// Create an `uncachedCtx` where the underlying MultiStore is the `rootMultiStore`.
clemire marked this conversation as resolved.
Show resolved Hide resolved
Expand Down
17 changes: 17 additions & 0 deletions protocol/app/app_test.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package app_test

import (
"github.com/dydxprotocol/v4-chain/protocol/mocks"
"gopkg.in/typ.v4/slices"
"reflect"
"strings"
"testing"
"time"

delaymsgmodule "github.com/dydxprotocol/v4-chain/protocol/x/delaymsg"

Expand Down Expand Up @@ -222,3 +224,18 @@ func TestModuleBasics(t *testing.T) {
actualFieldTypes := getMapFieldsAndTypes(reflect.ValueOf(basic_manager.ModuleBasics))
require.Equal(t, expectedFieldTypes, actualFieldTypes, "Module basics does not match expected")
}

func TestRegisterDaemonWithHealthMonitor_Panics(t *testing.T) {
app := testapp.DefaultTestApp(nil)
hc := &mocks.HealthCheckable{}
hc.On("ServiceName").Return("test-service")
hc.On("HealthCheck").Return(nil)

app.RegisterDaemonWithHealthMonitor(hc, 5*time.Minute)
// The second registration should fail, causing a panic.
require.PanicsWithError(
t,
"service test-service already registered",
func() { app.RegisterDaemonWithHealthMonitor(hc, 5*time.Minute) },
)
}
1 change: 0 additions & 1 deletion protocol/app/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package flags

import (
"fmt"

"github.com/cosmos/cosmos-sdk/server/config"
servertypes "github.com/cosmos/cosmos-sdk/server/types"
"github.com/spf13/cast"
Expand Down
36 changes: 32 additions & 4 deletions protocol/daemons/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ import (
// List of CLI flags for Server and Client.
const (
// Flag names
FlagUnixSocketAddress = "unix-socket-address"
FlagUnixSocketAddress = "unix-socket-address"
FlagPanicOnDaemonFailureEnabled = "panic-on-daemon-failure-enabled"
FlagMaxDaemonUnhealthySeconds = "max-daemon-unhealthy-seconds"

FlagPriceDaemonEnabled = "price-daemon-enabled"
FlagPriceDaemonLoopDelayMs = "price-daemon-loop-delay-ms"
Expand All @@ -28,6 +30,10 @@ const (
type SharedFlags struct {
// SocketAddress is the location of the unix socket to communicate with the daemon gRPC service.
SocketAddress string
// PanicOnDaemonFailureEnabled toggles whether the daemon should panic on failure.
PanicOnDaemonFailureEnabled bool
// MaxDaemonUnhealthySeconds is the maximum allowable duration for which a daemon can be unhealthy.
MaxDaemonUnhealthySeconds uint32
}

// BridgeFlags contains configuration flags for the Bridge Daemon.
Expand Down Expand Up @@ -74,7 +80,9 @@ func GetDefaultDaemonFlags() DaemonFlags {
if defaultDaemonFlags == nil {
defaultDaemonFlags = &DaemonFlags{
Shared: SharedFlags{
SocketAddress: "/tmp/daemons.sock",
SocketAddress: "/tmp/daemons.sock",
PanicOnDaemonFailureEnabled: true,
MaxDaemonUnhealthySeconds: 5 * 60, // 5 minutes.
},
Bridge: BridgeFlags{
Enabled: true,
Expand Down Expand Up @@ -109,8 +117,18 @@ func AddDaemonFlagsToCmd(
cmd.Flags().String(
FlagUnixSocketAddress,
df.Shared.SocketAddress,
"Socket address for the price daemon to send updates to, if not set "+
"will establish default location to ingest price updates from",
"Socket address for the daemons to send updates to, if not set "+
"will establish default location to ingest daemon updates from",
)
cmd.Flags().Bool(
FlagPanicOnDaemonFailureEnabled,
df.Shared.PanicOnDaemonFailureEnabled,
"Enables panicking when a daemon fails.",
)
cmd.Flags().Uint32(
FlagMaxDaemonUnhealthySeconds,
df.Shared.MaxDaemonUnhealthySeconds,
"Maximum allowable duration for which a daemon can be unhealthy.",
)

// Bridge Daemon.
Expand Down Expand Up @@ -178,6 +196,16 @@ func GetDaemonFlagValuesFromOptions(
result.Shared.SocketAddress = v
}
}
if option := appOpts.Get(FlagPanicOnDaemonFailureEnabled); option != nil {
if v, err := cast.ToBoolE(option); err == nil {
result.Shared.PanicOnDaemonFailureEnabled = v
}
}
if option := appOpts.Get(FlagMaxDaemonUnhealthySeconds); option != nil {
if v, err := cast.ToUint32E(option); err == nil {
result.Shared.MaxDaemonUnhealthySeconds = v
}
}

// Bridge Daemon.
if option := appOpts.Get(FlagBridgeDaemonEnabled); option != nil {
Expand Down
12 changes: 11 additions & 1 deletion protocol/daemons/flags/flags_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ func TestAddDaemonFlagsToCmd(t *testing.T) {
flags.AddDaemonFlagsToCmd(&cmd)
tests := []string{
flags.FlagUnixSocketAddress,
flags.FlagPanicOnDaemonFailureEnabled,
flags.FlagMaxDaemonUnhealthySeconds,

flags.FlagBridgeDaemonEnabled,
flags.FlagBridgeDaemonLoopDelayMs,
Expand All @@ -41,6 +43,8 @@ func TestGetDaemonFlagValuesFromOptions_Custom(t *testing.T) {
optsMap := make(map[string]interface{})

optsMap[flags.FlagUnixSocketAddress] = "test-socket-address"
optsMap[flags.FlagPanicOnDaemonFailureEnabled] = false
optsMap[flags.FlagMaxDaemonUnhealthySeconds] = uint32(1234)

optsMap[flags.FlagBridgeDaemonEnabled] = true
optsMap[flags.FlagBridgeDaemonLoopDelayMs] = uint32(1111)
Expand All @@ -64,6 +68,12 @@ func TestGetDaemonFlagValuesFromOptions_Custom(t *testing.T) {

// Shared.
require.Equal(t, optsMap[flags.FlagUnixSocketAddress], r.Shared.SocketAddress)
require.Equal(t, optsMap[flags.FlagPanicOnDaemonFailureEnabled], r.Shared.PanicOnDaemonFailureEnabled)
require.Equal(
t,
optsMap[flags.FlagMaxDaemonUnhealthySeconds],
r.Shared.MaxDaemonUnhealthySeconds,
)

// Bridge Daemon.
require.Equal(t, optsMap[flags.FlagBridgeDaemonEnabled], r.Bridge.Enabled)
Expand All @@ -81,7 +91,7 @@ func TestGetDaemonFlagValuesFromOptions_Custom(t *testing.T) {
require.Equal(t, optsMap[flags.FlagPriceDaemonLoopDelayMs], r.Price.LoopDelayMs)
}

func TestGetDaemonFlagValuesFromOptions_Defaul(t *testing.T) {
func TestGetDaemonFlagValuesFromOptions_Default(t *testing.T) {
mockOpts := mocks.AppOptions{}
mockOpts.On("Get", mock.Anything).
Return(func(key string) interface{} {
Expand Down
3 changes: 3 additions & 0 deletions protocol/daemons/metrics/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ var (

// Start begins a job that periodically:
// 1) Emits metrics about app version and git commit.
// Note: the metrics daemon is such a simple go-routine that we don't bother implementing a health-check
// for this service. The task loop does not produce any errors because the telemetry calls themselves are
// not error-returning, so in effect this daemon would never become unhealthy.
func Start(
ctx context.Context,
logger log.Logger,
Expand Down
15 changes: 14 additions & 1 deletion protocol/daemons/pricefeed/client/client_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ type PriceDaemonIntegrationTestSuite struct {
exchangeServer *pricefeed.ExchangeServer
daemonServer *daemonserver.Server
exchangePriceCache *pricefeedserver_types.MarketToExchangePrices
healthMonitor *servertypes.HealthMonitor

pricesMockQueryServer *mocks.QueryServer
pricesGrpcServer *grpc.Server
Expand Down Expand Up @@ -278,7 +279,14 @@ func (s *PriceDaemonIntegrationTestSuite) SetupTest() {
&daemontypes.FileHandlerImpl{},
s.daemonFlags.Shared.SocketAddress,
)
s.daemonServer.ExpectPricefeedDaemon(servertypes.MaximumAcceptableUpdateDelay(s.daemonFlags.Price.LoopDelayMs))

s.healthMonitor = servertypes.NewHealthMonitor(
servertypes.DaemonStartupGracePeriod,
servertypes.HealthCheckPollFrequency,
log.TestingLogger(),
flags.GetDefaultDaemonFlags().Shared.PanicOnDaemonFailureEnabled, // Use default behavior for testing
)

s.exchangePriceCache = pricefeedserver_types.NewMarketToExchangePrices(pricefeed_types.MaxPriceAge)
s.daemonServer.WithPriceFeedMarketToExchangePrices(s.exchangePriceCache)

Expand Down Expand Up @@ -329,6 +337,11 @@ func (s *PriceDaemonIntegrationTestSuite) startClient() {
testExchangeToQueryDetails,
&client.SubTaskRunnerImpl{},
)
err := s.healthMonitor.RegisterService(
s.pricefeedDaemon,
time.Duration(s.daemonFlags.Shared.MaxDaemonUnhealthySeconds)*time.Second,
)
s.Require().NoError(err)
}

// expectPricesWithTimeout waits for the exchange price cache to contain the expected prices, with a timeout.
Expand Down
28 changes: 8 additions & 20 deletions protocol/daemons/server/bridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,8 @@ package server

import (
"context"
"github.com/dydxprotocol/v4-chain/protocol/daemons/server/types"
"time"

"github.com/dydxprotocol/v4-chain/protocol/daemons/bridge/api"
"github.com/dydxprotocol/v4-chain/protocol/daemons/server/types"
bdtypes "github.com/dydxprotocol/v4-chain/protocol/daemons/server/types/bridge"
)

Expand All @@ -23,31 +21,21 @@ func (server *Server) WithBridgeEventManager(
return server
}

// ExpectBridgeDaemon registers the bridge daemon with the server. This is required
// in order to ensure that the daemon service is called at least once during every
// maximumAcceptableUpdateDelay duration. It will cause the protocol to panic if the daemon does not
// respond within maximumAcceptableUpdateDelay duration.
func (server *Server) ExpectBridgeDaemon(maximumAcceptableUpdateDelay time.Duration) {
server.registerDaemon(types.BridgeDaemonServiceName, maximumAcceptableUpdateDelay)
}

// AddBridgeEvents stores any bridge events recognized by the daemon
// in a go-routine safe slice.
func (s *Server) AddBridgeEvents(
ctx context.Context,
req *api.AddBridgeEventsRequest,
) (
*api.AddBridgeEventsResponse,
error,
response *api.AddBridgeEventsResponse,
err error,
) {
// If the daemon is unable to report a response, there is either an error in the registration of
// this daemon, or another one. In either case, the protocol should panic.
// TODO(CORE-582): Re-enable this check once the bridge daemon is fixed in local / CI environments.
//if err := s.reportResponse(types.BridgeDaemonServiceName); err != nil {
// panic(err)
//}
if err := s.bridgeEventManager.AddBridgeEvents(req.BridgeEvents); err != nil {
if err = s.bridgeEventManager.AddBridgeEvents(req.BridgeEvents); err != nil {
return nil, err
}

// Capture valid responses in metrics.
s.reportValidResponse(types.BridgeDaemonServiceName)

return &api.AddBridgeEventsResponse{}, nil
}
Loading
Loading