diff --git a/tests/integration/clientv3/metrics_test.go b/tests/integration/clientv3/metrics_test.go index 6f63f6c62ea..67d70c11198 100644 --- a/tests/integration/clientv3/metrics_test.go +++ b/tests/integration/clientv3/metrics_test.go @@ -26,7 +26,8 @@ import ( "testing" "time" - grpcprom "github.com/grpc-ecosystem/go-grpc-prometheus" + grpcprom "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" "google.golang.org/grpc" @@ -73,11 +74,14 @@ func TestV3ClientMetrics(t *testing.T) { clus := integration2.NewCluster(t, &integration2.ClusterConfig{Size: 1}) defer clus.Terminate(t) + clientMetrics := grpcprom.NewClientMetrics() + prometheus.Register(clientMetrics) + cfg := clientv3.Config{ Endpoints: []string{clus.Members[0].GRPCURL}, DialOptions: []grpc.DialOption{ - grpc.WithUnaryInterceptor(grpcprom.UnaryClientInterceptor), - grpc.WithStreamInterceptor(grpcprom.StreamClientInterceptor), + grpc.WithUnaryInterceptor(clientMetrics.UnaryClientInterceptor()), + grpc.WithStreamInterceptor(clientMetrics.StreamClientInterceptor()), }, } cli, cerr := integration2.NewClient(t, cfg) @@ -168,11 +172,198 @@ func getHTTPBodyAsLines(t *testing.T, url string) []string { if err != nil { if errors.Is(err, io.EOF) { break + } else { + t.Fatalf("error reading: %v", err) } - t.Fatalf("error reading: %v", err) } lines = append(lines, line) } resp.Body.Close() return lines } + +func TestAllMetricsGenerated(t *testing.T) { + integration2.BeforeTest(t) + + var ( + addr = "localhost:27989" + ln net.Listener + ) + + srv := &http.Server{Handler: promhttp.Handler()} + srv.SetKeepAlivesEnabled(false) + + ln, err := transport.NewUnixListener(addr) + if err != nil { + t.Errorf("Error: %v occurred while listening on addr: %v", err, addr) + } + + donec := make(chan struct{}) + defer func() { + ln.Close() + <-donec + }() + + // listen for all Prometheus metrics + go func() { + defer close(donec) + + serr := srv.Serve(ln) + if serr != nil && !transport.IsClosedConnError(serr) { + t.Errorf("Err serving http requests: %v", serr) + } + }() + + url := "unix://" + addr + "/metrics" + + clus := integration2.NewCluster(t, &integration2.ClusterConfig{Size: 1, Metrics: "extensive"}) + defer clus.Terminate(t) + + clientMetrics := grpcprom.NewClientMetrics() + prometheus.Register(clientMetrics) + + cfg := clientv3.Config{ + Endpoints: []string{clus.Members[0].GRPCURL}, + DialOptions: []grpc.DialOption{ + grpc.WithUnaryInterceptor(clientMetrics.UnaryClientInterceptor()), + grpc.WithStreamInterceptor(clientMetrics.StreamClientInterceptor()), + }, + } + cli, cerr := integration2.NewClient(t, cfg) + if cerr != nil { + t.Fatal(cerr) + } + defer cli.Close() + + // Perform some operations to generate metrics + wc := cli.Watch(context.Background(), "foo") + _, err = cli.Put(context.Background(), "foo", "bar") + if err != nil { + t.Errorf("Error putting value in key store") + } + + // consume watch response + select { + case <-wc: + case <-time.After(10 * time.Second): + t.Error("Timeout occurred for getting watch response") + } + + // Define the expected list of metrics + expectedMetrics := []string{ + "etcd_cluster_version", + "etcd_disk_backend_commit_duration_seconds_bucket", + "etcd_disk_backend_commit_duration_seconds_count", + "etcd_disk_backend_commit_duration_seconds_sum", + "etcd_disk_backend_defrag_duration_seconds_bucket", + "etcd_disk_backend_defrag_duration_seconds_count", + "etcd_disk_backend_defrag_duration_seconds_sum", + "etcd_disk_backend_snapshot_duration_seconds_bucket", + "etcd_disk_backend_snapshot_duration_seconds_count", + "etcd_disk_backend_snapshot_duration_seconds_sum", + "etcd_disk_defrag_inflight", + "etcd_disk_wal_fsync_duration_seconds_bucket", + "etcd_disk_wal_fsync_duration_seconds_count", + "etcd_disk_wal_fsync_duration_seconds_sum", + "etcd_disk_wal_write_bytes_total", + "etcd_disk_wal_write_duration_seconds_bucket", + "etcd_disk_wal_write_duration_seconds_count", + "etcd_disk_wal_write_duration_seconds_sum", + "etcd_mvcc_db_open_read_transactions", + "etcd_mvcc_db_total_size_in_bytes", + "etcd_mvcc_db_total_size_in_use_in_bytes", + "etcd_mvcc_delete_total", + "etcd_mvcc_hash_duration_seconds_bucket", + "etcd_mvcc_hash_duration_seconds_count", + "etcd_mvcc_hash_duration_seconds_sum", + "etcd_mvcc_hash_rev_duration_seconds_bucket", + "etcd_mvcc_hash_rev_duration_seconds_count", + "etcd_mvcc_hash_rev_duration_seconds_sum", + "etcd_mvcc_put_total", + "etcd_mvcc_range_total", + "etcd_mvcc_txn_total", + "etcd_network_client_grpc_received_bytes_total", + "etcd_network_client_grpc_sent_bytes_total", + "etcd_network_known_peers", + "etcd_server_apply_duration_seconds_bucket", + "etcd_server_apply_duration_seconds_count", + "etcd_server_apply_duration_seconds_sum", + "etcd_server_client_requests_total", + "etcd_server_go_version", + "etcd_server_has_leader", + "etcd_server_health_failures", + "etcd_server_health_success", + "etcd_server_heartbeat_send_failures_total", + "etcd_server_id", + "etcd_server_is_leader", + "etcd_server_is_learner", + "etcd_server_leader_changes_seen_total", + "etcd_server_learner_promote_successes", + "etcd_server_proposals_applied_total", + "etcd_server_proposals_committed_total", + "etcd_server_proposals_failed_total", + "etcd_server_proposals_pending", + "etcd_server_quota_backend_bytes", + "etcd_server_read_indexes_failed_total", + "etcd_server_slow_apply_total", + "etcd_server_slow_read_indexes_total", + "etcd_server_snapshot_apply_in_progress_total", + "etcd_server_version", + "etcd_snap_db_fsync_duration_seconds_bucket", + "etcd_snap_db_fsync_duration_seconds_count", + "etcd_snap_db_fsync_duration_seconds_sum", + "etcd_snap_db_save_total_duration_seconds_bucket", + "etcd_snap_db_save_total_duration_seconds_count", + "etcd_snap_db_save_total_duration_seconds_sum", + "etcd_snap_fsync_duration_seconds_bucket", + "etcd_snap_fsync_duration_seconds_count", + "etcd_snap_fsync_duration_seconds_sum", + "grpc_client_handled_total", + "grpc_client_msg_received_total", + "grpc_client_msg_sent_total", + "grpc_client_started_total", + "grpc_server_handled_total", + "grpc_server_handling_seconds_bucket", + "grpc_server_handling_seconds_count", + "grpc_server_handling_seconds_sum", + "grpc_server_msg_received_total", + "grpc_server_msg_sent_total", + "grpc_server_started_total", + } + + // Get the list of generated metrics + generatedMetrics := getMetricsList(t, url) + for _, metric := range expectedMetrics { + if !contains(generatedMetrics, metric) { + t.Errorf("Expected metric %s not found in generated metrics", metric) + } + } +} + +func getMetricsList(t *testing.T, url string) []string { + lines := getHTTPBodyAsLines(t, url) + metrics := make(map[string]struct{}) + for _, line := range lines { + if strings.Contains(line, "{") { + metric := line[:strings.Index(line, "{")] + metrics[metric] = struct{}{} + } else { + metric := line[:strings.Index(line, " ")] + metrics[metric] = struct{}{} + } + } + var metricList []string + for metric := range metrics { + metricList = append(metricList, metric) + } + return metricList +} + +func contains(slice []string, item string) bool { + for _, s := range slice { + if s == item { + return true + } + } + return false +}