diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java index 142ab63..6bc4967 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java @@ -246,15 +246,25 @@ public class FSHLog extends AbstractFSWAL { // Advance the ring buffer sequence so that it starts from 1 instead of 0, // because SyncFuture.NOT_DONE = 0. this.disruptor.getRingBuffer().next(); - int maxHandlersCount = conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT, 200); - this.ringBufferEventHandler = new RingBufferEventHandler( - conf.getInt("hbase.regionserver.hlog.syncer.count", 5), maxHandlersCount); + this.ringBufferEventHandler = newRingBufferEventHandler(conf); this.disruptor.handleExceptionsWith(new RingBufferExceptionHandler()); this.disruptor.handleEventsWith(new RingBufferEventHandler[] { this.ringBufferEventHandler }); // Starting up threads in constructor is a no no; Interface should have an init call. this.disruptor.start(); } + @VisibleForTesting + RingBufferEventHandler newRingBufferEventHandler(Configuration conf) { + return new RingBufferEventHandler( + conf.getInt("hbase.regionserver.hlog.syncer.count", 5), + conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT, 200)); + } + + @VisibleForTesting + RingBufferEventHandler getRingBufferEventHandler() { + return this.ringBufferEventHandler; + } + /** * Currently, we need to expose the writer's OutputStream to tests so that they can manipulate the * default behavior (such as setting the maxRecoveryErrorCount value for example (see @@ -1098,6 +1108,11 @@ public class FSHLog extends AbstractFSWAL { } } + @VisibleForTesting + int getSyncFuturesCount() { + return this.syncFuturesCount; + } + SafePointZigZagLatch attainSafePoint() { this.zigzagLatch = new SafePointZigZagLatch(); return this.zigzagLatch; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestFSWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestFSWAL.java index 19759d1..90a843c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestFSWAL.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestFSWAL.java @@ -146,9 +146,22 @@ public abstract class AbstractTestFSWAL { } } + /** + * Append to the WAL and then call sync. + */ protected void addEdits(WAL log, HRegionInfo hri, HTableDescriptor htd, int times, MultiVersionConcurrencyControl mvcc, NavigableMap scopes) throws IOException { + append(log, hri, htd, times, mvcc, scopes); + log.sync(); + } + + /** + * Append to the WAL. + */ + protected void append(WAL log, HRegionInfo hri, HTableDescriptor htd, int times, + MultiVersionConcurrencyControl mvcc, NavigableMap scopes) + throws IOException { final byte[] row = Bytes.toBytes("row"); for (int i = 0; i < times; i++) { long timestamp = System.currentTimeMillis(); @@ -159,7 +172,6 @@ public abstract class AbstractTestFSWAL { HConstants.NO_NONCE, mvcc, scopes); log.append(hri, key, cols, true); } - log.sync(); } /** diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java index 640e851..3e6c681 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java @@ -18,6 +18,8 @@ */ package org.apache.hadoop.hbase.regionserver.wal; +import static org.junit.Assert.*; + import java.io.IOException; import java.lang.reflect.Field; import java.util.List; @@ -40,22 +42,25 @@ import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; import org.apache.hadoop.hbase.regionserver.Region; +import org.apache.hadoop.hbase.regionserver.wal.FSHLog.RingBufferEventHandler; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.Threads; +import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WALKey; +import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; - -import static org.junit.Assert.assertEquals; +import org.junit.rules.TestName; /** * Provides FSHLog test cases. */ @Category({ RegionServerTests.class, MediumTests.class }) public class TestFSHLog extends AbstractTestFSWAL { + @Rule public final TestName testName = new TestName(); @Override protected AbstractFSWAL newWAL(FileSystem fs, Path rootDir, String logDir, String archiveDir, @@ -81,6 +86,170 @@ public class TestFSHLog extends AbstractTestFSWAL { }; } + /** + * Hacked up test to reproduce HBASE-16960 RegionServer hang when aborting + * @throws IOException + */ + @Test + public void testStuckAfterAppendException() throws IOException { + final String testName = this.testName.getMethodName(); + // Stop batching. + final AtomicBoolean blockBatch = new AtomicBoolean(true); + final AtomicBoolean throwExceptionOnAppend = new AtomicBoolean(false); + final FSHLog wal = new FSHLog(FS, FSUtils.getRootDir(CONF), + /*logDir=>*/testName, + /*ArchiveDir=>*/HConstants.HREGION_OLDLOGDIR_NAME, + CONF, null, true, /*Prefix=>*/null, /*Suffix=>*/null) { + @Override + RingBufferEventHandler newRingBufferEventHandler(Configuration conf) { + return new BatchManipulatingRingBufferEventHandler(this, + blockBatch, throwExceptionOnAppend, + 1/*syncer*/, conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT, 200)); + } + }; + FakedHandlerCallingSync fakeHandlerCallingSync = new FakedHandlerCallingSync(wal); + try { + // Append a few edits. + HTableDescriptor htd = + new HTableDescriptor(TableName.valueOf(testName)).addFamily(new HColumnDescriptor(testName)); + HRegionInfo hri = + new HRegionInfo(htd.getTableName(), HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW); + MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); + append(wal, hri, htd, 3, mvcc, null); + // Now add a sync on the WAL but do it in a separate thread because it will block and we don't + // want the main test thread blocked or we can't make progress. + fakeHandlerCallingSync.start(); + // Assert I have some syncs queued but not handed off to the SyncRunner. Will require the + // above syncer thread to have started and to become blocked on sync call. + RingBufferEventHandler ringBufferEventHandler = wal.getRingBufferEventHandler(); + while (ringBufferEventHandler.getSyncFuturesCount() <= 0) { + Threads.sleep(1); + } + assertTrue(fakeHandlerCallingSync.isAlive()); + assertTrue(ringBufferEventHandler.getSyncFuturesCount() > 0); + assertTrue(wal.isUnflushedEntries()); + // When here, we have a sync future stuck. It has not be assigned to a syncer thread + // because we have stopped the ringbuffer batching. See the blockBatch above. + // Now do an append but have it throw an exception. + throwExceptionOnAppend.set(true); + // Register a listener that will roll the log. + wal.registerWALActionsListener(new WALRoller(wal)); + append(wal, hri, htd, 1, mvcc, null); + wal.requestLogRoll(); + // Wait till append goes across the ringbuffer. It should throw an exception when + // append goes to run in event processor. + Threads.sleep(30000); + // Error if we got here. + fail(); + } catch (IOException ioe) { + LOG.info(ioe); + } finally { + wal.close(); + } + assertTrue(!fakeHandlerCallingSync.isAlive()); + } + + class FakedHandlerCallingSync extends Thread { + final WAL wal; + FakedHandlerCallingSync(final WAL wal) { + super("Calling sync on " + wal.toString()); + setDaemon(true); + this.wal = wal; + } + + @Override + public void run() { + try { + wal.sync(); + } catch (IOException e) { + LOG.info(e); + throw new RuntimeException(e); + } + } + } + + class WALRoller implements WALActionsListener { + private final WAL wal; + WALRoller(WAL wal) { + this.wal = wal; + } + @Override + public void preLogRoll(Path oldPath, Path newPath) throws IOException {} + + @Override + public void postLogRoll(Path oldPath, Path newPath) throws IOException {} + + @Override + public void preLogArchive(Path oldPath, Path newPath) throws IOException {} + + @Override + public void postLogArchive(Path oldPath, Path newPath) throws IOException {} + + @Override + public void logRollRequested(boolean tooFewReplicas) { + // Run log roll in separate thread so we don't block processing. + Thread t = new Thread("walroller") { + public void run() { + try { + wal.rollWriter(true); + } catch (FailedLogCloseException e) { + LOG.info(e); + } catch (IOException e) { + LOG.info(e); + } + }; + }; + t.start(); + } + + @Override + public void logCloseRequested() {} + + @Override + public void visitLogEntryBeforeWrite(HRegionInfo info, WALKey logKey, WALEdit logEdit) {} + + @Override + public void visitLogEntryBeforeWrite(WALKey logKey, WALEdit logEdit) throws IOException {} + + @Override + public void postAppend(long entryLen, long elapsedTimeMillis, WALKey logKey, WALEdit logEdit) + throws IOException {} + + @Override + public void postSync(long timeInNanos, int handlerSyncs) {} + } + + /** + * A RingBuffer Handler w/ doctored batching and append event throwing. + */ + class BatchManipulatingRingBufferEventHandler extends RingBufferEventHandler { + final AtomicBoolean blockBatch; + final AtomicBoolean throwExceptionOnAppend; + + BatchManipulatingRingBufferEventHandler(FSHLog fshLog, AtomicBoolean blockBatch, + AtomicBoolean throwExceptionOnAppend, + int syncRunnerCount, int maxHandlersCount) { + fshLog.super(syncRunnerCount, maxHandlersCount); + this.blockBatch = blockBatch; + this.throwExceptionOnAppend = throwExceptionOnAppend; + } + + @Override + public void onEvent(RingBufferTruck truck, long sequence, boolean endOfBatch) throws Exception { + boolean calculatedEndOfBatch = !endOfBatch? endOfBatch: this.blockBatch.get()? false: endOfBatch; + super.onEvent(truck, sequence, calculatedEndOfBatch); + } + + @Override + void append(FSWALEntry entry) throws Exception { + LOG.info("Append " + entry); + if (this.throwExceptionOnAppend.get()) { + throw new java.net.SocketTimeoutException("Faked Append Exception!!!!"); + } + super.append(entry); + } + } + @Test public void testSyncRunnerIndexOverflow() throws IOException, NoSuchFieldException, SecurityException, IllegalArgumentException, IllegalAccessException {