Skip to content
This repository was archived by the owner on Sep 26, 2023. It is now read-only.

Commit 461ff84

Browse files
authored
fix: retain context timeouts in ServerStreamingAttemptCallable (#1155)
1 parent 20bb200 commit 461ff84

File tree

2 files changed

+93
-10
lines changed

2 files changed

+93
-10
lines changed

‎gax/src/main/java/com/google/api/gax/rpc/ServerStreamingAttemptCallable.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -181,11 +181,12 @@ public void cancel() {
181181
}
182182
isStarted = true;
183183

184-
// Propagate the totalTimeout as the overall stream deadline.
184+
// Propagate the totalTimeout as the overall stream deadline, so long as the user
185+
// has not provided a timeout via the ApiCallContext. If they have, retain it.
185186
Duration totalTimeout =
186187
outerRetryingFuture.getAttemptSettings().getGlobalSettings().getTotalTimeout();
187188

188-
if (totalTimeout != null && context != null) {
189+
if (totalTimeout != null && context != null && context.getTimeout() == null) {
189190
context = context.withTimeout(totalTimeout);
190191
}
191192

@@ -217,7 +218,10 @@ public Void call() {
217218

218219
ApiCallContext attemptContext = context;
219220

220-
if (!outerRetryingFuture.getAttemptSettings().getRpcTimeout().isZero()) {
221+
// Set the streamWaitTimeout to the attempt RPC Timeout, only if the context
222+
// does not already have a timeout set by a user via withStreamWaitTimeout.
223+
if (!outerRetryingFuture.getAttemptSettings().getRpcTimeout().isZero()
224+
&& attemptContext.getStreamWaitTimeout() == null) {
221225
attemptContext =
222226
attemptContext.withStreamWaitTimeout(
223227
outerRetryingFuture.getAttemptSettings().getRpcTimeout());

‎gax/src/test/java/com/google/api/gax/rpc/ServerStreamingAttemptCallableTest.java

Lines changed: 86 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import com.google.api.gax.rpc.testing.FakeCallContext;
4242
import com.google.api.gax.rpc.testing.MockStreamingApi.MockServerStreamingCall;
4343
import com.google.api.gax.rpc.testing.MockStreamingApi.MockServerStreamingCallable;
44+
import com.google.api.gax.tracing.NoopApiTracer;
4445
import com.google.common.collect.Queues;
4546
import com.google.common.truth.Truth;
4647
import java.util.concurrent.BlockingDeque;
@@ -51,6 +52,7 @@
5152
import org.junit.Test;
5253
import org.junit.runner.RunWith;
5354
import org.junit.runners.JUnit4;
55+
import org.mockito.Mockito;
5456
import org.threeten.bp.Duration;
5557

5658
@RunWith(JUnit4.class)
@@ -59,29 +61,107 @@ public class ServerStreamingAttemptCallableTest {
5961
private AccumulatingObserver observer;
6062
private FakeRetryingFuture fakeRetryingFuture;
6163
private StreamResumptionStrategy<String, String> resumptionStrategy;
64+
private static Duration totalTimeout = Duration.ofHours(1);
65+
private FakeCallContext mockedCallContext;
6266

6367
@Before
6468
public void setUp() {
6569
innerCallable = new MockServerStreamingCallable<>();
6670
observer = new AccumulatingObserver(true);
6771
resumptionStrategy = new MyStreamResumptionStrategy();
72+
mockedCallContext = Mockito.mock(FakeCallContext.class);
6873
}
6974

7075
private ServerStreamingAttemptCallable<String, String> createCallable() {
76+
return createCallable(FakeCallContext.createDefault());
77+
}
78+
79+
private ServerStreamingAttemptCallable<String, String> createCallable(ApiCallContext context) {
7180
ServerStreamingAttemptCallable<String, String> callable =
7281
new ServerStreamingAttemptCallable<>(
73-
innerCallable,
74-
resumptionStrategy,
75-
"request",
76-
FakeCallContext.createDefault(),
77-
observer);
82+
innerCallable, resumptionStrategy, "request", context, observer);
7883

7984
fakeRetryingFuture = new FakeRetryingFuture(callable);
8085
callable.setExternalFuture(fakeRetryingFuture);
8186

8287
return callable;
8388
}
8489

90+
@Test
91+
public void testUserProvidedContextTimeout() {
92+
// Mock up the ApiCallContext as if the user provided a timeout and streamWaitTimeout.
93+
Mockito.doReturn(NoopApiTracer.getInstance()).when(mockedCallContext).getTracer();
94+
Mockito.doReturn(Duration.ofHours(5)).when(mockedCallContext).getTimeout();
95+
Mockito.doReturn(Duration.ofHours(5)).when(mockedCallContext).getStreamWaitTimeout();
96+
97+
ServerStreamingAttemptCallable<String, String> callable = createCallable(mockedCallContext);
98+
callable.start();
99+
100+
// Ensure that the callable did not overwrite the user provided timeouts
101+
Mockito.verify(mockedCallContext, Mockito.times(1)).getTimeout();
102+
Mockito.verify(mockedCallContext, Mockito.never()).withTimeout(totalTimeout);
103+
Mockito.verify(mockedCallContext, Mockito.times(1)).getStreamWaitTimeout();
104+
Mockito.verify(mockedCallContext, Mockito.never())
105+
.withStreamWaitTimeout(Mockito.any(Duration.class));
106+
107+
// Should notify outer observer
108+
Truth.assertThat(observer.controller).isNotNull();
109+
110+
// Should configure the inner controller correctly.
111+
MockServerStreamingCall<String, String> call = innerCallable.popLastCall();
112+
Truth.assertThat(call.getController().isAutoFlowControlEnabled()).isTrue();
113+
Truth.assertThat(call.getRequest()).isEqualTo("request");
114+
115+
// Send a response in auto flow mode.
116+
call.getController().getObserver().onResponse("response1");
117+
call.getController().getObserver().onResponse("response2");
118+
call.getController().getObserver().onComplete();
119+
120+
// Make sure the responses are received
121+
Truth.assertThat(observer.responses).containsExactly("response1", "response2").inOrder();
122+
fakeRetryingFuture.assertSuccess();
123+
}
124+
125+
@Test
126+
public void testNoUserProvidedContextTimeout() {
127+
// Mock up the ApiCallContext as if the user did not provide custom timeouts.
128+
Mockito.doReturn(NoopApiTracer.getInstance()).when(mockedCallContext).getTracer();
129+
Mockito.doReturn(null).when(mockedCallContext).getTimeout();
130+
Mockito.doReturn(null).when(mockedCallContext).getStreamWaitTimeout();
131+
Mockito.doReturn(mockedCallContext).when(mockedCallContext).withTimeout(totalTimeout);
132+
Mockito.doReturn(mockedCallContext)
133+
.when(mockedCallContext)
134+
.withStreamWaitTimeout(Mockito.any(Duration.class));
135+
136+
ServerStreamingAttemptCallable<String, String> callable = createCallable(mockedCallContext);
137+
callable.start();
138+
139+
// Ensure that the callable configured the timeouts via the Settings in the
140+
// absence of user-defined timeouts.
141+
Mockito.verify(mockedCallContext, Mockito.times(1)).getTimeout();
142+
Mockito.verify(mockedCallContext, Mockito.times(1)).withTimeout(totalTimeout);
143+
Mockito.verify(mockedCallContext, Mockito.times(1)).getStreamWaitTimeout();
144+
Mockito.verify(mockedCallContext, Mockito.times(1))
145+
.withStreamWaitTimeout(Mockito.any(Duration.class));
146+
147+
// Should notify outer observer
148+
Truth.assertThat(observer.controller).isNotNull();
149+
150+
// Should configure the inner controller correctly.
151+
MockServerStreamingCall<String, String> call = innerCallable.popLastCall();
152+
Truth.assertThat(call.getController().isAutoFlowControlEnabled()).isTrue();
153+
Truth.assertThat(call.getRequest()).isEqualTo("request");
154+
155+
// Send a response in auto flow mode.
156+
call.getController().getObserver().onResponse("response1");
157+
call.getController().getObserver().onResponse("response2");
158+
call.getController().getObserver().onComplete();
159+
160+
// Make sure the responses are received
161+
Truth.assertThat(observer.responses).containsExactly("response1", "response2").inOrder();
162+
fakeRetryingFuture.assertSuccess();
163+
}
164+
85165
@Test
86166
public void testNoErrorsAutoFlow() {
87167
ServerStreamingAttemptCallable<String, String> callable = createCallable();
@@ -396,8 +476,7 @@ private static class FakeRetryingFuture extends AbstractApiFuture<Void>
396476
this.attemptCallable = attemptCallable;
397477
attemptSettings =
398478
TimedAttemptSettings.newBuilder()
399-
.setGlobalSettings(
400-
RetrySettings.newBuilder().setTotalTimeout(Duration.ofHours(1)).build())
479+
.setGlobalSettings(RetrySettings.newBuilder().setTotalTimeout(totalTimeout).build())
401480
.setFirstAttemptStartTimeNanos(0)
402481
.setAttemptCount(0)
403482
.setOverallAttemptCount(0)

0 commit comments

Comments
 (0)