diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index f1f20ab556..08c0870f2b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -254,7 +254,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi protected volatile long lastReplayedOpenRegionSeqId = -1L; protected volatile long lastReplayedCompactionSeqId = -1L; - // collects Map(s) of Store to sequence Id when handleFileNotFound() is involved + // collects Map(s) of Store to sequence Id protected List storeSeqIds = new ArrayList<>(); ////////////////////////////////////////////////////////////////////////////// @@ -5817,12 +5817,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi try { for (Map.Entry> entry : scan.getFamilyMap().entrySet()) { Store store = stores.get(entry.getKey()); - KeyValueScanner scanner; - try { - scanner = store.getScanner(scan, entry.getValue(), this.readPt); - } catch (FileNotFoundException e) { - throw handleFileNotFound(e); - } + KeyValueScanner scanner = store.getScanner(scan, entry.getValue(), this.readPt); instantiatedScanners.add(scanner); if (this.filter == null || !scan.doLoadColumnFamiliesOnDemand() || this.filter.isFamilyEssential(entry.getKey())) { @@ -5929,7 +5924,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // scanner is closed throw new UnknownScannerException("Scanner was closed"); } - boolean moreValues; + boolean moreValues = false; if (outResults.isEmpty()) { // Usually outResults is empty. This is true when next is called // to handle scan or get operation. @@ -5990,34 +5985,30 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi boolean tmpKeepProgress = scannerContext.getKeepProgress(); // Scanning between column families and thus the scope is between cells LimitScope limitScope = LimitScope.BETWEEN_CELLS; - try { - do { - // We want to maintain any progress that is made towards the limits while scanning across - // different column families. To do this, we toggle the keep progress flag on during calls - // to the StoreScanner to ensure that any progress made thus far is not wiped away. - scannerContext.setKeepProgress(true); - heap.next(results, scannerContext); - scannerContext.setKeepProgress(tmpKeepProgress); - - nextKv = heap.peek(); - moreCellsInRow = moreCellsInRow(nextKv, currentRow, offset, length); - if (!moreCellsInRow) incrementCountOfRowsScannedMetric(scannerContext); - - if (moreCellsInRow && scannerContext.checkBatchLimit(limitScope)) { - return scannerContext.setScannerState(NextState.BATCH_LIMIT_REACHED).hasMoreValues(); - } else if (scannerContext.checkSizeLimit(limitScope)) { - ScannerContext.NextState state = - moreCellsInRow? NextState.SIZE_LIMIT_REACHED_MID_ROW: NextState.SIZE_LIMIT_REACHED; - return scannerContext.setScannerState(state).hasMoreValues(); - } else if (scannerContext.checkTimeLimit(limitScope)) { - ScannerContext.NextState state = - moreCellsInRow? NextState.TIME_LIMIT_REACHED_MID_ROW: NextState.TIME_LIMIT_REACHED; - return scannerContext.setScannerState(state).hasMoreValues(); - } - } while (moreCellsInRow); - } catch (FileNotFoundException e) { - throw handleFileNotFound(e); - } + do { + // We want to maintain any progress that is made towards the limits while scanning across + // different column families. To do this, we toggle the keep progress flag on during calls + // to the StoreScanner to ensure that any progress made thus far is not wiped away. + scannerContext.setKeepProgress(true); + heap.next(results, scannerContext); + scannerContext.setKeepProgress(tmpKeepProgress); + + nextKv = heap.peek(); + moreCellsInRow = moreCellsInRow(nextKv, currentRow, offset, length); + if (!moreCellsInRow) incrementCountOfRowsScannedMetric(scannerContext); + + if (moreCellsInRow && scannerContext.checkBatchLimit(limitScope)) { + return scannerContext.setScannerState(NextState.BATCH_LIMIT_REACHED).hasMoreValues(); + } else if (scannerContext.checkSizeLimit(limitScope)) { + ScannerContext.NextState state = + moreCellsInRow? NextState.SIZE_LIMIT_REACHED_MID_ROW: NextState.SIZE_LIMIT_REACHED; + return scannerContext.setScannerState(state).hasMoreValues(); + } else if (scannerContext.checkTimeLimit(limitScope)) { + ScannerContext.NextState state = + moreCellsInRow? NextState.TIME_LIMIT_REACHED_MID_ROW: NextState.TIME_LIMIT_REACHED; + return scannerContext.setScannerState(state).hasMoreValues(); + } + } while (moreCellsInRow); return nextKv != null; } @@ -6376,29 +6367,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi if (this.joinedHeap != null) { result = this.joinedHeap.requestSeek(kv, true, true) || result; } - } catch (FileNotFoundException e) { - throw handleFileNotFound(e); } finally { closeRegionOperation(); } return result; } - private IOException handleFileNotFound(FileNotFoundException fnfe) throws IOException { - // tries to refresh the store files, otherwise shutdown the RS. - // TODO: add support for abort() of a single region and trigger reassignment. - try { - region.refreshStoreFiles(true); - return new IOException("unable to read store file"); - } catch (IOException e) { - String msg = "a store file got lost: " + fnfe.getMessage(); - LOG.error(msg); - LOG.error("unable to refresh store files", e); - abortRegionServer(msg); - return new NotServingRegionException(getRegionInfo().getRegionNameAsString() + " closing"); - } - } - private void abortRegionServer(String msg) throws IOException { if (rsServices instanceof HRegionServer) { ((HRegionServer)rsServices).abort(msg); @@ -7665,8 +7639,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } finally { this.updatesLock.readLock().unlock(); // For increment/append, a region scanner for doing a get operation could throw - // FileNotFoundException. So we call dropMemstoreContents() in finally block - // after releasing read lock + // an exception. So we call dropMemstoreContents() in finally block after + // releasing read lock dropMemstoreContents(); } @@ -7892,8 +7866,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } finally { this.updatesLock.readLock().unlock(); // For increment/append, a region scanner for doing a get operation could throw - // FileNotFoundException. So we call dropMemstoreContents() in finally block - // after releasing read lock + // an exception. So we call dropMemstoreContents() in finally block after + // releasing read lock dropMemstoreContents(); } } finally { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCorruptedRegionStoreFile.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCorruptedRegionStoreFile.java deleted file mode 100644 index 969ef340fb..0000000000 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCorruptedRegionStoreFile.java +++ /dev/null @@ -1,249 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.regionserver; - -import java.io.IOException; -import java.util.ArrayList; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.HTable; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.Durability; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.io.HFileLink; -import org.apache.hadoop.hbase.testclassification.LargeTests; -import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.FSUtils; -import org.apache.hadoop.hbase.util.FSVisitor; -import org.apache.hadoop.hbase.util.TestTableName; - -import org.junit.After; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.experimental.categories.Category; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -@Category(LargeTests.class) -public class TestCorruptedRegionStoreFile { - private static final Log LOG = LogFactory.getLog(TestCorruptedRegionStoreFile.class); - - private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); - - private static final String FAMILY_NAME_STR = "f"; - private static final byte[] FAMILY_NAME = Bytes.toBytes(FAMILY_NAME_STR); - - private static final int NUM_FILES = 10; - private static final int ROW_PER_FILE = 2000; - private static final int NUM_ROWS = NUM_FILES * ROW_PER_FILE; - - @Rule public TestTableName TEST_TABLE = new TestTableName(); - - private final ArrayList storeFiles = new ArrayList(); - private Path tableDir; - private int rowCount; - - private static void setupConf(Configuration conf) { - // Disable compaction so the store file count stays constant - conf.setLong("hbase.hstore.compactionThreshold", NUM_FILES + 1); - conf.setLong("hbase.hstore.blockingStoreFiles", NUM_FILES * 2); - } - - private void setupTable(final TableName tableName) throws IOException { - // load the table - Table table = UTIL.createTable(tableName, FAMILY_NAME); - try { - rowCount = 0; - byte[] value = new byte[1024]; - byte[] q = Bytes.toBytes("q"); - while (rowCount < NUM_ROWS) { - Put put = new Put(Bytes.toBytes(String.format("%010d", rowCount))); - put.setDurability(Durability.SKIP_WAL); - put.add(FAMILY_NAME, q, value); - table.put(put); - - if ((rowCount++ % ROW_PER_FILE) == 0) { - // flush it - ((HTable)table).flushCommits(); - UTIL.getHBaseAdmin().flush(tableName); - } - } - } finally { - UTIL.getHBaseAdmin().flush(tableName); - table.close(); - } - - assertEquals(NUM_ROWS, rowCount); - - // get the store file paths - storeFiles.clear(); - tableDir = FSUtils.getTableDir(getRootDir(), tableName); - FSVisitor.visitTableStoreFiles(getFileSystem(), tableDir, new FSVisitor.StoreFileVisitor() { - @Override - public void storeFile(final String region, final String family, final String hfile) - throws IOException { - HFileLink link = HFileLink.build(UTIL.getConfiguration(), tableName, region, family, hfile); - storeFiles.add(link.getOriginPath()); - } - }); - assertTrue("Expected at least " + NUM_FILES + " store files", storeFiles.size() >= NUM_FILES); - LOG.info("Store files: " + storeFiles); - } - - @Before - public void setup() throws Exception { - setupConf(UTIL.getConfiguration()); - UTIL.startMiniCluster(2, 3); - - setupTable(TEST_TABLE.getTableName()); - } - - @After - public void tearDown() throws Exception { - try { - UTIL.shutdownMiniCluster(); - } catch (Exception e) { - LOG.warn("failure shutting down cluster", e); - } - } - - @Test(timeout=180000) - public void testLosingFileDuringScan() throws Exception { - assertEquals(rowCount, fullScanAndCount(TEST_TABLE.getTableName())); - - final FileSystem fs = getFileSystem(); - final Path tmpStoreFilePath = new Path(UTIL.getDataTestDir(), "corruptedHFile"); - - // try to query with the missing file - int count = fullScanAndCount(TEST_TABLE.getTableName(), new ScanInjector() { - private boolean hasFile = true; - - @Override - public void beforeScanNext(Table table) throws Exception { - // move the path away (now the region is corrupted) - if (hasFile) { - fs.copyToLocalFile(true, storeFiles.get(0), tmpStoreFilePath); - LOG.info("Move file to local"); - evictHFileCache(storeFiles.get(0)); - hasFile = false; - } - } - }); - assertTrue("expected one file lost: rowCount=" + count + " lostRows=" + (NUM_ROWS - count), - count >= (NUM_ROWS - ROW_PER_FILE)); - } - - @Test(timeout=180000) - public void testLosingFileAfterScannerInit() throws Exception { - assertEquals(rowCount, fullScanAndCount(TEST_TABLE.getTableName())); - - final FileSystem fs = getFileSystem(); - final Path tmpStoreFilePath = new Path(UTIL.getDataTestDir(), "corruptedHFile"); - - // try to query with the missing file - int count = fullScanAndCount(TEST_TABLE.getTableName(), new ScanInjector() { - private boolean hasFile = true; - - @Override - public void beforeScan(Table table, Scan scan) throws Exception { - // move the path away (now the region is corrupted) - if (hasFile) { - fs.copyToLocalFile(true, storeFiles.get(0), tmpStoreFilePath); - LOG.info("Move file to local"); - evictHFileCache(storeFiles.get(0)); - hasFile = false; - } - } - }); - assertTrue("expected one file lost: rowCount=" + count + " lostRows=" + (NUM_ROWS - count), - count >= (NUM_ROWS - ROW_PER_FILE)); - } - - // ========================================================================== - // Helpers - // ========================================================================== - private FileSystem getFileSystem() { - return UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getFileSystem(); - } - - private Path getRootDir() { - return UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getRootDir(); - } - - private void evictHFileCache(final Path hfile) throws Exception { - for (RegionServerThread rst: UTIL.getMiniHBaseCluster().getRegionServerThreads()) { - HRegionServer rs = rst.getRegionServer(); - rs.getCacheConfig().getBlockCache().evictBlocksByHfileName(hfile.getName()); - } - Thread.sleep(6000); - } - - private int fullScanAndCount(final TableName tableName) throws Exception { - return fullScanAndCount(tableName, new ScanInjector()); - } - - private int fullScanAndCount(final TableName tableName, final ScanInjector injector) - throws Exception { - Table table = UTIL.getConnection().getTable(tableName); - int count = 0; - try { - Scan scan = new Scan(); - scan.setCaching(1); - scan.setCacheBlocks(false); - injector.beforeScan(table, scan); - ResultScanner scanner = table.getScanner(scan); - try { - while (true) { - injector.beforeScanNext(table); - Result result = scanner.next(); - injector.afterScanNext(table, result); - if (result == null) break; - if ((count++ % (ROW_PER_FILE / 2)) == 0) { - LOG.debug("scan next " + count); - } - } - } finally { - scanner.close(); - injector.afterScan(table); - } - } finally { - table.close(); - } - return count; - } - - private class ScanInjector { - protected void beforeScan(Table table, Scan scan) throws Exception {} - protected void beforeScanNext(Table table) throws Exception {} - protected void afterScanNext(Table table, Result result) throws Exception {} - protected void afterScan(Table table) throws Exception {} - } -} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerAbort.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerAbort.java deleted file mode 100644 index 3d66c5ff32..0000000000 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerAbort.java +++ /dev/null @@ -1,210 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.regionserver; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.MiniHBaseCluster; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.Admin; -import org.apache.hadoop.hbase.client.HTable; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.coprocessor.BaseRegionServerObserver; -import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; -import org.apache.hadoop.hbase.coprocessor.ObserverContext; -import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment; -import org.apache.hadoop.hbase.testclassification.MediumTests; -import org.apache.hadoop.hbase.testclassification.RegionServerTests; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.JVMClusterUtil; -import org.apache.hadoop.hbase.wal.WAL; -import org.apache.hadoop.hdfs.DFSConfigKeys; -import org.apache.hadoop.hdfs.MiniDFSCluster; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.junit.experimental.categories.Category; - -import java.io.IOException; -import java.util.Collection; - -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - -/** - * Tests around regionserver shutdown and abort - */ -@Category({RegionServerTests.class, MediumTests.class}) -public class TestRegionServerAbort { - private static final byte[] FAMILY_BYTES = Bytes.toBytes("f"); - - private static final Log LOG = LogFactory.getLog(TestRegionServerAbort.class); - - private HBaseTestingUtility testUtil; - private Configuration conf; - private MiniDFSCluster dfsCluster; - private MiniHBaseCluster cluster; - - @Before - public void setup() throws Exception { - testUtil = new HBaseTestingUtility(); - conf = testUtil.getConfiguration(); - conf.set(CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY, - StopBlockingRegionObserver.class.getName()); - // make sure we have multiple blocks so that the client does not prefetch all block locations - conf.set("dfs.blocksize", Long.toString(100 * 1024)); - // prefetch the first block - conf.set(DFSConfigKeys.DFS_CLIENT_READ_PREFETCH_SIZE_KEY, Long.toString(100 * 1024)); - conf.set(HConstants.REGION_IMPL, ErrorThrowingHRegion.class.getName()); - - testUtil.startMiniZKCluster(); - dfsCluster = testUtil.startMiniDFSCluster(2); - cluster = testUtil.startMiniHBaseCluster(1, 2); - } - - @After - public void tearDown() throws Exception { - for (JVMClusterUtil.RegionServerThread t : cluster.getRegionServerThreads()) { - HRegionServer rs = t.getRegionServer(); - RegionServerCoprocessorHost cpHost = rs.getRegionServerCoprocessorHost(); - StopBlockingRegionObserver cp = (StopBlockingRegionObserver) - cpHost.findCoprocessor(StopBlockingRegionObserver.class.getName()); - cp.setStopAllowed(true); - } - testUtil.shutdownMiniCluster(); - } - - /** - * Test that a regionserver is able to abort properly, even when a coprocessor - * throws an exception in preStopRegionServer(). - */ - @Test - public void testAbortFromRPC() throws Exception { - TableName tableName = TableName.valueOf("testAbortFromRPC"); - // create a test table - HTable table = testUtil.createTable(tableName, FAMILY_BYTES); - - // write some edits - testUtil.loadTable(table, FAMILY_BYTES); - LOG.info("Wrote data"); - // force a flush - cluster.flushcache(tableName); - LOG.info("Flushed table"); - - // delete a store file from the table region - HRegion firstRegion = cluster.findRegionsForTable(tableName).get(0); - - // aborting from region - HRegionFileSystem regionFS = firstRegion.getRegionFileSystem(); - Collection storeFileInfos = regionFS.getStoreFiles(FAMILY_BYTES); - assertFalse(storeFileInfos.isEmpty()); - StoreFileInfo firstStoreFile = storeFileInfos.iterator().next(); - - // move the store file away - // we will still be able to read the first block, since the location was pre-fetched on open - // but attempts to read subsequent blocks will fail - LOG.info("Moving store file " + firstStoreFile.getPath()); - FileSystem fs = regionFS.getFileSystem(); - Path tmpdir = new Path("/tmp"); - fs.mkdirs(tmpdir); - assertTrue(fs.rename(firstStoreFile.getPath(), - new Path(tmpdir, firstStoreFile.getPath().getName()))); - - // start a scan, this should trigger a regionserver abort - ResultScanner scanner = table.getScanner(new Scan()); - int count = 0; - for (Result f : scanner) { - count++; - } - LOG.info("Finished scan with " + count + " results"); - // should have triggered an abort due to FileNotFoundException - - // verify that the regionserver is stopped - assertTrue(firstRegion.getRegionServerServices().isAborted()); - assertTrue(firstRegion.getRegionServerServices().isStopped()); - } - - /** - * Test that a coprocessor is able to override a normal regionserver stop request. - */ - @Test - public void testStopOverrideFromCoprocessor() throws Exception { - Admin admin = testUtil.getHBaseAdmin(); - HRegionServer regionserver = cluster.getRegionServer(0); - admin.stopRegionServer(regionserver.getServerName().getHostAndPort()); - - // regionserver should have failed to stop due to coprocessor - assertFalse(cluster.getRegionServer(0).isAborted()); - assertFalse(cluster.getRegionServer(0).isStopped()); - } - - public static class StopBlockingRegionObserver extends BaseRegionServerObserver { - private boolean stopAllowed; - - @Override - public void preStopRegionServer(ObserverContext env) - throws IOException { - if (!stopAllowed) { - throw new IOException("Stop not allowed"); - } - } - - public void setStopAllowed(boolean allowed) { - this.stopAllowed = allowed; - } - - public boolean isStopAllowed() { - return stopAllowed; - } - } - - /** - * Throws an exception during store file refresh in order to trigger a regionserver abort. - */ - public static class ErrorThrowingHRegion extends HRegion { - public ErrorThrowingHRegion(Path tableDir, WAL wal, FileSystem fs, Configuration confParam, - HRegionInfo regionInfo, HTableDescriptor htd, - RegionServerServices rsServices) { - super(tableDir, wal, fs, confParam, regionInfo, htd, rsServices); - } - - public ErrorThrowingHRegion(HRegionFileSystem fs, WAL wal, Configuration confParam, - HTableDescriptor htd, RegionServerServices rsServices) { - super(fs, wal, confParam, htd, rsServices); - } - - @Override - protected boolean refreshStoreFiles(boolean force) throws IOException { - // forced when called through RegionScannerImpl.handleFileNotFound() - if (force) { - throw new IOException("Failing file refresh for testing"); - } - return super.refreshStoreFiles(force); - } - } -}