Skip to content
This repository was archived by the owner on Sep 26, 2023. It is now read-only.

Commit c6308c9

Browse files
fix: Fix race condition in BatcherImpl flush (#1200)
* fix: Fix race condition in BatcherImpl flush Currently the following race condition exists: T1 - awaitAllOutstandingBatches checks that numOfOutstandingBatches > 0 T2 - onBatchCompletion decrements numOfOutstandingBatches T2 - flushLock.notifyAll() T1 - flushLock.wait() so T1 will wait indefinitely The fix is quite simple: make sure that the there batches to wait for after acquiring the lock * add test
1 parent 0976e20 commit c6308c9

File tree

2 files changed

+71
-0
lines changed

2 files changed

+71
-0
lines changed

‎gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,10 @@ private void onBatchCompletion() {
198198
private void awaitAllOutstandingBatches() throws InterruptedException {
199199
while (numOfOutstandingBatches.get() > 0) {
200200
synchronized (flushLock) {
201+
// Check again under lock to avoid racing with onBatchCompletion
202+
if (numOfOutstandingBatches.get() == 0) {
203+
break;
204+
}
201205
flushLock.wait();
202206
}
203207
}

‎gax/src/test/java/com/google/api/gax/batching/BatcherImplTest.java

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,12 +36,14 @@
3636

3737
import com.google.api.core.ApiFuture;
3838
import com.google.api.core.ApiFutures;
39+
import com.google.api.core.SettableApiFuture;
3940
import com.google.api.gax.batching.BatcherImpl.BatcherReference;
4041
import com.google.api.gax.rpc.ApiCallContext;
4142
import com.google.api.gax.rpc.UnaryCallable;
4243
import com.google.api.gax.rpc.testing.FakeBatchableApi.LabeledIntList;
4344
import com.google.api.gax.rpc.testing.FakeBatchableApi.LabeledIntSquarerCallable;
4445
import com.google.api.gax.rpc.testing.FakeBatchableApi.SquarerBatchingDescriptorV2;
46+
import com.google.common.collect.ImmutableList;
4547
import com.google.common.collect.Queues;
4648
import java.util.ArrayList;
4749
import java.util.List;
@@ -56,6 +58,7 @@
5658
import java.util.concurrent.ScheduledExecutorService;
5759
import java.util.concurrent.ScheduledFuture;
5860
import java.util.concurrent.TimeUnit;
61+
import java.util.concurrent.TimeoutException;
5962
import java.util.concurrent.atomic.AtomicBoolean;
6063
import java.util.concurrent.atomic.AtomicInteger;
6164
import java.util.logging.Filter;
@@ -644,6 +647,70 @@ public boolean isLoggable(LogRecord record) {
644647
}
645648
}
646649

650+
@Test
651+
public void testCloseRace() throws ExecutionException, InterruptedException, TimeoutException {
652+
int iterations = 1_000_000;
653+
654+
ExecutorService executor = Executors.newFixedThreadPool(100);
655+
656+
try {
657+
List<Future<?>> closeFutures = new ArrayList<>();
658+
659+
for (int i = 0; i < iterations; i++) {
660+
final SettableApiFuture<List<Integer>> result = SettableApiFuture.create();
661+
662+
UnaryCallable<LabeledIntList, List<Integer>> callable =
663+
new UnaryCallable<LabeledIntList, List<Integer>>() {
664+
@Override
665+
public ApiFuture<List<Integer>> futureCall(
666+
LabeledIntList request, ApiCallContext context) {
667+
return result;
668+
}
669+
};
670+
final Batcher<Integer, Integer> batcher =
671+
new BatcherImpl<>(
672+
SQUARER_BATCHING_DESC_V2, callable, labeledIntList, batchingSettings, EXECUTOR);
673+
674+
batcher.add(1);
675+
676+
executor.execute(
677+
new Runnable() {
678+
@Override
679+
public void run() {
680+
result.set(ImmutableList.of(1));
681+
}
682+
});
683+
Future<?> f =
684+
executor.submit(
685+
new Runnable() {
686+
@Override
687+
public void run() {
688+
try {
689+
batcher.close();
690+
} catch (InterruptedException e) {
691+
Thread.currentThread().interrupt();
692+
throw new RuntimeException(e);
693+
}
694+
}
695+
});
696+
697+
closeFutures.add(f);
698+
}
699+
700+
// Make sure that none hang
701+
for (Future<?> f : closeFutures) {
702+
try {
703+
// Should never take this long, but padded just in case this runs on a limited machine
704+
f.get(1, TimeUnit.MINUTES);
705+
} catch (TimeoutException e) {
706+
assertWithMessage("BatcherImpl.close() is deadlocked").fail();
707+
}
708+
}
709+
} finally {
710+
executor.shutdownNow();
711+
}
712+
}
713+
647714
private void testElementTriggers(BatchingSettings settings) throws Exception {
648715
underTest =
649716
new BatcherImpl<>(

0 commit comments

Comments
 (0)