commit 3b44b4e185f0d74ae45c6c3b207dc2c6a313cc3d Author: Bharath Vissapragada Date: Wed Jul 7 13:03:48 2021 -0700 HBASE-26042: Race repro diff --git a/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java index 457b7c1650..6e756313e5 100644 --- a/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java +++ b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java @@ -397,6 +397,17 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput { headerBuf.writerIndex(headerLen); Callback c = new Callback(future, nextPacketOffsetInBlock + dataLen, datanodeList); waitingAckQueue.addLast(c); + // Wait here until the unit test restarts the DN + // What happens here is that the netty channel failed() callback is invoked that cleans up + // waitingAckQueue. That is empty however because we are looping here, failed() finishes without + // cleaning up anything. + // The call backs here that correspond to flush() in the unit test are still not enqueued. + // So sequence of actions is + // -- failed() -> cleans up the waitingAckQueue (which is empty anyway) + // -- flush 1 -> cleans it up itself as ( waitingAckQueue.peekFirst() == c) + // -- flush 2 -> hung (enqueued waiting up to be cleaned but will never be cleaned) + // flush 2 is never cleaned up as the channel fail call back has been already executed. + while (state == State.STREAMING) {} // recheck again after we pushed the callback to queue if (state != State.STREAMING && waitingAckQueue.peekFirst() == c) { future.completeExceptionally(new IOException("stream already broken")); diff --git a/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java index 03ff1ee775..67518aa8b7 100644 --- a/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java +++ b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java @@ -33,8 +33,10 @@ import java.util.ArrayList; import java.util.List; import java.util.Random; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -146,9 +148,28 @@ public class TestFanOutOneBlockAsyncDFSOutput extends AsyncFSTestBase { byte[] b = new byte[10]; ThreadLocalRandom.current().nextBytes(b); out.write(b, 0, b.length); - out.flush(false).get(); + CountDownLatch waitForFlushes = new CountDownLatch(2); + CompletableFuture.runAsync(() -> { + try { + out.flush(false).get(); + } catch (Exception e) { + LOG.error("Flush 1 failed: " + e); + } + waitForFlushes.countDown(); + }); + out.write(b, 0, b.length); + CompletableFuture.runAsync(() -> { + try { + out.flush(false).get(); + } catch (Exception e) { + LOG.error("Flush 2 failed: " + e); + } + waitForFlushes.countDown(); + }); // restart one datanode which causes one connection broken CLUSTER.restartDataNode(0); + // Set a break point here and inspect the state of 'out' -> waitingAckQueue + waitForFlushes.await(); out.write(b, 0, b.length); try { out.flush(false).get();