|
30 | 30 | package com.google.api.gax.grpc;
|
31 | 31 |
|
32 | 32 | import com.google.api.client.util.Preconditions;
|
| 33 | +import com.google.api.core.AbstractApiFuture; |
| 34 | +import com.google.api.core.ApiFuture; |
| 35 | +import com.google.api.core.BetaApi; |
33 | 36 | import com.google.api.gax.rpc.ApiCallContext;
|
34 | 37 | import com.google.api.gax.tracing.ApiTracer.Scope;
|
35 | 38 | import io.grpc.CallOptions;
|
|
38 | 41 | import io.grpc.ClientInterceptor;
|
39 | 42 | import io.grpc.ClientInterceptors;
|
40 | 43 | import io.grpc.Deadline;
|
| 44 | +import io.grpc.Metadata; |
41 | 45 | import io.grpc.MethodDescriptor;
|
| 46 | +import io.grpc.Status; |
42 | 47 | import io.grpc.stub.MetadataUtils;
|
43 | 48 | import java.util.concurrent.TimeUnit;
|
| 49 | +import java.util.logging.Level; |
| 50 | +import java.util.logging.Logger; |
44 | 51 |
|
45 | 52 | /**
|
46 | 53 | * {@code GrpcClientCalls} creates a new {@code ClientCall} from the given call context.
|
47 | 54 | *
|
48 | 55 | * <p>Package-private for internal use.
|
49 | 56 | */
|
50 | 57 | class GrpcClientCalls {
|
| 58 | + private static final Logger LOGGER = Logger.getLogger(GrpcDirectCallable.class.getName()); |
| 59 | + |
51 | 60 | private GrpcClientCalls() {};
|
52 | 61 |
|
53 | 62 | public static <RequestT, ResponseT> ClientCall<RequestT, ResponseT> newCall(
|
@@ -90,4 +99,101 @@ public static <RequestT, ResponseT> ClientCall<RequestT, ResponseT> newCall(
|
90 | 99 | return channel.newCall(descriptor, callOptions);
|
91 | 100 | }
|
92 | 101 | }
|
| 102 | + |
| 103 | + /** |
| 104 | + * A work-alike of {@link io.grpc.stub.ClientCalls#futureUnaryCall(ClientCall, Object)}. |
| 105 | + * |
| 106 | + * <p>The only difference is that unlike grpc-stub's implementation. This implementation doesn't |
| 107 | + * wait for trailers to resolve a unary RPC. This can save milliseconds when the server is |
| 108 | + * overloaded. |
| 109 | + */ |
| 110 | + @BetaApi |
| 111 | + static <RequestT, ResponseT> ApiFuture<ResponseT> eagerFutureUnaryCall( |
| 112 | + ClientCall<RequestT, ResponseT> clientCall, RequestT request) { |
| 113 | + // Start the call |
| 114 | + GrpcFuture<ResponseT> future = new GrpcFuture<>(clientCall); |
| 115 | + clientCall.start(new EagerFutureListener<>(future), new Metadata()); |
| 116 | + |
| 117 | + // Send the request |
| 118 | + try { |
| 119 | + clientCall.sendMessage(request); |
| 120 | + clientCall.halfClose(); |
| 121 | + // Request an extra message to detect misconfigured servers |
| 122 | + clientCall.request(2); |
| 123 | + } catch (Throwable sendError) { |
| 124 | + // Cancel if anything goes wrong |
| 125 | + try { |
| 126 | + clientCall.cancel(null, sendError); |
| 127 | + } catch (Throwable cancelError) { |
| 128 | + LOGGER.log(Level.SEVERE, "Error encountered while closing it", sendError); |
| 129 | + } |
| 130 | + |
| 131 | + throw sendError; |
| 132 | + } |
| 133 | + |
| 134 | + return future; |
| 135 | + } |
| 136 | + |
| 137 | + /** Thin wrapper around an ApiFuture that will cancel the underlying ClientCall. */ |
| 138 | + private static class GrpcFuture<T> extends AbstractApiFuture<T> { |
| 139 | + private final ClientCall<?, T> call; |
| 140 | + |
| 141 | + private GrpcFuture(ClientCall<?, T> call) { |
| 142 | + this.call = call; |
| 143 | + } |
| 144 | + |
| 145 | + @Override |
| 146 | + protected void interruptTask() { |
| 147 | + call.cancel("GrpcFuture was cancelled", null); |
| 148 | + } |
| 149 | + |
| 150 | + @Override |
| 151 | + public boolean set(T value) { |
| 152 | + return super.set(value); |
| 153 | + } |
| 154 | + |
| 155 | + @Override |
| 156 | + public boolean setException(Throwable throwable) { |
| 157 | + return super.setException(throwable); |
| 158 | + } |
| 159 | + } |
| 160 | + |
| 161 | + /** |
| 162 | + * A bridge between gRPC's ClientCall.Listener to an ApiFuture. |
| 163 | + * |
| 164 | + * <p>The Listener will eagerly resolve the future when the first message is received and will not |
| 165 | + * wait for the trailers. This should cut down on the latency at the expense of safety. If the |
| 166 | + * server is misconfigured and sends a second response for a unary call, the error will be logged, |
| 167 | + * but the future will still be successful. |
| 168 | + */ |
| 169 | + private static class EagerFutureListener<T> extends ClientCall.Listener<T> { |
| 170 | + private final GrpcFuture<T> future; |
| 171 | + |
| 172 | + private EagerFutureListener(GrpcFuture<T> future) { |
| 173 | + this.future = future; |
| 174 | + } |
| 175 | + |
| 176 | + @Override |
| 177 | + public void onMessage(T message) { |
| 178 | + if (!future.set(message)) { |
| 179 | + throw Status.INTERNAL |
| 180 | + .withDescription("More than one value received for unary call") |
| 181 | + .asRuntimeException(); |
| 182 | + } |
| 183 | + } |
| 184 | + |
| 185 | + @Override |
| 186 | + public void onClose(Status status, Metadata trailers) { |
| 187 | + if (!future.isDone()) { |
| 188 | + future.setException( |
| 189 | + Status.INTERNAL |
| 190 | + .withDescription("No value received for unary call") |
| 191 | + .asException(trailers)); |
| 192 | + } |
| 193 | + if (!status.isOk()) { |
| 194 | + LOGGER.log( |
| 195 | + Level.WARNING, "Received error for unary call after receiving a successful response"); |
| 196 | + } |
| 197 | + } |
| 198 | + } |
93 | 199 | }
|
0 commit comments