From fd50d480c1b79cfb0c993e22874b4c50b0ed9e71 Mon Sep 17 00:00:00 2001 From: Elliott Clark Date: Thu, 27 Aug 2015 16:14:02 -0700 Subject: [PATCH] HBASE-14317 Stuck FSHLog: bad disk (HDFS-8960) and can't roll WAL --- .../hadoop/hbase/regionserver/wal/FSHLog.java | 21 ++++++++++++++++++--- 1 file changed, 18 insertions(+), 3 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java index fa69d63..de7a4fb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java @@ -65,6 +65,7 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.exceptions.TimeoutIOException; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.hbase.util.DrainBarrier; @@ -271,6 +272,9 @@ public class FSHLog implements WAL { private final Method getPipeLine; // refers to DFSOutputStream.getPipeLine private final int slowSyncNs; + /** The max number of ms that we'll wait on a safe point before aborting */ + private final int maxSafePointWait; + private final static Object [] NO_ARGS = new Object []{}; // If live datanode count is lower than the default replicas value, @@ -519,6 +523,8 @@ public class FSHLog implements WAL { this.slowSyncNs = 1000000 * conf.getInt("hbase.regionserver.hlog.slowsync.ms", DEFAULT_SLOW_SYNC_TIME_MS); + this.maxSafePointWait = conf.getInt("hbase.regionserver.hlog.max.safepoint.wait", 60 * 1000); + // handle the reflection necessary to call getNumCurrentReplicas(). TODO: Replace with // HdfsDataOutputStream#getCurrentBlockReplication() and go without reflection. this.getNumCurrentReplicas = getGetNumCurrentReplicas(this.hdfs_out); @@ -1754,11 +1760,11 @@ public class FSHLog implements WAL { int index = Math.abs(this.syncRunnerIndex++) % this.syncRunners.length; try { this.syncRunners[index].offer(sequence, this.syncFutures, this.syncFuturesCount); + attainSafePoint(sequence); } catch (Exception e) { cleanupOutstandingSyncsOnException(sequence, e); throw e; } - attainSafePoint(sequence); this.syncFuturesCount = 0; } catch (Throwable t) { LOG.error("UNEXPECTED!!! syncFutures.length=" + this.syncFutures.length, t); @@ -1774,14 +1780,23 @@ public class FSHLog implements WAL { * Check if we should attain safe point. If so, go there and then wait till signalled before * we proceeding. */ - private void attainSafePoint(final long currentSequence) { + private void attainSafePoint(final long currentSequence) throws TimeoutIOException { if (this.zigzagLatch == null || !this.zigzagLatch.isCocked()) return; // If here, another thread is waiting on us to get to safe point. Don't leave it hanging. try { + long start = EnvironmentEdgeManager.currentTime(); // Wait on outstanding syncers; wait for them to finish syncing (unless we've been // shutdown or unless our latch has been thrown because we have been aborted). while (!this.shutdown && this.zigzagLatch.isCocked() && highestSyncedSequence.get() < currentSequence) { + long currentTime = EnvironmentEdgeManager.currentTime(); + if (currentTime - start > maxSafePointWait) { + LOG.fatal("Waited too long in attainSafePoint. Waiting to get to seqId=" + + currentSequence + + " However we are only at seqId=" + highestSyncedSequence.get() + + " after waiting " + maxSafePointWait); + throw new TimeoutIOException("Waited too long in attainSafePoint"); + } synchronized (this.safePointWaiter) { this.safePointWaiter.wait(0, 1); } @@ -1839,7 +1854,7 @@ public class FSHLog implements WAL { assert highestUnsyncedSequence < entry.getSequence(); highestUnsyncedSequence = entry.getSequence(); sequenceIdAccounting.update(encodedRegionName, entry.getFamilyNames(), regionSequenceId, - entry.isInMemstore()); + entry.isInMemstore()); coprocessorHost.postWALWrite(entry.getHRegionInfo(), entry.getKey(), entry.getEdit()); // Update metrics. postAppend(entry, EnvironmentEdgeManager.currentTime() - start); -- 2.5.0