diff --git src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java index 02bcb96..09a8e82 100644 --- src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java +++ src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java @@ -56,6 +56,7 @@ import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor; import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitorBase; import org.apache.hadoop.hbase.ipc.HMasterInterface; import org.apache.hadoop.hbase.ipc.HRegionInterface; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest.CompactionState; import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException; import org.apache.hadoop.hbase.util.Addressing; import org.apache.hadoop.hbase.util.Bytes; @@ -1630,4 +1631,91 @@ public class HBaseAdmin implements Abortable, Closeable { return null; } } + + /** + * Get the current compaction state of a table or region. + * It could be in a major compaction, a minor compaction, both, or none. + * + * @param tableNameOrRegionName table or region to major compact + * @throws IOException if a remote or network exception occurs + * @throws InterruptedException + * @return the current compaction state + */ + public CompactionState getCompactionState(final String tableNameOrRegionName) + throws IOException, InterruptedException { + return getCompactionState(Bytes.toBytes(tableNameOrRegionName)); + } + + /** + * Get the current compaction state of a table or region. + * It could be in a major compaction, a minor compaction, both, or none. + * + * @param tableNameOrRegionName table or region to major compact + * @throws IOException if a remote or network exception occurs + * @throws InterruptedException + * @return the current compaction state + */ + public CompactionState getCompactionState(final byte [] tableNameOrRegionName) + throws IOException, InterruptedException { + CompactionState state = CompactionState.NONE; + CatalogTracker ct = getCatalogTracker(); + try { + if (isRegionName(tableNameOrRegionName, ct)) { + Pair pair = + MetaReader.getRegion(ct, tableNameOrRegionName); + if (pair == null || pair.getSecond() == null) { + LOG.info("No server in .META. for " + + Bytes.toStringBinary(tableNameOrRegionName) + "; pair=" + pair); + } else { + ServerName sn = pair.getSecond(); + HRegionInterface rs = + this.connection.getHRegionConnection(sn.getHostname(), sn.getPort()); + return CompactionState.valueOf( + rs.getCompactionState(pair.getFirst().getRegionName())); + } + } else { + final String tableName = tableNameString(tableNameOrRegionName, ct); + List> pairs = + MetaReader.getTableRegionsAndLocations(ct, tableName); + for (Pair pair: pairs) { + if (pair.getFirst().isOffline()) continue; + if (pair.getSecond() == null) continue; + try { + ServerName sn = pair.getSecond(); + HRegionInterface rs = + this.connection.getHRegionConnection(sn.getHostname(), sn.getPort()); + switch (CompactionState.valueOf( + rs.getCompactionState(pair.getFirst().getRegionName()))) { + case MAJOR_AND_MINOR: + return CompactionState.MAJOR_AND_MINOR; + case MAJOR: + if (state == CompactionState.MINOR) { + return CompactionState.MAJOR_AND_MINOR; + } + state = CompactionState.MAJOR; + break; + case MINOR: + if (state == CompactionState.MAJOR) { + return CompactionState.MAJOR_AND_MINOR; + } + state = CompactionState.MINOR; + break; + case NONE: + default: + // nothing, continue + } + } catch (NotServingRegionException e) { + if (LOG.isDebugEnabled()) { + LOG.debug("Trying to get compaction state of " + + pair.getFirst() + ": " + + StringUtils.stringifyException(e)); + } + } + } + } + } finally { + cleanupCatalogTracker(ct); + } + return state; + } } diff --git src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java index 47e8cf5..2797804 100644 --- src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java +++ src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java @@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.filter.WritableByteArrayComparable; import org.apache.hadoop.hbase.io.hfile.BlockCacheColumnFamilySummary; import org.apache.hadoop.hbase.regionserver.RegionOpeningState; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest.CompactionState; import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException; import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.security.TokenInfo; @@ -555,6 +556,15 @@ public interface HRegionInterface extends VersionedProtocol, Stoppable, Abortabl */ public byte[][] rollHLogWriter() throws IOException, FailedLogCloseException; + /** + * Get the current compaction state of the region. + * + * @param regionName the name of the region to check compaction statte. + * @return the compaction state name. + * @throws IOException exception + */ + public String getCompactionState(final byte[] regionName) throws IOException; + @Override public void stop(String why); } diff --git src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 82c30c7..ee3460a 100644 --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -2570,6 +2570,32 @@ public class HRegion implements HeapSize { // , Writable{ return this.stores; } + /** + * Return list of storeFiles for the set of CFs. + * Uses closeLock to prevent the race condition where a region closes + * in between the for loop - closing the stores one by one, some stores + * will return 0 files. + * @return List of storeFiles. + */ + public List getStoreFileList(final byte [][] columns) + throws IllegalArgumentException { + List storeFileNames = new ArrayList(); + synchronized(closeLock) { + for(byte[] column : columns) { + Store store = this.stores.get(column); + if (store == null) { + throw new IllegalArgumentException("No column family : " + + new String(column) + " available"); + } + List storeFiles = store.getStorefiles(); + for (StoreFile storeFile: storeFiles) { + storeFileNames.add(storeFile.getPath().toString()); + } + } + } + return storeFileNames; + } + ////////////////////////////////////////////////////////////////////////////// // Support code ////////////////////////////////////////////////////////////////////////////// diff --git src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index b176849..dc11099 100644 --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -116,6 +116,7 @@ import org.apache.hadoop.hbase.ipc.RpcServer; import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; import org.apache.hadoop.hbase.regionserver.Leases.LeaseStillHeldException; import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; import org.apache.hadoop.hbase.regionserver.handler.CloseMetaHandler; import org.apache.hadoop.hbase.regionserver.handler.CloseRegionHandler; import org.apache.hadoop.hbase.regionserver.handler.CloseRootHandler; @@ -3334,4 +3335,18 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, LOG.info("Registered RegionServer MXBean"); } + /** + * Get the current compaction state of the region. + * + * @param regionName the name of the region to check compaction statte. + * @return the compaction state name. + * @throws IOException exception + */ + public String getCompactionState(final byte[] regionName) throws IOException { + checkOpen(); + requestCount.incrementAndGet(); + HRegion region = getRegion(regionName); + HRegionInfo info = region.getRegionInfo(); + return CompactionRequest.getCompactionState(info.getRegionId()).name(); + } } diff --git src/main/java/org/apache/hadoop/hbase/regionserver/Store.java src/main/java/org/apache/hadoop/hbase/regionserver/Store.java index 56f61fe..f8312f8 100644 --- src/main/java/org/apache/hadoop/hbase/regionserver/Store.java +++ src/main/java/org/apache/hadoop/hbase/regionserver/Store.java @@ -1052,10 +1052,14 @@ public class Store implements HeapSize { } finally { this.lock.readLock().unlock(); } + if (ret != null) { + CompactionRequest.preRequest(ret); + } return ret; } public void finishRequest(CompactionRequest cr) { + CompactionRequest.postRequest(cr); synchronized (filesCompacting) { filesCompacting.removeAll(cr.getFiles()); } diff --git src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java index c719f5f..95afba1 100644 --- src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java +++ src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java @@ -21,8 +21,10 @@ package org.apache.hadoop.hbase.regionserver.compactions; import java.io.IOException; import java.util.List; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -55,6 +57,14 @@ public class CompactionRequest implements Comparable, private final Long timeInNanos; private HRegionServer server = null; + /** + * Map to track the number of compaction requested per region (id) + */ + private static final ConcurrentHashMap + majorCompactions = new ConcurrentHashMap(); + private static final ConcurrentHashMap + minorCompactions = new ConcurrentHashMap(); + public CompactionRequest(HRegion r, Store s, List files, boolean isMajor, int p) { Preconditions.checkNotNull(r); @@ -74,6 +84,58 @@ public class CompactionRequest implements Comparable, } /** + * Find out if a given region in compaction now. + * + * @param regionId + * @return + */ + public static CompactionState getCompactionState( + final long regionId) { + Long key = Long.valueOf(regionId); + AtomicInteger major = majorCompactions.get(key); + AtomicInteger minor = minorCompactions.get(key); + int state = 0; + if (minor != null && minor.get() > 0) { + state += 1; // use 1 indication minor only here + } + if (major != null && major.get() > 0) { + state += 2; // use 2 indication major only here + } + switch (state) { + case 3: // 3 = 2 + 1, so both major and minor + return CompactionState.MAJOR_AND_MINOR; + case 2: + return CompactionState.MAJOR; + case 1: + return CompactionState.MINOR; + default: + return CompactionState.NONE; + } + } + + public static void preRequest(final CompactionRequest cr){ + Long key = Long.valueOf(cr.getHRegion().getRegionId()); + ConcurrentHashMap compactions = + cr.isMajor() ? majorCompactions : minorCompactions; + AtomicInteger count = compactions.get(key); + if (count == null) { + compactions.putIfAbsent(key, new AtomicInteger(0)); + count = compactions.get(key); + } + count.incrementAndGet(); + } + + public static void postRequest(final CompactionRequest cr){ + Long key = Long.valueOf(cr.getHRegion().getRegionId()); + ConcurrentHashMap compactions = + cr.isMajor() ? majorCompactions : minorCompactions; + AtomicInteger count = compactions.get(key); + if (count != null) { + count.decrementAndGet(); + } + } + + /** * This function will define where in the priority queue the request will * end up. Those with the highest priorities will be first. When the * priorities are the same it will first compare priority then date @@ -203,6 +265,16 @@ public class CompactionRequest implements Comparable, } /** + * An enum for the region compaction state + */ + public static enum CompactionState { + NONE, + MINOR, + MAJOR, + MAJOR_AND_MINOR; + } + + /** * Cleanup class to use when rejecting a compaction request from the queue. */ public static class Rejection implements RejectedExecutionHandler { diff --git src/main/resources/hbase-webapps/master/table.jsp src/main/resources/hbase-webapps/master/table.jsp index 811df46..b7b0f33 100644 --- src/main/resources/hbase-webapps/master/table.jsp +++ src/main/resources/hbase-webapps/master/table.jsp @@ -154,6 +154,11 @@ <%= hbadmin.isTableEnabled(table.getTableName()) %> Is the table enabled + + Compaction + <%= hbadmin.getCompactionState(table.getTableName()) %> + Is the table compacting + <% if (showFragmentation) { %> Fragmentation diff --git src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionState.java src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionState.java new file mode 100644 index 0000000..fc23056 --- /dev/null +++ src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionState.java @@ -0,0 +1,163 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.regionserver; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest.CompactionState; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +/** Unit tests to test retrieving table/region compaction state*/ +public class TestCompactionState { + final static Log LOG = LogFactory.getLog(TestCompactionState.class); + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private final static Random random = new Random(); + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TEST_UTIL.startMiniCluster(); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + @Test(timeout=60000) + public void testMajorCompaction() throws IOException, InterruptedException { + compaction("testMajorCompaction", 8, CompactionState.MAJOR); + } + + @Test(timeout=60000) + public void testMinorCompaction() throws IOException, InterruptedException { + compaction("testMinorCompaction", 15, CompactionState.MINOR); + } + + /** + * Load data to a table, flush it to disk, trigger compaction, + * confirm the compaction state is right and wait till it is done. + * + * @param tableName + * @param flushes + * @param expectedState + * @throws IOException + * @throws InterruptedException + */ + private void compaction(final String tableName, final int flushes, + final CompactionState expectedState) throws IOException, InterruptedException { + // Create a table with regions + byte [] table = Bytes.toBytes(tableName); + byte [] family = Bytes.toBytes("family"); + HTable ht = null; + try { + ht = TEST_UTIL.createTable(table, family); + loadData(ht, family, 3000, flushes); + HRegionServer rs = TEST_UTIL.getMiniHBaseCluster().getRegionServer(0); + List regions = rs.getOnlineRegions(table); + int countBefore = countStoreFiles(regions, family); + assertTrue(countBefore > 0); // there should be some data files + HBaseAdmin admin = new HBaseAdmin(TEST_UTIL.getConfiguration()); + if (expectedState == CompactionState.MINOR) { + admin.compact(tableName); + } else { + admin.majorCompact(table); + } + long curt = System.currentTimeMillis(); + long waitTime = 5000; + long endt = curt + waitTime; + CompactionState state = admin.getCompactionState(table); + while (state == CompactionState.NONE && curt < endt) { + Thread.sleep(10); + state = admin.getCompactionState(table); + curt = System.currentTimeMillis(); + } + // Now, should have the right compaction state, + // otherwise, the compaction should have already been done + if (expectedState != state) { + for (HRegion region: regions) { + state = CompactionRequest.getCompactionState(region.getRegionId()); + assertEquals(CompactionState.NONE, state); + } + } else { + curt = System.currentTimeMillis(); + waitTime = 20000; + endt = curt + waitTime; + state = admin.getCompactionState(table); + while (state != CompactionState.NONE && curt < endt) { + Thread.sleep(10); + state = admin.getCompactionState(table); + curt = System.currentTimeMillis(); + } + // Now, compaction should be done. + assertEquals(CompactionState.NONE, state); + } + int countAfter = countStoreFiles(regions, family); + assertTrue(countAfter < countBefore); + if (expectedState == CompactionState.MAJOR) assertTrue(1 == countAfter); + else assertTrue(1 < countAfter); + } finally { + if (ht != null) { + TEST_UTIL.deleteTable(table); + } + } + } + + private static int countStoreFiles( + List regions, final byte[] family) { + int count = 0; + for (HRegion region: regions) { + count += region.getStoreFileList(new byte[][]{family}).size(); + } + return count; + } + + private static void loadData(final HTable ht, final byte[] family, + final int rows, final int flushes) throws IOException { + List puts = new ArrayList(rows); + byte[] qualifier = Bytes.toBytes("val"); + for (int i = 0; i < flushes; i++) { + for (int k = 0; k < rows; k++) { + byte[] row = Bytes.toBytes(random.nextLong()); + Put p = new Put(row); + p.add(family, qualifier, row); + puts.add(p); + } + ht.put(puts); + ht.flushCommits(); + TEST_UTIL.flush(); + puts.clear(); + } + } +}