diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 412abf6..09e846e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -1204,7 +1204,7 @@ public class HRegion implements HeapSize { // , Writable{ * Do preparation for pending compaction. * @throws IOException */ - void doRegionCompactionPrep() throws IOException { + protected void doRegionCompactionPrep() throws IOException { } /* diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java index c003bd5..44794c8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java @@ -441,7 +441,7 @@ public class HBaseTestingUtility { //file system, the tests should use getBaseTestDir, otherwise, we can use //the working directory, and create a unique sub dir there FileSystem fs = getTestFileSystem(); - if (fs.getUri().getScheme().equals(fs.getLocal(conf).getUri().getScheme())) { + if (fs.getUri().getScheme().equals(FileSystem.getLocal(conf).getUri().getScheme())) { if (dataTestDir == null) { setupDataTestDir(); } @@ -1256,6 +1256,15 @@ public class HBaseTestingUtility { return rowCount; } + public void loadNumericRows(final HTable t, final byte[] f, int startRow, int endRow) throws IOException { + for (int i = startRow; i < endRow; i++) { + byte[] data = Bytes.toBytes(String.valueOf(i)); + Put put = new Put(data); + put.add(f, null, data); + t.put(put); + } + } + /** * Return the number of rows in the given table. */ @@ -2059,6 +2068,7 @@ public class HBaseTestingUtility { LOG.info("Found=" + rows); Threads.sleep(200); } + meta.close(); } /** @@ -2359,6 +2369,7 @@ public class HBaseTestingUtility { // region servers * regions per region server). int numberOfServers = admin.getClusterStatus().getServers().size(); if (numberOfServers == 0) { + admin.close(); throw new IllegalStateException("No live regionservers"); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java index 8f7f3aa..c53a133 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java @@ -676,6 +676,20 @@ public class MiniHBaseCluster extends HBaseCluster { this.hbaseCluster.join(); } + public List findRegionsForTable(byte[] tableName) { + ArrayList ret = new ArrayList(); + for (JVMClusterUtil.RegionServerThread rst : getRegionServerThreads()) { + HRegionServer hrs = rst.getRegionServer(); + for (HRegion region : hrs.getOnlineRegions(tableName)) { + if (Bytes.equals(region.getTableDesc().getName(), tableName)) { + ret.add(region); + } + } + } + return ret; + } + + protected int getRegionServerIndex(ServerName serverName) { //we have a small number of region servers, this should be fine for now. List servers = getRegionServerThreads(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java new file mode 100644 index 0000000..c82b3b0 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java @@ -0,0 +1,245 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.commons.logging.impl.Log4JLogger; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.RegionServerServices; +import org.apache.hadoop.hbase.regionserver.Store; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; +import org.apache.hadoop.hbase.regionserver.wal.HLog; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; +import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; +import org.apache.log4j.Level; +import org.junit.Test; + +/** + * Test for the case where a regionserver going down has enough cycles to do damage to regions + * that have actually been assigned elsehwere. + * + *

If we happen to assign a region before it fully done with in its old location -- i.e. it is on two servers at the + * same time -- all can work fine until the case where the region on the dying server decides to compact or otherwise + * change the region file set. The region in its new location will then get a surprise when it tries to do something + * w/ a file removed by the region in its old location on dying server. + * + *

Making a test for this case is a little tough in that even if a file is deleted up on the namenode, + * if the file was opened before the delete, it will continue to let reads happen until something changes the + * state of cached blocks in the dfsclient that was already open (a block from the deleted file is cleaned + * from the datanode by NN). + * + *

What we will do below is do an explicit check for existence on the files listed in the region that + * has had some files removed because of a compaction. This sort of hurry's along and makes certain what is a chance + * occurance. + */ +public class TestIOFencing { + static final Log LOG = LogFactory.getLog(TestIOFencing.class); + static { + ((Log4JLogger)FSNamesystem.LOG).getLogger().setLevel(Level.ALL); + } + + /** + * An override of HRegion that allows us park compactions in a holding pattern and + * then when appropriate for the test, allow them proceed again. + */ + public static class CompactionBlockerRegion extends HRegion { + boolean compactionsBlocked = false; + Object compactionsBlockedLock = new Object(); + + Object compactionWaitingLock = new Object(); + boolean compactionWaiting = false; + + volatile int compactCount = 0; + + public CompactionBlockerRegion(Path tableDir, HLog log, FileSystem fs, Configuration conf, HRegionInfo regionInfo, + HTableDescriptor htd, RegionServerServices rss) { + super(tableDir, log, fs, conf, regionInfo, htd, rss); + } + + public void stopCompactions() { + synchronized (compactionsBlockedLock) { + compactionsBlocked = true; + } + } + + public void allowCompactions() { + synchronized (compactionsBlockedLock) { + compactionsBlocked = false; + compactionsBlockedLock.notifyAll(); + } + } + + public void waitForCompactionToBlock() throws InterruptedException { + synchronized (compactionWaitingLock) { + while (!compactionWaiting) { + compactionWaitingLock.wait(); + } + } + } + + @Override + protected void doRegionCompactionPrep() throws IOException { + synchronized (compactionWaitingLock) { + compactionWaiting = true; + compactionWaitingLock.notifyAll(); + } + synchronized (compactionsBlockedLock) { + while (compactionsBlocked) { + try { + compactionsBlockedLock.wait(); + } catch (InterruptedException e) { + throw new IOException(e); + } + } + } + synchronized (compactionWaitingLock) { + compactionWaiting = false; + compactionWaitingLock.notifyAll(); + } + super.doRegionCompactionPrep(); + } + + @Override + public boolean compact(CompactionRequest cr) throws IOException { + try { + return super.compact(cr); + } finally { + compactCount++; + } + } + + public int countStoreFiles() { + int count = 0; + for (Store store : stores.values()) { + count += store.getNumberOfStoreFiles(); + } + return count; + } + } + + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private final static byte[] TABLE_NAME = Bytes.toBytes("tabletest"); + private final static byte[] FAMILY = Bytes.toBytes("family"); + private static final int FIRST_BATCH_COUNT = 4000; + private static final int SECOND_BATCH_COUNT = FIRST_BATCH_COUNT; + + /** + * Test that puts up a regionserver, starts a compaction on a loaded region but holds the + * compaction completion until after we have killed the server and the region has come up on + * a new regionserver altogether. This fakes the double assignment case where region in one + * location changes the files out from underneath a region being served elsewhere. + */ + @Test + public void testFencingAroundCompaction() throws Exception { + Configuration c = TEST_UTIL.getConfiguration(); + // Insert our custom region + c.setClass(HConstants.REGION_IMPL, CompactionBlockerRegion.class, HRegion.class); + c.setBoolean("dfs.support.append", true); + // Encourage plenty of flushes + c.setLong("hbase.hregion.memstore.flush.size", 200000); + // Only run compaction when we tell it to + c.setInt("hbase.hstore.compactionThreshold", 1000); + c.setLong("hbase.hstore.blockingStoreFiles", 1000); + // Compact quickly after we tell it to! + c.setInt("hbase.regionserver.thread.splitcompactcheckfrequency", 1000); + LOG.info("Starting mini cluster"); + TEST_UTIL.startMiniCluster(1); + CompactionBlockerRegion compactingRegion = null; + HBaseAdmin admin = null; + try { + LOG.info("Creating admin"); + admin = new HBaseAdmin(c); + LOG.info("Creating table"); + TEST_UTIL.createTable(TABLE_NAME, FAMILY); + HTable table = new HTable(c, TABLE_NAME); + LOG.info("Loading test table"); + // Load some rows + TEST_UTIL.loadNumericRows(table, FAMILY, 0, FIRST_BATCH_COUNT); + // Find the region + List testRegions = TEST_UTIL.getMiniHBaseCluster().findRegionsForTable(TABLE_NAME); + assertEquals(1, testRegions.size()); + compactingRegion = (CompactionBlockerRegion)testRegions.get(0); + assertTrue(compactingRegion.countStoreFiles() > 1); + final byte REGION_NAME[] = compactingRegion.getRegionName(); + LOG.info("Blocking compactions"); + compactingRegion.stopCompactions(); + LOG.info("Asking for compaction"); + admin.majorCompact(TABLE_NAME); + LOG.info("Waiting for compaction to be about to start"); + compactingRegion.waitForCompactionToBlock(); + LOG.info("Starting a new server"); + RegionServerThread newServerThread = TEST_UTIL.getMiniHBaseCluster().startRegionServer(); + HRegionServer newServer = newServerThread.getRegionServer(); + LOG.info("Killing region server ZK lease"); + TEST_UTIL.expireRegionServerSession(0); + CompactionBlockerRegion newRegion = null; + long startWaitTime = System.currentTimeMillis(); + while (newRegion == null) { + LOG.info("Waiting for the new server to pick up the region " + Bytes.toString(REGION_NAME)); + Thread.sleep(100); + newRegion = (CompactionBlockerRegion)newServer.getOnlineRegion(REGION_NAME); + assertTrue("Timed out waiting for new server to open region", + System.currentTimeMillis() - startWaitTime < 60000); + } + LOG.info("Allowing compaction to proceed"); + compactingRegion.allowCompactions(); + while (compactingRegion.compactCount == 0) { + Thread.sleep(1000); + } + // The server we killed stays up until the compaction that was started before it was killed completes. In logs + // you should see the old regionserver now going down. + LOG.info("Compaction finished"); + // After compaction of old region finishes on the server that was going down, make sure that + // all the files we expect are still working when region is up in new location. + FileSystem fs = newRegion.getFilesystem(); + for (String f: newRegion.getStoreFileList(new byte [][] {FAMILY})) { + assertTrue("After compaction: " + f, fs.exists(new Path(f))); + } + // If we survive the split keep going... + // Now we make sure that the region isn't totally confused. Load up more rows. + TEST_UTIL.loadNumericRows(table, FAMILY, FIRST_BATCH_COUNT, FIRST_BATCH_COUNT + SECOND_BATCH_COUNT); + admin.majorCompact(TABLE_NAME); + startWaitTime = System.currentTimeMillis(); + while (newRegion.compactCount == 0) { + Thread.sleep(1000); + assertTrue("New region never compacted", System.currentTimeMillis() - startWaitTime < 30000); + } + assertEquals(FIRST_BATCH_COUNT + SECOND_BATCH_COUNT, TEST_UTIL.countRows(table)); + } finally { + if (compactingRegion != null) { + compactingRegion.allowCompactions(); + } + admin.close(); + TEST_UTIL.shutdownMiniCluster(); + } + } +} \ No newline at end of file