From 5ffd30a211eb5c6ab524f28903e0f7f17f84d4cf Mon Sep 17 00:00:00 2001 From: Jeff Cantrill Date: Wed, 4 Sep 2024 15:47:58 -0400 Subject: [PATCH] make journal work --- .../vector/filter/openshift/viaq/journal.go | 23 ++----- internal/generator/vector/input/audit.go | 6 +- internal/generator/vector/input/internal.go | 11 ++-- internal/generator/vector/input/journal.go | 1 + .../generator/vector/input/source_test.go | 2 +- .../output/common/template/template_test.go | 2 +- internal/generator/vector/pipeline/adapter.go | 41 ++---------- .../framework/functional/message_templates.go | 20 +++--- .../loglevel/journal_logs_test.go | 4 +- ...test.go => viaq_audit_logs_format_test.go} | 0 ....go => viaq_container_logs_format_test.go} | 0 .../normalization/viaq_journal_logs_test.go | 53 +++++++++++++++ test/helpers/types/type_journal.go | 66 +++++++++++++++++++ test/helpers/types/types.go | 39 +---------- 14 files changed, 158 insertions(+), 110 deletions(-) rename test/functional/normalization/{audit_logs_format_test.go => viaq_audit_logs_format_test.go} (100%) rename test/functional/normalization/{message_format_test.go => viaq_container_logs_format_test.go} (100%) create mode 100644 test/functional/normalization/viaq_journal_logs_test.go create mode 100644 test/helpers/types/type_journal.go diff --git a/internal/generator/vector/filter/openshift/viaq/journal.go b/internal/generator/vector/filter/openshift/viaq/journal.go index 43b5495036..c708d20e9b 100644 --- a/internal/generator/vector/filter/openshift/viaq/journal.go +++ b/internal/generator/vector/filter/openshift/viaq/journal.go @@ -11,9 +11,9 @@ import ( ) const ( - AddHostName = `.hostname = del(.internal.host)` + AddHostName = `.hostname = del(._internal.host)` AddJournalLogTag = `.tag = ".journal.system"` - AddTime = `.time = format_timestamp!(.internal.timestamp, format: "%FT%T%:z")` + AddTime = `.time = ._internal."@timestamp"` FixJournalLogLevel = ` if ._internal.PRIORITY == "8" || ._internal.PRIORITY == 8 { @@ -30,18 +30,8 @@ if ._internal.PRIORITY == "8" || ._internal.PRIORITY == 8 { } } ` - DeleteJournalLogFields = ` -del(.source_type) -del(._CPU_USAGE_NSEC) -del(.__REALTIME_TIMESTAMP) -del(.__MONOTONIC_TIMESTAMP) -del(._SOURCE_REALTIME_TIMESTAMP) -del(.JOB_RESULT) -del(.JOB_TYPE) -del(.TIMESTAMP_BOOTTIME) -del(.TIMESTAMP_MONOTONIC) -` - SystemK = ` + SetJournalMessage = `if exists(._internal.MESSAGE) {._internal.message = del(._internal.MESSAGE)}` + SystemK = ` # systemd’s kernel-specific metadata. # .systemd.k = {} if exists(._internal.KERNEL_DEVICE) { ._internal.systemd.k.KERNEL_DEVICE = del(._internal.KERNEL_DEVICE) } @@ -100,7 +90,7 @@ func NewJournal(id string, inputs ...string) framework.Element { func journalLogs() string { return fmt.Sprintf(` -if .log_source == "%s" { +if ._internal.log_source == "%s" { %s } `, obs.InfrastructureSourceNode, journalLogsVRL()) @@ -113,6 +103,7 @@ func journalLogsVRL() string { AddHostName, AddTime, `.systemd = ._internal.systemd`, + SetMessageOnRoot, }), "\n\n") } @@ -120,6 +111,6 @@ func DropJournalDebugLogs(id string, inputs ...string) framework.Element { return Filter{ ComponentID: id, Inputs: helpers.MakeInputs(inputs...), - Condition: `(.internal.log_source == "node" && .internal.PRIORITY != "7" && .internal.PRIORITY != 7) || .internal.log_source == "container" || .internal.log_type == "audit"`, + Condition: `(._internal.log_source == "node" && ._internal.PRIORITY != "7" && ._internal.PRIORITY != 7) || ._internal.log_source == "container" || ._internal.log_type == "audit"`, } } diff --git a/internal/generator/vector/input/audit.go b/internal/generator/vector/input/audit.go index 880b1f78f5..8279ac5145 100644 --- a/internal/generator/vector/input/audit.go +++ b/internal/generator/vector/input/audit.go @@ -23,7 +23,7 @@ func NewK8sAuditSource(input obs.InputSpec, op generator.Options) ([]generator.E metaID := helpers.MakeID(id, "meta") el := []generator.Element{ sources.NewK8sAuditLog(id), - NewInternalNormalization(metaID, obs.AuditSourceKube, obs.InputTypeAudit, id, ParseStructured), + NewInternalNormalization(metaID, obs.AuditSourceKube, obs.InputTypeAudit, id, parseStructured), } return el, []string{metaID} } @@ -33,7 +33,7 @@ func NewOpenshiftAuditSource(input obs.InputSpec, op generator.Options) ([]gener metaID := helpers.MakeID(id, "meta") el := []generator.Element{ sources.NewOpenshiftAuditLog(id), - NewInternalNormalization(metaID, obs.AuditSourceOpenShift, obs.InputTypeAudit, id, ParseStructured), + NewInternalNormalization(metaID, obs.AuditSourceOpenShift, obs.InputTypeAudit, id, parseStructured), } return el, []string{metaID} } @@ -43,7 +43,7 @@ func NewOVNAuditSource(input obs.InputSpec, op generator.Options) ([]generator.E metaID := helpers.MakeID(id, "meta") el := []generator.Element{ sources.NewOVNAuditLog(id), - NewInternalNormalization(metaID, obs.AuditSourceOVN, obs.InputTypeAudit, id, ParseStructured), + NewInternalNormalization(metaID, obs.AuditSourceOVN, obs.InputTypeAudit, id, parseStructured), } return el, []string{metaID} } diff --git a/internal/generator/vector/input/internal.go b/internal/generator/vector/input/internal.go index 4d327d5bdd..ea0ec42102 100644 --- a/internal/generator/vector/input/internal.go +++ b/internal/generator/vector/input/internal.go @@ -14,14 +14,15 @@ const ( ._internal.log_source = %q ._internal.log_type = %q ` - ParseStructured = `._internal.structured = parse_json!(string!(._internal.message))` - + parseStructured = `._internal.structured = parse_json!(string!(._internal.message))` + + setClusterID = `._internal.openshift.cluster_id = "${OPENSHIFT_CLUSTER_ID:-}"` setEnvelope = `. = {"_internal": .}` setHostName = `._internal.hostname = get_env_var("VECTOR_SELF_NODE_NAME") ?? ""` - setClusterID = `._internal.openshift.cluster_id = "${OPENSHIFT_CLUSTER_ID:-}"` setTimestampField = `ts = del(._internal.timestamp); if !exists(._internal."@timestamp") {._internal."@timestamp" = ts}` ) +// NewInternalNormalization returns configuration elements to normalize log entries to an internal, common data model func NewInternalNormalization(id string, logSource, logType interface{}, inputs string, addVRLs ...string) framework.Element { vrls := []string{ setEnvelope, @@ -31,9 +32,7 @@ func NewInternalNormalization(id string, logSource, logType interface{}, inputs setTimestampField, viaq.SetLogLevel, } - for _, vrl := range addVRLs { - vrls = append(vrls, vrl) - } + vrls = append(vrls, addVRLs...) return elements.Remap{ ComponentID: id, Inputs: helpers.MakeInputs(inputs), diff --git a/internal/generator/vector/input/journal.go b/internal/generator/vector/input/journal.go index 9a9b7d447d..786abf6958 100644 --- a/internal/generator/vector/input/journal.go +++ b/internal/generator/vector/input/journal.go @@ -14,6 +14,7 @@ func NewJournalSource(input obs.InputSpec) ([]Element, []string) { el := []Element{ source.NewJournalLog(id), NewInternalNormalization(metaID, string(obs.InfrastructureSourceNode), string(obs.InputTypeInfrastructure), id, + viaq.SetJournalMessage, viaq.SystemK, viaq.SystemT, viaq.SystemU, diff --git a/internal/generator/vector/input/source_test.go b/internal/generator/vector/input/source_test.go index dac7bd194b..439d1442e5 100644 --- a/internal/generator/vector/input/source_test.go +++ b/internal/generator/vector/input/source_test.go @@ -293,7 +293,7 @@ var _ = Describe("inputs", func() { }, "audit_host.toml", ), - FEntry("with an audit input for kube logs should generate kube audit file source", obs.InputSpec{ + Entry("with an audit input for kube logs should generate kube audit file source", obs.InputSpec{ Name: "myaudit", Type: obs.InputTypeAudit, Audit: &obs.Audit{ diff --git a/internal/generator/vector/output/common/template/template_test.go b/internal/generator/vector/output/common/template/template_test.go index 578fd668f4..170e1f6ac5 100644 --- a/internal/generator/vector/output/common/template/template_test.go +++ b/internal/generator/vector/output/common/template/template_test.go @@ -11,7 +11,7 @@ var _ = Describe("Vector Output Template", func() { DescribeTable("transforms template syntax to VRL compatible string", func(expVRL, template string) { Expect(TransformUserTemplateToVRL(template)).To(EqualTrimLines(expVRL)) }, - FEntry("should transform template with static and dynamic values into VRL compatible syntax", + Entry("should transform template with static and dynamic values into VRL compatible syntax", `"foo-" + to_string!(._internal.log_type||"none") + "." + to_string!(._internal.bar.foo.test||"missing) + "_" + to_string!(._internal.log_type||"none")`, `foo-{.log_type||"none"}.{.bar.foo.test||"missing}_{.log_type||"none"}`), diff --git a/internal/generator/vector/pipeline/adapter.go b/internal/generator/vector/pipeline/adapter.go index 18677b1048..f59033215c 100644 --- a/internal/generator/vector/pipeline/adapter.go +++ b/internal/generator/vector/pipeline/adapter.go @@ -49,8 +49,7 @@ func NewPipeline(index int, p obs.PipelineSpec, inputs map[string]helpers.InputC for name, f := range filters { pipeline.filterMap[name] = *f } - //addPrefilters(pipeline) - addPostfilters(pipeline) + addPostFilters(pipeline) for i, filterName := range pipeline.FilterRefs { pipeline.initFilter(i, filterName) @@ -83,37 +82,11 @@ func NewPipeline(index int, p obs.PipelineSpec, inputs map[string]helpers.InputC return pipeline } -// TODO: add migration to treat like any other -func addPrefilters(p *Pipeline) { - //prefilters := []string{} - //if viaq.HasJournalSource(p.inputSpecs) { - // prefilters = append(prefilters, viaq.ViaqJournal) - // p.filterMap[viaq.ViaqJournal] = filter.InternalFilterSpec{ - // FilterSpec: &obs.FilterSpec{Type: viaq.ViaqJournal}, - // SuppliesTransform: true, - // TranformFactory: func(id string, inputs ...string) framework.Element { - // return viaq.NewJournal(id, inputs...) - // }, - // } - //} - // - //prefilters = append(prefilters, viaq.Viaq) - //p.filterMap[viaq.Viaq] = filter.InternalFilterSpec{ - // FilterSpec: &obs.FilterSpec{Type: viaq.Viaq}, - // SuppliesTransform: true, - // TranformFactory: func(id string, inputs ...string) framework.Element { - // // Build all log_source VRL - // return viaq.New(id, inputs, p.inputSpecs) - // }, - //} - //p.FilterRefs = append(prefilters, p.FilterRefs...) -} - -func addPostfilters(p *Pipeline) { +func addPostFilters(p *Pipeline) { - postfilters := []string{} + var postFilters []string if viaq.HasJournalSource(p.inputSpecs) { - postfilters = append(postfilters, viaq.ViaqJournal) + postFilters = append(postFilters, viaq.ViaqJournal) p.filterMap[viaq.ViaqJournal] = filter.InternalFilterSpec{ FilterSpec: &obs.FilterSpec{Type: viaq.ViaqJournal}, SuppliesTransform: true, @@ -123,7 +96,7 @@ func addPostfilters(p *Pipeline) { } } - postfilters = append(postfilters, viaq.Viaq) + postFilters = append(postFilters, viaq.Viaq) p.filterMap[viaq.Viaq] = filter.InternalFilterSpec{ FilterSpec: &obs.FilterSpec{Type: viaq.Viaq}, SuppliesTransform: true, @@ -133,7 +106,7 @@ func addPostfilters(p *Pipeline) { }, } if viaq.HasContainerSource(p.inputSpecs) { - postfilters = append(postfilters, viaq.ViaqDedot) + postFilters = append(postFilters, viaq.ViaqDedot) p.filterMap[viaq.ViaqDedot] = filter.InternalFilterSpec{ FilterSpec: &obs.FilterSpec{Type: viaq.ViaqDedot}, SuppliesTransform: true, @@ -142,7 +115,7 @@ func addPostfilters(p *Pipeline) { }, } } - p.FilterRefs = append(p.FilterRefs, postfilters...) + p.FilterRefs = append(p.FilterRefs, postFilters...) } func (p *Pipeline) Name() string { diff --git a/test/framework/functional/message_templates.go b/test/framework/functional/message_templates.go index 2c4fbd1132..e1cce4cd50 100644 --- a/test/framework/functional/message_templates.go +++ b/test/framework/functional/message_templates.go @@ -105,14 +105,18 @@ func NewJournalInfrastructureLogTemplate() types.JournalLog { return types.JournalLog{ ViaQCommon: types.ViaQCommon{ - Timestamp: time.Time{}, - Message: "*", - LogSource: "node", - LogType: "infrastructure", - Level: "*", - Hostname: "*", - ViaqMsgID: "**optional**", - PipelineMetadata: TemplateForAnyPipelineMetadata, + Timestamp: time.Time{}, + Message: "*", + LogSource: "node", + LogType: "infrastructure", + Level: "*", + Hostname: "*", + Openshift: types.OpenshiftMeta{ + ClusterID: "*", + Sequence: types.NewOptionalInt(">0"), + }, }, + Tag: ".journal.system", + Time: time.Time{}, } } diff --git a/test/functional/normalization/loglevel/journal_logs_test.go b/test/functional/normalization/loglevel/journal_logs_test.go index 127c461759..0cf92abd00 100644 --- a/test/functional/normalization/loglevel/journal_logs_test.go +++ b/test/functional/normalization/loglevel/journal_logs_test.go @@ -25,8 +25,6 @@ var _ = Describe("[functional][normalization][loglevel] tests for message format framework = functional.NewCollectorFunctionalFramework() testruntime.NewClusterLogForwarderBuilder(framework.Forwarder). FromInput(obs.InputTypeInfrastructure). - ToHttpOutput(). - FromInput(obs.InputTypeAudit). ToElasticSearchOutput() Expect(framework.Deploy()).To(BeNil()) }) @@ -57,7 +55,7 @@ var _ = Describe("[functional][normalization][loglevel] tests for message format outputTestLog := logs[0] Expect(outputTestLog.Level).To(Equal(expLevel)) }, - FEntry("should recognize an emerg message", 0, "emerg"), + Entry("should recognize an emerg message", 0, "emerg"), Entry("should recognize an alert message", 1, "alert"), Entry("should recognize a crit message", 2, "crit"), Entry("should recognize an err message", 3, "err"), diff --git a/test/functional/normalization/audit_logs_format_test.go b/test/functional/normalization/viaq_audit_logs_format_test.go similarity index 100% rename from test/functional/normalization/audit_logs_format_test.go rename to test/functional/normalization/viaq_audit_logs_format_test.go diff --git a/test/functional/normalization/message_format_test.go b/test/functional/normalization/viaq_container_logs_format_test.go similarity index 100% rename from test/functional/normalization/message_format_test.go rename to test/functional/normalization/viaq_container_logs_format_test.go diff --git a/test/functional/normalization/viaq_journal_logs_test.go b/test/functional/normalization/viaq_journal_logs_test.go new file mode 100644 index 0000000000..c84411c48b --- /dev/null +++ b/test/functional/normalization/viaq_journal_logs_test.go @@ -0,0 +1,53 @@ +package normalization + +import ( + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + obs "github.com/openshift/cluster-logging-operator/api/observability/v1" + "github.com/openshift/cluster-logging-operator/internal/utils" + "github.com/openshift/cluster-logging-operator/test/framework/functional" + "github.com/openshift/cluster-logging-operator/test/helpers/types" + . "github.com/openshift/cluster-logging-operator/test/matchers" + testruntime "github.com/openshift/cluster-logging-operator/test/runtime/observability" +) + +var _ = Describe("[functional][normalization] ViaQ message format of journal logs", func() { + var ( + framework *functional.CollectorFunctionalFramework + ) + + BeforeEach(func() { + framework = functional.NewCollectorFunctionalFramework() + testruntime.NewClusterLogForwarderBuilder(framework.Forwarder). + FromInput(obs.InputTypeInfrastructure). + ToElasticSearchOutput() + Expect(framework.Deploy()).To(BeNil()) + }) + AfterEach(func() { + framework.Cleanup() + }) + + It("should format ViaQ journal logs", func() { + + expLog := functional.NewJournalInfrastructureLogTemplate() + + // Write log line as input to collector + logline := functional.NewJournalLog(2, "here is my message", "*") + Expect(framework.WriteMessagesToInfraJournalLog(logline, 1)).To(BeNil()) + + // Read line from Log Forward output + raw, err := framework.ReadInfrastructureLogsFrom(string(obs.OutputTypeElasticsearch)) + Expect(err).To(BeNil(), "Expected no errors reading the logs") + + // Parse log line + var logs []types.JournalLog + err = types.StrictlyParseLogs(utils.ToJsonLogs(raw), &logs) + Expect(err).To(BeNil(), "Expected no errors parsing the logs") + + // Compare to expected template + outputTestLog := logs[0] + Expect(outputTestLog.ViaQCommon).To(FitLogFormatTemplate(expLog.ViaQCommon)) + Expect(outputTestLog.Systemd.T).NotTo(Equal(types.T{}), "Exp. to be populated with something") + Expect(outputTestLog.Systemd.U).NotTo(Equal(types.U{}), "Exp. to be populated with something") + }) +}) diff --git a/test/helpers/types/type_journal.go b/test/helpers/types/type_journal.go new file mode 100644 index 0000000000..1000012c41 --- /dev/null +++ b/test/helpers/types/type_journal.go @@ -0,0 +1,66 @@ +package types + +import "time" + +// JournalLog is linux journal logs +type JournalLog struct { + ViaQCommon `json:",inline,omitempty"` + Tag string `json:"tag,omitempty"` + Time time.Time `json:"time,omitempty"` + STREAMID string `json:"_STREAM_ID,omitempty"` + SYSTEMDINVOCATIONID string `json:"_SYSTEMD_INVOCATION_ID,omitempty"` + Systemd Systemd `json:"systemd,omitempty"` +} + +type K struct { + KERNEL_DEVICE string `json:"KERNEL_DEVICE,omitempty"` + KERNEL_SUBSYSTEM string `json:"KERNEL_SUBSYSTEM,omitempty"` + UDEV_DEVLINK string `json:"UDEV_DEVLINK,omitempty"` + UDEV_DEVNODE string `json:"UDEV_DEVNODE,omitempty"` + UDEV_SYSNAME string `json:"UDEV_SYSNAME,omitempty"` +} + +type T struct { + AUDIT_LOGINUID string `json:"AUDIT_LOGINUID,omitempty"` + AUDIT_SESSION string `json:"AUDIT_SESSION,omitempty"` + BOOTID string `json:"BOOT_ID,omitempty"` + CAPEFFECTIVE string `json:"CAP_EFFECTIVE,omitempty"` + CMDLINE string `json:"CMDLINE,omitempty"` + COMM string `json:"COMM,omitempty"` + EXE string `json:"EXE,omitempty"` + GID string `json:"GID,omitempty"` + HOSTNAME string `json:"HOSTNAME,omitempty"` + LINE_BREAK string `json:"LINE_BREAK,omitempty"` + MACHINEID string `json:"MACHINE_ID,omitempty"` + PID string `json:"PID,omitempty"` + SELINUXCONTEXT string `json:"SELINUX_CONTEXT,omitempty"` + STREAMID string `json:"STREAM_ID,omitempty"` + SYSTEMDCGROUP string `json:"SYSTEMD_CGROUP,omitempty"` + SYSTEMDINVOCATIONID string `json:"SYSTEMD_INVOCATION_ID,omitempty"` + SYSTEMD_OWNER_UID string `json:"SYSTEMD_OWNER_UID,omitempty"` + SYSTEMD_SESSION string `json:"SYSTEMD_SESSION,omitempty"` + SYSTEMDSLICE string `json:"SYSTEMD_SLICE,omitempty"` + SYSTEMDUNIT string `json:"SYSTEMD_UNIT,omitempty"` + SYSTEMD_USER_UNIT string `json:"SYSTEMD_USER_UNIT,omitempty"` + TRANSPORT string `json:"TRANSPORT,omitempty"` + UID string `json:"UID,omitempty"` +} + +type U struct { + CODE_FILE string `json:"CODE_FILE,omitempty"` + CODE_FUNCTION string `json:"CODE_FUNCTION,omitempty"` + CODE_LINE string `json:"CODE_LINE,omitempty"` + ERRNO string `json:"ERRNO,omitempty"` + MESSAGE_ID string `json:"MESSAGE_ID,omitempty"` + SYSLOG_FACILITY string `json:"SYSLOG_FACILITY,omitempty"` + SYSLOGIDENTIFIER string `json:"SYSLOG_IDENTIFIER,omitempty"` + SYSLOG_PID string `json:"SYSLOG_PID,omitempty"` + RESULT string `json:"RESULT,omitempty"` + UNIT string `json:"UNIT,omitempty"` +} + +type Systemd struct { + K K `json:"k,omitempty"` + T T `json:"t,omitempty"` + U U `json:"u,omitempty"` +} diff --git a/test/helpers/types/types.go b/test/helpers/types/types.go index 26f9f0dfe2..addac6df5f 100644 --- a/test/helpers/types/types.go +++ b/test/helpers/types/types.go @@ -165,14 +165,13 @@ type OpenshiftMeta struct { Sequence OptionalInt `json:"sequence,omitempty"` } -// Application Logs are container logs from all namespaces except "openshift" and "openshift-*" namespaces +// ApplicationLog are container logs from all namespaces except "openshift" and "openshift-*" namespaces type ApplicationLog ContainerLog // Infrastructure logs are // - Journal logs // - logs from "openshift" and "openshift-*" namespaces -// InfraContainerLog // InfraContainerLog logs are container logs from "openshift" and "openshift-*" namespaces type InfraContainerLog ContainerLog @@ -267,42 +266,6 @@ type ViaQCommon struct { Openshift OpenshiftMeta `json:"openshift,omitempty"` } -// JournalLog is linux journal logs -type JournalLog struct { - ViaQCommon `json:",inline,omitempty"` - STREAMID string `json:"_STREAM_ID,omitempty"` - SYSTEMDINVOCATIONID string `json:"_SYSTEMD_INVOCATION_ID,omitempty"` - Systemd Systemd `json:"systemd,omitempty"` -} - -type T struct { - BOOTID string `json:"BOOT_ID,omitempty"` - CAPEFFECTIVE string `json:"CAP_EFFECTIVE,omitempty"` - CMDLINE string `json:"CMDLINE,omitempty"` - COMM string `json:"COMM,omitempty"` - EXE string `json:"EXE,omitempty"` - GID string `json:"GID,omitempty"` - MACHINEID string `json:"MACHINE_ID,omitempty"` - PID string `json:"PID,omitempty"` - SELINUXCONTEXT string `json:"SELINUX_CONTEXT,omitempty"` - STREAMID string `json:"STREAM_ID,omitempty"` - SYSTEMDCGROUP string `json:"SYSTEMD_CGROUP,omitempty"` - SYSTEMDINVOCATIONID string `json:"SYSTEMD_INVOCATION_ID,omitempty"` - SYSTEMDSLICE string `json:"SYSTEMD_SLICE,omitempty"` - SYSTEMDUNIT string `json:"SYSTEMD_UNIT,omitempty"` - TRANSPORT string `json:"TRANSPORT,omitempty"` - UID string `json:"UID,omitempty"` -} - -type U struct { - SYSLOGIDENTIFIER string `json:"SYSLOG_IDENTIFIER,omitempty"` -} - -type Systemd struct { - T T `json:"t,omitempty"` - U U `json:"u,omitempty"` -} - // InfraLog is union of JournalLog and InfraContainerLog type InfraLog struct { Docker Docker `json:"docker,omitempty"`