diff --git a/changelog/fragments/1696249276-Start-stop-monitoring-server-based-on-monitoring-config.yaml b/changelog/fragments/1696249276-Start-stop-monitoring-server-based-on-monitoring-config.yaml new file mode 100644 index 00000000000..141c18715bf --- /dev/null +++ b/changelog/fragments/1696249276-Start-stop-monitoring-server-based-on-monitoring-config.yaml @@ -0,0 +1,31 @@ +# Kind can be one of: +# - breaking-change: a change to previously-documented behavior +# - deprecation: functionality that is being removed in a later release +# - bug-fix: fixes a problem in a previous version +# - enhancement: extends functionality but does not break or fix existing behavior +# - feature: new functionality +# - known-issue: problems that we are aware of in a given version +# - security: impacts on the security of a product or a user’s deployment. +# - upgrade: important information for someone upgrading from a prior version +# - other: does not fit into any of the other categories +kind: enhancement + +# Change summary; a 80ish characters long description of the change. +summary: Start/stop monitoring server based on monitoring config + +# Long description; in case the summary is not enough to describe the change +# this field accommodate a description without length limits. +#description: + +# Affected component; a word indicating the component this changeset affects. +component: elastic-agent + +# PR number; optional; the PR number that added the changeset. +# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added. +# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number. +# Please provide it if you are adding a fragment for a different PR. +pr: 3492 + +# Issue number; optional; the GitHub issue related to this changeset (either closes or is part of). +# If not present is automatically filled by the tooling with the issue linked to the PR number. +issue: 2735 diff --git a/internal/pkg/agent/application/coordinator/coordinator.go b/internal/pkg/agent/application/coordinator/coordinator.go index 0da3546ffa8..778232b91d8 100644 --- a/internal/pkg/agent/application/coordinator/coordinator.go +++ b/internal/pkg/agent/application/coordinator/coordinator.go @@ -160,6 +160,10 @@ type ComponentsModifier func(comps []component.Component, cfg map[string]interfa // CoordinatorShutdownTimeout is how long the coordinator will wait during shutdown to receive a "clean" shutdown from other components var CoordinatorShutdownTimeout = time.Second * 5 +type configReloader interface { + Reload(*config.Config) error +} + // Coordinator manages the entire state of the Elastic Agent. // // All configuration changes, update variables, and upgrade actions are managed and controlled by the coordinator. @@ -175,6 +179,8 @@ type Coordinator struct { upgradeMgr UpgradeManager monitorMgr MonitorManager + monitoringServerReloader configReloader + runtimeMgr RuntimeManager configMgr ConfigManager varsMgr VarsManager @@ -376,6 +382,10 @@ func (c *Coordinator) State() State { return c.stateBroadcaster.Get() } +func (c *Coordinator) RegisterMonitoringServer(s configReloader) { + c.monitoringServerReloader = s +} + // StateSubscribe returns a channel that reports changes in Coordinator state. // // bufferLen specifies how many state changes should be queued in addition to @@ -1038,6 +1048,12 @@ func (c *Coordinator) generateAST(cfg *config.Config) (err error) { } } + if c.monitoringServerReloader != nil { + if err := c.monitoringServerReloader.Reload(cfg); err != nil { + return fmt.Errorf("failed to reload monitor manager configuration: %w", err) + } + } + c.ast = rawAst return nil } diff --git a/internal/pkg/agent/application/coordinator/coordinator_unit_test.go b/internal/pkg/agent/application/coordinator/coordinator_unit_test.go index 5752c100a41..fd4034f24c4 100644 --- a/internal/pkg/agent/application/coordinator/coordinator_unit_test.go +++ b/internal/pkg/agent/application/coordinator/coordinator_unit_test.go @@ -24,11 +24,13 @@ import ( "github.com/elastic/elastic-agent-client/v7/pkg/client" "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent/internal/pkg/agent/application/info" + "github.com/elastic/elastic-agent/internal/pkg/agent/application/monitoring/reload" "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade" "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/artifact" "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/details" "github.com/elastic/elastic-agent/internal/pkg/agent/transpiler" "github.com/elastic/elastic-agent/internal/pkg/config" + monitoringCfg "github.com/elastic/elastic-agent/internal/pkg/core/monitoring/config" "github.com/elastic/elastic-agent/pkg/component" "github.com/elastic/elastic-agent/pkg/component/runtime" agentclient "github.com/elastic/elastic-agent/pkg/control/v2/client" @@ -521,6 +523,136 @@ inputs: } } +func TestCoordinatorPolicyChangeUpdatesMonitorReloader(t *testing.T) { + // Send a test policy to the Coordinator as a Config Manager update, + // verify it generates the right component model and sends it to the + // runtime manager, then send an empty policy and verify it calls + // another runtime manager update with an empty component model. + + // Set a one-second timeout -- nothing here should block, but if it + // does let's report a failure instead of timing out the test runner. + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + logger := logp.NewLogger("testing") + + configChan := make(chan ConfigChange, 1) + + // Create a mocked runtime manager that will report the update call + runtimeManager := &fakeRuntimeManager{ + updateCallback: func(comp []component.Component) error { + return nil + }, + } + + monitoringServer := &fakeMonitoringServer{} + newServerFn := func() (reload.ServerController, error) { + return monitoringServer, nil + } + monitoringReloader := reload.NewServerReloader(newServerFn, logger, monitoringCfg.DefaultConfig()) + + coord := &Coordinator{ + logger: logger, + agentInfo: &info.AgentInfo{}, + stateBroadcaster: broadcaster.New(State{}, 0, 0), + managerChans: managerChans{ + configManagerUpdate: configChan, + }, + runtimeMgr: runtimeManager, + vars: emptyVars(t), + } + coord.RegisterMonitoringServer(monitoringReloader) + + // Create a policy with one input and one output + cfg := config.MustNewConfigFrom(` +outputs: + default: + type: elasticsearch +inputs: + - id: test-input + type: filestream + use_output: default +`) + + // Send the policy change and make sure it was acknowledged. + cfgChange := &configChange{cfg: cfg} + configChan <- cfgChange + coord.runLoopIteration(ctx) + assert.True(t, cfgChange.acked, "Coordinator should ACK a successful policy change") + + // server is started by default + assert.True(t, monitoringServer.startTriggered) + assert.True(t, monitoringServer.isRunning) + + // disable monitoring + cfgDisableMonitoring := config.MustNewConfigFrom(` +agent.monitoring.enabled: false +outputs: + default: + type: elasticsearch +inputs: + - id: test-input + type: filestream + use_output: default +`) + + // Send the policy change and make sure it was acknowledged. + monitoringServer.Reset() + cfgChange = &configChange{cfg: cfgDisableMonitoring} + configChan <- cfgChange + coord.runLoopIteration(ctx) + assert.True(t, cfgChange.acked, "Coordinator should ACK a successful policy change") + + // server is stopped: monitoring is disabled + assert.True(t, monitoringServer.stopTriggered) + assert.False(t, monitoringServer.isRunning) + + // enable monitoring + cfgEnabledMonitoring := config.MustNewConfigFrom(` +agent.monitoring.enabled: true +outputs: + default: + type: elasticsearch +inputs: + - id: test-input + type: filestream + use_output: default +`) + + // Send the policy change and make sure it was acknowledged. + monitoringServer.Reset() + cfgChange = &configChange{cfg: cfgEnabledMonitoring} + configChan <- cfgChange + coord.runLoopIteration(ctx) + assert.True(t, cfgChange.acked, "Coordinator should ACK a successful policy change") + + // server is started again + assert.True(t, monitoringServer.startTriggered) + assert.True(t, monitoringServer.isRunning) + + // enable monitoring and disable metrics + cfgEnabledMonitoringNoMetrics := config.MustNewConfigFrom(` +agent.monitoring.enabled: true +agent.monitoring.metrics: false +outputs: + default: + type: elasticsearch +inputs: + - id: test-input + type: filestream + use_output: default +`) + + // Send the policy change and make sure it was acknowledged. + monitoringServer.Reset() + cfgChange = &configChange{cfg: cfgEnabledMonitoringNoMetrics} + configChan <- cfgChange + coord.runLoopIteration(ctx) + assert.True(t, cfgChange.acked, "Coordinator should ACK a successful policy change") + + // server is running: monitoring.metrics is disabled does not have an effect + assert.True(t, monitoringServer.isRunning) +} + func TestCoordinatorPolicyChangeUpdatesRuntimeManager(t *testing.T) { // Send a test policy to the Coordinator as a Config Manager update, // verify it generates the right component model and sends it to the @@ -867,3 +999,25 @@ func emptyAST(t *testing.T) *transpiler.AST { require.NoError(t, err, "AST creation must succeed") return ast } + +type fakeMonitoringServer struct { + startTriggered bool + stopTriggered bool + isRunning bool +} + +func (fs *fakeMonitoringServer) Start() { + fs.startTriggered = true + fs.isRunning = true +} + +func (fs *fakeMonitoringServer) Stop() error { + fs.stopTriggered = true + fs.isRunning = false + return nil +} + +func (fs *fakeMonitoringServer) Reset() { + fs.stopTriggered = false + fs.startTriggered = false +} diff --git a/internal/pkg/agent/application/monitoring/reload/reload.go b/internal/pkg/agent/application/monitoring/reload/reload.go new file mode 100644 index 00000000000..430b425d6c6 --- /dev/null +++ b/internal/pkg/agent/application/monitoring/reload/reload.go @@ -0,0 +1,102 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package reload + +import ( + "sync/atomic" + + "github.com/elastic/elastic-agent/internal/pkg/agent/configuration" + "github.com/elastic/elastic-agent/internal/pkg/agent/errors" + aConfig "github.com/elastic/elastic-agent/internal/pkg/config" + monitoringCfg "github.com/elastic/elastic-agent/internal/pkg/core/monitoring/config" + "github.com/elastic/elastic-agent/pkg/core/logger" +) + +// ServerController controls the server runtime +type ServerController interface { + Start() + Stop() error +} +type serverConstructor func() (ServerController, error) + +type ServerReloader struct { + s ServerController + log *logger.Logger + newServerFn serverConstructor + + config *monitoringCfg.MonitoringConfig + isServerRunning atomic.Bool +} + +func NewServerReloader(newServerFn serverConstructor, log *logger.Logger, mcfg *monitoringCfg.MonitoringConfig) *ServerReloader { + sr := &ServerReloader{ + log: log, + config: mcfg, + newServerFn: newServerFn, + } + + return sr +} + +func (sr *ServerReloader) Start() { + if sr.s != nil && sr.isServerRunning.Load() { + // server is already running + return + } + + sr.log.Info("Starting server") + var err error + sr.s, err = sr.newServerFn() + if err != nil { + sr.log.Errorf("Failed creating a server: %v", err) + return + } + + sr.s.Start() + sr.log.Debugf("Server started") + sr.isServerRunning.Store(true) +} + +func (sr *ServerReloader) Stop() error { + if sr.s == nil { + // stopping not started server + sr.isServerRunning.Store(false) + return nil + } + sr.log.Info("Stopping server") + + sr.isServerRunning.Store(false) + if err := sr.s.Stop(); err != nil { + return err + } + + sr.log.Debugf("Server stopped") + sr.s = nil + return nil +} + +func (sr *ServerReloader) Reload(rawConfig *aConfig.Config) error { + newConfig := configuration.DefaultConfiguration() + if err := rawConfig.Unpack(&newConfig); err != nil { + return errors.New(err, "failed to unpack monitoring config during reload") + } + + sr.config = newConfig.Settings.MonitoringConfig + + shouldRunMetrics := sr.config.Enabled + if shouldRunMetrics && !sr.isServerRunning.Load() { + sr.Start() + + sr.isServerRunning.Store(true) + return nil + } + + if !shouldRunMetrics && sr.isServerRunning.Load() { + sr.isServerRunning.Store(false) + return sr.Stop() + } + + return nil +} diff --git a/internal/pkg/agent/application/monitoring/reload/reload_test.go b/internal/pkg/agent/application/monitoring/reload/reload_test.go new file mode 100644 index 00000000000..e45eae4d006 --- /dev/null +++ b/internal/pkg/agent/application/monitoring/reload/reload_test.go @@ -0,0 +1,130 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package reload + +import ( + "testing" + + "github.com/stretchr/testify/require" + + aConfig "github.com/elastic/elastic-agent/internal/pkg/config" + monitoringCfg "github.com/elastic/elastic-agent/internal/pkg/core/monitoring/config" + "github.com/elastic/elastic-agent/pkg/core/logger" +) + +func TestReload(t *testing.T) { + tcs := []struct { + name string + currEnabled bool + currMetrics bool + currRunning bool + + newConfig string + expectedRunning bool + expectedStart bool + expectedStop bool + }{ + { + "start with default config", + false, false, false, + ``, + true, true, false, + }, + { + "start when not running, monitoring enabled", + false, false, false, + ` +agent.monitoring.enabled: true +`, + true, true, false, + }, + { + "do not start when not running, only metrics enabled", + false, false, false, + ` +agent.monitoring.enabled: false +agent.monitoring.metrics: true +`, + false, false, false, + }, + + { + "stop when running, monitoring disabled", + true, true, true, + ` +agent.monitoring.enabled: false +`, + false, false, true, + }, + { + "do not stop when running, monitoring.metrics disabled", + true, true, true, + ` +agent.monitoring.metrics: false +`, + true, false, false, + }, + { + "stop stopped server", + false, false, false, + ` +agent.monitoring.enabled: false +`, + false, false, false, + }, + { + "start started server", + true, true, true, + ` +agent.monitoring.enabled: true +`, + true, false, false, + }, + } + + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + fsc := &fakeServerController{} + log, _ := logger.NewTesting(tc.name) + cfg := &monitoringCfg.MonitoringConfig{ + Enabled: tc.currEnabled, + MonitorMetrics: tc.currMetrics, + } + r := NewServerReloader( + func() (ServerController, error) { + return fsc, nil + }, + log, + cfg, + ) + r.isServerRunning.Store(tc.currRunning) + if tc.currRunning { + r.s = fsc + } + + newCfg := aConfig.MustNewConfigFrom(tc.newConfig) + require.NoError(t, r.Reload(newCfg)) + + require.Equal(t, tc.expectedRunning, r.isServerRunning.Load()) + require.Equal(t, tc.expectedStart, fsc.startTriggered) + require.Equal(t, tc.expectedStop, fsc.stopTriggered) + }) + } +} + +type fakeServerController struct { + startTriggered bool + stopTriggered bool +} + +func (fsc *fakeServerController) Start() { fsc.startTriggered = true } +func (fsc *fakeServerController) Stop() error { + fsc.stopTriggered = true + return nil +} +func (fsc *fakeServerController) Reset() { + fsc.startTriggered = false + fsc.stopTriggered = false +} diff --git a/internal/pkg/agent/application/monitoring/server.go b/internal/pkg/agent/application/monitoring/server.go index 94b28807700..bd809e83a3f 100644 --- a/internal/pkg/agent/application/monitoring/server.go +++ b/internal/pkg/agent/application/monitoring/server.go @@ -20,6 +20,8 @@ import ( "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/monitoring" "github.com/elastic/elastic-agent/internal/pkg/agent/application/coordinator" + "github.com/elastic/elastic-agent/internal/pkg/agent/application/monitoring/reload" + "github.com/elastic/elastic-agent/internal/pkg/agent/errors" monitoringCfg "github.com/elastic/elastic-agent/internal/pkg/core/monitoring/config" "github.com/elastic/elastic-agent/pkg/core/logger" ) @@ -33,7 +35,8 @@ func NewServer( coord *coordinator.Coordinator, enableProcessStats bool, operatingSystem string, -) (*api.Server, error) { + mcfg *monitoringCfg.MonitoringConfig, +) (*reload.ServerReloader, error) { if err := createAgentMonitoringDrop(endpointConfig.Host); err != nil { // log but ignore log.Warnf("failed to create monitoring drop: %v", err) @@ -48,7 +51,7 @@ func NewServer( return nil, err } - return exposeMetricsEndpoint(log, cfg, ns, tracer, coord, enableProcessStats, operatingSystem) + return exposeMetricsEndpoint(log, cfg, ns, tracer, coord, enableProcessStats, operatingSystem, mcfg) } func exposeMetricsEndpoint( @@ -59,7 +62,8 @@ func exposeMetricsEndpoint( coord *coordinator.Coordinator, enableProcessStats bool, operatingSystem string, -) (*api.Server, error) { + mcfg *monitoringCfg.MonitoringConfig, +) (*reload.ServerReloader, error) { r := mux.NewRouter() if tracer != nil { r.Use(apmgorilla.Middleware(apmgorilla.WithTracer(tracer))) @@ -77,7 +81,15 @@ func exposeMetricsEndpoint( mux := http.NewServeMux() mux.Handle("/", r) - return api.New(log, mux, config) + newServerFn := func() (reload.ServerController, error) { + apiServer, err := api.New(log, mux, config) + if err != nil { + return nil, errors.New(err, "failed to create api server") + } + return apiServer, nil + } + + return reload.NewServerReloader(newServerFn, log, mcfg), nil } func createAgentMonitoringDrop(drop string) error { diff --git a/internal/pkg/agent/cmd/run.go b/internal/pkg/agent/cmd/run.go index 319c0954b55..4a12bdc8540 100644 --- a/internal/pkg/agent/cmd/run.go +++ b/internal/pkg/agent/cmd/run.go @@ -32,6 +32,7 @@ import ( "github.com/elastic/elastic-agent/internal/pkg/agent/application/filelock" "github.com/elastic/elastic-agent/internal/pkg/agent/application/info" "github.com/elastic/elastic-agent/internal/pkg/agent/application/monitoring" + "github.com/elastic/elastic-agent/internal/pkg/agent/application/monitoring/reload" "github.com/elastic/elastic-agent/internal/pkg/agent/application/paths" "github.com/elastic/elastic-agent/internal/pkg/agent/application/reexec" "github.com/elastic/elastic-agent/internal/pkg/agent/application/secret" @@ -244,12 +245,15 @@ func run(override cfgOverrider, testingMode bool, fleetInitTimeout time.Duration } defer composable.Close() - serverStopFn, err := setupMetrics(l, cfg.Settings.DownloadConfig.OS(), cfg.Settings.MonitoringConfig, tracer, coord) + monitoringServer, err := setupMetrics(l, cfg.Settings.DownloadConfig.OS(), cfg.Settings.MonitoringConfig, tracer, coord) if err != nil { return err } + coord.RegisterMonitoringServer(monitoringServer) defer func() { - _ = serverStopFn() + if monitoringServer != nil { + _ = monitoringServer.Stop() + } }() diagHooks := diagnostics.GlobalHooks() @@ -543,7 +547,7 @@ func setupMetrics( cfg *monitoringCfg.MonitoringConfig, tracer *apm.Tracer, coord *coordinator.Coordinator, -) (func() error, error) { +) (*reload.ServerReloader, error) { if err := report.SetupMetrics(logger, agentName, version.GetDefaultVersion()); err != nil { return nil, err } @@ -554,14 +558,12 @@ func setupMetrics( Host: monitoring.AgentMonitoringEndpoint(operatingSystem, cfg), } - s, err := monitoring.NewServer(logger, endpointConfig, monitoringLib.GetNamespace, tracer, coord, isProcessStatsEnabled(cfg), operatingSystem) + s, err := monitoring.NewServer(logger, endpointConfig, monitoringLib.GetNamespace, tracer, coord, isProcessStatsEnabled(cfg), operatingSystem, cfg) if err != nil { return nil, errors.New(err, "could not start the HTTP server for the API") } - s.Start() - // return server stopper - return s.Stop, nil + return s, nil } func isProcessStatsEnabled(cfg *monitoringCfg.MonitoringConfig) bool {