commit 39db0af61ba2d49de639fa0a3c31cf80cc7bfebc Author: Todd Lipcon Date: Wed May 5 22:36:57 2010 -0700 HBASE-2231 test case - shows dataloss if ZK session lost while compacting diff --git src/java/org/apache/hadoop/hbase/regionserver/HRegion.java src/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 5ef638f..db7dbd1 100644 --- src/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ src/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -751,7 +751,7 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ * Clean out any vestiges of previous failed compactions. * @throws IOException */ - private void doRegionCompactionPrep() throws IOException { + protected void doRegionCompactionPrep() throws IOException { doRegionCompactionCleanup(); } diff --git src/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java src/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java index cd441b4..b659c7d 100644 --- src/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java +++ src/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java @@ -73,6 +73,7 @@ class StoreFileScanner implements KeyValueScanner { hfs.next(); return true; } catch(IOException ioe) { + LOG.error("Could not seek in store file scanner " + this, ioe); close(); return false; } diff --git src/test/hbase-site.xml src/test/hbase-site.xml index 486ea28..2dbd20a 100644 --- src/test/hbase-site.xml +++ src/test/hbase-site.xml @@ -32,7 +32,7 @@ hbase.client.pause - 5000 + 500 General client pause value. Used mostly as value to wait before running a retry of a failed get, region lookup, etc. diff --git src/test/org/apache/hadoop/hbase/HBaseTestingUtility.java src/test/org/apache/hadoop/hbase/HBaseTestingUtility.java index 7c3bb1a..d4addca 100644 --- src/test/org/apache/hadoop/hbase/HBaseTestingUtility.java +++ src/test/org/apache/hadoop/hbase/HBaseTestingUtility.java @@ -409,6 +409,26 @@ public class HBaseTestingUtility { return rowCount; } + public void loadNumericRows(final HTable t, final byte[] f, + int startRow, int endRow) throws IOException { + for (int i = startRow; i < endRow; i++) { + byte[] data = Bytes.toBytes(String.valueOf(i)); + Put put = new Put(data); + put.add(f, null, data); + t.put(put); + } + } + + public int countRows(final HTable table) throws IOException { + Scan scan = new Scan(); + ResultScanner results = table.getScanner(scan); + int count = 0; + for (@SuppressWarnings("unused") Result res : results) { + count++; + } + results.close(); + return count; + } /** * Creates many regions names "aaa" to "zzz". * diff --git src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java index a10fa0e..0fa9874 100644 --- src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java +++ src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.client.HConnectionManager; import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.JVMClusterUtil; /** @@ -290,7 +291,20 @@ public class MiniHBaseCluster implements HConstants { } return index; } - + + public List findRegionsForTable(byte[] tableName) { + ArrayList ret = new ArrayList(); + for (JVMClusterUtil.RegionServerThread rst : getRegionServerThreads()) { + HRegionServer hrs = rst.getRegionServer(); + + for (HRegion region : hrs.getOnlineRegions()) { + if (Bytes.equals(region.getTableDesc().getName(), tableName)) { + ret.add(region); + } + } + } + return ret; + } /** * Add a message to include in the responses send a regionserver when it * checks back in. diff --git src/test/org/apache/hadoop/hbase/TestFullLogReconstruction.java src/test/org/apache/hadoop/hbase/TestFullLogReconstruction.java index 5e2ddda..f50b0ea 100644 --- src/test/org/apache/hadoop/hbase/TestFullLogReconstruction.java +++ src/test/org/apache/hadoop/hbase/TestFullLogReconstruction.java @@ -99,13 +99,7 @@ public class TestFullLogReconstruction { // Load up the table with simple rows and count them int initialCount = TEST_UTIL.loadTable(table, FAMILY); - Scan scan = new Scan(); - ResultScanner results = table.getScanner(scan); - int count = 0; - for (@SuppressWarnings("unused") Result res : results) { - count++; - } - results.close(); + int count = TEST_UTIL.countRows(table); assertEquals(initialCount, count); @@ -116,13 +110,7 @@ public class TestFullLogReconstruction { // Expire the first RS. Don't have it run its shutdown hdfs thread. TEST_UTIL.expireRegionServerSession(0, false); - scan = new Scan(); - results = table.getScanner(scan); - int newCount = 0; - for (@SuppressWarnings("unused") Result res : results) { - newCount++; - } - results.close(); + int newCount = TEST_UTIL.countRows(table); assertEquals(count, newCount); } } diff --git src/test/org/apache/hadoop/hbase/TestIOFencing.java src/test/org/apache/hadoop/hbase/TestIOFencing.java new file mode 100644 index 0000000..d5dab96 --- /dev/null +++ src/test/org/apache/hadoop/hbase/TestIOFencing.java @@ -0,0 +1,236 @@ +/** + * Copyright 2009 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import static org.junit.Assert.*; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.commons.logging.impl.Log4JLogger; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.regionserver.FlushRequester; +import org.apache.hadoop.hbase.regionserver.HLog; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.Store; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; +import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; +import org.apache.log4j.Level; +import org.junit.Test; + +public class TestIOFencing { + static final Log LOG = LogFactory.getLog(TestIOFencing.class); + static { + ((Log4JLogger)FSNamesystem.LOG).getLogger().setLevel(Level.ALL); + } + + public static class CompactionBlockerRegion extends HRegion { + boolean compactionsBlocked = false; + Object compactionsBlockedLock = new Object(); + + Object compactionWaitingLock = new Object(); + boolean compactionWaiting = false; + + volatile int compactCount = 0; + + public CompactionBlockerRegion(Path basedir, HLog log, FileSystem fs, HBaseConfiguration conf, + HRegionInfo regionInfo, FlushRequester flushListener) { + super(basedir, log, fs, conf, regionInfo, flushListener); + } + + public void stopCompactions() { + synchronized (compactionsBlockedLock) { + compactionsBlocked = true; + } + } + + public void allowCompactions() { + synchronized (compactionsBlockedLock) { + compactionsBlocked = false; + compactionsBlockedLock.notifyAll(); + } + } + + public void waitForCompactionToBlock() throws InterruptedException { + synchronized (compactionWaitingLock) { + while (!compactionWaiting) { + compactionWaitingLock.wait(); + } + } + } + + @Override + protected void doRegionCompactionPrep() throws IOException { + synchronized (compactionWaitingLock) { + compactionWaiting = true; + compactionWaitingLock.notifyAll(); + } + synchronized (compactionsBlockedLock) { + while (compactionsBlocked) { + try { + compactionsBlockedLock.wait(); + } catch (InterruptedException e) { + throw new IOException(e); + } + } + } + synchronized (compactionWaitingLock) { + compactionWaiting = false; + compactionWaitingLock.notifyAll(); + } + super.doRegionCompactionPrep(); + } + + @Override + public byte [] compactStores() throws IOException { + try { + return super.compactStores(); + } finally { + compactCount++; + } + } + + public int countStoreFiles() { + int count = 0; + for (Store store : stores.values()) { + count += store.getNumberOfstorefiles(); + } + return count; + } + } + + private final static HBaseTestingUtility + TEST_UTIL = new HBaseTestingUtility(); + + private final static byte[] TABLE_NAME = Bytes.toBytes("tabletest"); + private final static byte[] FAMILY = Bytes.toBytes("family"); + + private static final int FIRST_BATCH_COUNT = 4000; + private static final int SECOND_BATCH_COUNT = 4000; + + @Test + public void testFencingAroundCompaction() throws Exception { + HBaseConfiguration c = TEST_UTIL.getConfiguration(); + c.setClass(HConstants.REGION_IMPL, + CompactionBlockerRegion.class, + HRegion.class); + c.setBoolean("dfs.support.append", true); + // encourage plenty of flushes + c.setLong("hbase.hregion.memstore.flush.size", 200000); + // Only run compaction when we tell it to + c.setInt("hbase.hstore.compactionThreshold", 1000); + c.setLong("hbase.hstore.blockingStoreFiles", 1000); + // Compact quickly after we tell it to! + c.setInt("hbase.regionserver.thread.splitcompactcheckfrequency", 1000); + + LOG.info("Starting mini cluster"); + TEST_UTIL.startMiniCluster(1); + CompactionBlockerRegion compactingRegion = null; + + try { + LOG.info("Creating admin"); + HBaseAdmin admin = new HBaseAdmin(c); + LOG.info("Creating table"); + TEST_UTIL.createTable(TABLE_NAME, FAMILY); + HTable table = new HTable(TABLE_NAME); + + LOG.info("Loading test table"); + // Load some rows + TEST_UTIL.loadNumericRows(table, FAMILY, 0, FIRST_BATCH_COUNT); + + // Find the region + + List testRegions = + TEST_UTIL.getMiniHBaseCluster().findRegionsForTable(TABLE_NAME); + assertEquals(1, testRegions.size()); + compactingRegion = (CompactionBlockerRegion)testRegions.get(0); + assertTrue(compactingRegion.countStoreFiles() > 1); + byte REGION_NAME[] = compactingRegion.getRegionName(); + + LOG.info("Blocking compactions"); + compactingRegion.stopCompactions(); + + LOG.info("Asking for compaction"); + admin.majorCompact(TABLE_NAME); + + LOG.info("Waiting for compaction to be about to start"); + compactingRegion.waitForCompactionToBlock(); + + LOG.info("Starting a new server"); + RegionServerThread newServerThread = + TEST_UTIL.getMiniHBaseCluster().startRegionServer(); + HRegionServer newServer = newServerThread.getRegionServer(); + + LOG.info("Killing region server ZK lease"); + TEST_UTIL.expireRegionServerSession(0, false); + + + CompactionBlockerRegion newRegion = null; + long startWaitTime = System.currentTimeMillis(); + while (newRegion == null) { + LOG.info("Waiting for the new server to pick up the region"); + Thread.sleep(1000); + newRegion = + (CompactionBlockerRegion)newServer.getOnlineRegion(REGION_NAME); + assertTrue("Timed out waiting for new server to open region", + System.currentTimeMillis() - startWaitTime < 60000); + } + + LOG.info("Allowing compaction to proceed"); + compactingRegion.allowCompactions(); + + while (compactingRegion.compactCount == 0) { + Thread.sleep(1000); + } + + LOG.info("Compaction finished, loading more data"); + + // Now we make sure that the region isn't totally confused + TEST_UTIL.loadNumericRows( + table, FAMILY, FIRST_BATCH_COUNT, FIRST_BATCH_COUNT + SECOND_BATCH_COUNT); + + admin.majorCompact(TABLE_NAME); + startWaitTime = System.currentTimeMillis(); + while (newRegion.compactCount == 0) { + Thread.sleep(1000); + assertTrue("New region never compacted", + System.currentTimeMillis() - startWaitTime < 30000); + } + + assertEquals(FIRST_BATCH_COUNT + SECOND_BATCH_COUNT, + TEST_UTIL.countRows(table)); + } finally { + if (compactingRegion != null) { + compactingRegion.allowCompactions(); + } + TEST_UTIL.shutdownMiniCluster(); + } + } + +}