Index: src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java (revision 1234444) +++ src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java (working copy) @@ -42,7 +42,9 @@ import java.util.List; import java.util.Map; import java.util.NavigableMap; +import java.util.TreeMap; import java.util.UUID; +import java.util.Random; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -58,6 +60,7 @@ import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.HServerAddress; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; @@ -79,6 +82,7 @@ import org.apache.hadoop.hbase.filter.WhileMatchFilter; import org.apache.hadoop.hbase.io.hfile.BlockCache; import org.apache.hadoop.hbase.io.hfile.CacheConfig; +import org.apache.hadoop.hbase.ipc.HRegionInterface; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.Store; @@ -106,6 +110,7 @@ private static byte [] QUALIFIER = Bytes.toBytes("testQualifier"); private static byte [] VALUE = Bytes.toBytes("testValue"); private static int SLAVES = 3; + private static Random random = new Random(); /** * @throws java.lang.Exception @@ -2900,7 +2905,123 @@ assertNullResult(result); } + + private void columnFamilyTableCompaction(int op) throws Exception { + String tableName = "testCompactCFTable" + op; + byte [] TABLE = Bytes.toBytes(tableName); + HTable hTable = TEST_UTIL.createTable(TABLE, FAMILY, 10); + HBaseAdmin admin = new HBaseAdmin(TEST_UTIL.getConfiguration()); + HConnection connection = HConnectionManager.getConnection(TEST_UTIL.getConfiguration()); + // This should create multiple store files. + for (int i = 0; i < 3; i++) { + byte[] row = Bytes.toBytes(random.nextInt()); + if (op == 1) { + performMultiplePutAndFlush(admin, hTable, row, FAMILY, 9, 6); + } else { + performMultiplePutAndFlush(admin, hTable, row, FAMILY, 3, 2); + } + } + + // Check whether we have multiple store files. + Map map = hTable.getRegionsInfo(); + Map beforeCompact = new TreeMap(); + Iterator > it1 = map.entrySet().iterator(); + while (it1.hasNext()) { + Map.Entry mpe = it1.next(); + byte[] regionName = mpe.getKey().getRegionName(); + HRegionInterface server = connection.getHRegionConnection(mpe.getValue()); + int storeFiles = server.getStoreFileList(regionName, FAMILY).size(); + beforeCompact.put(mpe.getKey(), storeFiles); + assertTrue(storeFiles > 1 ); + } + + // Now perform compaction. 1 for major, 0 for simple. + if (op == 1) { + admin.majorCompact(TABLE , FAMILY); + } else { + admin.compact(TABLE, FAMILY); + } + Thread.sleep(10000); + + // The number of store files after compaction should be lesser. + it1 = map.entrySet().iterator(); + while (it1.hasNext()) { + Map.Entry mpe = it1.next(); + byte[] regionName = mpe.getKey().getRegionName(); + HRegionInterface server = connection.getHRegionConnection(mpe.getValue()); + int storeFilesAfter = server.getStoreFileList(regionName, FAMILY).size(); + int storeFilesBefore = beforeCompact.get(mpe.getKey()); + if (op == 1) { + assertEquals(1, storeFilesAfter); + } else { + assertTrue(storeFilesAfter < storeFilesBefore); + } + } + } + + private void performMultiplePutAndFlush(HBaseAdmin admin, HTable table, + byte[] row, byte[] family, int nFlushes, int nPuts) throws Exception { + for (int i = 0; i < nFlushes; i++) { + randomCFPuts(table, row, family, nPuts); + admin.flush(table.getTableName()); + Thread.sleep(2000); + } + } + + private void randomCFPuts(HTable table, byte[] row, byte[] family, int nPuts) + throws Exception { + Put put = new Put(row); + for(int i = 0; i < nPuts; i++) { + byte[] qualifier = Bytes.toBytes(random.nextInt()); + byte[] value = Bytes.toBytes(random.nextInt()); + put.add(family, qualifier, value); + } + table.put(put); + } + + private void columnFamilyRegionCompaction(int op) throws Exception { + String tableName = "testCompactCFWithinRegion" + op; + byte [] TABLE = Bytes.toBytes(tableName); + HTable hTable = TEST_UTIL.createTable(TABLE, FAMILY, 10); + HBaseAdmin admin = new HBaseAdmin(TEST_UTIL.getConfiguration()); + HConnection connection = HConnectionManager.getConnection(TEST_UTIL.getConfiguration()); + byte[] row = Bytes.toBytes("row2"); + HRegionLocation regionLocation = hTable.getRegionLocation(row); + HServerAddress address = regionLocation.getServerAddress(); + HRegionInterface server = connection.getHRegionConnection(address); + byte[] regionName = regionLocation.getRegionInfo().getRegionName(); + + // This should create multiple store files. + if (op == 1) { + performMultiplePutAndFlush(admin, hTable, row, FAMILY, 9, 6); + } else { + performMultiplePutAndFlush(admin, hTable, row, FAMILY, 3, 2); + } + + // Check whether we have multiple store files. + int beforeCompaction = server.getStoreFileList(regionName, FAMILY).size(); + System.out.println("Before compaction " + beforeCompaction); + assertTrue(beforeCompaction > 1 ); + + // Now perform compaction. 1 for major, 0 for simple. + if (op == 1) { + admin.majorCompact(regionName, FAMILY); + } else { + admin.compact(regionName, FAMILY); + } + Thread.sleep(10000); + + // The number of store files after compaction should be lesser. + int afterCompaction = server.getStoreFileList(regionName, FAMILY).size(); + System.out.println("After compaction " + beforeCompaction); + if (op ==1 ) { + assertEquals(1, afterCompaction); + } else { + assertTrue(afterCompaction < beforeCompaction); + } + } + // // Verifiers // @@ -4572,6 +4693,27 @@ assertTrue(addrAfter.getPort() != addrCache.getPort()); assertEquals(addrAfter.getPort(), addrNoCache.getPort()); } + + @Test + public void testColumnFamilyTableCompaction() throws Exception { + columnFamilyTableCompaction(0); + } + + @Test + public void testColumnFamilyTableMajorCompaction() throws Exception { + columnFamilyTableCompaction(1); + } + + @Test + public void testColumnFamilyRegionCompaction() throws Exception { + columnFamilyRegionCompaction(0); + } + + @Test + public void testColumnFamilyRegionMajorCompaction() throws Exception { + columnFamilyRegionCompaction(1); + } + @org.junit.Rule public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu = new org.apache.hadoop.hbase.ResourceCheckerJUnitRule(); Index: src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java (revision 1234444) +++ src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java (working copy) @@ -1205,10 +1205,38 @@ */ public void compact(final byte [] tableNameOrRegionName) throws IOException, InterruptedException { - compact(tableNameOrRegionName, false); + compact(tableNameOrRegionName, null, false); } + + /** + * Compact a column family within a table or region. + * Asynchronous operation. + * + * @param tableOrRegionName table or region to compact + * @param columnFamily column family within a table or region + * @throws IOException if a remote or network exception occurs + * @throws InterruptedException + */ + public void compact(String tableOrRegionName, String columnFamily) + throws IOException, InterruptedException { + compact(Bytes.toBytes(tableOrRegionName), Bytes.toBytes(columnFamily)); + } /** + * Compact a column family within a table or region. + * Asynchronous operation. + * + * @param tableNameOrRegionName table or region to compact + * @param columnFamily column family within a table or region + * @throws IOException if a remote or network exception occurs + * @throws InterruptedException + */ + public void compact(final byte [] tableNameOrRegionName, final byte[] columnFamily) + throws IOException, InterruptedException { + compact(tableNameOrRegionName, columnFamily, false); + } + + /** * Major compact a table or an individual region. * Asynchronous operation. * @@ -1231,19 +1259,70 @@ */ public void majorCompact(final byte [] tableNameOrRegionName) throws IOException, InterruptedException { - compact(tableNameOrRegionName, true); + compact(tableNameOrRegionName, null, true); } + + /** + * Major compact a column family within a table or region. + * Asynchronous operation. + * + * @param tableNameOrRegionName table or region to major compact + * @param columnFamily column family within a table or region + * @throws IOException if a remote or network exception occurs + * @throws InterruptedException + */ + public void majorCompact(final String tableNameOrRegionName, + final String columnFamily) throws IOException, InterruptedException { + majorCompact(Bytes.toBytes(tableNameOrRegionName), + Bytes.toBytes(columnFamily)); + } /** + * Major compact a column family within a table or region. + * Asynchronous operation. + * + * @param tableNameOrRegionName table or region to major compact + * @param columnFamily column family within a table or region + * @throws IOException if a remote or network exception occurs + * @throws InterruptedException + */ + public void majorCompact(final byte [] tableNameOrRegionName, + final byte[] columnFamily) throws IOException, InterruptedException { + compact(tableNameOrRegionName, columnFamily, true); + } + + /** + * Verify whether column family exist in table or not + * + * @param tableName table name + * @param columnFamily column family to verify + * @throws IOException if a remote or network exception occurs + */ + private void verifyColumnFamily(final byte[] tableName, final byte[] columnFamily) + throws IOException{ + try{ + if (!getTableDescriptor(tableName).hasFamily(columnFamily)) { + throw new IllegalArgumentException("Column Family " + columnFamily + + " does not exist in table " + new String(tableName)); + } + }catch(final TableNotFoundException tnfe){ + throw new IllegalArgumentException("Table " + + new String(tableName) + " does not exist"); + } + } + + /** * Compact a table or an individual region. * Asynchronous operation. * * @param tableNameOrRegionName table or region to compact + * @param columnFamily column family within a table or region * @param major True if we are to do a major compaction. * @throws IOException if a remote or network exception occurs * @throws InterruptedException */ - private void compact(final byte [] tableNameOrRegionName, final boolean major) + private void compact(final byte [] tableNameOrRegionName, + final byte[] columnFamily,final boolean major) throws IOException, InterruptedException { CatalogTracker ct = getCatalogTracker(); try { @@ -1254,6 +1333,10 @@ LOG.info("No server in .META. for " + Bytes.toStringBinary(tableNameOrRegionName) + "; pair=" + pair); } else { + if(columnFamily != null){ + byte[] tableName = HRegionInfo.parseRegionName(tableNameOrRegionName)[0]; + verifyColumnFamily(tableName, columnFamily); + } compact(pair.getSecond(), pair.getFirst(), major); } } else { @@ -1265,6 +1348,9 @@ if (pair.getFirst().isOffline()) continue; if (pair.getSecond() == null) continue; try { + if(columnFamily != null){ + verifyColumnFamily(Bytes.toBytes(tableName), columnFamily); + } compact(pair.getSecond(), pair.getFirst(), major); } catch (NotServingRegionException e) { if (LOG.isDebugEnabled()) { Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (revision 1234444) +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (working copy) @@ -2868,14 +2868,31 @@ @QosPriority(priority=HIGH_QOS) public void compactRegion(HRegionInfo regionInfo, boolean major) throws NotServingRegionException, IOException { + compactRegion(regionInfo, null, major); + } + + @Override + @QosPriority(priority=HIGH_QOS) + public void compactRegion(HRegionInfo regionInfo, byte[] columnFamily, boolean major) + throws NotServingRegionException, IOException { checkOpen(); HRegion region = getRegion(regionInfo.getRegionName()); - if (major) { - region.triggerMajorCompaction(); + if(major){ + if (columnFamily != null) { + region.getStore(columnFamily).triggerMajorCompaction(); + }else{ + region.triggerMajorCompaction(); + } } - compactSplitThread.requestCompaction(region, "User-triggered " - + (major ? "major " : "") + "compaction", - CompactSplitThread.PRIORITY_USER); + if(columnFamily != null){ + compactSplitThread.requestCompaction(region, region.getStore(columnFamily), + "User-triggered " + (major ? "major " : "") + "compaction", + CompactSplitThread.PRIORITY_USER); + }else{ + compactSplitThread.requestCompaction(region, "User-triggered " + + (major ? "major " : "") + "compaction", + CompactSplitThread.PRIORITY_USER); + } } /** @return the info server */ Index: src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java (revision 1234444) +++ src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java (working copy) @@ -530,6 +530,20 @@ */ void compactRegion(HRegionInfo regionInfo, boolean major) throws NotServingRegionException, IOException; + + /** + * Compacts a column-family within a specified region. + * Performs a major compaction if specified. + *

+ * This method is asynchronous. + * @param regionInfo region to compact + * @param columnFamily column family within a region to compact + * @param major true to force major compaction + * @throws NotServingRegionException + * @throws IOException + */ + void compactRegion(HRegionInfo regionInfo, byte[] columnFamily, boolean major) + throws NotServingRegionException, IOException; /** * Replicates the given entries. The guarantee is that the given entries Index: src/main/ruby/hbase/admin.rb =================================================================== --- src/main/ruby/hbase/admin.rb (revision 1234444) +++ src/main/ruby/hbase/admin.rb (working copy) @@ -50,15 +50,28 @@ end #---------------------------------------------------------------------------------------------- - # Requests a table or region compaction - def compact(table_or_region_name) - @admin.compact(table_or_region_name) + # Requests a table or region or column family compaction + def compact(table_or_region_name, *args) + if args.empty? + @admin.compact(table_or_region_name) + elsif args.length == 1 + # We are compacting a column family within a region. + region_name = table_or_region_name + column_family = args.first + @admin.compact(region_name, column_family) + end end #---------------------------------------------------------------------------------------------- - # Requests a table or region major compaction - def major_compact(table_or_region_name) - @admin.majorCompact(table_or_region_name) + # Requests a table or region or column family major compaction + def major_compact(table_or_region_name, *args) + if args.empty? + @admin.majorCompact(table_or_region_name) + elsif args.length == 1 + # We are major compacting a column family within a region or table. + column_family = args.first + @admin.majorCompact(table_or_region_name, column_family) + end end #---------------------------------------------------------------------------------------------- Index: src/main/ruby/shell/commands/compact.rb =================================================================== --- src/main/ruby/shell/commands/compact.rb (revision 1234444) +++ src/main/ruby/shell/commands/compact.rb (working copy) @@ -23,14 +23,24 @@ class Compact < Command def help return <<-EOF -Compact all regions in passed table or pass a region row -to compact an individual region -EOF + Compact all regions in passed table or pass a region row + to compact an individual region. You can also compact a single column + family within a region. + Examples: + Compact all regions in a table: + hbase> compact 't1' + Compact an entire region: + hbase> compact 'r1' + Compact only a column family within a region: + hbase> compact 'r1', 'c1' + Compact a column family within a table: + hbase> compact 't1', 'c1' + EOF end - def command(table_or_region_name) + def command(table_or_region_name, *args) format_simple_command do - admin.compact(table_or_region_name) + admin.compact(table_or_region_name, *args) end end end Index: src/main/ruby/shell/commands/major_compact.rb =================================================================== --- src/main/ruby/shell/commands/major_compact.rb (revision 1234444) +++ src/main/ruby/shell/commands/major_compact.rb (working copy) @@ -23,14 +23,25 @@ class MajorCompact < Command def help return <<-EOF -Run major compaction on passed table or pass a region row -to major compact an individual region -EOF + Run major compaction on passed table or pass a region row + to major compact an individual region. To compact a + column family within a region specify the region name + followed by the column family name. + Examples: + Compact all regions in a table: + hbase> major_compact 't1' + Compact an entire region: + hbase> major_compact 'r1' + Compact only a column family within a region: + hbase> major_compact 'r1', 'c1' + Compact a column family within a table: + hbase> major_compact 't1', 'c1' + EOF end - def command(table_or_region_name) + def command(table_or_region_name, *args) format_simple_command do - admin.major_compact(table_or_region_name) + admin.major_compact(table_or_region_name, *args) end end end