Index: src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java (revision 1234221) +++ src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java (working copy) @@ -43,6 +43,8 @@ import java.util.Map; import java.util.NavigableMap; import java.util.UUID; +import java.util.TreeMap; +import java.util.Random; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -57,7 +59,9 @@ import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.ipc.HRegionInterface; import org.apache.hadoop.hbase.HServerAddress; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; @@ -106,6 +110,7 @@ private static byte [] QUALIFIER = Bytes.toBytes("testQualifier"); private static byte [] VALUE = Bytes.toBytes("testValue"); private static int SLAVES = 3; + private static Random random = new Random(); /** * @throws java.lang.Exception @@ -4521,6 +4526,122 @@ assertEquals(count, store.getNumberOfstorefiles()); } + private void columnFamilyTableCompaction(int op) throws Exception { + String tableName = "testCompactCFTable" + op; + byte [] TABLE = Bytes.toBytes(tableName); + HTable hTable = TEST_UTIL.createTable(TABLE, FAMILY, 10); + HBaseAdmin admin = new HBaseAdmin(TEST_UTIL.getConfiguration()); + HConnection connection = HConnectionManager.getConnection(TEST_UTIL.getConfiguration()); + + // This should create multiple store files. + for (int i = 0; i < 3; i++) { + byte[] row = Bytes.toBytes(random.nextInt()); + if (op == 1) { + performMultiplePutAndFlush(admin, hTable, row, FAMILY, 9, 6); + } else { + performMultiplePutAndFlush(admin, hTable, row, FAMILY, 3, 2); + } + } + + // Check whether we have multiple store files. + Map map = hTable.getRegionsInfo(); + Map beforeCompact = new TreeMap(); + Iterator > it1 = map.entrySet().iterator(); + while (it1.hasNext()) { + Map.Entry mpe = it1.next(); + byte[] regionName = mpe.getKey().getRegionName(); + HRegionInterface server = connection.getHRegionConnection(mpe.getValue()); + int storeFiles = server.getStoreFileList(regionName, FAMILY).size(); + beforeCompact.put(mpe.getKey(), storeFiles); + assertTrue(storeFiles > 1 ); + } + + // Now perform compaction. 1 for major, 0 for simple. + if (op == 1) { + admin.majorCompact(TABLE , FAMILY); + } else { + admin.compact(TABLE, FAMILY); + } + Thread.sleep(10000); + + // The number of store files after compaction should be lesser. + it1 = map.entrySet().iterator(); + while (it1.hasNext()) { + Map.Entry mpe = it1.next(); + byte[] regionName = mpe.getKey().getRegionName(); + HRegionInterface server = connection.getHRegionConnection(mpe.getValue()); + int storeFilesAfter = server.getStoreFileList(regionName, FAMILY).size(); + int storeFilesBefore = beforeCompact.get(mpe.getKey()); + if (op == 1) { + assertEquals(1, storeFilesAfter); + } else { + assertTrue(storeFilesAfter < storeFilesBefore); + } + } + } + + private void performMultiplePutAndFlush(HBaseAdmin admin, HTable table, + byte[] row, byte[] family, int nFlushes, int nPuts) throws Exception { + for (int i = 0; i < nFlushes; i++) { + randomCFPuts(table, row, family, nPuts); + admin.flush(table.getTableName()); + Thread.sleep(2000); + } + } + + private void randomCFPuts(HTable table, byte[] row, byte[] family, int nPuts) + throws Exception { + Put put = new Put(row); + for(int i = 0; i < nPuts; i++) { + byte[] qualifier = Bytes.toBytes(random.nextInt()); + byte[] value = Bytes.toBytes(random.nextInt()); + put.add(family, qualifier, value); + } + table.put(put); + } + + private void columnFamilyRegionCompaction(int op) throws Exception { + String tableName = "testCompactCFWithinRegion" + op; + byte [] TABLE = Bytes.toBytes(tableName); + HTable hTable = TEST_UTIL.createTable(TABLE, FAMILY, 10); + HBaseAdmin admin = new HBaseAdmin(TEST_UTIL.getConfiguration()); + HConnection connection = HConnectionManager.getConnection(TEST_UTIL.getConfiguration()); + byte[] row = Bytes.toBytes("row2"); + HRegionLocation regionLocation = hTable.getRegionLocation(row); + HServerAddress address = regionLocation.getServerAddress(); + HRegionInterface server = connection.getHRegionConnection(address); + byte[] regionName = regionLocation.getRegionInfo().getRegionName(); + + // This should create multiple store files. + if (op == 1) { + performMultiplePutAndFlush(admin, hTable, row, FAMILY, 9, 6); + } else { + performMultiplePutAndFlush(admin, hTable, row, FAMILY, 3, 2); + } + + // Check whether we have multiple store files. + int beforeCompaction = server.getStoreFileList(regionName, FAMILY).size(); + System.out.println("Before compaction " + beforeCompaction); + assertTrue(beforeCompaction > 1 ); + + // Now perform compaction. 1 for major, 0 for simple. + if (op == 1) { + admin.majorCompact(regionName, FAMILY); + } else { + admin.compact(regionName, FAMILY); + } + Thread.sleep(10000); + + // The number of store files after compaction should be lesser. + int afterCompaction = server.getStoreFileList(regionName, FAMILY).size(); + System.out.println("After compaction " + beforeCompaction); + if (op ==1 ) { + assertEquals(1, afterCompaction); + } else { + assertTrue(afterCompaction < beforeCompaction); + } + } + @Test /** * Tests the non cached version of getRegionLocation by moving a region. @@ -4571,7 +4692,28 @@ assertNotNull(addrAfter); assertTrue(addrAfter.getPort() != addrCache.getPort()); assertEquals(addrAfter.getPort(), addrNoCache.getPort()); - } + } + + @Test + public void testColumnFamilyTableCompaction() throws Exception { + columnFamilyTableCompaction(0); + } + + @Test + public void testColumnFamilyTableMajorCompaction() throws Exception { + columnFamilyTableCompaction(1); + } + + @Test + public void testColumnFamilyRegionCompaction() throws Exception { + columnFamilyRegionCompaction(0); + } + + @Test + public void testColumnFamilyRegionMajorCompaction() throws Exception { + columnFamilyRegionCompaction(1); + } + @org.junit.Rule public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu = new org.apache.hadoop.hbase.ResourceCheckerJUnitRule(); Index: src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java (revision 1233583) +++ src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java (working copy) @@ -1194,7 +1194,7 @@ throws IOException, InterruptedException { compact(Bytes.toBytes(tableNameOrRegionName)); } - + /** * Compact a table or an individual region. * Asynchronous operation. @@ -1203,12 +1203,40 @@ * @throws IOException if a remote or network exception occurs * @throws InterruptedException */ - public void compact(final byte [] tableNameOrRegionName) + public void compact(final byte[] tableNameOrRegionName) throws IOException, InterruptedException { - compact(tableNameOrRegionName, false); + compact(tableNameOrRegionName, null, false); } + + /** + * Compact a column family within a table or within an individual region. + * Asynchronous operation. + * + * @param tableOrRegionName table or region to compact + * @param columnFamily column family within a table or within a region + * @throws IOException if a remote or network exception occurs + * @throws InterruptedException + */ + public void compact(String tableOrRegionName, String columnFamily) + throws IOException, InterruptedException { + compact(Bytes.toBytes(tableOrRegionName), Bytes.toBytes(columnFamily)); + } /** + * Compact a column family within a table or an individual region. + * Asynchronous operation. + * + * @param tableNameOrRegionName table or region to compact + * @param columnFamily column family within a table or within a region + * @throws IOException if a remote or network exception occurs + * @throws InterruptedException + */ + public void compact(final byte [] tableNameOrRegionName, final byte[] columnFamily) + throws IOException, InterruptedException { + compact(tableNameOrRegionName, columnFamily, false); + } + + /** * Major compact a table or an individual region. * Asynchronous operation. * @@ -1231,19 +1259,71 @@ */ public void majorCompact(final byte [] tableNameOrRegionName) throws IOException, InterruptedException { - compact(tableNameOrRegionName, true); + compact(tableNameOrRegionName, null, true); } + + /** + * Major compact a column family within a table or within an individual region. + * Asynchronous operation. + * + * @param tableNameOrRegionName table or region to major compact + * @param columnFamily column family within a table or within a region + * @throws IOException if a remote or network exception occurs + * @throws InterruptedException + */ + public void majorCompact(final String tableNameOrRegionName, + final String columnFamily) throws IOException, InterruptedException { + majorCompact(Bytes.toBytes(tableNameOrRegionName), + Bytes.toBytes(columnFamily)); + } /** - * Compact a table or an individual region. + * Major compact a column family within a table or within an individual region. * Asynchronous operation. * + * @param tableNameOrRegionName table or region to major compact + * @param columnFamily column family within a table or within a region + * @throws IOException if a remote or network exception occurs + * @throws InterruptedException + */ + public void majorCompact(final byte [] tableNameOrRegionName, + final byte[] columnFamily) throws IOException, InterruptedException { + compact(tableNameOrRegionName, columnFamily, true); + } + + /** + * Verify whether column family exist in table or not + * + * @param tableName table name + * @param columnFamily column family to verify + * @throws IOException if a remote or network exception occurs + */ + private void verifyColumnFamily(final byte[] tableName, final byte[] columnFamily) + throws IOException{ + try{ + if (!getTableDescriptor(tableName).hasFamily(columnFamily)) { + throw new IllegalArgumentException("Column Family " + columnFamily + + " does not exist in table " + new String(tableName)); + } + }catch(final TableNotFoundException tnfe){ + throw new IllegalArgumentException("Table " + + new String(tableName) + " does not exist"); + } + } + + /** + * Compact a table or an individual region. Compact a column family + * within a table or within a region if valid column family is specified. + * Asynchronous operation. + * * @param tableNameOrRegionName table or region to compact + * @param columnFamily column family within a table or within a region * @param major True if we are to do a major compaction. * @throws IOException if a remote or network exception occurs * @throws InterruptedException */ - private void compact(final byte [] tableNameOrRegionName, final boolean major) + private void compact(final byte [] tableNameOrRegionName, + final byte[] columnFamily, final boolean major) throws IOException, InterruptedException { CatalogTracker ct = getCatalogTracker(); try { @@ -1254,7 +1334,11 @@ LOG.info("No server in .META. for " + Bytes.toStringBinary(tableNameOrRegionName) + "; pair=" + pair); } else { - compact(pair.getSecond(), pair.getFirst(), major); + if(columnFamily != null){ + byte[] tableName = HRegionInfo.parseRegionName(tableNameOrRegionName)[0]; + verifyColumnFamily(tableName, columnFamily); + } + compact(pair.getSecond(), pair.getFirst(), columnFamily, major); } } else { final String tableName = tableNameString(tableNameOrRegionName, ct); @@ -1265,7 +1349,10 @@ if (pair.getFirst().isOffline()) continue; if (pair.getSecond() == null) continue; try { - compact(pair.getSecond(), pair.getFirst(), major); + if(columnFamily != null){ + verifyColumnFamily(Bytes.toBytes(tableName), columnFamily); + } + compact(pair.getSecond(), pair.getFirst(), columnFamily, major); } catch (NotServingRegionException e) { if (LOG.isDebugEnabled()) { LOG.debug("Trying to" + (major ? " major" : "") + " compact " + @@ -1281,11 +1368,10 @@ } private void compact(final ServerName sn, final HRegionInfo hri, - final boolean major) - throws IOException { + final byte[] columnFamily, final boolean major) throws IOException { HRegionInterface rs = this.connection.getHRegionConnection(sn.getHostname(), sn.getPort()); - rs.compactRegion(hri, major); + rs.compactRegion(hri, columnFamily, major); } Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (revision 1233601) +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (working copy) @@ -2868,14 +2868,31 @@ @QosPriority(priority=HIGH_QOS) public void compactRegion(HRegionInfo regionInfo, boolean major) throws NotServingRegionException, IOException { + compactRegion(regionInfo, null, major); + } + + @Override + @QosPriority(priority=HIGH_QOS) + public void compactRegion(HRegionInfo regionInfo, byte[] columnFamily, boolean major) + throws NotServingRegionException, IOException { checkOpen(); HRegion region = getRegion(regionInfo.getRegionName()); - if (major) { - region.triggerMajorCompaction(); + if(major){ + if (columnFamily != null) { + region.getStore(columnFamily).triggerMajorCompaction(); + }else{ + region.triggerMajorCompaction(); + } } - compactSplitThread.requestCompaction(region, "User-triggered " - + (major ? "major " : "") + "compaction", - CompactSplitThread.PRIORITY_USER); + if(columnFamily != null){ + compactSplitThread.requestCompaction(region, region.getStore(columnFamily), + "User-triggered " + (major ? "major " : "") + "compaction", + CompactSplitThread.PRIORITY_USER); + }else{ + compactSplitThread.requestCompaction(region, "User-triggered " + + (major ? "major " : "") + "compaction", + CompactSplitThread.PRIORITY_USER); + } } Index: src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java (revision 1232594) +++ src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java (working copy) @@ -530,6 +530,20 @@ */ void compactRegion(HRegionInfo regionInfo, boolean major) throws NotServingRegionException, IOException; + + /** + * Compacts a column-family within a specified region. + * Performs a major compaction if specified. + *

+ * This method is asynchronous. + * @param regionInfo region to compact + * @param columnFamily column family within a region to compact + * @param major true to force major compaction + * @throws NotServingRegionException + * @throws IOException + */ + void compactRegion(HRegionInfo regionInfo, byte[] columnFamily, boolean major) + throws NotServingRegionException, IOException; Index: src/main/ruby/hbase/admin.rb =================================================================== --- src/main/ruby/hbase/admin.rb (revision 1233701) +++ src/main/ruby/hbase/admin.rb (working copy) @@ -50,15 +50,28 @@ end #---------------------------------------------------------------------------------------------- - # Requests a table or region compaction - def compact(table_or_region_name) - @admin.compact(table_or_region_name) + # Requests a table or region or column family compaction + def compact(table_or_region_name, *args) + if args.empty? + @admin.compact(table_or_region_name) + elsif args.length == 1 + # We are compacting a column family within a region. + region_name = table_or_region_name + column_family = args.first + @admin.compact(region_name, column_family) + end end #---------------------------------------------------------------------------------------------- - # Requests a table or region major compaction - def major_compact(table_or_region_name) - @admin.majorCompact(table_or_region_name) + # Requests a table or region or column family major compaction + def major_compact(table_or_region_name, *args) + if args.empty? + @admin.majorCompact(table_or_region_name) + elsif args.length == 1 + # We are major compacting a column family within a region or table. + column_family = args.first + @admin.majorCompact(table_or_region_name, column_family) + end end #---------------------------------------------------------------------------------------------- Index: src/main/ruby/shell/commands/compact.rb =================================================================== --- src/main/ruby/shell/commands/compact.rb (revision 1232594) +++ src/main/ruby/shell/commands/compact.rb (working copy) @@ -23,14 +23,24 @@ class Compact < Command def help return <<-EOF -Compact all regions in passed table or pass a region row -to compact an individual region -EOF + Compact all regions in passed table or pass a region row + to compact an individual region. You can also compact a single column + family within a region. + Examples: + Compact all regions in a table: + hbase> compact 't1' + Compact an entire region: + hbase> compact 'r1' + Compact only a column family within a region: + hbase> compact 'r1', 'c1' + Compact a column family within a table: + hbase> compact 't1', 'c1' + EOF end - def command(table_or_region_name) + def command(table_or_region_name, *args) format_simple_command do - admin.compact(table_or_region_name) + admin.compact(table_or_region_name, *args) end end end Index: src/main/ruby/shell/commands/major_compact.rb =================================================================== --- src/main/ruby/shell/commands/major_compact.rb (revision 1232594) +++ src/main/ruby/shell/commands/major_compact.rb (working copy) @@ -23,14 +23,25 @@ class MajorCompact < Command def help return <<-EOF -Run major compaction on passed table or pass a region row -to major compact an individual region -EOF + Run major compaction on passed table or pass a region row + to major compact an individual region. To compact a + column family within a region specify the region name + followed by the column family name. + Examples: + Compact all regions in a table: + hbase> major_compact 't1' + Compact an entire region: + hbase> major_compact 'r1' + Compact only a column family within a region: + hbase> major_compact 'r1', 'c1' + Compact a column family within a table: + hbase> major_compact 't1', 'c1' + EOF end - def command(table_or_region_name) + def command(table_or_region_name, *args) format_simple_command do - admin.major_compact(table_or_region_name) + admin.major_compact(table_or_region_name, *args) end end end