diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java index c551a94..5f9e3cd 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java @@ -279,8 +279,6 @@ public class FSHLog implements WAL { private final int slowSyncNs; - private final static Object [] NO_ARGS = new Object []{}; - // If live datanode count is lower than the default replicas value, // RollWriter will be triggered in each sync(So the RollWriter will be // triggered one by one in a short time). Using it as a workaround to slow @@ -821,8 +819,7 @@ public class FSHLog implements WAL { } catch (FailedSyncBeforeLogCloseException e) { // If unflushed/unsynced entries on close, it is reason to abort. if (isUnflushedEntries()) throw e; - // Else, let is pass through to the close. - LOG.warn("Failed sync but no outstanding unsync'd edits so falling through to close; " + + LOG.warn("Failed sync-before-close but no outstanding appends; closing WAL: " + e.getMessage()); } @@ -1332,8 +1329,8 @@ public class FSHLog implements WAL { } } } catch (Exception e) { - LOG.warn("Unable to invoke DFSOutputStream.getNumCurrentReplicas" + e + - " still proceeding ahead..."); + LOG.warn("DFSOutputStream.getNumCurrentReplicas failed because of " + e + + ", continuing..."); } return logRollNeeded; } @@ -1725,7 +1722,9 @@ public class FSHLog implements WAL { // add appends to dfsclient as they come in. Batching appends doesn't give any significant // benefit on measurement. Handler sync calls we will batch up. If we get an exception // appending an edit, we fail all subsequent appends and syncs with the same exception until - // the WAL is reset. + // the WAL is reset. It is important that we not short-circuit and exit early this method. + // It is important that we always go through the attainSafePoint on the end. Another thread, + // the log roller may be waiting on a signal from us here and will just hang without it. try { if (truck.hasSyncFuturePayload()) { @@ -1736,15 +1735,20 @@ public class FSHLog implements WAL { TraceScope scope = Trace.continueSpan(truck.unloadSpanPayload()); try { FSWALEntry entry = truck.unloadFSWALEntryPayload(); - // If already an exception, do not try to append. Throw. - if (this.exception != null) throw this.exception; + if (this.exception != null) { + // We got an exception on an earlier attempt at append. Do not let this append + // go through. Fail it but stamp the sequenceid into this append though failed. + // We need to do this to close the latch held down deep in WALKey...that is waiting + // on sequenceid assignment otherwise it will just hang out (The #append method + // called below does this also internally). + entry.stampRegionSequenceId(); + // Return to keep processing events coming off the ringbuffer + return; + } append(entry); } catch (Exception e) { - // Failed append. Record the exception. Throw it from here on out til new WAL in place - this.exception = new DamagedWALException(e); - // If append fails, presume any pending syncs will fail too; let all waiting handlers - // know of the exception. - cleanupOutstandingSyncsOnException(sequence, this.exception); + // Failed append. Record the exception. + this.exception = e; // Return to keep processing events coming off the ringbuffer return; } finally { @@ -1752,7 +1756,7 @@ public class FSHLog implements WAL { scope.close(); // append scope is complete } } else { - // They can't both be null. Fail all up to this!!! + // What is this if not an append or sync. Fail all up to this!!! cleanupOutstandingSyncsOnException(sequence, new IllegalStateException("Neither append nor sync")); // Return to keep processing. @@ -1771,23 +1775,22 @@ public class FSHLog implements WAL { LOG.trace("Sequence=" + sequence + ", syncCount=" + this.syncFuturesCount); } - // Below expects that the offer 'transfers' responsibility for the outstanding syncs to the - // syncRunner. We should never get an exception in here. - int index = Math.abs(this.syncRunnerIndex++) % this.syncRunners.length; - try { - if (this.exception != null) { - // Do not try to sync. If a this.exception, then we failed an append. Do not try to - // sync a failed append. Fall through to the attainSafePoint below. It is part of the - // means by which we put in place a new WAL. A new WAL is how we clean up. - // Don't throw because then we'll not get to attainSafePoint. - cleanupOutstandingSyncsOnException(sequence, this.exception); - } else { + if (this.exception == null) { + // Below expects that the offer 'transfers' responsibility for the outstanding syncs to + // the syncRunner. We should never get an exception in here. + int index = Math.abs(this.syncRunnerIndex++) % this.syncRunners.length; + try { this.syncRunners[index].offer(sequence, this.syncFutures, this.syncFuturesCount); + } catch (Exception e) { + // Should NEVER get here. + requestLogRoll(); + this.exception = new DamagedWALException("Failed offering sync", e); } - } catch (Exception e) { - // Should NEVER get here. - cleanupOutstandingSyncsOnException(sequence, e); - throw e; + } + // We may have picked up an exception above trying to offer sync + if (this.exception != null) { + cleanupOutstandingSyncsOnException(sequence, + new DamagedWALException("On sync", this.exception)); } attainSafePoint(sequence); this.syncFuturesCount = 0; @@ -1883,9 +1886,11 @@ public class FSHLog implements WAL { // Update metrics. postAppend(entry, EnvironmentEdgeManager.currentTime() - start); } catch (Exception e) { - LOG.warn("Could not append. Requesting close of WAL", e); + String msg = "Append sequenceId=" + regionSequenceId + + ", requesting roll of WAL"; + LOG.warn(msg, e); requestLogRoll(); - throw e; + throw new DamagedWALException(msg, e); } numEntries.incrementAndGet(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java index 9b3dede..74284e0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java @@ -315,8 +315,12 @@ public class WALKey implements SequenceId, Comparable { */ public long getSequenceId(final long maxWaitForSeqId) throws IOException { // TODO: This implementation waiting on a latch is problematic because if a higher level - // determines we should stop or abort, there is not global list of all these blocked WALKeys - // waiting on a sequence id; they can't be cancelled... interrupted. See getNextSequenceId + // determines we should stop or abort, there is no global list of all these blocked WALKeys + // waiting on a sequence id; they can't be cancelled... interrupted. See getNextSequenceId. + // + // UPDATE: I think we can remove the timeout now we are stamping all walkeys with sequenceid, + // even those that have failed (previously we were not... so they would just hang out...). + // St.Ack 20150910 try { if (maxWaitForSeqId < 0) { this.seqNumAssignedLatch.await(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java index b15792e..e9ff8ec 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java @@ -25,6 +25,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import java.io.IOException; +import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -33,11 +34,13 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.DroppedSnapshotException; import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.regionserver.wal.FSHLog; +import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper; @@ -78,6 +81,8 @@ public class TestFailedAppendAndSync { public void setup() throws IOException { TEST_UTIL = HBaseTestingUtility.createLocalHTU(); CONF = TEST_UTIL.getConfiguration(); + // Disable block cache. + CONF.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0f); dir = TEST_UTIL.getDataTestDir("TestHRegion").toString(); tableName = TableName.valueOf(name.getMethodName()); } @@ -101,6 +106,7 @@ public class TestFailedAppendAndSync { */ @Test (timeout=300000) public void testLockupAroundBadAssignSync() throws IOException { + final AtomicLong rolls = new AtomicLong(0); // Dodgy WAL. Will throw exceptions when flags set. class DodgyFSLog extends FSHLog { volatile boolean throwSyncException = false; @@ -112,6 +118,13 @@ public class TestFailedAppendAndSync { } @Override + public byte[][] rollWriter(boolean force) throws FailedLogCloseException, IOException { + byte [][] regions = super.rollWriter(force); + rolls.getAndIncrement(); + return regions; + } + + @Override protected Writer createWriterInstance(Path path) throws IOException { final Writer w = super.createWriterInstance(path); return new Writer() { @@ -175,7 +188,7 @@ public class TestFailedAppendAndSync { } catch (IOException ioe) { fail(); } - + long rollsCount = rolls.get(); try { dodgyWAL.throwAppendException = true; dodgyWAL.throwSyncException = false; @@ -185,6 +198,9 @@ public class TestFailedAppendAndSync { } catch (IOException ioe) { threwOnAppend = true; } + while (rollsCount == rolls.get()) Threads.sleep(100); + rollsCount = rolls.get(); + // When we get to here.. we should be ok. A new WAL has been put in place. There were no // appends to sync. We should be able to continue. @@ -197,6 +213,8 @@ public class TestFailedAppendAndSync { } catch (IOException ioe) { threwOnBoth = true; } + while (rollsCount == rolls.get()) Threads.sleep(100); + // Again, all should be good. New WAL and no outstanding unsync'd edits so we should be able // to just continue. diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java index a620951..ccf2b15 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java @@ -34,6 +34,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.TableName; @@ -81,6 +82,8 @@ public class TestWALLockup { public void setup() throws IOException { TEST_UTIL = HBaseTestingUtility.createLocalHTU(); CONF = TEST_UTIL.getConfiguration(); + // Disable block cache. + CONF.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0f); dir = TEST_UTIL.getDataTestDir("TestHRegion").toString(); tableName = TableName.valueOf(name.getMethodName()); } @@ -139,10 +142,10 @@ public class TestWALLockup { protected void beforeWaitOnSafePoint() { if (throwException) { LOG.info("COUNTDOWN"); - // Don't countdown latch until someone waiting on it. - while (this.latch.getCount() <= 0) { - Threads.sleep(10); - } + // Don't countdown latch until someone waiting on it otherwise, the above + // afterCreatingZigZagLatch will get to the latch and no one will ever free it and we'll + // be stuck; test won't go down + while (this.latch.getCount() <= 0) Threads.sleep(1); this.latch.countDown(); } }