Skip to content

Commit

Permalink
Don't run any agent service when running otel subcommand (#5748) (#…
Browse files Browse the repository at this point in the history
…5797)

* Remove need to run agent while running otel subcommand.

* Cleanup.

* Fix tests.

* Fix tests.

* Few more cleanups.

* Mage check.

(cherry picked from commit 3f1e4da)

Co-authored-by: Blake Rouse <blake.rouse@elastic.co>
  • Loading branch information
mergify[bot] and blakerouse authored Oct 15, 2024
1 parent be1c1ea commit 05e0f08
Show file tree
Hide file tree
Showing 12 changed files with 101 additions and 279 deletions.
28 changes: 6 additions & 22 deletions internal/pkg/agent/application/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"github.com/elastic/elastic-agent/internal/pkg/capabilities"
"github.com/elastic/elastic-agent/internal/pkg/composable"
"github.com/elastic/elastic-agent/internal/pkg/config"
"github.com/elastic/elastic-agent/internal/pkg/otel"
"github.com/elastic/elastic-agent/internal/pkg/release"
"github.com/elastic/elastic-agent/pkg/component"
"github.com/elastic/elastic-agent/pkg/component/runtime"
Expand All @@ -47,13 +46,11 @@ func New(
testingMode bool,
fleetInitTimeout time.Duration,
disableMonitoring bool,
runAsOtel bool,
modifiers ...component.PlatformModifier,
) (*coordinator.Coordinator, coordinator.ConfigManager, composable.Controller, error) {

err := version.InitVersionError()
if err != nil && !runAsOtel {
// ignore this error when running in otel mode
if err != nil {
// non-fatal error, log a warning and move on
log.With("error.message", err).Warnf("Error initializing version information: falling back to %s", release.Version())
}
Expand Down Expand Up @@ -92,13 +89,7 @@ func New(
log.Infof("Loading baseline config from %v", pathConfigFile)
rawConfig, err = config.LoadFile(pathConfigFile)
if err != nil {
if !runAsOtel {
return nil, nil, nil, fmt.Errorf("failed to load configuration: %w", err)
}

// initialize with empty config, configuration file is not necessary in otel mode,
// best effort is fine
rawConfig = config.New()
return nil, nil, nil, fmt.Errorf("failed to load configuration: %w", err)
}
}
if err := info.InjectAgentConfig(rawConfig); err != nil {
Expand All @@ -124,7 +115,6 @@ func New(
tracer,
monitor,
cfg.Settings.GRPC,
runAsOtel,
)
if err != nil {
return nil, nil, nil, fmt.Errorf("failed to initialize runtime manager: %w", err)
Expand All @@ -141,9 +131,6 @@ func New(

// testing mode uses a config manager that takes configuration from over the control protocol
configMgr = newTestingModeConfigManager(log)
} else if runAsOtel {
// ignoring configuration in elastic-agent.yml
configMgr = otel.NewOtelModeConfigManager()
} else if configuration.IsStandalone(cfg.Fleet) {
log.Info("Parsed configuration and determined agent is managed locally")

Expand Down Expand Up @@ -189,13 +176,10 @@ func New(
}
}

var varsManager composable.Controller
if !runAsOtel {
// no need for vars in otel mode
varsManager, err = composable.New(log, rawConfig, composableManaged)
if err != nil {
return nil, nil, nil, errors.New(err, "failed to initialize composable controller")
}
// no need for vars in otel mode
varsManager, err := composable.New(log, rawConfig, composableManaged)
if err != nil {
return nil, nil, nil, errors.New(err, "failed to initialize composable controller")
}

coord := coordinator.New(log, cfg, logLevel, agentInfo, specs, reexec, upgrader, runtime, configMgr, varsManager, caps, monitor, isManaged, compModifiers...)
Expand Down
1 change: 0 additions & 1 deletion internal/pkg/agent/application/application_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ func TestLimitsLog(t *testing.T) {
true, // testingMode
time.Millisecond, // fleetInitTimeout
true, // disable monitoring
false, // not otel mode
)
require.NoError(t, err)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -895,7 +895,7 @@ func createCoordinator(t *testing.T, ctx context.Context, opts ...CoordinatorOpt
monitoringMgr := newTestMonitoringMgr()
cfg := configuration.DefaultGRPCConfig()
cfg.Port = 0
rm, err := runtime.NewManager(l, l, ai, apmtest.DiscardTracer, monitoringMgr, cfg, false)
rm, err := runtime.NewManager(l, l, ai, apmtest.DiscardTracer, monitoringMgr, cfg)
require.NoError(t, err)

caps, err := capabilities.LoadFile(paths.AgentCapabilitiesPath(), l)
Expand Down
42 changes: 1 addition & 41 deletions internal/pkg/agent/cmd/otel.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,11 @@ package cmd

import (
"context"
goerrors "errors"
"sync"

"github.com/spf13/cobra"
"github.com/spf13/pflag"

"github.com/elastic/elastic-agent-libs/service"
"github.com/elastic/elastic-agent/internal/pkg/agent/errors"
"github.com/elastic/elastic-agent/internal/pkg/cli"
"github.com/elastic/elastic-agent/internal/pkg/otel"
)
Expand Down Expand Up @@ -79,42 +76,5 @@ func runCollector(cmdCtx context.Context, configFiles []string) error {
defer cancel()
go service.ProcessWindowsControlEvents(stopCollector)

var otelStartWg sync.WaitGroup
var errs []error
var awaiters awaiters

otelAwaiter := make(chan struct{})
awaiters = append(awaiters, otelAwaiter)

otelStartWg.Add(1)
go func() {
otelStartWg.Done()
if err := otel.Run(ctx, stop, configFiles); err != nil {
errs = append(errs, err)
// otel collector finished with an error, exit run loop
cancel()
}

// close awaiter handled in run loop
close(otelAwaiter)
}()

// wait for otel to start
otelStartWg.Wait()

if err := runElasticAgent(
ctx,
cancel,
nil, // no config overrides
stop, // service hook
false, // not in testing mode
0, // no fleet config
true, // is otel mode
awaiters, // wait for otel to finish
); err != nil && !errors.Is(err, context.Canceled) {
errs = append(errs, err)
}

return goerrors.Join(errs...)

return otel.Run(ctx, stop, configFiles)
}
27 changes: 7 additions & 20 deletions internal/pkg/agent/cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ const (

type (
cfgOverrider func(cfg *configuration.Configuration)
awaiters []<-chan struct{}
)

func newRunCommandWithArgs(_ []string, streams *cli.IOStreams) *cobra.Command {
Expand Down Expand Up @@ -155,7 +154,7 @@ func run(override cfgOverrider, testingMode bool, fleetInitTimeout time.Duration
defer cancel()
go service.ProcessWindowsControlEvents(stopBeat)

return runElasticAgent(ctx, cancel, override, stop, testingMode, fleetInitTimeout, false, nil, modifiers...)
return runElasticAgent(ctx, cancel, override, stop, testingMode, fleetInitTimeout, modifiers...)
}

func logReturn(l *logger.Logger, err error) error {
Expand All @@ -165,8 +164,8 @@ func logReturn(l *logger.Logger, err error) error {
return err
}

func runElasticAgent(ctx context.Context, cancel context.CancelFunc, override cfgOverrider, stop chan bool, testingMode bool, fleetInitTimeout time.Duration, runAsOtel bool, awaiters awaiters, modifiers ...component.PlatformModifier) error {
cfg, err := loadConfig(ctx, override, runAsOtel)
func runElasticAgent(ctx context.Context, cancel context.CancelFunc, override cfgOverrider, stop chan bool, testingMode bool, fleetInitTimeout time.Duration, modifiers ...component.PlatformModifier) error {
cfg, err := loadConfig(ctx, override)
if err != nil {
return err
}
Expand Down Expand Up @@ -205,7 +204,7 @@ func runElasticAgent(ctx context.Context, cancel context.CancelFunc, override cf
pathConfigFile := paths.AgentConfigFile()

// agent ID needs to stay empty in bootstrap mode
createAgentID := !runAsOtel
createAgentID := true
if cfg.Fleet != nil && cfg.Fleet.Server != nil && cfg.Fleet.Server.Bootstrap {
createAgentID = false
}
Expand Down Expand Up @@ -285,7 +284,7 @@ func runElasticAgent(ctx context.Context, cancel context.CancelFunc, override cf
l.Info("APM instrumentation disabled")
}

coord, configMgr, composable, err := application.New(ctx, l, baseLogger, logLvl, agentInfo, rex, tracer, testingMode, fleetInitTimeout, configuration.IsFleetServerBootstrap(cfg.Fleet), runAsOtel, modifiers...)
coord, configMgr, composable, err := application.New(ctx, l, baseLogger, logLvl, agentInfo, rex, tracer, testingMode, fleetInitTimeout, configuration.IsFleetServerBootstrap(cfg.Fleet), modifiers...)
if err != nil {
return logReturn(l, err)
}
Expand Down Expand Up @@ -397,9 +396,6 @@ LOOP:
}
cancel()
err = <-appErr
for _, a := range awaiters {
<-a // wait for awaiter to be done
}

if logShutdown {
l.Info("Shutting down completed.")
Expand All @@ -410,16 +406,7 @@ LOOP:
return logReturn(l, err)
}

func loadConfig(ctx context.Context, override cfgOverrider, runAsOtel bool) (*configuration.Configuration, error) {
if runAsOtel {
defaultCfg := configuration.DefaultConfiguration()
// disable monitoring to avoid injection of monitoring components
// in case inputs are not empty
defaultCfg.Settings.MonitoringConfig.Enabled = false
defaultCfg.Settings.V1MonitoringEnabled = false
return defaultCfg, nil
}

func loadConfig(ctx context.Context, override cfgOverrider) (*configuration.Configuration, error) {
pathConfigFile := paths.ConfigFile()
rawConfig, err := config.LoadFile(pathConfigFile)
if err != nil {
Expand Down Expand Up @@ -592,7 +579,7 @@ func tryDelayEnroll(ctx context.Context, logger *logger.Logger, cfg *configurati
errors.M("path", enrollPath)))
}
logger.Info("Successfully performed delayed enrollment of this Elastic Agent.")
return loadConfig(ctx, override, false)
return loadConfig(ctx, override)
}

func initTracer(agentName, version string, mcfg *monitoringCfg.MonitoringConfig) (*apm.Tracer, error) {
Expand Down
107 changes: 50 additions & 57 deletions pkg/component/runtime/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,7 @@ type Manager struct {

// doneChan is closed when Manager is shutting down to signal that any
// pending requests should be canceled.
doneChan chan struct{}
runAsOtel bool
doneChan chan struct{}
}

// NewManager creates a new manager.
Expand All @@ -157,7 +156,6 @@ func NewManager(
tracer *apm.Tracer,
monitor MonitoringManager,
grpcConfig *configuration.GRPCConfig,
runAsOtel bool,
) (*Manager, error) {
ca, err := authority.NewCA()
if err != nil {
Expand Down Expand Up @@ -193,7 +191,6 @@ func NewManager(
grpcConfig: grpcConfig,
serverReady: make(chan struct{}),
doneChan: make(chan struct{}),
runAsOtel: runAsOtel,
}
return m, nil
}
Expand All @@ -212,56 +209,54 @@ func (m *Manager) Run(ctx context.Context) error {
server *grpc.Server
wgServer sync.WaitGroup
)
if !m.runAsOtel {
if m.isLocal {
listener, err = ipc.CreateListener(m.logger, m.listenAddr)
} else {
listener, err = net.Listen("tcp", m.listenAddr)
}

if err != nil {
return fmt.Errorf("error starting tcp listener for runtime manager: %w", err)
}

if m.isLocal {
defer ipc.CleanupListener(m.logger, m.listenAddr)
} else {
m.listenPort = listener.Addr().(*net.TCPAddr).Port
}
if m.isLocal {
listener, err = ipc.CreateListener(m.logger, m.listenAddr)
} else {
listener, err = net.Listen("tcp", m.listenAddr)
}

certPool := x509.NewCertPool()
if ok := certPool.AppendCertsFromPEM(m.ca.Crt()); !ok {
return errors.New("failed to append root CA")
}
creds := credentials.NewTLS(&tls.Config{
ClientAuth: tls.RequireAndVerifyClientCert,
ClientCAs: certPool,
GetCertificate: m.getCertificate,
MinVersion: tls.VersionTLS12,
})
m.logger.Infof("Starting grpc control protocol listener on port %v with max_message_size %v", m.grpcConfig.Port, m.grpcConfig.MaxMsgSize)
if m.tracer != nil {
apmInterceptor := apmgrpc.NewUnaryServerInterceptor(apmgrpc.WithRecovery(), apmgrpc.WithTracer(m.tracer))
server = grpc.NewServer(
grpc.UnaryInterceptor(apmInterceptor),
grpc.Creds(creds),
grpc.MaxRecvMsgSize(m.grpcConfig.MaxMsgSize),
)
} else {
server = grpc.NewServer(
grpc.Creds(creds),
grpc.MaxRecvMsgSize(m.grpcConfig.MaxMsgSize),
)
}
proto.RegisterElasticAgentServer(server, m)
if err != nil {
return fmt.Errorf("error starting tcp listener for runtime manager: %w", err)
}

// start serving GRPC connections
wgServer.Add(1)
go func() {
defer wgServer.Done()
m.serverLoop(ctx, listener, server)
}()
if m.isLocal {
defer ipc.CleanupListener(m.logger, m.listenAddr)
} else {
m.listenPort = listener.Addr().(*net.TCPAddr).Port
}

certPool := x509.NewCertPool()
if ok := certPool.AppendCertsFromPEM(m.ca.Crt()); !ok {
return errors.New("failed to append root CA")
}
creds := credentials.NewTLS(&tls.Config{
ClientAuth: tls.RequireAndVerifyClientCert,
ClientCAs: certPool,
GetCertificate: m.getCertificate,
MinVersion: tls.VersionTLS12,
})
m.logger.Infof("Starting grpc control protocol listener on port %v with max_message_size %v", m.grpcConfig.Port, m.grpcConfig.MaxMsgSize)
if m.tracer != nil {
apmInterceptor := apmgrpc.NewUnaryServerInterceptor(apmgrpc.WithRecovery(), apmgrpc.WithTracer(m.tracer))
server = grpc.NewServer(
grpc.UnaryInterceptor(apmInterceptor),
grpc.Creds(creds),
grpc.MaxRecvMsgSize(m.grpcConfig.MaxMsgSize),
)
} else {
server = grpc.NewServer(
grpc.Creds(creds),
grpc.MaxRecvMsgSize(m.grpcConfig.MaxMsgSize),
)
}
proto.RegisterElasticAgentServer(server, m)

// start serving GRPC connections
wgServer.Add(1)
go func() {
defer wgServer.Done()
m.serverLoop(ctx, listener, server)
}()

// Start the run loop, which continues on the main goroutine
// until the context is canceled.
Expand All @@ -271,13 +266,11 @@ func (m *Manager) Run(ctx context.Context) error {
m.shutdown()

// Close the rpc listener and wait for serverLoop to return
if !m.runAsOtel {
listener.Close()
wgServer.Wait()
listener.Close()
wgServer.Wait()

// Cancel any remaining connections
server.Stop()
}
// Cancel any remaining connections
server.Stop()
return ctx.Err()
}

Expand Down
Loading

0 comments on commit 05e0f08

Please sign in to comment.