Skip to content
9 changes: 8 additions & 1 deletion google-cloud-spanner/clirr-ignored-differences.xml
Original file line number Diff line number Diff line change
Expand Up @@ -758,6 +758,13 @@
<method>boolean isEnableGRPCBuiltInMetrics()</method>
</difference>

<!-- Added AFE Server Timing option -->
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/SpannerOptions$SpannerEnvironment</className>
<method>boolean isEnableAFEServerTiming()</method>
</difference>

<!-- Added Monitoring host option -->
<difference>
<differenceType>7012</differenceType>
Expand Down Expand Up @@ -899,7 +906,7 @@
<className>com/google/cloud/spanner/connection/Connection</className>
<method>java.lang.String getDefaultSequenceKind()</method>
</difference>

<!-- Default isolation level -->
<difference>
<differenceType>7012</differenceType>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.opentelemetry.sdk.metrics.InstrumentType;
import io.opentelemetry.sdk.metrics.View;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
Expand All @@ -39,6 +40,9 @@ public class BuiltInMetricsConstant {
static final String SPANNER_METER_NAME = "spanner-java";
static final String GRPC_METER_NAME = "grpc-java";
static final String GFE_LATENCIES_NAME = "gfe_latencies";
static final String AFE_LATENCIES_NAME = "afe_latencies";
static final String GFE_CONNECTIVITY_ERROR_NAME = "gfe_connectivity_error_count";
static final String AFE_CONNECTIVITY_ERROR_NAME = "afe_connectivity_error_count";
static final String OPERATION_LATENCIES_NAME = "operation_latencies";
static final String ATTEMPT_LATENCIES_NAME = "attempt_latencies";
static final String OPERATION_LATENCY_NAME = "operation_latency";
Expand All @@ -52,7 +56,10 @@ public class BuiltInMetricsConstant {
ATTEMPT_LATENCIES_NAME,
OPERATION_COUNT_NAME,
ATTEMPT_COUNT_NAME,
GFE_LATENCIES_NAME)
GFE_LATENCIES_NAME,
AFE_LATENCIES_NAME,
GFE_CONNECTIVITY_ERROR_NAME,
AFE_CONNECTIVITY_ERROR_NAME)
.stream()
.map(m -> METER_NAME + '/' + m)
.collect(Collectors.toSet());
Expand Down Expand Up @@ -110,14 +117,14 @@ public class BuiltInMetricsConstant {
static final Set<String> GRPC_LB_RLS_ATTRIBUTES =
ImmutableSet.of("grpc.lb.rls.data_plane_target", "grpc.lb.pick_result");

static List<Double> BUCKET_BOUNDARIES =
ImmutableList.of(
0.0, 0.5, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0, 11.0, 12.0, 13.0, 14.0, 15.0,
16.0, 17.0, 18.0, 19.0, 20.0, 25.0, 30.0, 40.0, 50.0, 65.0, 80.0, 100.0, 130.0, 160.0,
200.0, 250.0, 300.0, 400.0, 500.0, 650.0, 800.0, 1000.0, 2000.0, 5000.0, 10000.0, 20000.0,
50000.0, 100000.0, 200000.0, 400000.0, 800000.0, 1600000.0, 3200000.0);
static Aggregation AGGREGATION_WITH_MILLIS_HISTOGRAM =
Aggregation.explicitBucketHistogram(
ImmutableList.of(
0.0, 0.5, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0, 11.0, 12.0, 13.0, 14.0,
15.0, 16.0, 17.0, 18.0, 19.0, 20.0, 25.0, 30.0, 40.0, 50.0, 65.0, 80.0, 100.0, 130.0,
160.0, 200.0, 250.0, 300.0, 400.0, 500.0, 650.0, 800.0, 1000.0, 2000.0, 5000.0,
10000.0, 20000.0, 50000.0, 100000.0, 200000.0, 400000.0, 800000.0, 1600000.0,
3200000.0));
Aggregation.explicitBucketHistogram(BUCKET_BOUNDARIES);

static final Collection<String> GRPC_METRICS_ENABLED_BY_DEFAULT =
ImmutableList.of(
Expand Down Expand Up @@ -145,14 +152,6 @@ static Map<InstrumentSelector, View> getAllViews() {
BuiltInMetricsConstant.AGGREGATION_WITH_MILLIS_HISTOGRAM,
InstrumentType.HISTOGRAM,
"ms");
defineView(
views,
BuiltInMetricsConstant.SPANNER_METER_NAME,
BuiltInMetricsConstant.GFE_LATENCIES_NAME,
BuiltInMetricsConstant.GFE_LATENCIES_NAME,
BuiltInMetricsConstant.AGGREGATION_WITH_MILLIS_HISTOGRAM,
InstrumentType.HISTOGRAM,
"ms");
defineView(
views,
BuiltInMetricsConstant.GAX_METER_NAME,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.common.AttributesBuilder;
import io.opentelemetry.api.metrics.DoubleHistogram;
import io.opentelemetry.api.metrics.LongCounter;
import io.opentelemetry.api.metrics.Meter;
import java.util.Map;

Expand All @@ -35,6 +36,9 @@
class BuiltInMetricsRecorder extends OpenTelemetryMetricsRecorder {

private final DoubleHistogram gfeLatencyRecorder;
private final DoubleHistogram afeLatencyRecorder;
private final LongCounter gfeHeaderMissingCountRecorder;
private final LongCounter afeHeaderMissingCountRecorder;

/**
* Creates the following instruments for the following metrics:
Expand All @@ -59,6 +63,27 @@ class BuiltInMetricsRecorder extends OpenTelemetryMetricsRecorder {
.setDescription(
"Latency between Google's network receiving an RPC and reading back the first byte of the response")
.setUnit("ms")
.setExplicitBucketBoundariesAdvice(BuiltInMetricsConstant.BUCKET_BOUNDARIES)
.build();
this.afeLatencyRecorder =
meter
.histogramBuilder(serviceName + '/' + BuiltInMetricsConstant.AFE_LATENCIES_NAME)
.setDescription(
"Latency between Spanner API Frontend receiving an RPC and starting to write back the response.")
.setExplicitBucketBoundariesAdvice(BuiltInMetricsConstant.BUCKET_BOUNDARIES)
.setUnit("ms")
.build();
this.gfeHeaderMissingCountRecorder =
meter
.counterBuilder(serviceName + '/' + BuiltInMetricsConstant.GFE_CONNECTIVITY_ERROR_NAME)
.setDescription("Number of requests that failed to reach the Google network.")
.setUnit("1")
.build();
this.afeHeaderMissingCountRecorder =
meter
.counterBuilder(serviceName + '/' + BuiltInMetricsConstant.AFE_CONNECTIVITY_ERROR_NAME)
.setDescription("Number of requests that failed to reach the Spanner API Frontend.")
.setUnit("1")
.build();
}

Expand All @@ -69,8 +94,25 @@ class BuiltInMetricsRecorder extends OpenTelemetryMetricsRecorder {
* @param gfeLatency Attempt Latency in ms
* @param attributes Map of the attributes to store
*/
void recordGFELatency(double gfeLatency, Map<String, String> attributes) {
gfeLatencyRecorder.record(gfeLatency, toOtelAttributes(attributes));
void recordServerTimingHeaderMetrics(
Long gfeLatency,
Long afeLatency,
Long gfeHeaderMissingCount,
Long afeHeaderMissingCount,
Map<String, String> attributes) {
io.opentelemetry.api.common.Attributes otelAttributes = toOtelAttributes(attributes);
if (gfeLatency != null) {
gfeLatencyRecorder.record(gfeLatency, otelAttributes);
}
if (gfeHeaderMissingCount > 0) {
gfeHeaderMissingCountRecorder.add(gfeHeaderMissingCount, otelAttributes);
}
if (afeLatency != null) {
afeLatencyRecorder.record(afeLatency, otelAttributes);
}
if (afeHeaderMissingCount > 0) {
afeHeaderMissingCountRecorder.add(afeHeaderMissingCount, otelAttributes);
}
}

Attributes toOtelAttributes(Map<String, String> attributes) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,10 @@ class BuiltInMetricsTracer extends MetricsTracer implements ApiTracer {
private final BuiltInMetricsRecorder builtInOpenTelemetryMetricsRecorder;
// These are RPC specific attributes and pertain to a specific API Trace
private final Map<String, String> attributes = new HashMap<>();

private Long gfeLatency = null;
private Long afeLatency = null;
private long gfeHeaderMissingCount = 0;
private long afeHeaderMissingCount = 0;

BuiltInMetricsTracer(
MethodName methodName, BuiltInMetricsRecorder builtInOpenTelemetryMetricsRecorder) {
Expand All @@ -54,10 +56,9 @@ class BuiltInMetricsTracer extends MetricsTracer implements ApiTracer {
@Override
public void attemptSucceeded() {
super.attemptSucceeded();
if (gfeLatency != null) {
attributes.put(STATUS_ATTRIBUTE, StatusCode.Code.OK.toString());
builtInOpenTelemetryMetricsRecorder.recordGFELatency(gfeLatency, attributes);
}
attributes.put(STATUS_ATTRIBUTE, StatusCode.Code.OK.toString());
builtInOpenTelemetryMetricsRecorder.recordServerTimingHeaderMetrics(
gfeLatency, afeLatency, gfeHeaderMissingCount, afeHeaderMissingCount, attributes);
}

/**
Expand All @@ -67,10 +68,9 @@ public void attemptSucceeded() {
@Override
public void attemptCancelled() {
super.attemptCancelled();
if (gfeLatency != null) {
attributes.put(STATUS_ATTRIBUTE, StatusCode.Code.CANCELLED.toString());
builtInOpenTelemetryMetricsRecorder.recordGFELatency(gfeLatency, attributes);
}
attributes.put(STATUS_ATTRIBUTE, StatusCode.Code.CANCELLED.toString());
builtInOpenTelemetryMetricsRecorder.recordServerTimingHeaderMetrics(
gfeLatency, afeLatency, gfeHeaderMissingCount, afeHeaderMissingCount, attributes);
}

/**
Expand All @@ -84,10 +84,9 @@ public void attemptCancelled() {
@Override
public void attemptFailedDuration(Throwable error, java.time.Duration delay) {
super.attemptFailedDuration(error, delay);
if (gfeLatency != null) {
attributes.put(STATUS_ATTRIBUTE, extractStatus(error));
builtInOpenTelemetryMetricsRecorder.recordGFELatency(gfeLatency, attributes);
}
attributes.put(STATUS_ATTRIBUTE, extractStatus(error));
builtInOpenTelemetryMetricsRecorder.recordServerTimingHeaderMetrics(
gfeLatency, afeLatency, gfeHeaderMissingCount, afeHeaderMissingCount, attributes);
}

/**
Expand All @@ -100,10 +99,9 @@ public void attemptFailedDuration(Throwable error, java.time.Duration delay) {
@Override
public void attemptFailedRetriesExhausted(Throwable error) {
super.attemptFailedRetriesExhausted(error);
if (gfeLatency != null) {
attributes.put(STATUS_ATTRIBUTE, extractStatus(error));
builtInOpenTelemetryMetricsRecorder.recordGFELatency(gfeLatency, attributes);
}
attributes.put(STATUS_ATTRIBUTE, extractStatus(error));
builtInOpenTelemetryMetricsRecorder.recordServerTimingHeaderMetrics(
gfeLatency, afeLatency, gfeHeaderMissingCount, afeHeaderMissingCount, attributes);
}

/**
Expand All @@ -116,16 +114,27 @@ public void attemptFailedRetriesExhausted(Throwable error) {
@Override
public void attemptPermanentFailure(Throwable error) {
super.attemptPermanentFailure(error);
if (gfeLatency != null) {
attributes.put(STATUS_ATTRIBUTE, extractStatus(error));
builtInOpenTelemetryMetricsRecorder.recordGFELatency(gfeLatency, attributes);
}
attributes.put(STATUS_ATTRIBUTE, extractStatus(error));
builtInOpenTelemetryMetricsRecorder.recordServerTimingHeaderMetrics(
gfeLatency, afeLatency, gfeHeaderMissingCount, afeHeaderMissingCount, attributes);
}

void recordGFELatency(Long gfeLatency) {
this.gfeLatency = gfeLatency;
}

void recordAFELatency(Long afeLatency) {
this.afeLatency = afeLatency;
}

void recordGfeHeaderMissingCount(Long value) {
this.gfeHeaderMissingCount = value;
}

void recordAfeHeaderMissingCount(Long value) {
this.afeHeaderMissingCount = value;
}

@Override
public void addAttributes(Map<String, String> attributes) {
super.addAttributes(attributes);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,4 +198,28 @@ public void recordGFELatency(Long gfeLatency) {
}
}
}

public void recordGfeHeaderMissingCount(Long value) {
for (ApiTracer child : children) {
if (child instanceof BuiltInMetricsTracer) {
((BuiltInMetricsTracer) child).recordGfeHeaderMissingCount(value);
}
}
}

public void recordAFELatency(Long afeLatency) {
for (ApiTracer child : children) {
if (child instanceof BuiltInMetricsTracer) {
((BuiltInMetricsTracer) child).recordAFELatency(afeLatency);
}
}
}

public void recordAfeHeaderMissingCount(Long value) {
for (ApiTracer child : children) {
if (child instanceof BuiltInMetricsTracer) {
((BuiltInMetricsTracer) child).recordAfeHeaderMissingCount(value);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -689,6 +689,10 @@ private static boolean isEmulatorEnabled(SpannerOptions options, String emulator
&& options.getHost().endsWith(emulatorHost);
}

public static boolean isEnableAFEServerTiming() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: why are we not using SpannerOptions?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feature can only be disabled using ENV .

return "false".equalsIgnoreCase(System.getenv("SPANNER_DISABLE_AFE_SERVER_TIMING"));
}

private static final RetrySettings ADMIN_REQUESTS_LIMIT_EXCEEDED_RETRY_SETTINGS =
RetrySettings.newBuilder()
.setInitialRetryDelayDuration(Duration.ofSeconds(5L))
Expand Down Expand Up @@ -1993,6 +1997,9 @@ private GrpcCallContext createBaseCallContext() {
if (endToEndTracingEnabled) {
context = context.withExtraHeaders(metadataProvider.newEndToEndTracingHeader());
}
if (isEnableAFEServerTiming()) {
context = context.withExtraHeaders(metadataProvider.newAfeServerTimingHeader());
}
return context
.withStreamWaitTimeoutDuration(waitTimeout)
.withStreamIdleTimeoutDuration(idleTimeout);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ class HeaderInterceptor implements ClientInterceptor {
private static final Metadata.Key<String> SERVER_TIMING_HEADER_KEY =
Metadata.Key.of("server-timing", Metadata.ASCII_STRING_MARSHALLER);
private static final String GFE_TIMING_HEADER = "gfet4t7";
private static final String AFE_TIMING_HEADER = "afe";
private static final Metadata.Key<String> GOOGLE_CLOUD_RESOURCE_PREFIX_KEY =
Metadata.Key.of("google-cloud-resource-prefix", Metadata.ASCII_STRING_MARSHALLER);
private static final Pattern SERVER_TIMING_PATTERN =
Expand Down Expand Up @@ -174,13 +175,25 @@ private void processHeader(
if (compositeTracer != null) {
compositeTracer.recordGFELatency(gfeLatency);
}

if (span != null) {
span.setAttribute("gfe_latency", String.valueOf(gfeLatency));
}
} else {
measureMap.put(SPANNER_GFE_HEADER_MISSING_COUNT, 1L).record(tagContext);
spannerRpcMetrics.recordGfeHeaderMissingCount(1L, attributes);
if (compositeTracer != null) {
compositeTracer.recordGfeHeaderMissingCount(1L);
}
}

// Record AFE metrics
if (compositeTracer != null && GapicSpannerRpc.isEnableAFEServerTiming()) {
if (serverTimingMetrics.containsKey(AFE_TIMING_HEADER)) {
long afeLatency = serverTimingMetrics.get(AFE_TIMING_HEADER);
compositeTracer.recordAFELatency(afeLatency);
} else {
compositeTracer.recordAfeHeaderMissingCount(1L);
}
}
} catch (NumberFormatException e) {
LOGGER.log(LEVEL, "Invalid server-timing object in header: {}", serverTiming);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ class SpannerMetadataProvider {
private final String resourceHeaderKey;
private static final String ROUTE_TO_LEADER_HEADER_KEY = "x-goog-spanner-route-to-leader";
private static final String END_TO_END_TRACING_HEADER_KEY = "x-goog-spanner-end-to-end-tracing";
private static final String AFE_SERVER_TIMING_HEADER_KEY =
"x-goog-spanner-enable-afe-server-timing";
private static final Pattern[] RESOURCE_TOKEN_PATTERNS = {
Pattern.compile("^(?<headerValue>projects/[^/]*/instances/[^/]*/databases/[^/]*)(.*)?"),
Pattern.compile("^(?<headerValue>projects/[^/]*/instances/[^/]*)(.*)?")
Expand All @@ -47,6 +49,8 @@ class SpannerMetadataProvider {
ImmutableMap.of(ROUTE_TO_LEADER_HEADER_KEY, Collections.singletonList("true"));
private static final Map<String, List<String>> END_TO_END_TRACING_HEADER_MAP =
ImmutableMap.of(END_TO_END_TRACING_HEADER_KEY, Collections.singletonList("true"));
private static final Map<String, List<String>> AFE_SERVER_TIMING_HEADER_MAP =
ImmutableMap.of(AFE_SERVER_TIMING_HEADER_KEY, Collections.singletonList("true"));

private SpannerMetadataProvider(Map<String, String> headers, String resourceHeaderKey) {
this.resourceHeaderKey = resourceHeaderKey;
Expand Down Expand Up @@ -96,6 +100,10 @@ Map<String, List<String>> newEndToEndTracingHeader() {
return END_TO_END_TRACING_HEADER_MAP;
}

Map<String, List<String>> newAfeServerTimingHeader() {
return AFE_SERVER_TIMING_HEADER_MAP;
}

private Map<Metadata.Key<String>, String> constructHeadersAsMetadata(
Map<String, String> headers) {
ImmutableMap.Builder<Metadata.Key<String>, String> headersAsMetadataBuilder =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ abstract class AbstractNettyMockServerTest {
protected static AtomicInteger fakeServerTiming =
new AtomicInteger(new Random().nextInt(1000) + 1);

protected static AtomicInteger fakeAFEServerTiming =
new AtomicInteger(new Random().nextInt(500) + 1);

protected Spanner spanner;

@BeforeClass
Expand All @@ -72,7 +75,9 @@ public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
public void sendHeaders(Metadata headers) {
headers.put(
Metadata.Key.of("server-timing", Metadata.ASCII_STRING_MARSHALLER),
String.format("gfet4t7; dur=%d", fakeServerTiming.get()));
String.format(
"afe; dur=%d, gfet4t7; dur=%d",
fakeAFEServerTiming.get(), fakeServerTiming.get()));
super.sendHeaders(headers);
}
},
Expand Down
Loading
Loading