Skip to content

Commit

Permalink
make journal work
Browse files Browse the repository at this point in the history
  • Loading branch information
jcantrill committed Sep 4, 2024
1 parent 6fa1045 commit 5ffd30a
Show file tree
Hide file tree
Showing 14 changed files with 158 additions and 110 deletions.
23 changes: 7 additions & 16 deletions internal/generator/vector/filter/openshift/viaq/journal.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ import (
)

const (
AddHostName = `.hostname = del(.internal.host)`
AddHostName = `.hostname = del(._internal.host)`
AddJournalLogTag = `.tag = ".journal.system"`
AddTime = `.time = format_timestamp!(.internal.timestamp, format: "%FT%T%:z")`
AddTime = `.time = ._internal."@timestamp"`

FixJournalLogLevel = `
if ._internal.PRIORITY == "8" || ._internal.PRIORITY == 8 {
Expand All @@ -30,18 +30,8 @@ if ._internal.PRIORITY == "8" || ._internal.PRIORITY == 8 {
}
}
`
DeleteJournalLogFields = `
del(.source_type)
del(._CPU_USAGE_NSEC)
del(.__REALTIME_TIMESTAMP)
del(.__MONOTONIC_TIMESTAMP)
del(._SOURCE_REALTIME_TIMESTAMP)
del(.JOB_RESULT)
del(.JOB_TYPE)
del(.TIMESTAMP_BOOTTIME)
del(.TIMESTAMP_MONOTONIC)
`
SystemK = `
SetJournalMessage = `if exists(._internal.MESSAGE) {._internal.message = del(._internal.MESSAGE)}`
SystemK = `
# systemd’s kernel-specific metadata.
# .systemd.k = {}
if exists(._internal.KERNEL_DEVICE) { ._internal.systemd.k.KERNEL_DEVICE = del(._internal.KERNEL_DEVICE) }
Expand Down Expand Up @@ -100,7 +90,7 @@ func NewJournal(id string, inputs ...string) framework.Element {

func journalLogs() string {
return fmt.Sprintf(`
if .log_source == "%s" {
if ._internal.log_source == "%s" {
%s
}
`, obs.InfrastructureSourceNode, journalLogsVRL())
Expand All @@ -113,13 +103,14 @@ func journalLogsVRL() string {
AddHostName,
AddTime,
`.systemd = ._internal.systemd`,
SetMessageOnRoot,
}), "\n\n")
}

func DropJournalDebugLogs(id string, inputs ...string) framework.Element {
return Filter{
ComponentID: id,
Inputs: helpers.MakeInputs(inputs...),
Condition: `(.internal.log_source == "node" && .internal.PRIORITY != "7" && .internal.PRIORITY != 7) || .internal.log_source == "container" || .internal.log_type == "audit"`,
Condition: `(._internal.log_source == "node" && ._internal.PRIORITY != "7" && ._internal.PRIORITY != 7) || ._internal.log_source == "container" || ._internal.log_type == "audit"`,
}
}
6 changes: 3 additions & 3 deletions internal/generator/vector/input/audit.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func NewK8sAuditSource(input obs.InputSpec, op generator.Options) ([]generator.E
metaID := helpers.MakeID(id, "meta")
el := []generator.Element{
sources.NewK8sAuditLog(id),
NewInternalNormalization(metaID, obs.AuditSourceKube, obs.InputTypeAudit, id, ParseStructured),
NewInternalNormalization(metaID, obs.AuditSourceKube, obs.InputTypeAudit, id, parseStructured),
}
return el, []string{metaID}
}
Expand All @@ -33,7 +33,7 @@ func NewOpenshiftAuditSource(input obs.InputSpec, op generator.Options) ([]gener
metaID := helpers.MakeID(id, "meta")
el := []generator.Element{
sources.NewOpenshiftAuditLog(id),
NewInternalNormalization(metaID, obs.AuditSourceOpenShift, obs.InputTypeAudit, id, ParseStructured),
NewInternalNormalization(metaID, obs.AuditSourceOpenShift, obs.InputTypeAudit, id, parseStructured),
}
return el, []string{metaID}
}
Expand All @@ -43,7 +43,7 @@ func NewOVNAuditSource(input obs.InputSpec, op generator.Options) ([]generator.E
metaID := helpers.MakeID(id, "meta")
el := []generator.Element{
sources.NewOVNAuditLog(id),
NewInternalNormalization(metaID, obs.AuditSourceOVN, obs.InputTypeAudit, id, ParseStructured),
NewInternalNormalization(metaID, obs.AuditSourceOVN, obs.InputTypeAudit, id, parseStructured),
}
return el, []string{metaID}
}
11 changes: 5 additions & 6 deletions internal/generator/vector/input/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,15 @@ const (
._internal.log_source = %q
._internal.log_type = %q
`
ParseStructured = `._internal.structured = parse_json!(string!(._internal.message))`

parseStructured = `._internal.structured = parse_json!(string!(._internal.message))`

setClusterID = `._internal.openshift.cluster_id = "${OPENSHIFT_CLUSTER_ID:-}"`
setEnvelope = `. = {"_internal": .}`
setHostName = `._internal.hostname = get_env_var("VECTOR_SELF_NODE_NAME") ?? ""`
setClusterID = `._internal.openshift.cluster_id = "${OPENSHIFT_CLUSTER_ID:-}"`
setTimestampField = `ts = del(._internal.timestamp); if !exists(._internal."@timestamp") {._internal."@timestamp" = ts}`
)

// NewInternalNormalization returns configuration elements to normalize log entries to an internal, common data model
func NewInternalNormalization(id string, logSource, logType interface{}, inputs string, addVRLs ...string) framework.Element {
vrls := []string{
setEnvelope,
Expand All @@ -31,9 +32,7 @@ func NewInternalNormalization(id string, logSource, logType interface{}, inputs
setTimestampField,
viaq.SetLogLevel,
}
for _, vrl := range addVRLs {
vrls = append(vrls, vrl)
}
vrls = append(vrls, addVRLs...)
return elements.Remap{
ComponentID: id,
Inputs: helpers.MakeInputs(inputs),
Expand Down
1 change: 1 addition & 0 deletions internal/generator/vector/input/journal.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ func NewJournalSource(input obs.InputSpec) ([]Element, []string) {
el := []Element{
source.NewJournalLog(id),
NewInternalNormalization(metaID, string(obs.InfrastructureSourceNode), string(obs.InputTypeInfrastructure), id,
viaq.SetJournalMessage,
viaq.SystemK,
viaq.SystemT,
viaq.SystemU,
Expand Down
2 changes: 1 addition & 1 deletion internal/generator/vector/input/source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ var _ = Describe("inputs", func() {
},
"audit_host.toml",
),
FEntry("with an audit input for kube logs should generate kube audit file source", obs.InputSpec{
Entry("with an audit input for kube logs should generate kube audit file source", obs.InputSpec{
Name: "myaudit",
Type: obs.InputTypeAudit,
Audit: &obs.Audit{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ var _ = Describe("Vector Output Template", func() {
DescribeTable("transforms template syntax to VRL compatible string", func(expVRL, template string) {
Expect(TransformUserTemplateToVRL(template)).To(EqualTrimLines(expVRL))
},
FEntry("should transform template with static and dynamic values into VRL compatible syntax",
Entry("should transform template with static and dynamic values into VRL compatible syntax",
`"foo-" + to_string!(._internal.log_type||"none") + "." + to_string!(._internal.bar.foo.test||"missing) + "_" + to_string!(._internal.log_type||"none")`,
`foo-{.log_type||"none"}.{.bar.foo.test||"missing}_{.log_type||"none"}`),

Expand Down
41 changes: 7 additions & 34 deletions internal/generator/vector/pipeline/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,7 @@ func NewPipeline(index int, p obs.PipelineSpec, inputs map[string]helpers.InputC
for name, f := range filters {
pipeline.filterMap[name] = *f
}
//addPrefilters(pipeline)
addPostfilters(pipeline)
addPostFilters(pipeline)

for i, filterName := range pipeline.FilterRefs {
pipeline.initFilter(i, filterName)
Expand Down Expand Up @@ -83,37 +82,11 @@ func NewPipeline(index int, p obs.PipelineSpec, inputs map[string]helpers.InputC
return pipeline
}

// TODO: add migration to treat like any other
func addPrefilters(p *Pipeline) {
//prefilters := []string{}
//if viaq.HasJournalSource(p.inputSpecs) {
// prefilters = append(prefilters, viaq.ViaqJournal)
// p.filterMap[viaq.ViaqJournal] = filter.InternalFilterSpec{
// FilterSpec: &obs.FilterSpec{Type: viaq.ViaqJournal},
// SuppliesTransform: true,
// TranformFactory: func(id string, inputs ...string) framework.Element {
// return viaq.NewJournal(id, inputs...)
// },
// }
//}
//
//prefilters = append(prefilters, viaq.Viaq)
//p.filterMap[viaq.Viaq] = filter.InternalFilterSpec{
// FilterSpec: &obs.FilterSpec{Type: viaq.Viaq},
// SuppliesTransform: true,
// TranformFactory: func(id string, inputs ...string) framework.Element {
// // Build all log_source VRL
// return viaq.New(id, inputs, p.inputSpecs)
// },
//}
//p.FilterRefs = append(prefilters, p.FilterRefs...)
}

func addPostfilters(p *Pipeline) {
func addPostFilters(p *Pipeline) {

postfilters := []string{}
var postFilters []string
if viaq.HasJournalSource(p.inputSpecs) {
postfilters = append(postfilters, viaq.ViaqJournal)
postFilters = append(postFilters, viaq.ViaqJournal)
p.filterMap[viaq.ViaqJournal] = filter.InternalFilterSpec{
FilterSpec: &obs.FilterSpec{Type: viaq.ViaqJournal},
SuppliesTransform: true,
Expand All @@ -123,7 +96,7 @@ func addPostfilters(p *Pipeline) {
}
}

postfilters = append(postfilters, viaq.Viaq)
postFilters = append(postFilters, viaq.Viaq)
p.filterMap[viaq.Viaq] = filter.InternalFilterSpec{
FilterSpec: &obs.FilterSpec{Type: viaq.Viaq},
SuppliesTransform: true,
Expand All @@ -133,7 +106,7 @@ func addPostfilters(p *Pipeline) {
},
}
if viaq.HasContainerSource(p.inputSpecs) {
postfilters = append(postfilters, viaq.ViaqDedot)
postFilters = append(postFilters, viaq.ViaqDedot)
p.filterMap[viaq.ViaqDedot] = filter.InternalFilterSpec{
FilterSpec: &obs.FilterSpec{Type: viaq.ViaqDedot},
SuppliesTransform: true,
Expand All @@ -142,7 +115,7 @@ func addPostfilters(p *Pipeline) {
},
}
}
p.FilterRefs = append(p.FilterRefs, postfilters...)
p.FilterRefs = append(p.FilterRefs, postFilters...)
}

func (p *Pipeline) Name() string {
Expand Down
20 changes: 12 additions & 8 deletions test/framework/functional/message_templates.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,14 +105,18 @@ func NewJournalInfrastructureLogTemplate() types.JournalLog {
return types.JournalLog{
ViaQCommon: types.ViaQCommon{

Timestamp: time.Time{},
Message: "*",
LogSource: "node",
LogType: "infrastructure",
Level: "*",
Hostname: "*",
ViaqMsgID: "**optional**",
PipelineMetadata: TemplateForAnyPipelineMetadata,
Timestamp: time.Time{},
Message: "*",
LogSource: "node",
LogType: "infrastructure",
Level: "*",
Hostname: "*",
Openshift: types.OpenshiftMeta{
ClusterID: "*",
Sequence: types.NewOptionalInt(">0"),
},
},
Tag: ".journal.system",
Time: time.Time{},
}
}
4 changes: 1 addition & 3 deletions test/functional/normalization/loglevel/journal_logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ var _ = Describe("[functional][normalization][loglevel] tests for message format
framework = functional.NewCollectorFunctionalFramework()
testruntime.NewClusterLogForwarderBuilder(framework.Forwarder).
FromInput(obs.InputTypeInfrastructure).
ToHttpOutput().
FromInput(obs.InputTypeAudit).
ToElasticSearchOutput()
Expect(framework.Deploy()).To(BeNil())
})
Expand Down Expand Up @@ -57,7 +55,7 @@ var _ = Describe("[functional][normalization][loglevel] tests for message format
outputTestLog := logs[0]
Expect(outputTestLog.Level).To(Equal(expLevel))
},
FEntry("should recognize an emerg message", 0, "emerg"),
Entry("should recognize an emerg message", 0, "emerg"),
Entry("should recognize an alert message", 1, "alert"),
Entry("should recognize a crit message", 2, "crit"),
Entry("should recognize an err message", 3, "err"),
Expand Down
53 changes: 53 additions & 0 deletions test/functional/normalization/viaq_journal_logs_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package normalization

import (
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
obs "github.com/openshift/cluster-logging-operator/api/observability/v1"
"github.com/openshift/cluster-logging-operator/internal/utils"
"github.com/openshift/cluster-logging-operator/test/framework/functional"
"github.com/openshift/cluster-logging-operator/test/helpers/types"
. "github.com/openshift/cluster-logging-operator/test/matchers"
testruntime "github.com/openshift/cluster-logging-operator/test/runtime/observability"
)

var _ = Describe("[functional][normalization] ViaQ message format of journal logs", func() {
var (
framework *functional.CollectorFunctionalFramework
)

BeforeEach(func() {
framework = functional.NewCollectorFunctionalFramework()
testruntime.NewClusterLogForwarderBuilder(framework.Forwarder).
FromInput(obs.InputTypeInfrastructure).
ToElasticSearchOutput()
Expect(framework.Deploy()).To(BeNil())
})
AfterEach(func() {
framework.Cleanup()
})

It("should format ViaQ journal logs", func() {

expLog := functional.NewJournalInfrastructureLogTemplate()

// Write log line as input to collector
logline := functional.NewJournalLog(2, "here is my message", "*")
Expect(framework.WriteMessagesToInfraJournalLog(logline, 1)).To(BeNil())

// Read line from Log Forward output
raw, err := framework.ReadInfrastructureLogsFrom(string(obs.OutputTypeElasticsearch))
Expect(err).To(BeNil(), "Expected no errors reading the logs")

// Parse log line
var logs []types.JournalLog
err = types.StrictlyParseLogs(utils.ToJsonLogs(raw), &logs)
Expect(err).To(BeNil(), "Expected no errors parsing the logs")

// Compare to expected template
outputTestLog := logs[0]
Expect(outputTestLog.ViaQCommon).To(FitLogFormatTemplate(expLog.ViaQCommon))
Expect(outputTestLog.Systemd.T).NotTo(Equal(types.T{}), "Exp. to be populated with something")
Expect(outputTestLog.Systemd.U).NotTo(Equal(types.U{}), "Exp. to be populated with something")
})
})
66 changes: 66 additions & 0 deletions test/helpers/types/type_journal.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package types

import "time"

// JournalLog is linux journal logs
type JournalLog struct {
ViaQCommon `json:",inline,omitempty"`
Tag string `json:"tag,omitempty"`
Time time.Time `json:"time,omitempty"`
STREAMID string `json:"_STREAM_ID,omitempty"`
SYSTEMDINVOCATIONID string `json:"_SYSTEMD_INVOCATION_ID,omitempty"`
Systemd Systemd `json:"systemd,omitempty"`
}

type K struct {
KERNEL_DEVICE string `json:"KERNEL_DEVICE,omitempty"`
KERNEL_SUBSYSTEM string `json:"KERNEL_SUBSYSTEM,omitempty"`
UDEV_DEVLINK string `json:"UDEV_DEVLINK,omitempty"`
UDEV_DEVNODE string `json:"UDEV_DEVNODE,omitempty"`
UDEV_SYSNAME string `json:"UDEV_SYSNAME,omitempty"`
}

type T struct {
AUDIT_LOGINUID string `json:"AUDIT_LOGINUID,omitempty"`
AUDIT_SESSION string `json:"AUDIT_SESSION,omitempty"`
BOOTID string `json:"BOOT_ID,omitempty"`
CAPEFFECTIVE string `json:"CAP_EFFECTIVE,omitempty"`
CMDLINE string `json:"CMDLINE,omitempty"`
COMM string `json:"COMM,omitempty"`
EXE string `json:"EXE,omitempty"`
GID string `json:"GID,omitempty"`
HOSTNAME string `json:"HOSTNAME,omitempty"`
LINE_BREAK string `json:"LINE_BREAK,omitempty"`
MACHINEID string `json:"MACHINE_ID,omitempty"`
PID string `json:"PID,omitempty"`
SELINUXCONTEXT string `json:"SELINUX_CONTEXT,omitempty"`
STREAMID string `json:"STREAM_ID,omitempty"`
SYSTEMDCGROUP string `json:"SYSTEMD_CGROUP,omitempty"`
SYSTEMDINVOCATIONID string `json:"SYSTEMD_INVOCATION_ID,omitempty"`
SYSTEMD_OWNER_UID string `json:"SYSTEMD_OWNER_UID,omitempty"`
SYSTEMD_SESSION string `json:"SYSTEMD_SESSION,omitempty"`
SYSTEMDSLICE string `json:"SYSTEMD_SLICE,omitempty"`
SYSTEMDUNIT string `json:"SYSTEMD_UNIT,omitempty"`
SYSTEMD_USER_UNIT string `json:"SYSTEMD_USER_UNIT,omitempty"`
TRANSPORT string `json:"TRANSPORT,omitempty"`
UID string `json:"UID,omitempty"`
}

type U struct {
CODE_FILE string `json:"CODE_FILE,omitempty"`
CODE_FUNCTION string `json:"CODE_FUNCTION,omitempty"`
CODE_LINE string `json:"CODE_LINE,omitempty"`
ERRNO string `json:"ERRNO,omitempty"`
MESSAGE_ID string `json:"MESSAGE_ID,omitempty"`
SYSLOG_FACILITY string `json:"SYSLOG_FACILITY,omitempty"`
SYSLOGIDENTIFIER string `json:"SYSLOG_IDENTIFIER,omitempty"`
SYSLOG_PID string `json:"SYSLOG_PID,omitempty"`
RESULT string `json:"RESULT,omitempty"`
UNIT string `json:"UNIT,omitempty"`
}

type Systemd struct {
K K `json:"k,omitempty"`
T T `json:"t,omitempty"`
U U `json:"u,omitempty"`
}
Loading

0 comments on commit 5ffd30a

Please sign in to comment.