diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 681dd3f..8992b73 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -233,6 +233,8 @@ public class HRegion implements HeapSize { // , Writable{ REPLAY_BATCH_MUTATE, COMPACT_REGION } + private final Map currentCompactions = Maps.newConcurrentMap(); + ////////////////////////////////////////////////////////////////////////////// // Members ////////////////////////////////////////////////////////////////////////////// @@ -745,6 +747,7 @@ public class HRegion implements HeapSize { // , Writable{ // A region can be reopened if failed a split; reset flags this.closing.set(false); this.closed.set(false); + this.writestate.writesEnabled = true; if (coprocessorHost != null) { status.setStatus("Running coprocessor post-open hooks"); @@ -1109,7 +1112,8 @@ public class HRegion implements HeapSize { // , Writable{ // region. writestate.writesEnabled = false; LOG.debug("Closing " + this + ": disabling compactions & flushes"); - waitForFlushesAndCompactions(); + // give compactions 30s to finish before we start to interrupt + waitForFlushesAndCompactions(30000); } // If we were not just flushing, is it worth doing a preflush...one // that will clear out of the bulk of the memstore before we put up @@ -1230,12 +1234,34 @@ public class HRegion implements HeapSize { // , Writable{ * Exposed for TESTING. */ public void waitForFlushesAndCompactions() { + waitForFlushesAndCompactions(0); + } + /** + * Wait for all current flushes and compactions of the region to complete. + *

+ * Exposed for TESTING. + */ + public void waitForFlushesAndCompactions(long millis) { synchronized (writestate) { while (writestate.compacting > 0 || writestate.flushing) { LOG.debug("waiting for " + writestate.compacting + " compactions" + (writestate.flushing ? " & cache flush" : "") + " to complete for region " + this); try { - writestate.wait(); + long start = EnvironmentEdgeManager.currentTimeMillis(); + writestate.wait(millis); + if (millis > 0 && EnvironmentEdgeManager.currentTimeMillis() - start >= millis) { + // if we waited once for compactions to finish, interrupt them, and try again + if (LOG.isDebugEnabled()) { + LOG.debug("Waited for " + millis + + " ms for compactions to finish on close. Interrupting " + + currentCompactions.size() + " compactions."); + } + for (Thread t : currentCompactions.keySet()) { + // interrupt any current IO in the currently running compactions. + t.interrupt(); + } + millis = 0; + } } catch (InterruptedException iex) { // essentially ignore and propagate the interrupt back up Thread.currentThread().interrupt(); @@ -6157,6 +6183,14 @@ public class HRegion implements HeapSize { // , Writable{ : (hasMinor ? CompactionState.MINOR : CompactionState.NONE)); } + public void reportCompactionStart(Store store) { + currentCompactions.put(Thread.currentThread(), store); + } + + public void reportCompactionEnd(Store store) { + currentCompactions.remove(Thread.currentThread()); + } + public void reportCompactionRequestStart(boolean isMajor){ (isMajor ? majorInProgress : minorInProgress).incrementAndGet(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index c6b62d4..ee87862 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -2124,4 +2124,14 @@ public class HStore implements Store { public long getMajorCompactedCellsSize() { return majorCompactedCellsSize; } + + @Override + public void reportCompactionStart() { + getHRegion().reportCompactionStart(this); + } + + @Override + public void reportCompactionEnd() { + getHRegion().reportCompactionEnd(this); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java index 0db40c3..1f2a65f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java @@ -907,7 +907,6 @@ public class SplitTransaction { break; case CREATE_SPLIT_DIR: - this.parent.writestate.writesEnabled = true; this.parent.getRegionFileSystem().cleanupSplitsDir(); break; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java index 770ab75..f1423a4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java @@ -386,4 +386,16 @@ public interface Store extends HeapSize, StoreConfigInformation { * @return Whether this store has too many store files. */ boolean hasTooManyStoreFiles(); + + /** + * report the beginning of a compaction + * this must be called from the thread performing the compaction + */ + void reportCompactionStart(); + + /** + * report the completion of a compaction + * this must be called from the thread performing the compaction + */ + void reportCompactionEnd(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java index e04a715..225179b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java @@ -26,6 +26,7 @@ import java.util.List; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.regionserver.HStore; import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.ScanType; import org.apache.hadoop.hbase.regionserver.Store; @@ -56,6 +57,7 @@ public class DefaultCompactor extends Compactor { List newFiles = new ArrayList(); IOException e = null; try { + store.reportCompactionStart(); InternalScanner scanner = null; try { /* Include deletes, unless we are doing a major compaction */ @@ -102,6 +104,7 @@ public class DefaultCompactor extends Compactor { newFiles.add(writer.getPath()); } } + store.reportCompactionEnd(); } return newFiles; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionIO.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionIO.java new file mode 100644 index 0000000..e5f6bb5 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionIO.java @@ -0,0 +1,126 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import static org.apache.hadoop.hbase.HBaseTestingUtility.START_KEY; +import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.util.concurrent.CountDownLatch; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.MediumTests; +import org.apache.hadoop.hbase.client.Durability; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +/** + * Test compaction IO cancellation. + */ +@Category(MediumTests.class) +public class TestCompactionIO { + private static final HBaseTestingUtility UTIL = HBaseTestingUtility.createLocalHTU(); + private static final CountDownLatch latch = new CountDownLatch(1); + /** + * verify that a compaction stuck in IO is aborted when we attempt to close a region + * @throws Exception + */ + @Test + public void testInterruptCompactionIO() throws Exception { + byte [] STARTROW = Bytes.toBytes(START_KEY); + byte [] COLUMN_FAMILY = fam1; + Configuration conf = UTIL.getConfiguration(); + conf.setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 1024*1024); + conf.setInt("hbase.hregion.memstore.block.multiplier", 100); + conf.set(DefaultStoreEngine.DEFAULT_COMPACTOR_CLASS_KEY, BlockedCompactor.class.getName()); + int compactionThreshold = conf.getInt("hbase.hstore.compactionThreshold", 3); + + final HRegion r = UTIL.createLocalHRegion(UTIL.createTableDescriptor("TestCompactionIO"), null, null); + + //Create a couple store files w/ 15KB (over 10KB interval) + int jmax = (int) Math.ceil(15.0/compactionThreshold); + byte [] pad = new byte[1000]; // 1 KB chunk + for (int i = 0; i < compactionThreshold; i++) { + Put p = new Put(Bytes.add(STARTROW, Bytes.toBytes(i))); + p.setDurability(Durability.SKIP_WAL); + for (int j = 0; j < jmax; j++) { + p.add(COLUMN_FAMILY, Bytes.toBytes(j), pad); + } + UTIL.loadRegion(r, COLUMN_FAMILY); + r.put(p); + r.flushcache(); + } + new Thread(new Runnable() { + @Override + public void run() { + try { + latch.await(); + Thread.sleep(1000); + r.close(); + } catch (Exception x) { + throw new RuntimeException(x); + } + } + }).start(); + // hangs + r.compactStores(); + + // ensure that the compaction stopped, all old files are intact, + Store s = r.stores.get(COLUMN_FAMILY); + assertEquals(compactionThreshold, s.getStorefilesCount()); + assertTrue(s.getStorefilesSize() > 15*1000); + // and no new store files persisted past compactStores() + FileStatus[] ls = r.getFilesystem().listStatus(r.getRegionFileSystem().getTempDir()); + + // this is happening after the compaction start, the DefaultCompactor does not + // clean tmp files when it encounters an IOException. Should it? + assertEquals(1, ls.length); + } + + public static class BlockedCompactor extends DefaultCompactor { + public BlockedCompactor(final Configuration conf, final Store store) { + super(conf, store); + } + @Override + protected boolean performCompaction(InternalScanner scanner, + CellSink writer, long smallestReadPoint) throws IOException { + CellSink myWriter = new CellSink() { + @Override + public void append(KeyValue kv) throws IOException { + try { + Thread.sleep(100000); + } catch (InterruptedException ie) { + throw new InterruptedIOException(ie.getMessage()); + } + } + }; + latch.countDown(); + return super.performCompaction(scanner, myWriter, smallestReadPoint); + } + } +}