diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index fbf151a..8018aa4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -251,6 +251,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // REPLAY_BATCH_MUTATE, COMPACT_REGION } + private final Map currentCompactions = Maps.newConcurrentMap(); + ////////////////////////////////////////////////////////////////////////////// // Members ////////////////////////////////////////////////////////////////////////////// @@ -778,6 +780,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // // A region can be reopened if failed a split; reset flags this.closing.set(false); this.closed.set(false); + this.writestate.writesEnabled = true; if (coprocessorHost != null) { status.setStatus("Running coprocessor post-open hooks"); @@ -1181,7 +1184,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // // region. writestate.writesEnabled = false; LOG.debug("Closing " + this + ": disabling compactions & flushes"); - waitForFlushesAndCompactions(); + // give compactions 30s to finish before we start to interrupt + waitForFlushesAndCompactions(30000); } // If we were not just flushing, is it worth doing a preflush...one // that will clear out of the bulk of the memstore before we put up @@ -1308,6 +1312,15 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // * Exposed for TESTING. */ public void waitForFlushesAndCompactions() { + waitForFlushesAndCompactions(0); + } + + /** + * Wait for all current flushes and compactions of the region to complete. + *

+ * Exposed for TESTING. + */ + public void waitForFlushesAndCompactions(long millis) { synchronized (writestate) { boolean interrupted = false; try { @@ -1315,7 +1328,21 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // LOG.debug("waiting for " + writestate.compacting + " compactions" + (writestate.flushing ? " & cache flush" : "") + " to complete for region " + this); try { - writestate.wait(); + long start = EnvironmentEdgeManager.currentTime(); + writestate.wait(millis); + if (millis > 0 && EnvironmentEdgeManager.currentTime() - start >= millis) { + // if we waited once for compactions to finish, interrupt them, and try again + if (LOG.isDebugEnabled()) { + LOG.debug("Waited for " + millis + + " ms for compactions to finish on close. Interrupting " + + currentCompactions.size() + " compactions."); + } + for (Thread t : currentCompactions.keySet()) { + // interrupt any current IO in the currently running compactions. + t.interrupt(); + } + millis = 0; + } } catch (InterruptedException iex) { // essentially ignore and propagate the interrupt back up LOG.warn("Interrupted while waiting"); @@ -6334,6 +6361,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // : (hasMinor ? CompactionState.MINOR : CompactionState.NONE)); } + public void reportCompactionStart(Store store) { + currentCompactions.put(Thread.currentThread(), store); + } + + public void reportCompactionEnd(Store store) { + currentCompactions.remove(Thread.currentThread()); + } + public void reportCompactionRequestStart(boolean isMajor){ (isMajor ? majorInProgress : minorInProgress).incrementAndGet(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index 8b41401..803a3c9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -2243,4 +2243,14 @@ public class HStore implements Store { public void deregisterChildren(ConfigurationManager manager) { // No children to deregister } + + @Override + public void reportCompactionStart() { + getHRegion().reportCompactionStart(this); + } + + @Override + public void reportCompactionEnd() { + getHRegion().reportCompactionEnd(this); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java index 6e306a9..1f8d046 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java @@ -693,7 +693,6 @@ public class SplitTransaction { break; case CREATE_SPLIT_DIR: - this.parent.writestate.writesEnabled = true; this.parent.getRegionFileSystem().cleanupSplitsDir(); break; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java index 70faff1..cb73810 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java @@ -398,4 +398,16 @@ public interface Store extends HeapSize, StoreConfigInformation, PropagatingConf * @throws IOException */ void refreshStoreFiles() throws IOException; + + /** + * report the beginning of a compaction + * this must be called from the thread performing the compaction + */ + void reportCompactionStart(); + + /** + * report the completion of a compaction + * this must be called from the thread performing the compaction + */ + void reportCompactionEnd(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java index aae3968..a7dc28d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java @@ -26,6 +26,7 @@ import java.util.List; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.regionserver.HStore; import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.ScanType; import org.apache.hadoop.hbase.regionserver.Store; @@ -57,6 +58,7 @@ public class DefaultCompactor extends Compactor { boolean cleanSeqId = false; IOException e = null; try { + store.reportCompactionStart(); InternalScanner scanner = null; try { /* Include deletes, unless we are doing a compaction of all files */ @@ -108,6 +110,7 @@ public class DefaultCompactor extends Compactor { newFiles.add(writer.getPath()); } } + store.reportCompactionEnd(); } return newFiles; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionIO.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionIO.java new file mode 100644 index 0000000..efcffaf --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionIO.java @@ -0,0 +1,126 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import static org.apache.hadoop.hbase.HBaseTestingUtility.START_KEY; +import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.util.concurrent.CountDownLatch; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.client.Durability; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +/** + * Test compaction IO cancellation. + */ +@Category(MediumTests.class) +public class TestCompactionIO { + private static final HBaseTestingUtility UTIL = HBaseTestingUtility.createLocalHTU(); + private static final CountDownLatch latch = new CountDownLatch(1); + /** + * verify that a compaction stuck in IO is aborted when we attempt to close a region + * @throws Exception + */ + @Test + public void testInterruptCompactionIO() throws Exception { + byte [] STARTROW = Bytes.toBytes(START_KEY); + byte [] COLUMN_FAMILY = fam1; + Configuration conf = UTIL.getConfiguration(); + conf.setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 1024*1024); + conf.setInt("hbase.hregion.memstore.block.multiplier", 100); + conf.set(DefaultStoreEngine.DEFAULT_COMPACTOR_CLASS_KEY, BlockedCompactor.class.getName()); + int compactionThreshold = conf.getInt("hbase.hstore.compactionThreshold", 3); + + final HRegion r = UTIL.createLocalHRegion(UTIL.createTableDescriptor("TestCompactionIO"), null, null); + + //Create a couple store files w/ 15KB (over 10KB interval) + int jmax = (int) Math.ceil(15.0/compactionThreshold); + byte [] pad = new byte[1000]; // 1 KB chunk + for (int i = 0; i < compactionThreshold; i++) { + Put p = new Put(Bytes.add(STARTROW, Bytes.toBytes(i))); + p.setDurability(Durability.SKIP_WAL); + for (int j = 0; j < jmax; j++) { + p.add(COLUMN_FAMILY, Bytes.toBytes(j), pad); + } + UTIL.loadRegion(r, COLUMN_FAMILY); + r.put(p); + r.flushcache(); + } + new Thread(new Runnable() { + @Override + public void run() { + try { + latch.await(); + Thread.sleep(1000); + r.close(); + } catch (Exception x) { + throw new RuntimeException(x); + } + } + }).start(); + // hangs + r.compactStores(); + + // ensure that the compaction stopped, all old files are intact, + Store s = r.stores.get(COLUMN_FAMILY); + assertEquals(compactionThreshold, s.getStorefilesCount()); + assertTrue(s.getStorefilesSize() > 15*1000); + // and no new store files persisted past compactStores() + FileStatus[] ls = r.getFilesystem().listStatus(r.getRegionFileSystem().getTempDir()); + + // this is happening after the compaction start, the DefaultCompactor does not + // clean tmp files when it encounters an IOException. Should it? + assertEquals(1, ls.length); + } + + public static class BlockedCompactor extends DefaultCompactor { + public BlockedCompactor(final Configuration conf, final Store store) { + super(conf, store); + } + @Override + protected boolean performCompaction(InternalScanner scanner, + CellSink writer, long smallestReadPoint, boolean cleanSeqId) throws IOException { + CellSink myWriter = new CellSink() { + @Override + public void append(Cell cell) throws IOException { + try { + Thread.sleep(100000); + } catch (InterruptedException ie) { + throw new InterruptedIOException(ie.getMessage()); + } + } + }; + latch.countDown(); + return super.performCompaction(scanner, myWriter, smallestReadPoint, cleanSeqId); + } + } +}