diff --git src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java index e874fcf..9f93d5c 100644 --- src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java +++ src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java @@ -53,6 +53,7 @@ import org.apache.hadoop.hbase.catalog.MetaReader; import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor; import org.apache.hadoop.hbase.ipc.HMasterInterface; import org.apache.hadoop.hbase.ipc.HRegionInterface; +import org.apache.hadoop.hbase.regionserver.CompactSplitThread.CompactionState; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Writables; @@ -1349,4 +1350,88 @@ public class HBaseAdmin implements Abortable, Closeable { this.connection.close(); } } + /** + * Get the current compaction state of a table or region. + * It could be in a major compaction, a minor compaction, both, or none. + * + * @param tableNameOrRegionName table or region to major compact + * @throws IOException if a remote or network exception occurs + * @throws InterruptedException + * @return the current compaction state + */ + public CompactionState getCompactionState(final String tableNameOrRegionName) + throws IOException, InterruptedException { + return getCompactionState(Bytes.toBytes(tableNameOrRegionName)); + } + + /** + * Get the current compaction state of a table or region. + * It could be in a major compaction, a minor compaction, both, or none. + * + * @param tableNameOrRegionName table or region to major compact + * @throws IOException if a remote or network exception occurs + * @throws InterruptedException + * @return the current compaction state + */ + public CompactionState getCompactionState(final byte [] tableNameOrRegionName) + throws IOException, InterruptedException { + CompactionState state = CompactionState.NONE; + CatalogTracker ct = getCatalogTracker(); + try { + if (isRegionName(tableNameOrRegionName, ct)) { + Pair pair = + MetaReader.getRegion(ct, tableNameOrRegionName); + if (pair == null || pair.getSecond() == null) { + LOG.info("No server in .META. for " + + Bytes.toStringBinary(tableNameOrRegionName) + "; pair=" + pair); + } else { + HRegionInterface rs = + this.connection.getHRegionConnection(pair.getSecond()); + return CompactionState.valueOf( + rs.getCompactionState(pair.getFirst().getRegionName())); + } + } else { + final String tableName = tableNameString(tableNameOrRegionName, ct); + List> pairs = + MetaReader.getTableRegionsAndLocations(ct, tableName); + for (Pair pair: pairs) { + if (pair.getFirst().isOffline()) continue; + if (pair.getSecond() == null) continue; + try { + HRegionInterface rs = + this.connection.getHRegionConnection(pair.getSecond()); + switch (CompactionState.valueOf( + rs.getCompactionState(pair.getFirst().getRegionName()))) { + case MAJOR_AND_MINOR: + return CompactionState.MAJOR_AND_MINOR; + case MAJOR: + if (state == CompactionState.MINOR) { + return CompactionState.MAJOR_AND_MINOR; + } + state = CompactionState.MAJOR; + break; + case MINOR: + if (state == CompactionState.MAJOR) { + return CompactionState.MAJOR_AND_MINOR; + } + state = CompactionState.MINOR; + break; + case NONE: + default: + // nothing, continue + } + } catch (NotServingRegionException e) { + if (LOG.isDebugEnabled()) { + LOG.debug("Trying to get compaction state of " + + pair.getFirst() + ": " + + StringUtils.stringifyException(e)); + } + } + } + } + } finally { + cleanupCatalogTracker(ct); + } + return state; + } } diff --git src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java index cbf4720..d8ccdb7 100644 --- src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java +++ src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java @@ -433,4 +433,13 @@ public interface HRegionInterface extends HBaseRPCProtocolVersion, Stoppable, Ab /* TODO: Move into place above master operations after deprecation cycle */ public boolean bulkLoadHFiles(List> familyPaths, byte[] regionName) throws IOException; + + /** + * Get the current compaction state of the region. + * + * @param regionName the name of the region to check compaction statte. + * @return the compaction state name. + * @throws IOException exception + */ + public String getCompactionState(final byte[] regionName) throws IOException; } diff --git src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java index b3cc46a..d1c2875 100644 --- src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java +++ src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantLock; @@ -49,6 +50,53 @@ public class CompactSplitThread extends Thread implements CompactionRequestor { public static final int PRIORITY_USER = 1; /** + * Map to track the state of compactions requested per region (id) + */ + private static final ConcurrentHashMap + compactionStates = new ConcurrentHashMap(); + + /** + * Find out if a given region is in compaction now. + * This information is not accurate in case one request is + * being processed while new request comes. So it is just + * an indication for normal scenario. + * + * @param regionId + * @return + */ + public static CompactionState getCompactionState( + final long regionId) { + Long key = Long.valueOf(regionId); + CompactionState state = compactionStates.get(key); + if (state == null) { + state = CompactionState.NONE; + } + return state; + } + + public static void preRequest(final HRegion region){ + Long key = Long.valueOf(region.getRegionId()); + CompactionState state = + region.getForceMajorCompaction() ? CompactionState.MAJOR : CompactionState.MINOR; + compactionStates.put(key, state); + } + + public static void postRequest(final HRegion region){ + Long key = Long.valueOf(region.getRegionId()); + compactionStates.remove(key); + } + + /** + * An enum for the region compaction state + */ + public static enum CompactionState { + NONE, + MINOR, + MAJOR, + MAJOR_AND_MINOR; + } + + /** * Splitting should not take place if the total number of regions exceed this. * This is not a hard limit to the number of regions but it is a guideline to * stop splitting after number of online regions is greater than this. @@ -89,6 +137,7 @@ public class CompactSplitThread extends Thread implements CompactionRequestor { } } finally { lock.unlock(); + CompactSplitThread.postRequest(r); } } } catch (InterruptedException ex) { diff --git src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index a256b3c..e505694 100644 --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -2045,6 +2045,32 @@ public class HRegion implements HeapSize { // , Writable{ return this.stores.get(column); } + /** + * Return list of storeFiles for the set of CFs. + * Uses closeLock to prevent the race condition where a region closes + * in between the for loop - closing the stores one by one, some stores + * will return 0 files. + * @return List of storeFiles. + */ + public List getStoreFileList(final byte [][] columns) + throws IllegalArgumentException { + List storeFileNames = new ArrayList(); + synchronized(closeLock) { + for(byte[] column : columns) { + Store store = this.stores.get(column); + if (store == null) { + throw new IllegalArgumentException("No column family : " + + new String(column) + " available"); + } + List storeFiles = store.getStorefiles(); + for (StoreFile storeFile: storeFiles) { + storeFileNames.add(storeFile.getPath().toString()); + } + } + } + return storeFileNames; + } + ////////////////////////////////////////////////////////////////////////////// // Support code ////////////////////////////////////////////////////////////////////////////// diff --git src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 567ca79..199638e 100644 --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -2798,4 +2798,42 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, new HRegionServerCommandLine(regionServerClass).doMain(args); } + + /** + * Gets the online regions of the specified table. + * This method looks at the in-memory onlineRegions. It does not go to .META.. + * Only returns online regions. If a region on this table has been + * closed during a disable, etc., it will not be included in the returned list. + * So, the returned list may not necessarily be ALL regions in this table, its + * all the ONLINE regions in the table. + * @param tableName + * @return Online regions from tableName + */ + public List getOnlineRegions(byte[] tableName) { + List tableRegions = new ArrayList(); + synchronized (this.onlineRegions) { + for (HRegion region: this.onlineRegions.values()) { + HRegionInfo regionInfo = region.getRegionInfo(); + if(Bytes.equals(regionInfo.getTableName(), tableName)) { + tableRegions.add(region); + } + } + } + return tableRegions; + } + + /** + * Get the current compaction state of the region. + * + * @param regionName the name of the region to check compaction statte. + * @return the compaction state name. + * @throws IOException exception + */ + public String getCompactionState(final byte[] regionName) throws IOException { + checkOpen(); + requestCount.incrementAndGet(); + HRegion region = getRegion(regionName); + HRegionInfo info = region.getRegionInfo(); + return CompactSplitThread.getCompactionState(info.getRegionId()).name(); + } } diff --git src/main/java/org/apache/hadoop/hbase/regionserver/PriorityCompactionQueue.java src/main/java/org/apache/hadoop/hbase/regionserver/PriorityCompactionQueue.java index a3e41c3..6c2b8d2 100644 --- src/main/java/org/apache/hadoop/hbase/regionserver/PriorityCompactionQueue.java +++ src/main/java/org/apache/hadoop/hbase/regionserver/PriorityCompactionQueue.java @@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.regionserver; import java.util.Collection; -import java.util.Date; import java.util.HashMap; import java.util.Iterator; import java.util.concurrent.BlockingQueue; @@ -44,7 +43,7 @@ public class PriorityCompactionQueue implements BlockingQueue { * This class represents a compaction request and holds the region, priority, * and time submitted. */ - private class CompactionRequest implements Comparable { + static class CompactionRequest implements Comparable { private final HRegion r; private final int p; private final Long timeInNanos; @@ -144,6 +143,7 @@ public class PriorityCompactionQueue implements BlockingQueue { newRequest.getPriority() < queuedRequest.getPriority()) { LOG.trace("Inserting region in queue. " + newRequest); regionsInQueue.put(r, newRequest); + CompactSplitThread.preRequest(r); } else { LOG.trace("Region already in queue, skipping. Queued: " + queuedRequest + ", requested: " + newRequest); @@ -187,8 +187,7 @@ public class PriorityCompactionQueue implements BlockingQueue { public boolean add(HRegion e, int p) { CompactionRequest request = this.addToRegionsInQueue(e, p); if (request != null) { - boolean result = queue.add(request); - return result; + return queue.add(request); } else { return false; } diff --git src/main/resources/hbase-webapps/master/table.jsp src/main/resources/hbase-webapps/master/table.jsp index f7edce6..0f3665a 100644 --- src/main/resources/hbase-webapps/master/table.jsp +++ src/main/resources/hbase-webapps/master/table.jsp @@ -146,6 +146,11 @@ <%= hbadmin.isTableEnabled(table.getTableName()) %> Is the table enabled + + Compaction + <%= hbadmin.getCompactionState(table.getTableName()) %> + Is the table compacting + <% if (showFragmentation) { %> Fragmentation diff --git src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionState.java src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionState.java new file mode 100644 index 0000000..2e89d12 --- /dev/null +++ src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionState.java @@ -0,0 +1,162 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.regionserver; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.regionserver.CompactSplitThread.CompactionState; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +/** Unit tests to test retrieving table/region compaction state*/ +public class TestCompactionState { + final static Log LOG = LogFactory.getLog(TestCompactionState.class); + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private final static Random random = new Random(); + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TEST_UTIL.startMiniCluster(); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + @Test(timeout=60000) + public void testMajorCompaction() throws IOException, InterruptedException { + compaction("testMajorCompaction", 8, CompactionState.MAJOR); + } + + @Test(timeout=60000) + public void testMinorCompaction() throws IOException, InterruptedException { + compaction("testMinorCompaction", 15, CompactionState.MINOR); + } + + /** + * Load data to a table, flush it to disk, trigger compaction, + * confirm the compaction state is right and wait till it is done. + * + * @param tableName + * @param flushes + * @param expectedState + * @throws IOException + * @throws InterruptedException + */ + private void compaction(final String tableName, final int flushes, + final CompactionState expectedState) throws IOException, InterruptedException { + // Create a table with regions + byte [] table = Bytes.toBytes(tableName); + byte [] family = Bytes.toBytes("family"); + HTable ht = null; + try { + ht = TEST_UTIL.createTable(table, family); + loadData(ht, family, 3000, flushes); + HRegionServer rs = TEST_UTIL.getMiniHBaseCluster().getRegionServer(0); + List regions = rs.getOnlineRegions(table); + int countBefore = countStoreFiles(regions, family); + assertTrue(countBefore > 0); // there should be some data files + HBaseAdmin admin = new HBaseAdmin(TEST_UTIL.getConfiguration()); + if (expectedState == CompactionState.MINOR) { + admin.compact(tableName); + } else { + admin.majorCompact(table); + } + long curt = System.currentTimeMillis(); + long waitTime = 5000; + long endt = curt + waitTime; + CompactionState state = admin.getCompactionState(table); + while (state == CompactionState.NONE && curt < endt) { + Thread.sleep(10); + state = admin.getCompactionState(table); + curt = System.currentTimeMillis(); + } + // Now, should have the right compaction state, + // otherwise, the compaction should have already been done + if (expectedState != state) { + for (HRegion region: regions) { + state = CompactSplitThread.getCompactionState(region.getRegionId()); + assertEquals(CompactionState.NONE, state); + } + } else { + curt = System.currentTimeMillis(); + waitTime = 20000; + endt = curt + waitTime; + state = admin.getCompactionState(table); + while (state != CompactionState.NONE && curt < endt) { + Thread.sleep(10); + state = admin.getCompactionState(table); + curt = System.currentTimeMillis(); + } + // Now, compaction should be done. + assertEquals(CompactionState.NONE, state); + } + int countAfter = countStoreFiles(regions, family); + assertTrue(countAfter < countBefore); + if (expectedState == CompactionState.MAJOR) assertTrue(1 == countAfter); + else assertTrue(1 < countAfter); + } finally { + if (ht != null) { + TEST_UTIL.deleteTable(table); + } + } + } + + private static int countStoreFiles( + List regions, final byte[] family) { + int count = 0; + for (HRegion region: regions) { + count += region.getStoreFileList(new byte[][]{family}).size(); + } + return count; + } + + private static void loadData(final HTable ht, final byte[] family, + final int rows, final int flushes) throws IOException { + List puts = new ArrayList(rows); + byte[] qualifier = Bytes.toBytes("val"); + for (int i = 0; i < flushes; i++) { + for (int k = 0; k < rows; k++) { + byte[] row = Bytes.toBytes(random.nextLong()); + Put p = new Put(row); + p.add(family, qualifier, row); + puts.add(p); + } + ht.put(puts); + ht.flushCommits(); + TEST_UTIL.flush(); + puts.clear(); + } + } +}