Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Test for etcd metrics #19242

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
199 changes: 195 additions & 4 deletions tests/integration/clientv3/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ import (
"testing"
"time"

grpcprom "github.com/grpc-ecosystem/go-grpc-prometheus"
grpcprom "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"google.golang.org/grpc"

Expand Down Expand Up @@ -73,11 +74,14 @@ func TestV3ClientMetrics(t *testing.T) {
clus := integration2.NewCluster(t, &integration2.ClusterConfig{Size: 1})
defer clus.Terminate(t)

clientMetrics := grpcprom.NewClientMetrics()
prometheus.Register(clientMetrics)

cfg := clientv3.Config{
Endpoints: []string{clus.Members[0].GRPCURL},
DialOptions: []grpc.DialOption{
grpc.WithUnaryInterceptor(grpcprom.UnaryClientInterceptor),
grpc.WithStreamInterceptor(grpcprom.StreamClientInterceptor),
grpc.WithUnaryInterceptor(clientMetrics.UnaryClientInterceptor()),
grpc.WithStreamInterceptor(clientMetrics.StreamClientInterceptor()),
},
}
cli, cerr := integration2.NewClient(t, cfg)
Expand Down Expand Up @@ -168,11 +172,198 @@ func getHTTPBodyAsLines(t *testing.T, url string) []string {
if err != nil {
if errors.Is(err, io.EOF) {
break
} else {
t.Fatalf("error reading: %v", err)
}
t.Fatalf("error reading: %v", err)
}
lines = append(lines, line)
}
resp.Body.Close()
return lines
}

func TestAllMetricsGenerated(t *testing.T) {
integration2.BeforeTest(t)

var (
addr = "localhost:27989"
ln net.Listener
)

srv := &http.Server{Handler: promhttp.Handler()}
srv.SetKeepAlivesEnabled(false)

ln, err := transport.NewUnixListener(addr)
if err != nil {
t.Errorf("Error: %v occurred while listening on addr: %v", err, addr)
}

donec := make(chan struct{})
defer func() {
ln.Close()
<-donec
}()

// listen for all Prometheus metrics
go func() {
defer close(donec)

serr := srv.Serve(ln)
if serr != nil && !transport.IsClosedConnError(serr) {
t.Errorf("Err serving http requests: %v", serr)
}
}()

url := "unix://" + addr + "/metrics"

clus := integration2.NewCluster(t, &integration2.ClusterConfig{Size: 1, Metrics: "extensive"})
defer clus.Terminate(t)

clientMetrics := grpcprom.NewClientMetrics()
prometheus.Register(clientMetrics)

cfg := clientv3.Config{
Endpoints: []string{clus.Members[0].GRPCURL},
DialOptions: []grpc.DialOption{
grpc.WithUnaryInterceptor(clientMetrics.UnaryClientInterceptor()),
grpc.WithStreamInterceptor(clientMetrics.StreamClientInterceptor()),
},
}
cli, cerr := integration2.NewClient(t, cfg)
if cerr != nil {
t.Fatal(cerr)
}
defer cli.Close()

// Perform some operations to generate metrics
wc := cli.Watch(context.Background(), "foo")
_, err = cli.Put(context.Background(), "foo", "bar")
if err != nil {
t.Errorf("Error putting value in key store")
}

// consume watch response
select {
case <-wc:
case <-time.After(10 * time.Second):
t.Error("Timeout occurred for getting watch response")
}

// Define the expected list of metrics
expectedMetrics := []string{
"etcd_cluster_version",
"etcd_disk_backend_commit_duration_seconds_bucket",
"etcd_disk_backend_commit_duration_seconds_count",
"etcd_disk_backend_commit_duration_seconds_sum",
"etcd_disk_backend_defrag_duration_seconds_bucket",
"etcd_disk_backend_defrag_duration_seconds_count",
"etcd_disk_backend_defrag_duration_seconds_sum",
"etcd_disk_backend_snapshot_duration_seconds_bucket",
"etcd_disk_backend_snapshot_duration_seconds_count",
"etcd_disk_backend_snapshot_duration_seconds_sum",
"etcd_disk_defrag_inflight",
"etcd_disk_wal_fsync_duration_seconds_bucket",
"etcd_disk_wal_fsync_duration_seconds_count",
"etcd_disk_wal_fsync_duration_seconds_sum",
"etcd_disk_wal_write_bytes_total",
"etcd_disk_wal_write_duration_seconds_bucket",
"etcd_disk_wal_write_duration_seconds_count",
"etcd_disk_wal_write_duration_seconds_sum",
"etcd_mvcc_db_open_read_transactions",
"etcd_mvcc_db_total_size_in_bytes",
"etcd_mvcc_db_total_size_in_use_in_bytes",
"etcd_mvcc_delete_total",
"etcd_mvcc_hash_duration_seconds_bucket",
"etcd_mvcc_hash_duration_seconds_count",
"etcd_mvcc_hash_duration_seconds_sum",
"etcd_mvcc_hash_rev_duration_seconds_bucket",
"etcd_mvcc_hash_rev_duration_seconds_count",
"etcd_mvcc_hash_rev_duration_seconds_sum",
"etcd_mvcc_put_total",
"etcd_mvcc_range_total",
"etcd_mvcc_txn_total",
"etcd_network_client_grpc_received_bytes_total",
"etcd_network_client_grpc_sent_bytes_total",
"etcd_network_known_peers",
"etcd_server_apply_duration_seconds_bucket",
"etcd_server_apply_duration_seconds_count",
"etcd_server_apply_duration_seconds_sum",
"etcd_server_client_requests_total",
"etcd_server_go_version",
"etcd_server_has_leader",
"etcd_server_health_failures",
"etcd_server_health_success",
"etcd_server_heartbeat_send_failures_total",
"etcd_server_id",
"etcd_server_is_leader",
"etcd_server_is_learner",
"etcd_server_leader_changes_seen_total",
"etcd_server_learner_promote_successes",
"etcd_server_proposals_applied_total",
"etcd_server_proposals_committed_total",
"etcd_server_proposals_failed_total",
"etcd_server_proposals_pending",
"etcd_server_quota_backend_bytes",
"etcd_server_read_indexes_failed_total",
"etcd_server_slow_apply_total",
"etcd_server_slow_read_indexes_total",
"etcd_server_snapshot_apply_in_progress_total",
"etcd_server_version",
"etcd_snap_db_fsync_duration_seconds_bucket",
"etcd_snap_db_fsync_duration_seconds_count",
"etcd_snap_db_fsync_duration_seconds_sum",
"etcd_snap_db_save_total_duration_seconds_bucket",
"etcd_snap_db_save_total_duration_seconds_count",
"etcd_snap_db_save_total_duration_seconds_sum",
"etcd_snap_fsync_duration_seconds_bucket",
"etcd_snap_fsync_duration_seconds_count",
"etcd_snap_fsync_duration_seconds_sum",
"grpc_client_handled_total",
"grpc_client_msg_received_total",
"grpc_client_msg_sent_total",
"grpc_client_started_total",
"grpc_server_handled_total",
"grpc_server_handling_seconds_bucket",
"grpc_server_handling_seconds_count",
"grpc_server_handling_seconds_sum",
"grpc_server_msg_received_total",
"grpc_server_msg_sent_total",
"grpc_server_started_total",
}

// Get the list of generated metrics
generatedMetrics := getMetricsList(t, url)
for _, metric := range expectedMetrics {
if !contains(generatedMetrics, metric) {
t.Errorf("Expected metric %s not found in generated metrics", metric)
}
}
}

func getMetricsList(t *testing.T, url string) []string {
lines := getHTTPBodyAsLines(t, url)
metrics := make(map[string]struct{})
for _, line := range lines {
if strings.Contains(line, "{") {
metric := line[:strings.Index(line, "{")]
metrics[metric] = struct{}{}
} else {
metric := line[:strings.Index(line, " ")]
metrics[metric] = struct{}{}
}
}
var metricList []string
for metric := range metrics {
metricList = append(metricList, metric)
}
return metricList
}

func contains(slice []string, item string) bool {
for _, s := range slice {
if s == item {
return true
}
}
return false
}
Loading