diff --git a/changelog/fragments/1695389490-Support-flattened-data_stream.-fields.yaml b/changelog/fragments/1695389490-Support-flattened-data_stream.-fields.yaml new file mode 100644 index 00000000000..2e04a62793f --- /dev/null +++ b/changelog/fragments/1695389490-Support-flattened-data_stream.-fields.yaml @@ -0,0 +1,35 @@ +# Kind can be one of: +# - breaking-change: a change to previously-documented behavior +# - deprecation: functionality that is being removed in a later release +# - bug-fix: fixes a problem in a previous version +# - enhancement: extends functionality but does not break or fix existing behavior +# - feature: new functionality +# - known-issue: problems that we are aware of in a given version +# - security: impacts on the security of a product or a user’s deployment. +# - upgrade: important information for someone upgrading from a prior version +# - other: does not fit into any of the other categories +kind: feature + +# Change summary; a 80ish characters long description of the change. +summary: Support flattened data_stream.* fields + +# Long description; in case the summary is not enough to describe the change +# this field accommodate a description without length limits. +# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment. +description: >- + An input configuration supports flattened fields, however the + 'data_stream' field was not being correctly decoded when + flattened. This commit fixes this issue. + +# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc. +component: elastic-agent + +# PR URL; optional; the PR number that added the changeset. +# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added. +# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number. +# Please provide it if you are adding a fragment for a different PR. +pr: https://github.com/elastic/elastic-agent/pull/3465 + +# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of). +# If not present is automatically filled by the tooling with the issue linked to the PR number. +issue: https://github.com/elastic/elastic-agent/issues/3191 diff --git a/pkg/component/component_test.go b/pkg/component/component_test.go index 424f3a93147..00c4d1c63cb 100644 --- a/pkg/component/component_test.go +++ b/pkg/component/component_test.go @@ -2361,3 +2361,74 @@ func gatherDurationFieldPaths(s interface{}, pathSoFar string) []string { return gatheredPaths } + +func TestFlattenedDataStream(t *testing.T) { + expectedNamespace := "test-namespace" + expectedType := "test-type" + expectedDataset := "test-dataset" + + policy := map[string]any{ + "outputs": map[string]any{ + "default": map[string]any{ + "type": "elasticsearch", + "enabled": true, + }, + }, + "inputs": []any{ + map[string]any{ + "type": "filestream", + "id": "filestream-0", + "enabled": true, + "data_stream.type": expectedType, + "data_stream.dataset": expectedDataset, + "data_stream": map[string]any{ + "namespace": expectedNamespace, + }, + }, + }, + } + runtime, err := LoadRuntimeSpecs(filepath.Join("..", "..", "specs"), PlatformDetail{}, SkipBinaryCheck()) + if err != nil { + t.Fatalf("cannot load runtime specs: %s", err) + } + + result, err := runtime.ToComponents(policy, nil, logp.DebugLevel, nil) + if err != nil { + t.Fatalf("cannot convert policy to component: %s", err) + } + + if len(result) != 1 { + t.Fatalf("expecting result to have one element, got %d", len(result)) + } + + if len(result[0].Units) != 2 { + t.Fatalf("expecting result[0].Units to have two elements, got %d", len(result)) + } + + // We do not make assumptions about ordering. + // Get the input Unit + var dataStream *proto.DataStream + for _, unit := range result[0].Units { + if unit.Err != nil { + t.Fatalf("unit.Err: %s", unit.Err) + } + if unit.Type == client.UnitTypeInput { + dataStream = unit.Config.DataStream + break + } + } + + if dataStream == nil { + t.Fatal("DataStream cannot be nil") + } + + if dataStream.Dataset != expectedDataset { + t.Errorf("expecting DataStream.Dataset: %q, got: %q", expectedDataset, dataStream.Dataset) + } + if dataStream.Type != expectedType { + t.Errorf("expecting DataStream.Type: %q, got: %q", expectedType, dataStream.Type) + } + if dataStream.Namespace != expectedNamespace { + t.Errorf("expecting DataStream.Namespace: %q, got: %q", expectedNamespace, dataStream.Namespace) + } +} diff --git a/pkg/component/config.go b/pkg/component/config.go index a0c75d00e32..50b5e590e6b 100644 --- a/pkg/component/config.go +++ b/pkg/component/config.go @@ -15,6 +15,7 @@ import ( "google.golang.org/protobuf/types/known/structpb" "github.com/elastic/elastic-agent-client/v7/pkg/proto" + "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent/pkg/limits" ) @@ -100,9 +101,91 @@ func ExpectedConfig(cfg map[string]interface{}) (*proto.UnitExpectedConfig, erro return nil, err } + if err := updateDataStreamsFromSource(result); err != nil { + return nil, fmt.Errorf("could not dedot 'data_stream': %w", err) + } + return result, nil } +// dataStreamAndSource is a generic way to represent proto mesages +// that contain a source field and a datastream field. +type dataStreamAndSource interface { + GetDataStream() *proto.DataStream + GetSource() *structpb.Struct +} + +func deDotDataStream(raw dataStreamAndSource) (*proto.DataStream, error) { + ds := raw.GetDataStream() + if ds == nil { + ds = &proto.DataStream{} + } + + tmp := struct { + DataStream struct { + Dataset string `config:"dataset" yaml:"dataset"` + Type string `config:"type" yaml:"type"` + Namespace string `config:"namespace" yaml:"namespace"` + } `config:"data_stream" yaml:"data_stream"` + }{} + + cfg, err := config.NewConfigFrom(raw.GetSource().AsMap()) + if err != nil { + return nil, fmt.Errorf("cannot generate config from source field: %w", err) + } + + if err := cfg.Unpack(&tmp); err != nil { + return nil, fmt.Errorf("cannot unpack source field into struct: %w", err) + } + + if (ds.Dataset != tmp.DataStream.Dataset) && (ds.Dataset != "" && tmp.DataStream.Dataset != "") { + return nil, errors.New("duplicated key 'datastream.dataset'") + } + + if (ds.Type != tmp.DataStream.Type) && (ds.Type != "" && tmp.DataStream.Type != "") { + return nil, errors.New("duplicated key 'datastream.type'") + } + + if (ds.Namespace != tmp.DataStream.Namespace) && (ds.Namespace != "" && tmp.DataStream.Namespace != "") { + return nil, errors.New("duplicated key 'datastream.namespace'") + } + + ret := &proto.DataStream{ + Dataset: merge(tmp.DataStream.Dataset, ds.Dataset), + Type: merge(tmp.DataStream.Type, ds.Type), + Namespace: merge(tmp.DataStream.Namespace, ds.Namespace), + Source: raw.GetDataStream().GetSource(), + } + + return ret, nil +} + +// merge returns b if a is an empty string +func merge(a, b string) string { + if a == "" { + return b + } + return a +} + +func updateDataStreamsFromSource(unitConfig *proto.UnitExpectedConfig) error { + var err error + unitConfig.DataStream, err = deDotDataStream(unitConfig) + if err != nil { + return fmt.Errorf("could not parse data_stream from input: %w", err) + } + + for i, stream := range unitConfig.Streams { + stream.DataStream, err = deDotDataStream(stream) + if err != nil { + return fmt.Errorf("could not parse data_stream from stream [%d]: %w", + i, err) + } + } + + return nil +} + func setSource(val interface{}, cfg map[string]interface{}) error { // find the source field on the val resVal := reflect.ValueOf(val).Elem() diff --git a/pkg/component/config_test.go b/pkg/component/config_test.go index 64dcfe3a697..7cdef177829 100644 --- a/pkg/component/config_test.go +++ b/pkg/component/config_test.go @@ -8,8 +8,10 @@ import ( "errors" "testing" + "github.com/google/go-cmp/cmp" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "google.golang.org/protobuf/testing/protocmp" "google.golang.org/protobuf/types/known/structpb" "github.com/elastic/elastic-agent-client/v7/pkg/proto" @@ -197,7 +199,12 @@ func TestExpectedConfig(t *testing.T) { assert.Equal(t, err.Error(), scenario.Err.Error()) } else { require.NoError(t, err) - assert.EqualValues(t, scenario.Expected, observed) + // protocmp.Transform ensures we do not compare any internal + // protobuf fields + if !cmp.Equal(scenario.Expected, observed, protocmp.Transform()) { + t.Errorf("mismatch (-want +got) \n%s", + cmp.Diff(scenario.Expected, observed, protocmp.Transform())) + } } }) } diff --git a/pkg/testing/fixture.go b/pkg/testing/fixture.go index f713a581f80..99035fc8d50 100644 --- a/pkg/testing/fixture.go +++ b/pkg/testing/fixture.go @@ -242,6 +242,13 @@ func ExtractArtifact(l Logger, artifactFile, outputDir string) error { // with `WithAllowErrors()` then `Run` will exit early and return the logged error. // // If no `states` are provided then the Elastic Agent runs until the context is cancelled. +// +// The Elastic-Agent is started agent in test mode (--testing-mode) this mode +// expects the initial configuration (full YAML config) via gRPC. +// This configuration should be passed in the State.Configure field. +// +// The `elastic-agent.yml` generated by `Fixture.Configure` is ignored +// when `Run` is called. func (f *Fixture) Run(ctx context.Context, states ...State) error { if f.installed { return errors.New("fixture is installed; cannot be run") diff --git a/pkg/testing/tools/estools/elasticsearch.go b/pkg/testing/tools/estools/elasticsearch.go index 8cd6e126597..ea78373e1b7 100644 --- a/pkg/testing/tools/estools/elasticsearch.go +++ b/pkg/testing/tools/estools/elasticsearch.go @@ -230,9 +230,9 @@ func CheckForErrorsInLogsWithContext(ctx context.Context, client elastictranspor return handleDocsResponse(res) } -// GetLogsForDatastream returns any logs associated with the datastream -func GetLogsForDatastream(client elastictransport.Interface, index string) (Documents, error) { - return GetLogsForDatastreamWithContext(context.Background(), client, index) +// GetLogsForDataset returns any logs associated with the datastream +func GetLogsForDataset(client elastictransport.Interface, index string) (Documents, error) { + return GetLogsForDatasetWithContext(context.Background(), client, index) } // GetLogsForAgentID returns any logs associated with the agent ID @@ -270,8 +270,8 @@ func GetLogsForAgentID(client elastictransport.Interface, id string) (Documents, return handleDocsResponse(res) } -// GetLogsForDatastreamWithContext returns any logs associated with the datastream -func GetLogsForDatastreamWithContext(ctx context.Context, client elastictransport.Interface, index string) (Documents, error) { +// GetLogsForDatasetWithContext returns any logs associated with the datastream +func GetLogsForDatasetWithContext(ctx context.Context, client elastictransport.Interface, index string) (Documents, error) { indexQuery := map[string]interface{}{ "query": map[string]interface{}{ "match": map[string]interface{}{ @@ -302,7 +302,62 @@ func GetLogsForDatastreamWithContext(ctx context.Context, client elastictranspor return handleDocsResponse(res) } +// GetLogsForDatastream returns any logs associated with the datastream +func GetLogsForDatastream( + ctx context.Context, + client elastictransport.Interface, + dsType, dataset, namespace string) (Documents, error) { + + query := map[string]any{ + "_source": []string{"message"}, + "query": map[string]any{ + "bool": map[string]any{ + "must": []any{ + map[string]any{ + "match": map[string]any{ + "data_stream.dataset": dataset, + }, + }, + map[string]any{ + "match": map[string]any{ + "data_stream.namespace": namespace, + }, + }, + map[string]any{ + "match": map[string]any{ + "data_stream.type": dsType, + }, + }, + }, + }, + }, + } + + var buf bytes.Buffer + if err := json.NewEncoder(&buf).Encode(query); err != nil { + return Documents{}, fmt.Errorf("error creating ES query: %w", err) + } + + es := esapi.New(client) + res, err := es.Search( + es.Search.WithIndex(fmt.Sprintf(".ds-%s*", dsType)), + es.Search.WithExpandWildcards("all"), + es.Search.WithBody(&buf), + es.Search.WithTrackTotalHits(true), + es.Search.WithPretty(), + es.Search.WithContext(ctx), + ) + if err != nil { + return Documents{}, fmt.Errorf("error performing ES search: %w", err) + } + + return handleDocsResponse(res) +} + +// handleDocsResponse converts the esapi.Response into Documents, +// it closes the response.Body after reading func handleDocsResponse(res *esapi.Response) (Documents, error) { + defer res.Body.Close() if res.StatusCode >= 300 || res.StatusCode < 200 { return Documents{}, fmt.Errorf("non-200 return code: %v, response: '%s'", res.StatusCode, res.String()) } diff --git a/testing/integration/datastreams_test.go b/testing/integration/datastreams_test.go new file mode 100644 index 00000000000..712380bb8e3 --- /dev/null +++ b/testing/integration/datastreams_test.go @@ -0,0 +1,354 @@ +//go:build integration + +package integration + +import ( + "bytes" + "context" + "errors" + "fmt" + "math/rand" + "net/http" + "net/http/httputil" + "os" + "path/filepath" + "strings" + "testing" + "text/template" + "time" + + "github.com/elastic/elastic-agent-libs/kibana" + "github.com/elastic/elastic-agent/pkg/control/v2/client" + atesting "github.com/elastic/elastic-agent/pkg/testing" + "github.com/elastic/elastic-agent/pkg/testing/define" + "github.com/elastic/elastic-agent/pkg/testing/tools" + "github.com/elastic/elastic-agent/pkg/testing/tools/estools" + "github.com/stretchr/testify/require" +) + +func TestFlattenedDatastreamFleetPolicy(t *testing.T) { + dsType := "logs" + dsNamespace := strings.ToLower(fmt.Sprintf("%snamespace%d", t.Name(), rand.Uint64())) + dsDataset := strings.ToLower(fmt.Sprintf("%s-dataset", t.Name())) + numEvents := uint64(60) + + tempDir := t.TempDir() + logFilePath := filepath.Join(tempDir, "log.log") + generateLogFile(t, logFilePath, time.Second, 60) + + info := define.Require(t, define.Requirements{ + Local: false, + Stack: &define.Stack{}, + Sudo: true, + }) + + agentFixture, err := define.NewFixture(t, define.Version()) + require.NoError(t, err) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + createPolicyReq := kibana.AgentPolicy{ + Name: t.Name() + "--" + time.Now().Format(time.RFC3339Nano), + Namespace: info.Namespace, + Description: "Test policy for " + t.Name(), + MonitoringEnabled: []kibana.MonitoringEnabledOption{ + kibana.MonitoringEnabledLogs, + kibana.MonitoringEnabledMetrics, + }, + IsProtected: false, + } + installOpts := atesting.InstallOpts{ + NonInteractive: true, + Force: true, + } + + policy, err := tools.InstallAgentWithPolicy(ctx, + t, + installOpts, + agentFixture, + info.KibanaClient, + createPolicyReq) + if err != nil { + t.Fatalf("could not install Elastic-AGent with Policy: %s", err) + } + + tmpl, err := template.New(t.Name() + "custom-log-policy").Parse(policyJSON) + if err != nil { + t.Fatalf("cannot parse template: %s", err) + } + + agentPolicyBuffer := bytes.Buffer{} + err = tmpl.Execute(&agentPolicyBuffer, plolicyVars{ + Name: "Log-Input-" + t.Name() + "-" + time.Now().Format(time.RFC3339), + PolicyID: policy.ID, + LogFilePath: logFilePath, + Namespace: dsNamespace, + Dataset: dsDataset, + }) + if err != nil { + t.Fatalf("could not render template: %s", err) + } + + // POST /api/fleet/package_policies + resp, err := info.KibanaClient.Connection.Send( + http.MethodPost, + "/api/fleet/package_policies", + nil, + nil, + &agentPolicyBuffer) + if err != nil { + t.Fatalf("could not execute request to Kibana/Fleet: %s", err) + } + if resp.StatusCode != http.StatusOK { + t.Errorf("received a non 200-OK when adding package to policy. "+ + "Status code: %d", resp.StatusCode) + respDump, err := httputil.DumpResponse(resp, true) + if err != nil { + t.Fatalf("could not dump error response from Kibana: %s", err) + } + t.Log("Kibana error response") + t.Log(string(respDump)) + t.FailNow() + } + + ensureDocumentsInES := func() bool { + docs, err := estools.GetLogsForDatastream( + context.TODO(), info.ESClient, dsType, dsDataset, dsNamespace) + if err != nil { + t.Logf("error quering ES, will retry later: %s", err) + } + + if docs.Hits.Total.Value == int(numEvents) { + return true + } + + return false + } + + require.Eventually(t, ensureDocumentsInES, 120*time.Second, time.Second, "could not get all expected documents form ES") +} + +func TestFlattenedDatastreamStandalone(t *testing.T) { + dsType := "logs" + dsNamespace := fmt.Sprintf("%s-namespace-%d", t.Name(), rand.Uint64()) + dsDataset := fmt.Sprintf("%s-dataset", t.Name()) + numEvents := uint64(60) + + tempDir := t.TempDir() + logFilePath := filepath.Join(tempDir, "log.log") + generateLogFile(t, logFilePath, 10*time.Microsecond, numEvents) + + info := define.Require(t, define.Requirements{ + Local: false, + Stack: &define.Stack{}, + Sudo: true, + }) + + agentFixture, err := define.NewFixture(t, + define.Version(), atesting.WithAllowErrors()) + if err != nil { + t.Fatalf("could not create test fixture: %s", err) + } + + tmpl, err := template.New("standalone-policy").Parse(standalonePolicy) + if err != nil { + t.Fatalf("cannot parse template: %s", err) + } + + // The environment variables are set by the test runner. + // If you're manually running the tests (go test) then you + // will have to manually set them + renderedPolicy := bytes.Buffer{} + tmpl.Execute(&renderedPolicy, plolicyVars{ + LogFilePath: logFilePath, + Dataset: dsDataset, + Namespace: dsNamespace, + Type: dsType, + + ESHost: os.Getenv("ELASTICSEARCH_HOST"), + ESUsername: os.Getenv("ELASTICSEARCH_USERNAME"), + ESPassword: os.Getenv("ELASTICSEARCH_PASSWORD"), + }) + + if err := agentFixture.Prepare(context.TODO()); err != nil { + t.Fatalf("cannot prepare Elastic-Agent: %s", err) + } + + runCtx, cancelAgentRunCtx := context.WithCancel(context.Background()) + go func() { + // make sure the test does not hang forever + time.Sleep(30 * time.Second) + t.Error("cancelling run context, the Elastic-Agent will exit") + cancelAgentRunCtx() + }() + + state := atesting.State{ + Configure: renderedPolicy.String(), + AgentState: atesting.NewClientState(client.Healthy), + Components: map[string]atesting.ComponentState{ + "filestream-default": { + State: atesting.NewClientState(client.Healthy), + Units: map[atesting.ComponentUnitKey]atesting.ComponentUnitState{ + { + UnitType: client.UnitTypeInput, + UnitID: "filestream-default-elastic-agent-input-id", + }: { + State: atesting.NewClientState(client.Healthy), + }, + + { + UnitType: client.UnitTypeOutput, + UnitID: "filestream-default", + }: { + State: atesting.NewClientState(client.Healthy), + }, + }, + }, + }, + After: func() error { + ensureDocumentsInES := func() bool { + docs, err := estools.GetLogsForDatastream(context.TODO(), info.ESClient, dsType, dsDataset, dsNamespace) + if err != nil { + t.Logf("error quering ES, will retry later: %s", err) + } + + if docs.Hits.Total.Value == 60 { + return true + } + + return false + } + + require.Eventually( + t, + ensureDocumentsInES, + 2*time.Minute, time.Second, + "did not find all expected documents") + cancelAgentRunCtx() + return nil + }, + } + + if err := agentFixture.Run(runCtx, state); err != nil { + if !errors.Is(err, context.Canceled) { + t.Errorf("error running Elastic-Agent: %s", err) + } + } +} + +// generateLogFile generates a log file by appending new lines every tick +// the lines are composed by the test name and the current time in RFC3339Nano +// This function spans a new goroutine and does not block +func generateLogFile(t *testing.T, fullPath string, tick time.Duration, events uint64) { + t.Helper() + f, err := os.Create(fullPath) + if err != nil { + t.Fatalf("could not create file '%s: %s", fullPath, err) + } + + go func() { + t.Helper() + ticker := time.NewTicker(tick) + t.Cleanup(ticker.Stop) + + done := make(chan struct{}) + t.Cleanup(func() { close(done) }) + + defer func() { + if err := f.Close(); err != nil { + t.Errorf("could not close log file '%s': %s", fullPath, err) + } + }() + + i := uint64(0) + for { + select { + case <-done: + return + case now := <-ticker.C: + i++ + _, err := fmt.Fprintln(f, t.Name(), "Iteration: ", i, now.Format(time.RFC3339Nano)) + if err != nil { + // The Go compiler does not allow me to call t.Fatalf from a non-test + // goroutine, t.Errorf is our only option + t.Errorf("could not write data to log file '%s': %s", fullPath, err) + return + } + // make sure log lines are synced as quickly as possible + if err := f.Sync(); err != nil { + t.Errorf("could not sync file '%s': %s", fullPath, err) + } + if i == events { + return + } + } + } + }() +} + +type plolicyVars struct { + Name string + PolicyID string + LogFilePath string + ESHost string + ESPassword string + ESUsername string + Namespace string + Dataset string + Type string +} + +var policyJSON = ` +{ + "policy_id": "{{.PolicyID}}", + "package": { + "name": "log", + "version": "2.3.0" + }, + "name": "{{.Name}}", + "namespace": "{{.Namespace}}", + "inputs": { + "logs-logfile": { + "enabled": true, + "streams": { + "log.logs": { + "enabled": true, + "vars": { + "paths": [ + "{{.LogFilePath}}" + ], + "data_stream.dataset": "{{.Dataset}}" + } + } + } + } + } +}` + +var standalonePolicy = ` +outputs: + default: + type: elasticsearch + hosts: + - "{{.ESHost}}:443" + username: "{{.ESUsername}}" + password: "{{.ESPassword}}" + +inputs: + - type: filestream + id: elastic-agent-input-id + streams: + - id: filestream-input-id-1 + data_stream: + dataset: "{{.Dataset}}" + data_stream.namespace: "{{.Namespace}}" + data_stream.type: "{{.Type}}" + paths: + - {{.LogFilePath}} + +agent.monitoring: + enabled: true + logs: true + metrics: true +` diff --git a/testing/integration/monitoring_logs_test.go b/testing/integration/monitoring_logs_test.go index 97836c7ff3f..9e9ee99e41e 100644 --- a/testing/integration/monitoring_logs_test.go +++ b/testing/integration/monitoring_logs_test.go @@ -85,7 +85,7 @@ func TestMonitoringLogsShipped(t *testing.T) { // Stage 3: Make sure metricbeat logs are populated t.Log("Making sure metricbeat logs are populated") docs := findESDocs(t, func() (estools.Documents, error) { - return estools.GetLogsForDatastream(info.ESClient, "elastic_agent.metricbeat") + return estools.GetLogsForDataset(info.ESClient, "elastic_agent.metricbeat") }) require.NotZero(t, len(docs.Hits.Hits)) t.Logf("metricbeat: Got %d documents", len(docs.Hits.Hits))