Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[chore] Add logs and metrics k8s attributes verification tests #1610

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
181 changes: 180 additions & 1 deletion functional_tests/configuration_switching_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ func Test_Functions(t *testing.T) {
t.Run("agent logs and metrics enabled or disabled", testAgentLogsAndMetrics)
t.Run("logs and metrics index switch", testIndexSwitch)
t.Run("cluster receiver enabled or disabled", testClusterReceiverEnabledOrDisabled)
t.Run("logs and metrics attributes verification", testVerifyLogsAndMetricsAttributes)

}

Expand Down Expand Up @@ -347,6 +348,103 @@ func testClusterReceiverEnabledOrDisabled(t *testing.T) {
resetLogsSink(t, logsObjectsConsumer)
}

func testVerifyLogsAndMetricsAttributes(t *testing.T) {
attributesList := [4]string{"k8s.node.name", "k8s.pod.name", "k8s.pod.uid", "k8s.namespace.name"}

hostEp := hostEndpoint(t)
if len(hostEp) == 0 {
require.Fail(t, "Host endpoint not found")
}

t.Run("verify cluster receiver attributes", func(t *testing.T) {
valuesFileName := "values_cluster_receiver_only.yaml.tmpl"
logsObjectsConsumer := setupOnce(t).logsObjectsConsumer
logsObjectsHecEndpoint := fmt.Sprintf("http://%s:%d/services/collector", hostEp, hecLogsObjectsReceiverPort)

replacements := map[string]interface{}{
"ClusterReceiverEnabled": true,
"LogObjectsHecEndpoint": logsObjectsHecEndpoint,
}
deployChartsAndApps(t, valuesFileName, replacements)
resetLogsSink(t, logsObjectsConsumer)
waitForLogs(t, 5, logsObjectsConsumer)
t.Logf("===> >>>> Logs: %v", len(logsObjectsConsumer.AllLogs()))

for _, attr := range attributesList {
t.Log("Checking attribute: ", attr)
attrValues, notFoundCounter := getLogsAttributes(logsObjectsConsumer.AllLogs(), attr)
assert.True(t, len(attrValues) >= 1)
assert.Equal(t, 0, notFoundCounter)
t.Logf("Attributes: %v", attrValues)
}
})

t.Run("verify cluster receiver metrics attributes", func(t *testing.T) {
valuesFileName := "values_cluster_receiver_only.yaml.tmpl"
hecMetricsConsumer := setupOnce(t).hecMetricsConsumer
logsObjectsHecEndpoint := fmt.Sprintf("http://%s:%d/services/collector", hostEp, hecLogsObjectsReceiverPort)

replacements := map[string]interface{}{
"ClusterReceiverEnabled": true,
"LogObjectsHecEndpoint": logsObjectsHecEndpoint,
}
deployChartsAndApps(t, valuesFileName, replacements)
resetMetricsSink(t, hecMetricsConsumer)
t.Logf("===> >>>> Metrics: %d", len(hecMetricsConsumer.AllMetrics()))

waitForMetrics(t, 5, hecMetricsConsumer)
for _, attr := range attributesList {
t.Log("Checking attributes: ", attr)
attrValues, notFoundCounter := getMetricsAttributes(hecMetricsConsumer.AllMetrics(), attr)
assert.True(t, len(attrValues) >= 1)
assert.Equal(t, 0, notFoundCounter)
t.Logf("Resource Attributes for %s: %v", attr, attrValues)
}
})

t.Run("verify agent logs attributes", func(t *testing.T) {
valuesFileName := "values_logs_and_metrics_switching.yaml.tmpl"
agentLogsConsumer := setupOnce(t).logsConsumer

replacements := map[string]interface{}{
"MetricsEnabled": true,
"LogsEnabled": true,
}
deployChartsAndApps(t, valuesFileName, replacements)
resetLogsSink(t, agentLogsConsumer)

waitForLogs(t, 5, agentLogsConsumer)
for _, attr := range attributesList {
t.Log("Checking attribute: ", attr)
attrValues, notFoundCounter := getLogsAttributes(agentLogsConsumer.AllLogs(), attr)
assert.True(t, len(attrValues) >= 1)
assert.Equal(t, 0, notFoundCounter)
t.Logf("Attributes: %v", attrValues)
}
})

t.Run("verify metrics attributes", func(t *testing.T) {
valuesFileName := "values_logs_and_metrics_switching.yaml.tmpl"
hecMetricsConsumer := setupOnce(t).hecMetricsConsumer

replacements := map[string]interface{}{
"MetricsEnabled": true,
"LogsEnabled": true,
}
deployChartsAndApps(t, valuesFileName, replacements)
resetMetricsSink(t, hecMetricsConsumer)

waitForMetrics(t, 5, hecMetricsConsumer)
for _, attr := range attributesList {
t.Log("Checking attribute: ", attr)
attrValues, notFoundCounter := getMetricsAttributes(hecMetricsConsumer.AllMetrics(), attr)
assert.True(t, len(attrValues) >= 1)
assert.Equal(t, 0, notFoundCounter)
t.Logf("Attributes for %s: %v", attr, attrValues)
}
})
}

func checkPodExists(pods *corev1.PodList, podNamePrefix string) bool {
for _, pod := range pods.Items {
if strings.HasPrefix(pod.Name, podNamePrefix) {
Expand Down Expand Up @@ -410,7 +508,6 @@ func getMetricsIndex(metrics []pmetric.Metrics) []string {
var indices []string
for i := 0; i < len(metrics); i++ {
m := metrics[i]
fmt.Printf("Metrics: %v", m.ResourceMetrics().At(0).Resource().Attributes())
if value, ok := m.ResourceMetrics().At(0).Resource().Attributes().Get("com.splunk.index"); ok {
index := value.AsString()
if !contains(indices, index) {
Expand All @@ -430,6 +527,88 @@ func contains(list []string, newValue string) bool {
return false
}

func getLogsAttributes(logs []plog.Logs, attributeName string) ([]string, int) {
var attributes []string
var notFoundCounter int = 0
var foundCounter int = 0

for i := 0; i < len(logs); i++ {
l := logs[i]
for j := 0; j < l.ResourceLogs().Len(); j++ {
rl := l.ResourceLogs().At(j)
for k := 0; k < rl.ScopeLogs().Len(); k++ {
sl := rl.ScopeLogs().At(k)
for m := 0; m < sl.LogRecords().Len(); m++ {
tmpAttribute, ok := sl.LogRecords().At(m).Attributes().Get(attributeName)
if ok {
if !contains(attributes, tmpAttribute.AsString()) {
attributes = append(attributes, tmpAttribute.AsString())
}
foundCounter++
} else {
fmt.Println("=== Attribute not found: ", attributeName)
fmt.Printf("Log Record Body: %v\n", sl.LogRecords().At(m).Body().AsRaw())
notFoundCounter++
}
}
}
}
}
fmt.Printf("Counters: Found: %d | Not Found: %d\n", foundCounter, notFoundCounter)
return attributes, notFoundCounter
}

func getMetricsAttributes(metrics []pmetric.Metrics, attributeName string) ([]string, int) {
var resourceAttributes []string
var notFoundCounter int = 0
var foundCounter int = 0
var skippedCounter int = 0
prefixesForMetricsToSkip := []string{
// agent metrics
"system.", "k8s.node.",
// cluster receiver metrics
"k8s.deployment.", "k8s.namespace.", "k8s.replicaset.", "k8s.daemonset.",
}

for i := 0; i < len(metrics); i++ {
m := metrics[i]
for j := 0; j < m.ResourceMetrics().Len(); j++ {
rm := m.ResourceMetrics().At(j)
for k := 0; k < rm.ScopeMetrics().Len(); k++ {
sm := rm.ScopeMetrics().At(k)
for l := 0; l < sm.Metrics().Len(); l++ {
skip := false
for _, prefix := range prefixesForMetricsToSkip {
if strings.HasPrefix(sm.Metrics().At(l).Name(), prefix) {
skip = true
break
}
}
if skip {
skippedCounter++
continue
}
for m := 0; m < sm.Metrics().At(l).Gauge().DataPoints().Len(); m++ {
tmpAttribute, ok := sm.Metrics().At(l).Gauge().DataPoints().At(m).Attributes().Get(attributeName)

if ok {
if !contains(resourceAttributes, tmpAttribute.AsString()) {
resourceAttributes = append(resourceAttributes, tmpAttribute.AsString())
}
foundCounter++
} else {
fmt.Printf("Resource Attribute %s not found for metric: %v \n", attributeName, sm.Metrics().At(l).Name())
notFoundCounter++
}
}
}
}
}
}
fmt.Printf("Counters: Found: %d | Skipped: %d | Not Found: %d\n", foundCounter, skippedCounter, notFoundCounter)
return resourceAttributes, notFoundCounter
}

func uninstallDeployment(t *testing.T) {
testKubeConfig, setKubeConfig := os.LookupEnv("KUBECONFIG")
require.True(t, setKubeConfig, "the environment variable KUBECONFIG must be set")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
---
splunkPlatform:
token: foobar
endpoint: {{ .LogHecEndpoint }}
metricsEnabled: true
metricsIndex: myMetricsIndex

agent:
enabled: false

clusterReceiver:
enabled: {{ .ClusterReceiverEnabled }}
config:
exporters:
splunk_hec/platform_logs:
endpoint: {{ .LogObjectsHecEndpoint }}
splunk_hec/platform_metrics:
endpoint: {{ .MetricHecEndpoint }}
k8sObjects:
- name: pods
mode: pull
interval: 5s
- name: events
mode: watch

clusterName: dev-operator
Loading