Skip to content
This repository was archived by the owner on Sep 26, 2023. It is now read-only.

Commit aab5288

Browse files
feat: introduce closeAsync to Batcher (#1423)
* feat: introduce closeAsync to Batcher This should allow callers to signal that they are done using a batcher without blocking their thread. * improve comments * add tests * format
1 parent 87636a5 commit aab5288

File tree

3 files changed

+200
-13
lines changed

3 files changed

+200
-13
lines changed

‎gax/src/main/java/com/google/api/gax/batching/Batcher.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131

3232
import com.google.api.core.ApiFuture;
3333
import com.google.api.core.BetaApi;
34+
import com.google.api.core.InternalExtensionOnly;
3435

3536
/**
3637
* Represents a batching context where individual elements will be accumulated and flushed in a
@@ -45,6 +46,7 @@
4546
* @param <ElementResultT> The type of the result for each individual element.
4647
*/
4748
@BetaApi("The surface for batching is not stable yet and may change in the future.")
49+
@InternalExtensionOnly
4850
public interface Batcher<ElementT, ElementResultT> extends AutoCloseable {
4951

5052
/**
@@ -74,9 +76,15 @@ public interface Batcher<ElementT, ElementResultT> extends AutoCloseable {
7476
void sendOutstanding();
7577

7678
/**
77-
* Closes this Batcher by preventing new elements from being added and flushing the existing
79+
* Closes this Batcher by preventing new elements from being added, and then flushing the existing
7880
* elements.
7981
*/
8082
@Override
8183
void close() throws InterruptedException;
84+
85+
/**
86+
* Closes this Batcher by preventing new elements from being added, and then sending outstanding
87+
* elements. The returned future will be resolved when the last element completes
88+
*/
89+
ApiFuture<Void> closeAsync();
8290
}

‎gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java

Lines changed: 66 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
import java.util.List;
5353
import java.util.concurrent.ConcurrentHashMap;
5454
import java.util.concurrent.ConcurrentMap;
55+
import java.util.concurrent.ExecutionException;
5556
import java.util.concurrent.Future;
5657
import java.util.concurrent.ScheduledExecutorService;
5758
import java.util.concurrent.TimeUnit;
@@ -89,7 +90,7 @@ public class BatcherImpl<ElementT, ElementResultT, RequestT, ResponseT>
8990
private final Object flushLock = new Object();
9091
private final Object elementLock = new Object();
9192
private final Future<?> scheduledFuture;
92-
private volatile boolean isClosed = false;
93+
private SettableApiFuture<Void> closeFuture;
9394
private final BatcherStats batcherStats = new BatcherStats();
9495
private final FlowController flowController;
9596

@@ -172,7 +173,10 @@ public BatcherImpl(
172173
/** {@inheritDoc} */
173174
@Override
174175
public ApiFuture<ElementResultT> add(ElementT element) {
175-
Preconditions.checkState(!isClosed, "Cannot add elements on a closed batcher");
176+
// Note: there is no need to synchronize over closeFuture. The write & read of the variable
177+
// will only be done from a single calling thread.
178+
Preconditions.checkState(closeFuture == null, "Cannot add elements on a closed batcher");
179+
176180
// This is not the optimal way of throttling. It does not send out partial batches, which
177181
// means that the Batcher might not use up all the resources allowed by FlowController.
178182
// The more efficient implementation should look like:
@@ -257,9 +261,20 @@ public void onFailure(Throwable throwable) {
257261
}
258262

259263
private void onBatchCompletion() {
260-
if (numOfOutstandingBatches.decrementAndGet() == 0) {
261-
synchronized (flushLock) {
264+
boolean shouldClose = false;
265+
266+
synchronized (flushLock) {
267+
if (numOfOutstandingBatches.decrementAndGet() == 0) {
262268
flushLock.notifyAll();
269+
shouldClose = closeFuture != null;
270+
}
271+
}
272+
if (shouldClose) {
273+
BatchingException overallError = batcherStats.asException();
274+
if (overallError != null) {
275+
closeFuture.setException(overallError);
276+
} else {
277+
closeFuture.set(null);
263278
}
264279
}
265280
}
@@ -279,17 +294,57 @@ private void awaitAllOutstandingBatches() throws InterruptedException {
279294
/** {@inheritDoc} */
280295
@Override
281296
public void close() throws InterruptedException {
282-
if (isClosed) {
283-
return;
297+
try {
298+
closeAsync().get();
299+
} catch (ExecutionException e) {
300+
// Original stacktrace of a batching exception is not useful, so rethrow the error with
301+
// the caller stacktrace
302+
if (e.getCause() instanceof BatchingException) {
303+
BatchingException cause = (BatchingException) e.getCause();
304+
throw new BatchingException(cause.getMessage());
305+
} else {
306+
throw new IllegalStateException("unexpected error closing the batcher", e.getCause());
307+
}
308+
}
309+
}
310+
311+
@Override
312+
public ApiFuture<Void> closeAsync() {
313+
if (closeFuture != null) {
314+
return closeFuture;
315+
}
316+
317+
// Send any buffered elements
318+
// Must come before numOfOutstandingBatches check below
319+
sendOutstanding();
320+
321+
boolean closeImmediately;
322+
323+
synchronized (flushLock) {
324+
// prevent admission of new elements
325+
closeFuture = SettableApiFuture.create();
326+
// check if we can close immediately
327+
closeImmediately = numOfOutstandingBatches.get() == 0;
284328
}
285-
flush();
329+
330+
// Clean up accounting
286331
scheduledFuture.cancel(true);
287-
isClosed = true;
288332
currentBatcherReference.closed = true;
289333
currentBatcherReference.clear();
290-
BatchingException exception = batcherStats.asException();
291-
if (exception != null) {
292-
throw exception;
334+
335+
// notify futures
336+
if (closeImmediately) {
337+
finishClose();
338+
}
339+
return closeFuture;
340+
}
341+
342+
private void finishClose() {
343+
BatchingException batchingException = batcherStats.asException();
344+
if (batchingException != null) {
345+
closeFuture.setException(batchingException);
346+
} else {
347+
closeFuture.set(null);
293348
}
294349
}
295350

‎gax/src/test/java/com/google/api/gax/batching/BatcherImplTest.java

Lines changed: 125 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import com.google.api.gax.rpc.testing.FakeBatchableApi.LabeledIntList;
4646
import com.google.api.gax.rpc.testing.FakeBatchableApi.LabeledIntSquarerCallable;
4747
import com.google.api.gax.rpc.testing.FakeBatchableApi.SquarerBatchingDescriptorV2;
48+
import com.google.common.base.Stopwatch;
4849
import com.google.common.collect.ImmutableList;
4950
import com.google.common.collect.Queues;
5051
import java.util.ArrayList;
@@ -69,7 +70,9 @@
6970
import java.util.logging.Logger;
7071
import org.junit.After;
7172
import org.junit.AfterClass;
73+
import org.junit.Assert;
7274
import org.junit.Test;
75+
import org.junit.function.ThrowingRunnable;
7376
import org.junit.runner.RunWith;
7477
import org.junit.runners.JUnit4;
7578
import org.threeten.bp.Duration;
@@ -92,7 +95,12 @@ public class BatcherImplTest {
9295
@After
9396
public void tearDown() throws InterruptedException {
9497
if (underTest != null) {
95-
underTest.close();
98+
try {
99+
// Close the batcher to avoid warnings of orphaned batchers
100+
underTest.close();
101+
} catch (BatchingException ignored) {
102+
// Some tests intentionally inject failures into mutations
103+
}
96104
}
97105
}
98106

@@ -172,6 +180,55 @@ public void testNoElementAdditionAfterClose() throws Exception {
172180
.matches("Cannot add elements on a closed batcher");
173181
}
174182

183+
/** Validates exception when batch is called after {@link Batcher#close()}. */
184+
@Test
185+
public void testNoElementAdditionAfterCloseAsync() throws Exception {
186+
underTest = createDefaultBatcherImpl(batchingSettings, null);
187+
underTest.add(1);
188+
underTest.closeAsync();
189+
190+
IllegalStateException e =
191+
Assert.assertThrows(
192+
IllegalStateException.class,
193+
new ThrowingRunnable() {
194+
@Override
195+
public void run() throws Throwable {
196+
underTest.add(1);
197+
}
198+
});
199+
200+
assertThat(e).hasMessageThat().matches("Cannot add elements on a closed batcher");
201+
}
202+
203+
@Test
204+
public void testCloseAsyncNonblocking() throws ExecutionException, InterruptedException {
205+
final SettableApiFuture<List<Integer>> innerFuture = SettableApiFuture.create();
206+
207+
UnaryCallable<LabeledIntList, List<Integer>> unaryCallable =
208+
new UnaryCallable<LabeledIntList, List<Integer>>() {
209+
@Override
210+
public ApiFuture<List<Integer>> futureCall(
211+
LabeledIntList request, ApiCallContext context) {
212+
return innerFuture;
213+
}
214+
};
215+
underTest =
216+
new BatcherImpl<>(
217+
SQUARER_BATCHING_DESC_V2, unaryCallable, labeledIntList, batchingSettings, EXECUTOR);
218+
219+
ApiFuture<Integer> elementFuture = underTest.add(1);
220+
221+
Stopwatch stopwatch = Stopwatch.createStarted();
222+
ApiFuture<Void> closeFuture = underTest.closeAsync();
223+
assertThat(stopwatch.elapsed(TimeUnit.MILLISECONDS)).isAtMost(100);
224+
225+
assertThat(closeFuture.isDone()).isFalse();
226+
assertThat(elementFuture.isDone()).isFalse();
227+
228+
innerFuture.set(ImmutableList.of(1));
229+
closeFuture.get();
230+
}
231+
175232
/** Verifies exception occurred at RPC is propagated to element results */
176233
@Test
177234
public void testResultFailureAfterRPCFailure() throws Exception {
@@ -614,6 +671,73 @@ public boolean isLoggable(LogRecord record) {
614671
}
615672
}
616673

674+
/**
675+
* Validates the absence of warning in case {@link BatcherImpl} is garbage collected after being
676+
* closed.
677+
*
678+
* <p>Note:This test cannot run concurrently with other tests that use Batchers.
679+
*/
680+
@Test
681+
public void testClosedBatchersAreNotLogged() throws Exception {
682+
// Clean out the existing instances
683+
final long DELAY_TIME = 30L;
684+
int actualRemaining = 0;
685+
for (int retry = 0; retry < 3; retry++) {
686+
System.gc();
687+
System.runFinalization();
688+
actualRemaining = BatcherReference.cleanQueue();
689+
if (actualRemaining == 0) {
690+
break;
691+
}
692+
Thread.sleep(DELAY_TIME * (1L << retry));
693+
}
694+
assertThat(actualRemaining).isAtMost(0);
695+
696+
// Capture logs
697+
final List<LogRecord> records = new ArrayList<>(1);
698+
Logger batcherLogger = Logger.getLogger(BatcherImpl.class.getName());
699+
Filter oldFilter = batcherLogger.getFilter();
700+
batcherLogger.setFilter(
701+
new Filter() {
702+
@Override
703+
public boolean isLoggable(LogRecord record) {
704+
synchronized (records) {
705+
records.add(record);
706+
}
707+
return false;
708+
}
709+
});
710+
711+
try {
712+
// Create a bunch of batchers that will garbage collected after being closed
713+
for (int i = 0; i < 1_000; i++) {
714+
BatcherImpl<Integer, Integer, LabeledIntList, List<Integer>> batcher =
715+
createDefaultBatcherImpl(batchingSettings, null);
716+
batcher.add(1);
717+
718+
if (i % 2 == 0) {
719+
batcher.close();
720+
} else {
721+
batcher.closeAsync();
722+
}
723+
}
724+
// Run GC a few times to give the batchers a chance to be collected
725+
for (int retry = 0; retry < 100; retry++) {
726+
System.gc();
727+
System.runFinalization();
728+
BatcherReference.cleanQueue();
729+
Thread.sleep(10);
730+
}
731+
732+
synchronized (records) {
733+
assertThat(records).isEmpty();
734+
}
735+
} finally {
736+
// reset logging
737+
batcherLogger.setFilter(oldFilter);
738+
}
739+
}
740+
617741
@Test
618742
public void testCloseRace() throws ExecutionException, InterruptedException, TimeoutException {
619743
int iterations = 1_000_000;

0 commit comments

Comments
 (0)