From 7047e94048c1c66c2a0e339a2b3d6ac14bad4d36 Mon Sep 17 00:00:00 2001 From: Pawel Szkamruk Date: Mon, 13 Jan 2025 13:50:17 +0100 Subject: [PATCH 1/4] add resourceAttributes tests --- .../configuration_switching_test.go | 175 +++++++++++++++++- .../values_cluster_receiver_only.yaml.tmpl | 26 +++ 2 files changed, 200 insertions(+), 1 deletion(-) create mode 100644 functional_tests/testdata_configuration_switching/values/values_cluster_receiver_only.yaml.tmpl diff --git a/functional_tests/configuration_switching_test.go b/functional_tests/configuration_switching_test.go index 3cf5c8f6e7..5b297a1bbe 100644 --- a/functional_tests/configuration_switching_test.go +++ b/functional_tests/configuration_switching_test.go @@ -180,6 +180,7 @@ func Test_Functions(t *testing.T) { t.Run("agent logs and metrics enabled or disabled", testAgentLogsAndMetrics) t.Run("logs and metrics index switch", testIndexSwitch) t.Run("cluster receiver enabled or disabled", testClusterReceiverEnabledOrDisabled) + t.Run("resource attributes verification", testVerifyResourceAttributes) } @@ -347,6 +348,103 @@ func testClusterReceiverEnabledOrDisabled(t *testing.T) { resetLogsSink(t, logsObjectsConsumer) } +func testVerifyResourceAttributes(t *testing.T) { + attributesList := [4]string{"k8s.node.name", "k8s.pod.name", "k8s.pod.uid", "k8s.namespace.name"} + + hostEp := hostEndpoint(t) + if len(hostEp) == 0 { + require.Fail(t, "Host endpoint not found") + } + + t.Run("verify cluster receiver resource attributes", func(t *testing.T) { + valuesFileName := "values_cluster_receiver_only.yaml.tmpl" + logsObjectsConsumer := setupOnce(t).logsObjectsConsumer + logsObjectsHecEndpoint := fmt.Sprintf("http://%s:%d/services/collector", hostEp, hecLogsObjectsReceiverPort) + + replacements := map[string]interface{}{ + "ClusterReceiverEnabled": true, + "LogObjectsHecEndpoint": logsObjectsHecEndpoint, + } + deployChartsAndApps(t, valuesFileName, replacements) + resetLogsSink(t, logsObjectsConsumer) + waitForLogs(t, 5, logsObjectsConsumer) + t.Logf("===> >>>> Logs: %v", len(logsObjectsConsumer.AllLogs())) + + for _, attr := range attributesList { + t.Log("Checking resourceAttribute: ", attr) + resourceAttrValues, notFoundCounter := getLogsResourceAttribute(logsObjectsConsumer.AllLogs(), attr) + assert.True(t, len(resourceAttrValues) >= 1) + assert.Equal(t, 0, notFoundCounter) + t.Logf("Resource Attributes: %v", resourceAttrValues) + } + }) + + t.Run("verify cluster receiver metrics resource attributes", func(t *testing.T) { + valuesFileName := "values_cluster_receiver_only.yaml.tmpl" + hecMetricsConsumer := setupOnce(t).hecMetricsConsumer + logsObjectsHecEndpoint := fmt.Sprintf("http://%s:%d/services/collector", hostEp, hecLogsObjectsReceiverPort) + + replacements := map[string]interface{}{ + "ClusterReceiverEnabled": true, + "LogObjectsHecEndpoint": logsObjectsHecEndpoint, + } + deployChartsAndApps(t, valuesFileName, replacements) + resetMetricsSink(t, hecMetricsConsumer) + t.Logf("===> >>>> Metrics: %v", len(hecMetricsConsumer.AllMetrics())) + + waitForMetrics(t, 5, hecMetricsConsumer) + for _, attr := range attributesList { + t.Log("Checking resourceAttribute: ", attr) + resourceAttrValues, notFoundCounter := getMetricsResourceAttribute(hecMetricsConsumer.AllMetrics(), attr) + assert.True(t, len(resourceAttrValues) >= 1) + assert.Equal(t, 0, notFoundCounter) + t.Logf("Resource Attributes for %s: %v", attr, resourceAttrValues) + } + }) + + t.Run("verify agent logs resource attributes", func(t *testing.T) { + valuesFileName := "values_logs_and_metrics_switching.yaml.tmpl" + agentLogsConsumer := setupOnce(t).logsConsumer + + replacements := map[string]interface{}{ + "MetricsEnabled": true, + "LogsEnabled": true, + } + deployChartsAndApps(t, valuesFileName, replacements) + resetLogsSink(t, agentLogsConsumer) + + waitForLogs(t, 5, agentLogsConsumer) + for _, attr := range attributesList { + t.Log("Checking resourceAttribute: ", attr) + resourceAttrValues, notFoundCounter := getLogsResourceAttribute(agentLogsConsumer.AllLogs(), attr) + assert.True(t, len(resourceAttrValues) >= 1) + assert.Equal(t, 0, notFoundCounter) + t.Logf("Resource Attributes: %v", resourceAttrValues) + } + }) + + t.Run("verify metrics resource attributes", func(t *testing.T) { + valuesFileName := "values_logs_and_metrics_switching.yaml.tmpl" + hecMetricsConsumer := setupOnce(t).hecMetricsConsumer + + replacements := map[string]interface{}{ + "MetricsEnabled": true, + "LogsEnabled": true, + } + deployChartsAndApps(t, valuesFileName, replacements) + resetMetricsSink(t, hecMetricsConsumer) + + waitForMetrics(t, 5, hecMetricsConsumer) + for _, attr := range attributesList { + t.Log("Checking resourceAttribute: ", attr) + resourceAttrValues, notFoundCounter := getMetricsResourceAttribute(hecMetricsConsumer.AllMetrics(), attr) + assert.True(t, len(resourceAttrValues) >= 1) + assert.Equal(t, 0, notFoundCounter) + t.Logf("Resource Attributes for %s: %v", attr, resourceAttrValues) + } + }) +} + func checkPodExists(pods *corev1.PodList, podNamePrefix string) bool { for _, pod := range pods.Items { if strings.HasPrefix(pod.Name, podNamePrefix) { @@ -410,7 +508,6 @@ func getMetricsIndex(metrics []pmetric.Metrics) []string { var indices []string for i := 0; i < len(metrics); i++ { m := metrics[i] - fmt.Printf("Metrics: %v", m.ResourceMetrics().At(0).Resource().Attributes()) if value, ok := m.ResourceMetrics().At(0).Resource().Attributes().Get("com.splunk.index"); ok { index := value.AsString() if !contains(indices, index) { @@ -430,6 +527,82 @@ func contains(list []string, newValue string) bool { return false } +func getLogsResourceAttribute(logs []plog.Logs, attributeName string) ([]string, int) { + var resourceAttributes []string + var notFoundCounter int = 0 + + for i := 0; i < len(logs); i++ { + l := logs[i] + for j := 0; j < l.ResourceLogs().Len(); j++ { + rl := l.ResourceLogs().At(j) + sl := rl.ScopeLogs().At(j) + for k := 0; k < sl.LogRecords().Len(); k++ { + tmpAttribute, ok := sl.LogRecords().At(k).Attributes().Get(attributeName) + if ok { + if !contains(resourceAttributes, tmpAttribute.AsString()) { + resourceAttributes = append(resourceAttributes, tmpAttribute.AsString()) + } + } else { + fmt.Println("== Resource Attribute not found: ", attributeName) + notFoundCounter++ + } + } + } + } + return resourceAttributes, notFoundCounter +} + +func getMetricsResourceAttribute(metrics []pmetric.Metrics, attributeName string) ([]string, int) { + var resourceAttributes []string + var notFoundCounter int = 0 + var foundCounter int = 0 + var skippedCounter int = 0 + prefixesForMetricsToSkip := []string{ + // agent metrics + "system.", "k8s.node.", + // cluster receiver metrics + "k8s.deployment.", "k8s.namespace.", "k8s.replicaset.", "k8s.daemonset.", "k8s.node.", + } + + for i := 0; i < len(metrics); i++ { + m := metrics[i] + for j := 0; j < m.ResourceMetrics().Len(); j++ { + rm := m.ResourceMetrics().At(j) + for k := 0; k < rm.ScopeMetrics().Len(); k++ { + sm := rm.ScopeMetrics().At(k) + for l := 0; l < sm.Metrics().Len(); l++ { + skip := false + for _, prefix := range prefixesForMetricsToSkip { + if strings.HasPrefix(sm.Metrics().At(l).Name(), prefix) { + skip = true + break + } + } + if skip { + skippedCounter++ + continue + } + for m := 0; m < sm.Metrics().At(l).Gauge().DataPoints().Len(); m++ { + tmpAttribute, ok := sm.Metrics().At(l).Gauge().DataPoints().At(m).Attributes().Get(attributeName) + + if ok { + if !contains(resourceAttributes, tmpAttribute.AsString()) { + resourceAttributes = append(resourceAttributes, tmpAttribute.AsString()) + } + foundCounter++ + } else { + fmt.Printf("Resource Attribute %s not found for metric: %v \n", attributeName, sm.Metrics().At(l).Name()) + notFoundCounter++ + } + } + } + } + } + } + fmt.Printf("Counters: Found: %d, Skipped: %d, not Found: %d\n", foundCounter, skippedCounter, notFoundCounter) + return resourceAttributes, notFoundCounter +} + func uninstallDeployment(t *testing.T) { testKubeConfig, setKubeConfig := os.LookupEnv("KUBECONFIG") require.True(t, setKubeConfig, "the environment variable KUBECONFIG must be set") diff --git a/functional_tests/testdata_configuration_switching/values/values_cluster_receiver_only.yaml.tmpl b/functional_tests/testdata_configuration_switching/values/values_cluster_receiver_only.yaml.tmpl new file mode 100644 index 0000000000..b5625cb975 --- /dev/null +++ b/functional_tests/testdata_configuration_switching/values/values_cluster_receiver_only.yaml.tmpl @@ -0,0 +1,26 @@ +--- +splunkPlatform: + token: foobar + endpoint: {{ .LogHecEndpoint }} + metricsEnabled: true + metricsIndex: myMetricsIndex + +agent: + enabled: false + +clusterReceiver: + enabled: {{ .ClusterReceiverEnabled }} + config: + exporters: + splunk_hec/platform_logs: + endpoint: {{ .LogObjectsHecEndpoint }} + splunk_hec/platform_metrics: + endpoint: {{ .MetricHecEndpoint }} + k8sObjects: + - name: pods + mode: pull + interval: 5s + - name: events + mode: watch + +clusterName: dev-operator From 2b891bf57a7517a4d65399d3e41370bda81b1fb8 Mon Sep 17 00:00:00 2001 From: Pawel Szkamruk Date: Mon, 13 Jan 2025 14:43:45 +0100 Subject: [PATCH 2/4] fix --- .../configuration_switching_test.go | 20 ++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/functional_tests/configuration_switching_test.go b/functional_tests/configuration_switching_test.go index 5b297a1bbe..779d039a72 100644 --- a/functional_tests/configuration_switching_test.go +++ b/functional_tests/configuration_switching_test.go @@ -535,16 +535,18 @@ func getLogsResourceAttribute(logs []plog.Logs, attributeName string) ([]string, l := logs[i] for j := 0; j < l.ResourceLogs().Len(); j++ { rl := l.ResourceLogs().At(j) - sl := rl.ScopeLogs().At(j) - for k := 0; k < sl.LogRecords().Len(); k++ { - tmpAttribute, ok := sl.LogRecords().At(k).Attributes().Get(attributeName) - if ok { - if !contains(resourceAttributes, tmpAttribute.AsString()) { - resourceAttributes = append(resourceAttributes, tmpAttribute.AsString()) + for k := 0; k < rl.ScopeLogs().Len(); k++ { + sl := rl.ScopeLogs().At(k) + for m := 0; m < sl.LogRecords().Len(); m++ { + tmpAttribute, ok := sl.LogRecords().At(m).Attributes().Get(attributeName) + if ok { + if !contains(resourceAttributes, tmpAttribute.AsString()) { + resourceAttributes = append(resourceAttributes, tmpAttribute.AsString()) + } + } else { + fmt.Println("== Resource Attribute not found: ", attributeName) + notFoundCounter++ } - } else { - fmt.Println("== Resource Attribute not found: ", attributeName) - notFoundCounter++ } } } From a59d44b3248dbfe59c9fa883f82fa8cd0bed28bf Mon Sep 17 00:00:00 2001 From: Pawel Szkamruk Date: Wed, 15 Jan 2025 15:50:16 +0100 Subject: [PATCH 3/4] minor improvements --- functional_tests/configuration_switching_test.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/functional_tests/configuration_switching_test.go b/functional_tests/configuration_switching_test.go index 779d039a72..3d48badfa7 100644 --- a/functional_tests/configuration_switching_test.go +++ b/functional_tests/configuration_switching_test.go @@ -530,6 +530,7 @@ func contains(list []string, newValue string) bool { func getLogsResourceAttribute(logs []plog.Logs, attributeName string) ([]string, int) { var resourceAttributes []string var notFoundCounter int = 0 + var foundCounter int = 0 for i := 0; i < len(logs); i++ { l := logs[i] @@ -543,14 +544,17 @@ func getLogsResourceAttribute(logs []plog.Logs, attributeName string) ([]string, if !contains(resourceAttributes, tmpAttribute.AsString()) { resourceAttributes = append(resourceAttributes, tmpAttribute.AsString()) } + foundCounter++ } else { fmt.Println("== Resource Attribute not found: ", attributeName) + fmt.Printf("Log Record Body: %v\n", sl.LogRecords().At(m).Body().AsRaw()) notFoundCounter++ } } } } } + fmt.Printf("Counters: Found: %d | Not Found: %d\n", foundCounter, notFoundCounter) return resourceAttributes, notFoundCounter } @@ -563,7 +567,7 @@ func getMetricsResourceAttribute(metrics []pmetric.Metrics, attributeName string // agent metrics "system.", "k8s.node.", // cluster receiver metrics - "k8s.deployment.", "k8s.namespace.", "k8s.replicaset.", "k8s.daemonset.", "k8s.node.", + "k8s.deployment.", "k8s.namespace.", "k8s.replicaset.", "k8s.daemonset.", } for i := 0; i < len(metrics); i++ { @@ -601,7 +605,7 @@ func getMetricsResourceAttribute(metrics []pmetric.Metrics, attributeName string } } } - fmt.Printf("Counters: Found: %d, Skipped: %d, not Found: %d\n", foundCounter, skippedCounter, notFoundCounter) + fmt.Printf("Counters: Found: %d | Skipped: %d | Not Found: %d\n", foundCounter, skippedCounter, notFoundCounter) return resourceAttributes, notFoundCounter } From 1d580b57df3e7e3dd0cad1326f5570b756079f98 Mon Sep 17 00:00:00 2001 From: Pawel Szkamruk Date: Wed, 15 Jan 2025 16:09:29 +0100 Subject: [PATCH 4/4] changes after review - renaming resourceAttributes -> attributes, etc --- .../configuration_switching_test.go | 60 +++++++++---------- 1 file changed, 30 insertions(+), 30 deletions(-) diff --git a/functional_tests/configuration_switching_test.go b/functional_tests/configuration_switching_test.go index 3d48badfa7..908f2ead20 100644 --- a/functional_tests/configuration_switching_test.go +++ b/functional_tests/configuration_switching_test.go @@ -180,7 +180,7 @@ func Test_Functions(t *testing.T) { t.Run("agent logs and metrics enabled or disabled", testAgentLogsAndMetrics) t.Run("logs and metrics index switch", testIndexSwitch) t.Run("cluster receiver enabled or disabled", testClusterReceiverEnabledOrDisabled) - t.Run("resource attributes verification", testVerifyResourceAttributes) + t.Run("logs and metrics attributes verification", testVerifyLogsAndMetricsAttributes) } @@ -348,7 +348,7 @@ func testClusterReceiverEnabledOrDisabled(t *testing.T) { resetLogsSink(t, logsObjectsConsumer) } -func testVerifyResourceAttributes(t *testing.T) { +func testVerifyLogsAndMetricsAttributes(t *testing.T) { attributesList := [4]string{"k8s.node.name", "k8s.pod.name", "k8s.pod.uid", "k8s.namespace.name"} hostEp := hostEndpoint(t) @@ -356,7 +356,7 @@ func testVerifyResourceAttributes(t *testing.T) { require.Fail(t, "Host endpoint not found") } - t.Run("verify cluster receiver resource attributes", func(t *testing.T) { + t.Run("verify cluster receiver attributes", func(t *testing.T) { valuesFileName := "values_cluster_receiver_only.yaml.tmpl" logsObjectsConsumer := setupOnce(t).logsObjectsConsumer logsObjectsHecEndpoint := fmt.Sprintf("http://%s:%d/services/collector", hostEp, hecLogsObjectsReceiverPort) @@ -371,15 +371,15 @@ func testVerifyResourceAttributes(t *testing.T) { t.Logf("===> >>>> Logs: %v", len(logsObjectsConsumer.AllLogs())) for _, attr := range attributesList { - t.Log("Checking resourceAttribute: ", attr) - resourceAttrValues, notFoundCounter := getLogsResourceAttribute(logsObjectsConsumer.AllLogs(), attr) - assert.True(t, len(resourceAttrValues) >= 1) + t.Log("Checking attribute: ", attr) + attrValues, notFoundCounter := getLogsAttributes(logsObjectsConsumer.AllLogs(), attr) + assert.True(t, len(attrValues) >= 1) assert.Equal(t, 0, notFoundCounter) - t.Logf("Resource Attributes: %v", resourceAttrValues) + t.Logf("Attributes: %v", attrValues) } }) - t.Run("verify cluster receiver metrics resource attributes", func(t *testing.T) { + t.Run("verify cluster receiver metrics attributes", func(t *testing.T) { valuesFileName := "values_cluster_receiver_only.yaml.tmpl" hecMetricsConsumer := setupOnce(t).hecMetricsConsumer logsObjectsHecEndpoint := fmt.Sprintf("http://%s:%d/services/collector", hostEp, hecLogsObjectsReceiverPort) @@ -390,19 +390,19 @@ func testVerifyResourceAttributes(t *testing.T) { } deployChartsAndApps(t, valuesFileName, replacements) resetMetricsSink(t, hecMetricsConsumer) - t.Logf("===> >>>> Metrics: %v", len(hecMetricsConsumer.AllMetrics())) + t.Logf("===> >>>> Metrics: %d", len(hecMetricsConsumer.AllMetrics())) waitForMetrics(t, 5, hecMetricsConsumer) for _, attr := range attributesList { - t.Log("Checking resourceAttribute: ", attr) - resourceAttrValues, notFoundCounter := getMetricsResourceAttribute(hecMetricsConsumer.AllMetrics(), attr) - assert.True(t, len(resourceAttrValues) >= 1) + t.Log("Checking attributes: ", attr) + attrValues, notFoundCounter := getMetricsAttributes(hecMetricsConsumer.AllMetrics(), attr) + assert.True(t, len(attrValues) >= 1) assert.Equal(t, 0, notFoundCounter) - t.Logf("Resource Attributes for %s: %v", attr, resourceAttrValues) + t.Logf("Resource Attributes for %s: %v", attr, attrValues) } }) - t.Run("verify agent logs resource attributes", func(t *testing.T) { + t.Run("verify agent logs attributes", func(t *testing.T) { valuesFileName := "values_logs_and_metrics_switching.yaml.tmpl" agentLogsConsumer := setupOnce(t).logsConsumer @@ -415,15 +415,15 @@ func testVerifyResourceAttributes(t *testing.T) { waitForLogs(t, 5, agentLogsConsumer) for _, attr := range attributesList { - t.Log("Checking resourceAttribute: ", attr) - resourceAttrValues, notFoundCounter := getLogsResourceAttribute(agentLogsConsumer.AllLogs(), attr) - assert.True(t, len(resourceAttrValues) >= 1) + t.Log("Checking attribute: ", attr) + attrValues, notFoundCounter := getLogsAttributes(agentLogsConsumer.AllLogs(), attr) + assert.True(t, len(attrValues) >= 1) assert.Equal(t, 0, notFoundCounter) - t.Logf("Resource Attributes: %v", resourceAttrValues) + t.Logf("Attributes: %v", attrValues) } }) - t.Run("verify metrics resource attributes", func(t *testing.T) { + t.Run("verify metrics attributes", func(t *testing.T) { valuesFileName := "values_logs_and_metrics_switching.yaml.tmpl" hecMetricsConsumer := setupOnce(t).hecMetricsConsumer @@ -436,11 +436,11 @@ func testVerifyResourceAttributes(t *testing.T) { waitForMetrics(t, 5, hecMetricsConsumer) for _, attr := range attributesList { - t.Log("Checking resourceAttribute: ", attr) - resourceAttrValues, notFoundCounter := getMetricsResourceAttribute(hecMetricsConsumer.AllMetrics(), attr) - assert.True(t, len(resourceAttrValues) >= 1) + t.Log("Checking attribute: ", attr) + attrValues, notFoundCounter := getMetricsAttributes(hecMetricsConsumer.AllMetrics(), attr) + assert.True(t, len(attrValues) >= 1) assert.Equal(t, 0, notFoundCounter) - t.Logf("Resource Attributes for %s: %v", attr, resourceAttrValues) + t.Logf("Attributes for %s: %v", attr, attrValues) } }) } @@ -527,8 +527,8 @@ func contains(list []string, newValue string) bool { return false } -func getLogsResourceAttribute(logs []plog.Logs, attributeName string) ([]string, int) { - var resourceAttributes []string +func getLogsAttributes(logs []plog.Logs, attributeName string) ([]string, int) { + var attributes []string var notFoundCounter int = 0 var foundCounter int = 0 @@ -541,12 +541,12 @@ func getLogsResourceAttribute(logs []plog.Logs, attributeName string) ([]string, for m := 0; m < sl.LogRecords().Len(); m++ { tmpAttribute, ok := sl.LogRecords().At(m).Attributes().Get(attributeName) if ok { - if !contains(resourceAttributes, tmpAttribute.AsString()) { - resourceAttributes = append(resourceAttributes, tmpAttribute.AsString()) + if !contains(attributes, tmpAttribute.AsString()) { + attributes = append(attributes, tmpAttribute.AsString()) } foundCounter++ } else { - fmt.Println("== Resource Attribute not found: ", attributeName) + fmt.Println("=== Attribute not found: ", attributeName) fmt.Printf("Log Record Body: %v\n", sl.LogRecords().At(m).Body().AsRaw()) notFoundCounter++ } @@ -555,10 +555,10 @@ func getLogsResourceAttribute(logs []plog.Logs, attributeName string) ([]string, } } fmt.Printf("Counters: Found: %d | Not Found: %d\n", foundCounter, notFoundCounter) - return resourceAttributes, notFoundCounter + return attributes, notFoundCounter } -func getMetricsResourceAttribute(metrics []pmetric.Metrics, attributeName string) ([]string, int) { +func getMetricsAttributes(metrics []pmetric.Metrics, attributeName string) ([]string, int) { var resourceAttributes []string var notFoundCounter int = 0 var foundCounter int = 0