diff --git src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java index a77dc22..67e5631 100644 --- src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java +++ src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java @@ -22,7 +22,6 @@ package org.apache.hadoop.hbase.client; import java.io.Closeable; import java.io.IOException; import java.io.InterruptedIOException; -import java.lang.reflect.UndeclaredThrowableException; import java.net.SocketTimeoutException; import java.util.Arrays; import java.util.LinkedList; @@ -56,6 +55,7 @@ import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor; import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitorBase; import org.apache.hadoop.hbase.ipc.HMasterInterface; import org.apache.hadoop.hbase.ipc.HRegionInterface; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest.CompactionState; import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException; import org.apache.hadoop.hbase.util.Addressing; import org.apache.hadoop.hbase.util.Bytes; @@ -1667,4 +1667,90 @@ public class HBaseAdmin implements Abortable, Closeable { return null; } } + + /** + * Get the current compaction state of a table or region. + * It could be in a major compaction, a minor compaction, both, or none. + * + * @param tableNameOrRegionName table or region to major compact + * @throws IOException if a remote or network exception occurs + * @throws InterruptedException + * @return the current compaction state + */ + public CompactionState getCompactionState(final String tableNameOrRegionName) + throws IOException, InterruptedException { + return getCompactionState(Bytes.toBytes(tableNameOrRegionName)); + } + + /** + * Get the current compaction state of a table or region. + * It could be in a major compaction, a minor compaction, both, or none. + * + * @param tableNameOrRegionName table or region to major compact + * @throws IOException if a remote or network exception occurs + * @throws InterruptedException + * @return the current compaction state + */ + public CompactionState getCompactionState(final byte [] tableNameOrRegionName) + throws IOException, InterruptedException { + CompactionState state = CompactionState.NONE; + CatalogTracker ct = getCatalogTracker(); + try { + if (isRegionName(tableNameOrRegionName, ct)) { + Pair pair = + MetaReader.getRegion(ct, tableNameOrRegionName); + if (pair == null || pair.getSecond() == null) { + LOG.info("No server in .META. for " + + Bytes.toStringBinary(tableNameOrRegionName) + "; pair=" + pair); + } else { + ServerName sn = pair.getSecond(); + HRegionInterface rs = + this.connection.getHRegionConnection(sn.getHostname(), sn.getPort()); + return CompactionState.valueOf( + rs.getCompactionState(pair.getFirst().getRegionName())); + } + } else { + final String tableName = tableNameString(tableNameOrRegionName, ct); + List> pairs = + MetaReader.getTableRegionsAndLocations(ct, tableName); + for (Pair pair: pairs) { + if (pair.getFirst().isOffline()) continue; + if (pair.getSecond() == null) continue; + try { + ServerName sn = pair.getSecond(); + HRegionInterface rs = + this.connection.getHRegionConnection(sn.getHostname(), sn.getPort()); + switch (CompactionState.valueOf( + rs.getCompactionState(pair.getFirst().getRegionName()))) { + case MAJOR_AND_MINOR: + return CompactionState.MAJOR_AND_MINOR; + case MAJOR: + if (state == CompactionState.MINOR) { + return CompactionState.MAJOR_AND_MINOR; + } + state = CompactionState.MAJOR; + break; + case MINOR: + if (state == CompactionState.MAJOR) { + return CompactionState.MAJOR_AND_MINOR; + } + state = CompactionState.MINOR; + break; + case NONE: + default: // nothing, continue + } + } catch (NotServingRegionException e) { + if (LOG.isDebugEnabled()) { + LOG.debug("Trying to get compaction state of " + + pair.getFirst() + ": " + + StringUtils.stringifyException(e)); + } + } + } + } + } finally { + cleanupCatalogTracker(ct); + } + return state; + } } diff --git src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java index a3e7669..d5e329e 100644 --- src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java +++ src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java @@ -44,6 +44,7 @@ import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.filter.WritableByteArrayComparable; import org.apache.hadoop.hbase.io.hfile.BlockCacheColumnFamilySummary; import org.apache.hadoop.hbase.regionserver.RegionOpeningState; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest.CompactionState; import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException; import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.security.TokenInfo; @@ -621,6 +622,15 @@ public interface HRegionInterface extends VersionedProtocol, Stoppable, Abortabl */ public byte[][] rollHLogWriter() throws IOException, FailedLogCloseException; + /** + * Get the current compaction state of the region. + * + * @param regionName the name of the region to check compaction statte. + * @return the compaction state name. + * @throws IOException exception + */ + public String getCompactionState(final byte[] regionName) throws IOException; + @Override public void stop(String why); } diff --git src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index ff3d635..3f28b7a 100644 --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -122,6 +122,7 @@ import org.apache.hadoop.hbase.ipc.RpcServer; import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; import org.apache.hadoop.hbase.regionserver.Leases.LeaseStillHeldException; import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; import org.apache.hadoop.hbase.regionserver.handler.CloseMetaHandler; import org.apache.hadoop.hbase.regionserver.handler.CloseRegionHandler; import org.apache.hadoop.hbase.regionserver.handler.CloseRootHandler; @@ -3698,4 +3699,19 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, mxBeanInfo); LOG.info("Registered RegionServer MXBean"); } + + /** + * Get the current compaction state of the region. + * + * @param regionName the name of the region to check compaction statte. + * @return the compaction state name. + * @throws IOException exception + */ + public String getCompactionState(final byte[] regionName) throws IOException { + checkOpen(); + requestCount.incrementAndGet(); + HRegion region = getRegion(regionName); + HRegionInfo info = region.getRegionInfo(); + return CompactionRequest.getCompactionState(info.getRegionId()).name(); + } } diff --git src/main/java/org/apache/hadoop/hbase/regionserver/Store.java src/main/java/org/apache/hadoop/hbase/regionserver/Store.java index 6f32235..f85da01 100644 --- src/main/java/org/apache/hadoop/hbase/regionserver/Store.java +++ src/main/java/org/apache/hadoop/hbase/regionserver/Store.java @@ -1281,10 +1281,14 @@ public class Store extends SchemaConfigured implements HeapSize { } finally { this.lock.readLock().unlock(); } + if (ret != null) { + CompactionRequest.preRequest(ret); + } return ret; } public void finishRequest(CompactionRequest cr) { + CompactionRequest.postRequest(cr); cr.finishRequest(); synchronized (filesCompacting) { filesCompacting.removeAll(cr.getFiles()); diff --git src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java index 7544b71..0a931bb 100644 --- src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java +++ src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java @@ -21,12 +21,15 @@ package org.apache.hadoop.hbase.regionserver.compactions; import java.io.IOException; import java.util.List; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.RemoteExceptionHandler; +import org.apache.hadoop.hbase.KeyValue.Type; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.Store; @@ -55,6 +58,14 @@ public class CompactionRequest implements Comparable, private final Long timeInNanos; private HRegionServer server = null; + /** + * Map to track the number of compaction requested per region (id) + */ + private static final ConcurrentHashMap + majorCompactions = new ConcurrentHashMap(); + private static final ConcurrentHashMap + minorCompactions = new ConcurrentHashMap(); + public CompactionRequest(HRegion r, Store s, CompactSelection files, boolean isMajor, int p) { Preconditions.checkNotNull(r); @@ -73,6 +84,58 @@ public class CompactionRequest implements Comparable, this.timeInNanos = System.nanoTime(); } + /** + * Find out if a given region in compaction now. + * + * @param regionId + * @return + */ + public static CompactionState getCompactionState( + final long regionId) { + Long key = Long.valueOf(regionId); + AtomicInteger major = majorCompactions.get(key); + AtomicInteger minor = minorCompactions.get(key); + int state = 0; + if (minor != null && minor.get() > 0) { + state += 1; // use 1 to indicate minor here + } + if (major != null && major.get() > 0) { + state += 2; // use 2 to indicate major here + } + switch (state) { + case 3: // 3 = 2 + 1, so both major and minor + return CompactionState.MAJOR_AND_MINOR; + case 2: + return CompactionState.MAJOR; + case 1: + return CompactionState.MINOR; + default: + return CompactionState.NONE; + } + } + + public static void preRequest(final CompactionRequest cr){ + Long key = Long.valueOf(cr.getHRegion().getRegionId()); + ConcurrentHashMap compactions = + cr.isMajor() ? majorCompactions : minorCompactions; + AtomicInteger count = compactions.get(key); + if (count == null) { + compactions.putIfAbsent(key, new AtomicInteger(0)); + count = compactions.get(key); + } + count.incrementAndGet(); + } + + public static void postRequest(final CompactionRequest cr){ + Long key = Long.valueOf(cr.getHRegion().getRegionId()); + ConcurrentHashMap compactions = + cr.isMajor() ? majorCompactions : minorCompactions; + AtomicInteger count = compactions.get(key); + if (count != null) { + count.decrementAndGet(); + } + } + public void finishRequest() { this.compactSelection.finishRequest(); } @@ -213,6 +276,16 @@ public class CompactionRequest implements Comparable, } /** + * An enum for the region compaction state + */ + public static enum CompactionState { + NONE, + MINOR, + MAJOR, + MAJOR_AND_MINOR; + } + + /** * Cleanup class to use when rejecting a compaction request from the queue. */ public static class Rejection implements RejectedExecutionHandler { diff --git src/main/resources/hbase-webapps/master/table.jsp src/main/resources/hbase-webapps/master/table.jsp index 811df46..b7b0f33 100644 --- src/main/resources/hbase-webapps/master/table.jsp +++ src/main/resources/hbase-webapps/master/table.jsp @@ -154,6 +154,11 @@ <%= hbadmin.isTableEnabled(table.getTableName()) %> Is the table enabled + + Compaction + <%= hbadmin.getCompactionState(table.getTableName()) %> + Is the table compacting + <% if (showFragmentation) { %> Fragmentation diff --git src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionState.java src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionState.java new file mode 100644 index 0000000..8fc2f59 --- /dev/null +++ src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionState.java @@ -0,0 +1,170 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.regionserver; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.LargeTests; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest.CompactionState; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +/** Unit tests to test retrieving table/region compaction state*/ +@Category(LargeTests.class) +public class TestCompactionState { + final static Log LOG = LogFactory.getLog(TestCompactionState.class); + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private final static Random random = new Random(); + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TEST_UTIL.startMiniCluster(); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + @Test(timeout=60000) + public void testMajorCompaction() throws IOException, InterruptedException { + compaction("testMajorCompaction", 8, CompactionState.MAJOR); + } + + @Test(timeout=60000) + public void testMinorCompaction() throws IOException, InterruptedException { + compaction("testMinorCompaction", 15, CompactionState.MINOR); + } + + /** + * Load data to a table, flush it to disk, trigger compaction, + * confirm the compaction state is right and wait till it is done. + * + * @param tableName + * @param flushes + * @param expectedState + * @throws IOException + * @throws InterruptedException + */ + private void compaction(final String tableName, final int flushes, + final CompactionState expectedState) throws IOException, InterruptedException { + // Create a table with regions + byte [] table = Bytes.toBytes(tableName); + byte [] family = Bytes.toBytes("family"); + HTable ht = null; + try { + ht = TEST_UTIL.createTable(table, family); + loadData(ht, family, 3000, flushes); + HRegionServer rs = TEST_UTIL.getMiniHBaseCluster().getRegionServer(0); + List regions = rs.getOnlineRegions(table); + int countBefore = countStoreFiles(regions, family); + assertTrue(countBefore > 0); // there should be some data files + HBaseAdmin admin = new HBaseAdmin(TEST_UTIL.getConfiguration()); + if (expectedState == CompactionState.MINOR) { + admin.compact(tableName); + } else { + admin.majorCompact(table); + } + long curt = System.currentTimeMillis(); + long waitTime = 5000; + long endt = curt + waitTime; + CompactionState state = admin.getCompactionState(table); + while (state == CompactionState.NONE && curt < endt) { + Thread.sleep(10); + state = admin.getCompactionState(table); + curt = System.currentTimeMillis(); + } + // Now, should have the right compaction state, + // otherwise, the compaction should have already been done + if (expectedState != state) { + for (HRegion region: regions) { + state = CompactionRequest.getCompactionState(region.getRegionId()); + assertEquals(CompactionState.NONE, state); + } + } else { + curt = System.currentTimeMillis(); + waitTime = 20000; + endt = curt + waitTime; + state = admin.getCompactionState(table); + while (state != CompactionState.NONE && curt < endt) { + Thread.sleep(10); + state = admin.getCompactionState(table); + curt = System.currentTimeMillis(); + } + // Now, compaction should be done. + assertEquals(CompactionState.NONE, state); + } + int countAfter = countStoreFiles(regions, family); + assertTrue(countAfter < countBefore); + if (expectedState == CompactionState.MAJOR) assertTrue(1 == countAfter); + else assertTrue(1 < countAfter); + } finally { + if (ht != null) { + TEST_UTIL.deleteTable(table); + } + } + } + + private static int countStoreFiles( + List regions, final byte[] family) { + int count = 0; + for (HRegion region: regions) { + count += region.getStoreFileList(new byte[][]{family}).size(); + } + return count; + } + + private static void loadData(final HTable ht, final byte[] family, + final int rows, final int flushes) throws IOException { + List puts = new ArrayList(rows); + byte[] qualifier = Bytes.toBytes("val"); + for (int i = 0; i < flushes; i++) { + for (int k = 0; k < rows; k++) { + byte[] row = Bytes.toBytes(random.nextLong()); + Put p = new Put(row); + p.add(family, qualifier, row); + puts.add(p); + } + ht.put(puts); + ht.flushCommits(); + TEST_UTIL.flush(); + puts.clear(); + } + } + + @org.junit.Rule + public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu = + new org.apache.hadoop.hbase.ResourceCheckerJUnitRule(); +}