diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java index 3efadc1d47..25e9c646e1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java @@ -891,6 +891,14 @@ public class FSHLog extends AbstractFSWAL { return syncFuture; } + /** + * @return if the safepoint has been attained. + */ + @InterfaceAudience.Private + boolean isSafePointAttained() { + return this.safePointAttainedLatch.getCount() == 0; + } + /** * Called by Thread B when it attains the 'safe point'. In this method, Thread B signals Thread * A it can proceed. Thread B will be held in here until {@link #releaseSafePoint()} is called diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java index e763896d8d..49fa1dffd5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java @@ -18,6 +18,8 @@ package org.apache.hadoop.hbase.regionserver.wal; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; import java.io.IOException; import java.lang.reflect.Field; @@ -27,6 +29,7 @@ import java.util.TreeMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -34,6 +37,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.RegionInfo; @@ -49,8 +53,10 @@ import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.Threads; +import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALKey; +import org.apache.hadoop.hbase.wal.WALProvider; import org.junit.ClassRule; import org.junit.Rule; import org.junit.Test; @@ -67,6 +73,8 @@ public class TestFSHLog extends AbstractTestFSWAL { public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestFSHLog.class); + private static final long TEST_TIMEOUT_MS = 10000; + @Rule public TestName name = new TestName(); @@ -131,6 +139,89 @@ public class TestFSHLog extends AbstractTestFSWAL { } } + /** + * Test for WAL stall due to sync future overwrites. See HBASE-25984. + */ + @Test + public void testDeadlockWithSyncOverwrites() throws Exception { + final CountDownLatch blockBeforeSafePoint = new CountDownLatch(1); + + class FailingWriter implements WALProvider.Writer { + @Override public void sync(boolean forceSync) throws IOException { + throw new IOException("Injected failure.."); + } + + @Override public void append(WAL.Entry entry) throws IOException { + } + + @Override public long getLength() { + return 0; + } + + @Override + public long getSyncedLength() { + return 0; + } + + @Override public void close() throws IOException { + } + } + + /* + * Custom FSHLog implementation with a conditional wait before attaining safe point. + */ + class CustomFSHLog extends FSHLog { + public CustomFSHLog(FileSystem fs, Path rootDir, String logDir, String archiveDir, + Configuration conf, List listeners, boolean failIfWALExists, + String prefix, String suffix) throws IOException { + super(fs, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix); + } + + @Override + protected void beforeWaitOnSafePoint() { + try { + assertTrue(blockBeforeSafePoint.await(TEST_TIMEOUT_MS, TimeUnit.MILLISECONDS)); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + public SyncFuture publishSyncOnRingBuffer() { + long sequence = getSequenceOnRingBuffer(); + return publishSyncOnRingBuffer(sequence, false); + } + } + + final String name = this.name.getMethodName(); + try (CustomFSHLog log = new CustomFSHLog(FS, CommonFSUtils.getRootDir(CONF), name, + HConstants.HREGION_OLDLOGDIR_NAME, CONF, null, true, null, null)) { + log.setWriter(new FailingWriter()); + Field ringBufferEventHandlerField = + FSHLog.class.getDeclaredField("ringBufferEventHandler"); + ringBufferEventHandlerField.setAccessible(true); + FSHLog.RingBufferEventHandler ringBufferEventHandler = + (FSHLog.RingBufferEventHandler) ringBufferEventHandlerField.get(log); + // Force a safe point + FSHLog.SafePointZigZagLatch latch = ringBufferEventHandler.attainSafePoint(); + try { + SyncFuture future0 = log.publishSyncOnRingBuffer(); + // Wait for the sync to be done. + Waiter.waitFor(CONF, TEST_TIMEOUT_MS, future0::isDone); + // Publish another sync from the same thread, this should not overwrite the done sync. + SyncFuture future1 = log.publishSyncOnRingBuffer(); + assertFalse(future1.isDone()); + // Unblock the safe point trigger.. + blockBeforeSafePoint.countDown(); + // Wait for the safe point to be reached. + // With the deadlock in HBASE-25984, this is never possible, thus blocking the sync pipeline. + Waiter.waitFor(CONF, TEST_TIMEOUT_MS, latch::isSafePointAttained); + } finally { + // Force release the safe point, for the clean up. + latch.releaseSafePoint(); + } + } + } + /** * Test case for https://issues.apache.org/jira/browse/HBASE-16721 */