From 7a667daf4fcd6721c4c94133176de9b21ac7e24d Mon Sep 17 00:00:00 2001 From: Andrew Gizas Date: Fri, 6 Oct 2023 20:36:48 +0300 Subject: [PATCH] Support Hints per container input (#3416) * Adding updates for code to introduces hints per container * Splitting functions for hints and processors * Adding test for specific container hints --- .../1694692246-hintspercontainer.yaml | 32 +++++ .../composable/providers/kubernetes/hints.go | 70 +++++++++- .../providers/kubernetes/hints_test.go | 132 +++++++++++++++++- 3 files changed, 221 insertions(+), 13 deletions(-) create mode 100644 changelog/fragments/1694692246-hintspercontainer.yaml diff --git a/changelog/fragments/1694692246-hintspercontainer.yaml b/changelog/fragments/1694692246-hintspercontainer.yaml new file mode 100644 index 00000000000..7defa8adea3 --- /dev/null +++ b/changelog/fragments/1694692246-hintspercontainer.yaml @@ -0,0 +1,32 @@ +# Kind can be one of: +# - breaking-change: a change to previously-documented behavior +# - deprecation: functionality that is being removed in a later release +# - bug-fix: fixes a problem in a previous version +# - enhancement: extends functionality but does not break or fix existing behavior +# - feature: new functionality +# - known-issue: problems that we are aware of in a given version +# - security: impacts on the security of a product or a user’s deployment. +# - upgrade: important information for someone upgrading from a prior version +# - other: does not fit into any of the other categories +kind: enhancement + +# Change summary; a 80ish characters long description of the change. +summary: Hints Autodiscovery for Elastic Agent - Define configuration through annotations for specific containers inside a pod + +# Long description; in case the summary is not enough to describe the change +# this field accommodate a description without length limits. +# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment. +#description: + +# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc. +component: + +# PR URL; optional; the PR number that added the changeset. +# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added. +# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number. +# Please provide it if you are adding a fragment for a different PR. +#pr: https://github.com/owner/repo/1234 + +# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of). +# If not present is automatically filled by the tooling with the issue linked to the PR number. +#issue: https://github.com/owner/repo/1234 diff --git a/internal/pkg/composable/providers/kubernetes/hints.go b/internal/pkg/composable/providers/kubernetes/hints.go index 751bab078be..0a439afd5da 100644 --- a/internal/pkg/composable/providers/kubernetes/hints.go +++ b/internal/pkg/composable/providers/kubernetes/hints.go @@ -256,19 +256,77 @@ func GetHintsMapping(k8sMapping map[string]interface{}, logger *logp.Logger, pre composableMapping: mapstr.M{}, processors: []mapstr.M{}, } + var hints mapstr.M + var containerProcessors []mapstr.M if ann, ok := k8sMapping["annotations"]; ok { annotations, _ := ann.(mapstr.M) - hints := utils.GenerateHints(annotations, "", prefix) + + if containerEntries, err := annotations.GetValue(prefix + ".hints"); err == nil { + entries, ok := containerEntries.(mapstr.M) + if ok && len(entries) > 0 { + for key := range entries { + parts := strings.Split(key, "/") + + if len(parts) > 1 { + if con, ok := k8sMapping["container"]; ok { + containers, ok := con.(mapstr.M) + if ok { + if cname, err := containers.GetValue("name"); err == nil { + if parts[0] == cname { + // If there are hints like co.elastic.hints./ then add the values after the / to the corresponding container + // Eg Annotation "co.elastic.hints.nginx/stream: stderr" will create a hints entry for container nginx + hints, containerProcessors = GenerateHintsForContainer(annotations, parts[0], prefix) + } + } + } + } + } + } + } + } else { + // If there are top level hints like co.elastic.hints/ then just add the values after the / + // Eg. Annotation "co.elastic.hints/stream: stderr" will will create a hints entries for all containers in the pod + hints = utils.GenerateHints(annotations, "", prefix) + } + logger.Debugf("Extracted hints are :%v", hints) + if len(hints) > 0 { - logger.Debugf("Extracted hints are :%v", hints) - hintData.composableMapping = GenerateHintsMapping(hints, k8sMapping, logger, cID) - logger.Debugf("Generated hints mappings are :%v", hintData.composableMapping) + hintData = GenerateHintsResult(hints, k8sMapping, annotations, logger, prefix, cID) - hintData.processors = utils.GetConfigs(annotations, prefix, processorhints) - logger.Debugf("Generated Processors are :%v", hintData.processors) + // Only if there are processors defined in a specific container we append them to the processors of the top level + if len(containerProcessors) > 0 { + hintData.processors = append(hintData.processors, containerProcessors...) + } + logger.Debugf("Generated Processors mapping :%v", hintData.processors) } + } + + return hintData +} + +// Generates hints and processors list for specific containers +func GenerateHintsForContainer(annotations mapstr.M, parts, prefix string) (mapstr.M, []mapstr.M) { + hints := utils.GenerateHints(annotations, parts, prefix) + // Processors for specific container + // We need to make an extra check if we have processors added only to the specific containers + containerProcessors := utils.GetConfigs(annotations, prefix, "hints."+parts+"/processors") + return hints, containerProcessors +} + +// Generates the final hintData (hints and processors) struct that will be emitted in pods. +func GenerateHintsResult(hints mapstr.M, k8sMapping map[string]interface{}, annotations mapstr.M, logger *logp.Logger, prefix, cID string) hintsData { + hintData := hintsData{ + composableMapping: mapstr.M{}, + processors: []mapstr.M{}, } + + hintData.composableMapping = GenerateHintsMapping(hints, k8sMapping, logger, cID) + logger.Debugf("Generated hints mappings :%v", hintData.composableMapping) + + // Eg co.elastic.hints/processors.decode_json_fields.fields: "message" will add a processor in all containers of pod + hintData.processors = utils.GetConfigs(annotations, prefix, processorhints) + return hintData } diff --git a/internal/pkg/composable/providers/kubernetes/hints_test.go b/internal/pkg/composable/providers/kubernetes/hints_test.go index 912b01b123a..17e36b0d7c7 100644 --- a/internal/pkg/composable/providers/kubernetes/hints_test.go +++ b/internal/pkg/composable/providers/kubernetes/hints_test.go @@ -433,11 +433,10 @@ func TestGenerateHintsMappingWithProcessors(t *testing.T) { }, }, }, - }, - }, + }}, } - expected_hints := mapstr.M{ + expectedhints := mapstr.M{ "container_id": "asdfghjkl", "apache": mapstr.M{ "container_logs": mapstr.M{ @@ -447,7 +446,7 @@ func TestGenerateHintsMappingWithProcessors(t *testing.T) { }, } - expected_procesors := []mapstr.M{ + expectedprocesors := []mapstr.M{ 0: { "rename": mapstr.M{ "fail_on_error": "false", @@ -467,9 +466,128 @@ func TestGenerateHintsMappingWithProcessors(t *testing.T) { hintData := GetHintsMapping(mapping, logger, "co.elastic", "asdfghjkl") - assert.Equal(t, expected_hints, hintData.composableMapping) + assert.Equal(t, expectedhints, hintData.composableMapping) //assert.Equal(t, expected_procesors, hintData.processors). We replace this assertion with assert.Contains in order to avoid flakiness in tests because map keys are not sorted - assert.Contains(t, expected_procesors, hintData.processors[0]) - assert.Contains(t, expected_procesors, hintData.processors[1]) + if len(hintData.processors) > 0 { + assert.Contains(t, expectedprocesors, hintData.processors[0]) + assert.Contains(t, expectedprocesors, hintData.processors[1]) + } +} + +// This test evaluates the hints Generation when you define specific container nginx +// Following will need to include all annotations after top level "co.elastic.hints/" plus those that defined for nginx with prefix "co.elastic.hints.nginx" +// mappings.container.name = nginx defines the container we want to emmit the new configuration. Annotations for other containers like co.elastic.hints.webapp should be excluded +func TestGenerateHintsMappingWithProcessorsForContainer(t *testing.T) { + logger := getLogger() + // pod := &kubernetes.Pod{ + // ObjectMeta: metav1.ObjectMeta{ + // Name: "testpod", + // UID: types.UID(uid), + // Namespace: "testns", + // Labels: map[string]string{ + // "foo": "bar", + // "with-dash": "dash-value", + // "with/slash": "some/path", + // }, + // Annotations: map[string]string{ + // "app": "production", + // "co.elastic.hints/package": "apache", + // "co.elastic.hints/processors.decode_json_fields.fields": "message", + // "co.elastic.hints/processors.decode_json_fields.add_error_key": "true", + // "co.elastic.hints/processors.decode_json_fields.overwrite_keys": "true", + // "co.elastic.hints/processors.decode_json_fields.target": "team", + // "co.elastic.hints.nginx/stream": "stderr", + // "co.elastic.hints.nginx/processors.add_fields.fields.name": "myproject", + // "co.elastic.hints.webapp/processors.add_fields.fields.name": "myproject2", + // }, + // }, + // TypeMeta: metav1.TypeMeta{ + // Kind: "Pod", + // APIVersion: "v1", + // }, + // Spec: kubernetes.PodSpec{ + // NodeName: "testnode", + // }, + // Status: kubernetes.PodStatus{PodIP: "127.0.0.5"}, + // } + + mapping := map[string]interface{}{ + "namespace": "testns", + "pod": mapstr.M{ + "uid": string(types.UID(uid)), + "name": "testpod", + "ip": "127.0.0.5", + }, + "namespace_annotations": mapstr.M{ + "nsa": "nsb", + }, + "labels": mapstr.M{ + "foo": "bar", + "with-dash": "dash-value", + "with/slash": "some/path", + }, + "container": mapstr.M{ + "name": "nginx", + "id": "8863418215f5d6b1919db9b3b710615878f88b0773e2b098e714c8d696c3261f", + }, + "annotations": mapstr.M{ + "app": "production", + "co": mapstr.M{ + "elastic": mapstr.M{ + "hints/package": "apache", + "hints/processors": mapstr.M{ + "decode_json_fields": mapstr.M{ + "fields": "message", + "add_error_key": "true", + "overwrite_keys": "true", + "target": "team", + }}, + "hints": mapstr.M{ + "nginx/processors": mapstr.M{ + "add_fields": mapstr.M{ + "name": "myproject", + }, + }, + "nginx/stream": "stderr", + }, + }, + }, + }, + } + + expectedhints := mapstr.M{ + "container_id": "asdfghjkl", + "apache": mapstr.M{ + "container_logs": mapstr.M{ + "enabled": true, + }, + "stream": "stderr", + "enabled": true, + }, + } + expectedprocesors := []mapstr.M{ + 0: { + "decode_json_fields": mapstr.M{ + "fields": "message", + "add_error_key": "true", + "overwrite_keys": "true", + "target": "team", + }, + }, + 1: { + "add_fields": mapstr.M{ + "name": "myproject", + }, + }, + } + + hintData := GetHintsMapping(mapping, logger, "co.elastic", "asdfghjkl") + + assert.Equal(t, expectedhints, hintData.composableMapping) + //assert.Equal(t, expected_procesors, hintData.processors). We replace this assertion with assert.Contains in order to avoid flakiness in tests because map keys are not sorted + if len(hintData.processors) > 0 { + assert.Contains(t, expectedprocesors, hintData.processors[0]) + assert.Contains(t, expectedprocesors, hintData.processors[1]) + } }