Skip to content

Commit

Permalink
Refactor Header Interceptor and modified the server timing header logic
Browse files Browse the repository at this point in the history
  • Loading branch information
surbhigarg92 committed Jan 3, 2025
1 parent 3b03055 commit 105edeb
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1828,11 +1828,11 @@ public OpenTelemetry getOpenTelemetry() {
/** Returns an instance of OpenTelemetry object for Built-in Client metrics. */
public OpenTelemetry getBuiltInMetricsOpenTelemetry() {
return this.builtInOpenTelemetryMetricsProvider.getOrCreateOpenTelemetry(
this.getProjectId(), getCredentials());
this.getProjectId(), getCredentials(), this.getMonitoringHost());
}

/** Returns attributes for an instance of Built-in Client metrics. */
public Map<String, String> getBuiltInMetricsClientAttributes() {
public Map<String, String> getBuiltInMetricsAttributes() {
return builtInOpenTelemetryMetricsProvider.createOrGetClientAttributes(
this.getProjectId(), "spanner-java/" + GaxProperties.getLibraryVersion(getClass()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ public GapicSpannerRpc(final SpannerOptions options) {
SpannerInterceptorProvider.createDefault(
options.getOpenTelemetry(),
options.getBuiltInMetricsOpenTelemetry(),
options.getBuiltInMetricsClientAttributes(),
options.getBuiltInMetricsAttributes(),
(() -> directPathEnabledSupplier.get()))))
// This sets the trace context headers.
.withTraceContext(endToEndTracingEnabled, options.getOpenTelemetry())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,11 @@ class HeaderInterceptor implements ClientInterceptor {
DatabaseName.of("undefined-project", "undefined-instance", "undefined-database");
private static final Metadata.Key<String> SERVER_TIMING_HEADER_KEY =
Metadata.Key.of("server-timing", Metadata.ASCII_STRING_MARSHALLER);
private static final String SERVER_TIMING_HEADER_PREFIX = "gfet4t7; dur=";
private static final String GFE_TIMING_HEADER = "gfet4t7";
private static final Metadata.Key<String> GOOGLE_CLOUD_RESOURCE_PREFIX_KEY =
Metadata.Key.of("google-cloud-resource-prefix", Metadata.ASCII_STRING_MARSHALLER);
private static final Pattern SERVER_TIMING_PATTERN =
Pattern.compile("(?<metricName>[a-zA-Z0-9_-]+);\\s*dur=(?<duration>\\d+)");
private static final Pattern GOOGLE_CLOUD_RESOURCE_PREFIX_PATTERN =
Pattern.compile(
".*projects/(?<project>\\p{ASCII}[^/]*)(/instances/(?<instance>\\p{ASCII}[^/]*))?(/databases/(?<database>\\p{ASCII}[^/]*))?");
Expand Down Expand Up @@ -121,25 +123,31 @@ public void start(Listener<RespT> responseListener, Metadata headers) {
Span span = Span.current();
DatabaseName databaseName = extractDatabaseName(headers);
String key = databaseName + method.getFullMethodName();
TagContext tagContext = getTagContext(key, method.getFullMethodName(), databaseName);
Attributes attributes =
getMetricAttributes(key, method.getFullMethodName(), databaseName);
Map<String, String> commonBuiltInMetricAttributes =
getCommonBuiltInMetricAttributes(key, databaseName);

TagContext openCensusTagContext =
getOpenCensusTagContext(key, method.getFullMethodName(), databaseName);
Attributes customMetricAttributes =
buildCustomMetricAttributes(key, method.getFullMethodName(), databaseName);
Map<String, String> builtInMetricAttributes =
buildBuiltInMetricAttributes(key, databaseName);
super.start(
new SimpleForwardingClientCallListener<RespT>(responseListener) {
@Override
public void onHeaders(Metadata metadata) {
Boolean isDirectPathUsed =
isDirectPathUsed(getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR));
addBuiltInMetricAttributes(
compositeTracer, commonBuiltInMetricAttributes, isDirectPathUsed);
processHeader(
if (compositeTracer != null) {
builtInMetricAttributes.put(
BuiltInMetricsConstant.DIRECT_PATH_USED_KEY.getKey(),
Boolean.toString(
isDirectPathUsed(
getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR))));
compositeTracer.addAttributes(builtInMetricAttributes);
}
processServerTimingHeader(
metadata,
tagContext,
attributes,
openCensusTagContext,
customMetricAttributes,
span,
getBuiltInMetricAttributes(commonBuiltInMetricAttributes, isDirectPathUsed));
builtInMetricAttributes);
super.onHeaders(metadata);
}
},
Expand All @@ -152,37 +160,63 @@ public void onHeaders(Metadata metadata) {
};
}

private void processHeader(
private void processServerTimingHeader(
Metadata metadata,
TagContext tagContext,
Attributes attributes,
Span span,
Map<String, String> builtInMetricsAttributes) {
MeasureMap measureMap = STATS_RECORDER.newMeasureMap();
String serverTiming = metadata.get(SERVER_TIMING_HEADER_KEY);
if (serverTiming != null && serverTiming.startsWith(SERVER_TIMING_HEADER_PREFIX)) {
try {
long latency = Long.parseLong(serverTiming.substring(SERVER_TIMING_HEADER_PREFIX.length()));
measureMap.put(SPANNER_GFE_LATENCY, latency);
try {
// Previous implementation parsed the GFE latency directly using:
// long latency = Long.parseLong(serverTiming.substring("gfet4t7; dur=".length()));
// This approach assumed the serverTiming header contained exactly one metric "gfet4t7".
// If additional metrics were introduced in the header, older versions of the library
// would fail to parse it correctly. To make the parsing more robust, the logic has been
// updated to handle multiple metrics gracefully.

Map<String, Long> serverTimingMetrics = parseServerTimingHeader(serverTiming);
if (serverTimingMetrics.containsKey(GFE_TIMING_HEADER)) {
long gfeLatency = serverTimingMetrics.get(GFE_TIMING_HEADER);

measureMap.put(SPANNER_GFE_LATENCY, gfeLatency);
measureMap.put(SPANNER_GFE_HEADER_MISSING_COUNT, 0L);
measureMap.record(tagContext);

spannerRpcMetrics.recordGfeLatency(latency, attributes);
spannerRpcMetrics.recordGfeLatency(gfeLatency, attributes);
spannerRpcMetrics.recordGfeHeaderMissingCount(0L, attributes);
builtInOpenTelemetryMetricsRecorder.recordGFELatency(latency, builtInMetricsAttributes);

builtInOpenTelemetryMetricsRecorder.recordGFELatency(gfeLatency, builtInMetricsAttributes);

if (span != null) {
span.setAttribute("gfe_latency", String.valueOf(latency));
span.setAttribute("gfe_latency", String.valueOf(gfeLatency));
}
} catch (NumberFormatException e) {
LOGGER.log(LEVEL, "Invalid server-timing object in header: {}", serverTiming);
} else {
measureMap.put(SPANNER_GFE_HEADER_MISSING_COUNT, 1L).record(tagContext);
spannerRpcMetrics.recordGfeHeaderMissingCount(1L, attributes);
}
} else {
spannerRpcMetrics.recordGfeHeaderMissingCount(1L, attributes);
measureMap.put(SPANNER_GFE_HEADER_MISSING_COUNT, 1L).record(tagContext);
} catch (NumberFormatException e) {
LOGGER.log(LEVEL, "Invalid server-timing object in header: {}", serverTiming);
}
}

private Map<String, Long> parseServerTimingHeader(String serverTiming) {
Map<String, Long> serverTimingMetrics = new HashMap<>();
if (serverTiming != null) {
Matcher matcher = SERVER_TIMING_PATTERN.matcher(serverTiming);
while (matcher.find()) {
String metricName = matcher.group("metricName");
String durationStr = matcher.group("duration");

if (metricName != null && durationStr != null) {
serverTimingMetrics.put(metricName, Long.valueOf(durationStr));
}
}
}
return serverTimingMetrics;
}

private DatabaseName extractDatabaseName(Metadata headers) throws ExecutionException {
String googleResourcePrefix = headers.get(GOOGLE_CLOUD_RESOURCE_PREFIX_KEY);
if (googleResourcePrefix != null) {
Expand Down Expand Up @@ -211,7 +245,7 @@ private DatabaseName extractDatabaseName(Metadata headers) throws ExecutionExcep
return UNDEFINED_DATABASE_NAME;
}

private TagContext getTagContext(String key, String method, DatabaseName databaseName)
private TagContext getOpenCensusTagContext(String key, String method, DatabaseName databaseName)
throws ExecutionException {
return tagsCache.get(
key,
Expand All @@ -225,8 +259,8 @@ private TagContext getTagContext(String key, String method, DatabaseName databas
.build());
}

private Attributes getMetricAttributes(String key, String method, DatabaseName databaseName)
throws ExecutionException {
private Attributes buildCustomMetricAttributes(
String key, String method, DatabaseName databaseName) throws ExecutionException {
return attributesCache.get(
key,
() -> {
Expand All @@ -240,8 +274,8 @@ private Attributes getMetricAttributes(String key, String method, DatabaseName d
});
}

private Map<String, String> getCommonBuiltInMetricAttributes(
String key, DatabaseName databaseName) throws ExecutionException {
private Map<String, String> buildBuiltInMetricAttributes(String key, DatabaseName databaseName)
throws ExecutionException {
return builtInAttributesCache.get(
key,
() -> {
Expand All @@ -256,24 +290,6 @@ private Map<String, String> getCommonBuiltInMetricAttributes(
});
}

private Map<String, String> getBuiltInMetricAttributes(
Map<String, String> commonBuiltInMetricsAttributes, Boolean isDirectPathUsed) {
Map<String, String> builtInMetricAttributes = new HashMap<>(commonBuiltInMetricsAttributes);
builtInMetricAttributes.put(
BuiltInMetricsConstant.DIRECT_PATH_USED_KEY.getKey(), Boolean.toString(isDirectPathUsed));
return builtInMetricAttributes;
}

private void addBuiltInMetricAttributes(
CompositeTracer compositeTracer,
Map<String, String> commonBuiltInMetricsAttributes,
Boolean isDirectPathUsed) {
if (compositeTracer != null) {
compositeTracer.addAttributes(
getBuiltInMetricAttributes(commonBuiltInMetricsAttributes, isDirectPathUsed));
}
}

private Boolean isDirectPathUsed(SocketAddress remoteAddr) {
if (remoteAddr instanceof InetSocketAddress) {
InetAddress inetAddress = ((InetSocketAddress) remoteAddr).getAddress();
Expand Down

0 comments on commit 105edeb

Please sign in to comment.