diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 681dd3f..f4eaac7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -233,6 +233,8 @@ public class HRegion implements HeapSize { // , Writable{ REPLAY_BATCH_MUTATE, COMPACT_REGION } + public Map currentCompactions = Maps.newConcurrentMap(); + ////////////////////////////////////////////////////////////////////////////// // Members ////////////////////////////////////////////////////////////////////////////// @@ -1109,6 +1111,10 @@ public class HRegion implements HeapSize { // , Writable{ // region. writestate.writesEnabled = false; LOG.debug("Closing " + this + ": disabling compactions & flushes"); + for (Thread t : currentCompactions.keySet()) { + // interrupt any current IO in the currently running compactions. + t.interrupt(); + } waitForFlushesAndCompactions(); } // If we were not just flushing, is it worth doing a preflush...one diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java index e04a715..e30427e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java @@ -26,6 +26,7 @@ import java.util.List; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.regionserver.HStore; import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.ScanType; import org.apache.hadoop.hbase.regionserver.Store; @@ -56,6 +57,7 @@ public class DefaultCompactor extends Compactor { List newFiles = new ArrayList(); IOException e = null; try { + ((HStore)store).getHRegion().currentCompactions.put(Thread.currentThread(), store); InternalScanner scanner = null; try { /* Include deletes, unless we are doing a major compaction */ @@ -102,6 +104,7 @@ public class DefaultCompactor extends Compactor { newFiles.add(writer.getPath()); } } + ((HStore)store).getHRegion().currentCompactions.remove(Thread.currentThread()); } return newFiles; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionIO.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionIO.java new file mode 100644 index 0000000..e75e7fd --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionIO.java @@ -0,0 +1,106 @@ +package org.apache.hadoop.hbase.regionserver; + +import static org.apache.hadoop.hbase.HBaseTestingUtility.START_KEY; +import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.util.concurrent.CountDownLatch; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.hbase.HBaseTestCase; +import org.apache.hadoop.hbase.HBaseTestCase.HRegionIncommon; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.Durability; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.Test; + +public class TestCompactionIO { + private static final HBaseTestingUtility UTIL = HBaseTestingUtility.createLocalHTU(); + private static final CountDownLatch latch = new CountDownLatch(1); + /** + * verify that a compaction stuck in IO is aborted when we attempt to close a region + * @throws Exception + */ + @Test + public void testInterruptCompactionIO() throws Exception { + byte [] STARTROW = Bytes.toBytes(START_KEY); + byte [] COLUMN_FAMILY = fam1; + Configuration conf = UTIL.getConfiguration(); + conf.setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 1024*1024); + conf.setInt("hbase.hregion.memstore.block.multiplier", 100); + conf.set(DefaultStoreEngine.DEFAULT_COMPACTOR_CLASS_KEY, BlockedCompactor.class.getName()); + int compactionThreshold = conf.getInt("hbase.hstore.compactionThreshold", 3); + + final HRegion r = UTIL.createLocalHRegion(UTIL.createTableDescriptor("TestCompactionIO"), null, null); + + //Create a couple store files w/ 15KB (over 10KB interval) + int jmax = (int) Math.ceil(15.0/compactionThreshold); + byte [] pad = new byte[1000]; // 1 KB chunk + for (int i = 0; i < compactionThreshold; i++) { + HRegionIncommon loader = new HRegionIncommon(r); + Put p = new Put(Bytes.add(STARTROW, Bytes.toBytes(i))); + p.setDurability(Durability.SKIP_WAL); + for (int j = 0; j < jmax; j++) { + p.add(COLUMN_FAMILY, Bytes.toBytes(j), pad); + } + HBaseTestCase.addContent(loader, Bytes.toString(COLUMN_FAMILY)); + loader.put(p); + loader.flushcache(); + } + new Thread(new Runnable() { + @Override + public void run() { + try { + latch.await(); + Thread.sleep(1000); + r.close(); + } catch (Exception x) { + throw new RuntimeException(x); + } + } + }).start(); + // hangs + r.compactStores(); + + // ensure that the compaction stopped, all old files are intact, + Store s = r.stores.get(COLUMN_FAMILY); + assertEquals(compactionThreshold, s.getStorefilesCount()); + assertTrue(s.getStorefilesSize() > 15*1000); + // and no new store files persisted past compactStores() + FileStatus[] ls = r.getFilesystem().listStatus(r.getRegionFileSystem().getTempDir()); + + // this is happening after the compaction start, the DefaultCompactor does not + // clean tmp files when it encounters an IOException. Should it? + //assertEquals(0, ls.length); + } + + public static class BlockedCompactor extends DefaultCompactor { + public BlockedCompactor(final Configuration conf, final Store store) { + super(conf, store); + } + @Override + protected boolean performCompaction(InternalScanner scanner, + CellSink writer, long smallestReadPoint) throws IOException { + CellSink myWriter = new CellSink() { + @Override + public void append(KeyValue kv) throws IOException { + try { + Thread.sleep(100000); + } catch (InterruptedException ie) { + throw new InterruptedIOException(ie.getMessage()); + } + } + }; + latch.countDown(); + return super.performCompaction(scanner, myWriter, smallestReadPoint); + } + } +}