From d6dd92cd686b6a7e160c27f94cec7b96b8ae7f13 Mon Sep 17 00:00:00 2001 From: Elliott Clark Date: Thu, 27 Aug 2015 16:14:02 -0700 Subject: [PATCH] HBASE-14317 Stuck FSHLog: bad disk (HDFS-8960) and can't roll WAL Append only test. --- .../org/apache/hadoop/hbase/io/hfile/HFile.java | 3 +- .../hadoop/hbase/regionserver/wal/FSHLog.java | 32 +++++- .../hadoop/hbase/regionserver/TestHRegion.java | 117 ++++++++++++++++++++- .../hadoop/hbase/regionserver/wal/TestFSHLog.java | 3 +- 4 files changed, 146 insertions(+), 9 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java index 7dbad6c..93eac88 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java @@ -305,7 +305,8 @@ public class HFile { try { ostream.setDropBehind(shouldDropBehind && cacheConf.shouldDropBehindCompaction()); } catch (UnsupportedOperationException uoe) { - LOG.debug("Unable to set drop behind on " + path, uoe); + if (LOG.isDebugEnabled()) LOG.debug("Unable to set drop behind on " + path); + else if (LOG.isTraceEnabled()) LOG.trace("Unable to set drop behind on " + path, uoe); } } return createWriter(fs, path, ostream, diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java index fa69d63..c7d7fac 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java @@ -65,6 +65,7 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.exceptions.TimeoutIOException; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.hbase.util.DrainBarrier; @@ -271,6 +272,9 @@ public class FSHLog implements WAL { private final Method getPipeLine; // refers to DFSOutputStream.getPipeLine private final int slowSyncNs; + /** The max number of ms that we'll wait on a safe point before aborting */ + private final int maxSafePointWait; + private final static Object [] NO_ARGS = new Object []{}; // If live datanode count is lower than the default replicas value, @@ -519,6 +523,8 @@ public class FSHLog implements WAL { this.slowSyncNs = 1000000 * conf.getInt("hbase.regionserver.hlog.slowsync.ms", DEFAULT_SLOW_SYNC_TIME_MS); + this.maxSafePointWait = conf.getInt("hbase.regionserver.hlog.max.safepoint.wait", 60 * 1000); + // handle the reflection necessary to call getNumCurrentReplicas(). TODO: Replace with // HdfsDataOutputStream#getCurrentBlockReplication() and go without reflection. this.getNumCurrentReplicas = getGetNumCurrentReplicas(this.hdfs_out); @@ -1696,7 +1702,16 @@ public class FSHLog implements WAL { } private void cleanupOutstandingSyncsOnException(final long sequence, final Exception e) { - for (int i = 0; i < this.syncFuturesCount; i++) this.syncFutures[i].done(sequence, e); + for (int i = 0; i < syncFuturesCount; i++) { + // Make sure that all sync future slots are filled in + // as we are going to cancel all of them + // we need to make sure that nothing gets added + // after this that's not still good.. + if (syncFutures[i] == null) { + this.syncFutures[i] = getSyncFuture(sequence, null); + } + this.syncFutures[i].done(sequence, e); + } this.syncFuturesCount = 0; } @@ -1754,11 +1769,11 @@ public class FSHLog implements WAL { int index = Math.abs(this.syncRunnerIndex++) % this.syncRunners.length; try { this.syncRunners[index].offer(sequence, this.syncFutures, this.syncFuturesCount); + attainSafePoint(sequence); } catch (Exception e) { cleanupOutstandingSyncsOnException(sequence, e); throw e; } - attainSafePoint(sequence); this.syncFuturesCount = 0; } catch (Throwable t) { LOG.error("UNEXPECTED!!! syncFutures.length=" + this.syncFutures.length, t); @@ -1774,14 +1789,23 @@ public class FSHLog implements WAL { * Check if we should attain safe point. If so, go there and then wait till signalled before * we proceeding. */ - private void attainSafePoint(final long currentSequence) { + private void attainSafePoint(final long currentSequence) throws TimeoutIOException { if (this.zigzagLatch == null || !this.zigzagLatch.isCocked()) return; // If here, another thread is waiting on us to get to safe point. Don't leave it hanging. try { + long start = EnvironmentEdgeManager.currentTime(); // Wait on outstanding syncers; wait for them to finish syncing (unless we've been // shutdown or unless our latch has been thrown because we have been aborted). while (!this.shutdown && this.zigzagLatch.isCocked() && highestSyncedSequence.get() < currentSequence) { + long currentTime = EnvironmentEdgeManager.currentTime(); + if (currentTime - start > maxSafePointWait) { + LOG.fatal("Waited too long in attainSafePoint. Waiting to get to seqId=" + + currentSequence + + " However we are only at seqId=" + highestSyncedSequence.get() + + " after waiting " + maxSafePointWait); + throw new TimeoutIOException("Waited too long in attainSafePoint"); + } synchronized (this.safePointWaiter) { this.safePointWaiter.wait(0, 1); } @@ -1839,7 +1863,7 @@ public class FSHLog implements WAL { assert highestUnsyncedSequence < entry.getSequence(); highestUnsyncedSequence = entry.getSequence(); sequenceIdAccounting.update(encodedRegionName, entry.getFamilyNames(), regionSequenceId, - entry.isInMemstore()); + entry.isInMemstore()); coprocessorHost.postWALWrite(entry.getHRegionInfo(), entry.getKey(), entry.getEdit()); // Update metrics. postAppend(entry, EnvironmentEdgeManager.currentTime() - start); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index 826c9b3..45fd2d0 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -46,6 +46,7 @@ import static org.mockito.Mockito.when; import java.io.IOException; import java.io.InterruptedIOException; +import java.net.InetSocketAddress; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.Arrays; @@ -55,8 +56,10 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.NavigableMap; +import java.util.Set; import java.util.TreeMap; import java.util.UUID; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -72,7 +75,9 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.ChoreService; import org.apache.hadoop.hbase.CompatibilitySingletonFactory; +import org.apache.hadoop.hbase.CoordinatedStateManager; import org.apache.hadoop.hbase.DroppedSnapshotException; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseTestCase; @@ -90,12 +95,14 @@ import org.apache.hadoop.hbase.MultithreadedTestUtil.RepeatingTestThread; import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread; import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.RegionTooBusyException; +import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.Tag; import org.apache.hadoop.hbase.TagType; import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.client.Append; +import org.apache.hadoop.hbase.client.ClusterConnection; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Get; @@ -106,6 +113,7 @@ import org.apache.hadoop.hbase.client.RowMutations; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException; +import org.apache.hadoop.hbase.executor.ExecutorService; import org.apache.hadoop.hbase.filter.BinaryComparator; import org.apache.hadoop.hbase.filter.ColumnCountGetFilter; import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; @@ -117,20 +125,25 @@ import org.apache.hadoop.hbase.filter.PrefixFilter; import org.apache.hadoop.hbase.filter.SingleColumnValueExcludeFilter; import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; import org.apache.hadoop.hbase.io.hfile.HFile; +import org.apache.hadoop.hbase.ipc.RpcServerInterface; +import org.apache.hadoop.hbase.master.TableLockManager; import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.monitoring.TaskMonitor; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor.FlushAction; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor.StoreFlushDescriptor; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor; +import org.apache.hadoop.hbase.quotas.RegionServerQuotaManager; import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl; import org.apache.hadoop.hbase.regionserver.Region.RowLock; import org.apache.hadoop.hbase.regionserver.TestStore.FaultyFileSystem; import org.apache.hadoop.hbase.regionserver.handler.FinishRegionRecoveringHandler; +import org.apache.hadoop.hbase.regionserver.wal.FSHLog; import org.apache.hadoop.hbase.regionserver.wal.HLogKey; import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL; import org.apache.hadoop.hbase.regionserver.wal.MetricsWALSource; @@ -156,6 +169,10 @@ import org.apache.hadoop.hbase.wal.WALFactory; import org.apache.hadoop.hbase.wal.WALKey; import org.apache.hadoop.hbase.wal.WALProvider; import org.apache.hadoop.hbase.wal.WALSplitter; +import org.apache.hadoop.hbase.wal.WALProvider.Writer; +import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.zookeeper.KeeperException; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -171,6 +188,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.protobuf.ByteString; +import com.google.protobuf.Service; /** * Basic stand-alone testing of HRegion. No clusters! @@ -233,6 +251,102 @@ public class TestHRegion { } /** + * Reproduce locking up that happens when we get an IOE out of an append. See HBASE-14317. + * First I need to set up some fat mocks for Server and RegionServerServices. I also need to + * set up a WAL that will throw an exception when we go to append to it. + */ + @Test // FIX (timeout=60000) + public void testLockedUpWALSystem() throws IOException { + /** + * A WAL that we can have throw exceptions when a flag is set. + */ + class DodgyFSLog extends FSHLog { + volatile boolean throwSyncException = false; + volatile boolean throwAppendException = false; + public DodgyFSLog(FileSystem fs, Path root, String logDir, Configuration conf) + throws IOException { + super(fs, root, logDir, conf); + } + @Override + protected Writer createWriterInstance(Path path) throws IOException { + final Writer w = super.createWriterInstance(path); + return new Writer() { + @Override + public void close() throws IOException { + w.close(); + } + + @Override + public void sync() throws IOException { + if (throwSyncException) { + throw new IOException("FAKE! Failed to replace a bad datanode..."); + } + w.sync(); + } + + @Override + public void append(Entry entry) throws IOException { + if (throwAppendException) { + throw new IOException("FAKE! Failed to replace a bad datanode..."); + } + w.append(entry); + } + + @Override + public long getLength() throws IOException { + return w.getLength(); + } + }; + } + } + + /** + * Mocked Server + */ + Server server = mock(Server.class); + when(server.getConfiguration()).thenReturn(CONF); + when(server.isStopped()).thenReturn(false); + when(server.isAborted()).thenReturn(false); + /** + * Set up RegionServerServices mock. + */ + RegionServerServices services = mock(RegionServerServices.class); + // OK. Now I have my mocked up Server and RegionServerServices and my dodgy WAL, go ahead with + // the test. + FileSystem fs = FileSystem.get(CONF); + Path rootDir = new Path(dir + "testLockedUpWALSystem"); + DodgyFSLog dodgyWAL = new DodgyFSLog(fs, rootDir, "testLockedUpWALSystem", CONF); + LogRoller logRoller = new LogRoller(server, services); + logRoller.addWAL(dodgyWAL); + logRoller.start(); + HRegion region = initHRegion(tableName, null, null, name.getMethodName(), + CONF, false, Durability.SYNC_WAL, dodgyWAL, COLUMN_FAMILY_BYTES); + try { + // Get some random bytes. + byte [] value = Bytes.toBytes(name.getMethodName()); + boolean threwIOE = false; + try { + // First get something into memstore + Put put = new Put(value); + put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("1"), value); + region.put(put); + // Now, try and append but have it throw an IOE. + dodgyWAL.throwAppendException = true; + put = new Put(value); + put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("2"), value); + region.put(put); + } catch (IOException ioe) { + threwIOE = true; + } finally { + assertTrue("The regionserver should have thrown an exception", threwIOE); + } + } finally { + //region.close(); + if (logRoller != null) Threads.shutdown(logRoller.getThread()); + } + } + + /** * Test that I can use the max flushed sequence id after the close. * @throws IOException */ @@ -916,7 +1030,7 @@ public class TestHRegion { // now verify that the flush markers are written wal.shutdown(); - WAL.Reader reader = wals.createReader(fs, DefaultWALProvider.getCurrentFileName(wal), + WAL.Reader reader = WALFactory.createReader(fs, DefaultWALProvider.getCurrentFileName(wal), TEST_UTIL.getConfiguration()); try { List flushDescriptors = new ArrayList(); @@ -5702,7 +5816,6 @@ public class TestHRegion { putData(startRow, numRows, qualifier, families); int splitRow = startRow + numRows; putData(splitRow, numRows, qualifier, families); - int endRow = splitRow + numRows; region.flush(true); HRegion [] regions = null; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java index 28ae46a..1631bbf 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java @@ -416,5 +416,4 @@ public class TestFSHLog { wal.close(); } } - -} +} \ No newline at end of file -- 2.5.0