From 02d0f27441bdcc3c0d74705efb30b8a88beda6d5 Mon Sep 17 00:00:00 2001 From: zhangduo Date: Sat, 4 Mar 2017 20:55:50 +0800 Subject: [PATCH] HBASE-17712 Remove/Simplify the logic of RegionScannerImpl.handleFileNotFound --- .../apache/hadoop/hbase/regionserver/HRegion.java | 139 ++++------------- .../hadoop/hbase/regionserver/wal/AsyncFSWAL.java | 12 +- .../TestCompactionInDeadRegionServer.java | 168 +++++++++++++++++++++ 3 files changed, 211 insertions(+), 108 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionInDeadRegionServer.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index cc32179..f0f7a36 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -238,9 +238,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi protected volatile long lastReplayedOpenRegionSeqId = -1L; protected volatile long lastReplayedCompactionSeqId = -1L; - // collects Map(s) of Store to sequence Id when handleFileNotFound() is involved - protected List storeSeqIds = new ArrayList<>(); - ////////////////////////////////////////////////////////////////////////////// // Members ////////////////////////////////////////////////////////////////////////////// @@ -5080,15 +5077,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } } + @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NN_NAKED_NOTIFY", + justification = "Notify is about post replay. Intentional") @Override public boolean refreshStoreFiles() throws IOException { - return refreshStoreFiles(false); - } - - @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NN_NAKED_NOTIFY", - justification="Notify is about post replay. Intentional") - protected boolean refreshStoreFiles(boolean force) throws IOException { - if (!force && ServerRegionReplicaUtil.isDefaultReplica(this.getRegionInfo())) { + if (ServerRegionReplicaUtil.isDefaultReplica(this.getRegionInfo())) { return false; // if primary nothing to do } @@ -5120,7 +5113,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // see whether we can drop the memstore or the snapshot if (storeSeqId > maxSeqIdBefore) { - if (writestate.flushing) { // only drop memstore snapshots if they are smaller than last flush for the store if (this.prepareFlushResult.flushOpSeqId <= storeSeqId) { @@ -5160,17 +5152,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } } if (!map.isEmpty()) { - if (!force) { - for (Map.Entry entry : map.entrySet()) { - // Drop the memstore contents if they are now smaller than the latest seen flushed file - totalFreedDataSize += dropMemstoreContentsForSeqId(entry.getValue(), entry.getKey()) - .getDataSize(); - } - } else { - synchronized (storeSeqIds) { - // don't try to acquire write lock of updatesLock now - storeSeqIds.add(map); - } + for (Map.Entry entry : map.entrySet()) { + // Drop the memstore contents if they are now smaller than the latest seen flushed file + totalFreedDataSize += dropMemstoreContentsForSeqId(entry.getValue(), entry.getKey()) + .getDataSize(); } } // C. Finally notify anyone waiting on memstore to clear: @@ -5855,12 +5840,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi try { for (Map.Entry> entry : scan.getFamilyMap().entrySet()) { Store store = stores.get(entry.getKey()); - KeyValueScanner scanner; - try { - scanner = store.getScanner(scan, entry.getValue(), this.readPt); - } catch (FileNotFoundException e) { - throw handleFileNotFound(e); - } + KeyValueScanner scanner = store.getScanner(scan, entry.getValue(), this.readPt); instantiatedScanners.add(scanner); if (this.filter == null || !scan.doLoadColumnFamiliesOnDemand() || this.filter.isFamilyEssential(entry.getKey())) { @@ -6025,33 +6005,29 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi boolean tmpKeepProgress = scannerContext.getKeepProgress(); // Scanning between column families and thus the scope is between cells LimitScope limitScope = LimitScope.BETWEEN_CELLS; - try { - do { - // We want to maintain any progress that is made towards the limits while scanning across - // different column families. To do this, we toggle the keep progress flag on during calls - // to the StoreScanner to ensure that any progress made thus far is not wiped away. - scannerContext.setKeepProgress(true); - heap.next(results, scannerContext); - scannerContext.setKeepProgress(tmpKeepProgress); - - nextKv = heap.peek(); - moreCellsInRow = moreCellsInRow(nextKv, currentRowCell); - if (!moreCellsInRow) incrementCountOfRowsScannedMetric(scannerContext); - if (moreCellsInRow && scannerContext.checkBatchLimit(limitScope)) { - return scannerContext.setScannerState(NextState.BATCH_LIMIT_REACHED).hasMoreValues(); - } else if (scannerContext.checkSizeLimit(limitScope)) { - ScannerContext.NextState state = - moreCellsInRow? NextState.SIZE_LIMIT_REACHED_MID_ROW: NextState.SIZE_LIMIT_REACHED; - return scannerContext.setScannerState(state).hasMoreValues(); - } else if (scannerContext.checkTimeLimit(limitScope)) { - ScannerContext.NextState state = - moreCellsInRow? NextState.TIME_LIMIT_REACHED_MID_ROW: NextState.TIME_LIMIT_REACHED; - return scannerContext.setScannerState(state).hasMoreValues(); - } - } while (moreCellsInRow); - } catch (FileNotFoundException e) { - throw handleFileNotFound(e); - } + do { + // We want to maintain any progress that is made towards the limits while scanning across + // different column families. To do this, we toggle the keep progress flag on during calls + // to the StoreScanner to ensure that any progress made thus far is not wiped away. + scannerContext.setKeepProgress(true); + heap.next(results, scannerContext); + scannerContext.setKeepProgress(tmpKeepProgress); + + nextKv = heap.peek(); + moreCellsInRow = moreCellsInRow(nextKv, currentRowCell); + if (!moreCellsInRow) incrementCountOfRowsScannedMetric(scannerContext); + if (moreCellsInRow && scannerContext.checkBatchLimit(limitScope)) { + return scannerContext.setScannerState(NextState.BATCH_LIMIT_REACHED).hasMoreValues(); + } else if (scannerContext.checkSizeLimit(limitScope)) { + ScannerContext.NextState state = + moreCellsInRow ? NextState.SIZE_LIMIT_REACHED_MID_ROW : NextState.SIZE_LIMIT_REACHED; + return scannerContext.setScannerState(state).hasMoreValues(); + } else if (scannerContext.checkTimeLimit(limitScope)) { + ScannerContext.NextState state = + moreCellsInRow ? NextState.TIME_LIMIT_REACHED_MID_ROW : NextState.TIME_LIMIT_REACHED; + return scannerContext.setScannerState(state).hasMoreValues(); + } + } while (moreCellsInRow); return nextKv != null; } @@ -6400,36 +6376,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi if (this.joinedHeap != null) { result = this.joinedHeap.requestSeek(kv, true, true) || result; } - } catch (FileNotFoundException e) { - throw handleFileNotFound(e); } finally { closeRegionOperation(); } return result; } - private IOException handleFileNotFound(FileNotFoundException fnfe) throws IOException { - // tries to refresh the store files, otherwise shutdown the RS. - // TODO: add support for abort() of a single region and trigger reassignment. - try { - region.refreshStoreFiles(true); - return new IOException("unable to read store file"); - } catch (IOException e) { - String msg = "a store file got lost: " + fnfe.getMessage(); - LOG.error("unable to refresh store files", e); - abortRegionServer(msg); - return new NotServingRegionException( - getRegionInfo().getRegionNameAsString() + " is closing"); - } - } - - private void abortRegionServer(String msg) throws IOException { - if (rsServices instanceof HRegionServer) { - ((HRegionServer)rsServices).abort(msg); - } - throw new UnsupportedOperationException("not able to abort RS after: " + msg); - } - @Override public void shipped() throws IOException { if (storeHeap != null) { @@ -7233,29 +7185,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi return increment(increment, HConstants.NO_NONCE, HConstants.NO_NONCE); } - // dropMemstoreContentsForSeqId() would acquire write lock of updatesLock - // We perform this operation outside of the read lock of updatesLock to avoid dead lock - // See HBASE-16304 - @SuppressWarnings("unchecked") - private void dropMemstoreContents() throws IOException { - MemstoreSize totalFreedSize = new MemstoreSize(); - while (!storeSeqIds.isEmpty()) { - Map map = null; - synchronized (storeSeqIds) { - if (storeSeqIds.isEmpty()) break; - map = storeSeqIds.remove(storeSeqIds.size()-1); - } - for (Map.Entry entry : map.entrySet()) { - // Drop the memstore contents if they are now smaller than the latest seen flushed file - totalFreedSize - .incMemstoreSize(dropMemstoreContentsForSeqId(entry.getValue(), entry.getKey())); - } - } - if (totalFreedSize.getDataSize() > 0) { - LOG.debug("Freed " + totalFreedSize.getDataSize() + " bytes from memstore"); - } - } - @Override public Result increment(Increment mutation, long nonceGroup, long nonce) throws IOException { @@ -7320,10 +7249,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi writeEntry = null; } finally { this.updatesLock.readLock().unlock(); - // For increment/append, a region scanner for doing a get operation could throw - // FileNotFoundException. So we call dropMemstoreContents() in finally block - // after releasing read lock - dropMemstoreContents(); } // If results is null, then client asked that we not return the calculated results. return results != null && returnResults? Result.create(results): Result.EMPTY_RESULT; @@ -7671,7 +7596,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi public static final long FIXED_OVERHEAD = ClassSize.align( ClassSize.OBJECT + ClassSize.ARRAY + - 50 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT + + 49 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT + (14 * Bytes.SIZEOF_LONG) + 6 * Bytes.SIZEOF_BOOLEAN); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java index 83d93fe..23668bc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java @@ -615,7 +615,15 @@ public class AsyncFSWAL extends AbstractFSWAL { break; } } else { - throw e.unwrapRemoteException(); + IOException ioe = e.unwrapRemoteException(); + // this usually means master already think we are dead so let's fail all the pending + // syncs. The shutdown process of RS will wait for all regions to be closed before calling + // WAL.close so if we do not wake up the thread blocked by sync here it will cause dead + // lock. + if (e.getMessage().contains("Parent directory doesn't exist:")) { + syncFutures.forEach(f -> f.done(f.getTxid(), ioe)); + } + throw ioe; } } catch (NameNodeException e) { throw e; @@ -696,6 +704,8 @@ public class AsyncFSWAL extends AbstractFSWAL { this.writer.close(); this.writer = null; closeExecutor.shutdown(); + IOException error = new IOException("WAL has been closed"); + syncFutures.forEach(f -> f.done(f.getTxid(), error)); } @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionInDeadRegionServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionInDeadRegionServer.java new file mode 100644 index 0000000..6b242fb --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionInDeadRegionServer.java @@ -0,0 +1,168 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.CoordinatedStateManager; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.Waiter.ExplainingPredicate; +import org.apache.hadoop.hbase.YouAreDeadException; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; +import org.apache.hadoop.hbase.wal.AsyncFSWALProvider; +import org.apache.hadoop.hbase.wal.FSHLogProvider; +import org.apache.hadoop.hbase.wal.WALFactory; +import org.apache.hadoop.hbase.wal.WALProvider; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; +import org.junit.runners.Parameterized.Parameters; + +/** + * In the old time, when we hit a long STW GC when doing compaction, the RS will be marked as dead + * and region will be reassigned to another RS. But when the GC is finished, the RS may complete the + * compaction thus remove some store files and add a new one before crashing. This will cause + * FileNotFoundException in the new RS. This testcase is used to ensure that this will not happen + * now as we will write out a compaction marker to WAL before removing storefiles. And a dead RS can + * not write out any WAL edits and the compaction will fail. + */ +@RunWith(Parameterized.class) +@Category({ RegionServerTests.class, LargeTests.class }) +public class TestCompactionInDeadRegionServer { + + private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + + private static final TableName TABLE_NAME = TableName.valueOf("test"); + + private static final byte[] CF = Bytes.toBytes("cf"); + + private static final byte[] CQ = Bytes.toBytes("cq"); + + public static final class IgnoreYouAreDeadRS extends HRegionServer { + + public IgnoreYouAreDeadRS(Configuration conf) throws IOException, InterruptedException { + super(conf); + } + + public IgnoreYouAreDeadRS(Configuration conf, CoordinatedStateManager csm) throws IOException { + super(conf, csm); + } + + @Override + protected void tryRegionServerReport(long reportStartTime, long reportEndTime) + throws IOException { + try { + super.tryRegionServerReport(reportStartTime, reportEndTime); + } catch (YouAreDeadException e) { + // ignore, do not abort + } + } + } + + @Parameter + public Class walProvider; + + @Parameters(name = "{index}: wal={0}") + public static List params() { + return Arrays.asList(new Object[] { FSHLogProvider.class }, + new Object[] { AsyncFSWALProvider.class }); + } + + @Before + public void setUp() throws Exception { + UTIL.getConfiguration().setClass(WALFactory.WAL_PROVIDER, walProvider, WALProvider.class); + UTIL.getConfiguration().setInt(HConstants.ZK_SESSION_TIMEOUT, 2000); + UTIL.getConfiguration().setClass(HConstants.REGION_SERVER_IMPL, IgnoreYouAreDeadRS.class, + HRegionServer.class); + UTIL.startMiniCluster(2); + Table table = UTIL.createTable(TABLE_NAME, CF); + for (int i = 0; i < 10; i++) { + table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i))); + } + UTIL.getAdmin().flush(TABLE_NAME); + for (int i = 10; i < 20; i++) { + table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i))); + } + UTIL.getAdmin().flush(TABLE_NAME); + } + + @After + public void tearDown() throws Exception { + UTIL.shutdownMiniCluster(); + } + + @Test + public void test() throws Exception { + HRegionServer rsToSuspend = UTIL.getRSForFirstRegionInTable(TABLE_NAME); + HRegion region = (HRegion) rsToSuspend.getOnlineRegions(TABLE_NAME).get(0); + ZooKeeperWatcher watcher = UTIL.getZooKeeperWatcher(); + watcher.getRecoverableZooKeeper().delete( + ZKUtil.joinZNode(watcher.getZNodePaths().rsZNode, rsToSuspend.getServerName().toString()), + -1); + UTIL.waitFor(60000, 1000, new ExplainingPredicate() { + + @Override + public boolean evaluate() throws Exception { + for (RegionServerThread thread : UTIL.getHBaseCluster().getRegionServerThreads()) { + HRegionServer rs = thread.getRegionServer(); + if (rs != rsToSuspend) { + return !rs.getOnlineRegions(TABLE_NAME).isEmpty(); + } + } + return false; + } + + @Override + public String explainFailure() throws Exception { + return "The region for " + TABLE_NAME + " is still on " + rsToSuspend.getServerName(); + } + }); + try { + region.compact(true); + fail("Should fail as our wal file has already been closed, " + + "and walDir has also been renamed"); + } catch (Exception e) { + // expected + } + Table table = UTIL.getConnection().getTable(TABLE_NAME); + // should not hit FNFE + for (int i = 0; i < 20; i++) { + assertEquals(i, Bytes.toInt(table.get(new Get(Bytes.toBytes(i))).getValue(CF, CQ))); + } + } +} -- 1.9.1