diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java index 0fe2ff3..22b9134 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java @@ -580,6 +580,28 @@ void majorCompactRegion(final byte[] regionName, final byte[] columnFamily) throws IOException, InterruptedException; /** + * Compact a mob table. Asynchronous operation. + * + * @param tableName table to compact + * @param columnFamily column family within a table + * @throws IOException if not a mob column family or if a remote or network exception occurs + * @throws InterruptedException + */ + void compactMob(final TableName tableName, final byte[] columnFamily) throws IOException, + InterruptedException; + + /** + * Major compact a mob table. Asynchronous operation. + * + * @param tableName table to compact + * @param columnFamily column family within a table + * @throws IOException if not a mob column family or if a remote or network exception occurs + * @throws InterruptedException + */ + void majorCompactMob(final TableName tableName, final byte[] columnFamily) throws IOException, + InterruptedException; + + /** * Move the region r to dest. * * @param encodedRegionName The encoded region name; i.e. the hash that makes up the region name diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java index b659e87..706276d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java @@ -24,6 +24,7 @@ import java.net.SocketTimeoutException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.HashMap; import java.util.LinkedList; import java.util.List; @@ -33,6 +34,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.regex.Pattern; +import org.apache.commons.lang.math.RandomUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -1778,6 +1780,46 @@ private void compact(final ServerName sn, final HRegionInfo hri, } /** + * {@inheritDoc} + */ + @Override + public void compactMob(final TableName tableName, final byte[] columnFamily) throws IOException, + InterruptedException { + HTableDescriptor htd = getTableDescriptor(tableName); + HColumnDescriptor family = htd.getFamily(columnFamily); + if (family == null || !family.isMobEnabled()) { + throw new IOException("Column family " + columnFamily + " is not a MOB column family"); + } + HRegionInfo info = new HRegionInfo(tableName, Bytes.toBytes(".mob"), HConstants.EMPTY_END_ROW, + false, 0); + Collection serversList = getClusterStatus().getServers(); + ServerName[] servers = serversList.toArray(new ServerName[serversList.size()]); + ServerName destServerName = ServerName.valueOf(servers[RandomUtils.nextInt(servers.length)] + .getServerName()); + compact(destServerName, info, false, columnFamily); + } + + /** + * {@inheritDoc} + */ + @Override + public void majorCompactMob(final TableName tableName, final byte[] columnFamily) + throws IOException, InterruptedException { + HTableDescriptor htd = getTableDescriptor(tableName); + HColumnDescriptor family = htd.getFamily(columnFamily); + if (family == null || !family.isMobEnabled()) { + throw new IOException("Column family " + columnFamily + " is not a MOB column family"); + } + HRegionInfo info = new HRegionInfo(tableName, Bytes.toBytes(".mob"), HConstants.EMPTY_END_ROW, + false, 0); + Collection serversList = getClusterStatus().getServers(); + ServerName[] servers = serversList.toArray(new ServerName[serversList.size()]); + ServerName destServerName = ServerName.valueOf(servers[RandomUtils.nextInt(servers.length)] + .getServerName()); + compact(destServerName, info, true, columnFamily); + } + + /** * Move the region r to dest. * @param encodedRegionName The encoded region name; i.e. the hash that makes * up the region name suffix: e.g. if regionname is diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MobFileCompactionChore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MobFileCompactionChore.java index 9973619..35d571a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MobFileCompactionChore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MobFileCompactionChore.java @@ -18,7 +18,6 @@ */ package org.apache.hadoop.hbase.master; -import java.io.IOException; import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.RejectedExecutionException; @@ -31,18 +30,12 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hbase.Chore; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableDescriptors; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.master.TableLockManager.TableLock; import org.apache.hadoop.hbase.mob.MobConstants; import org.apache.hadoop.hbase.mob.MobUtils; -import org.apache.hadoop.hbase.mob.filecompactions.MobFileCompactor; -import org.apache.hadoop.hbase.mob.filecompactions.PartitionedMobFileCompactor; -import org.apache.hadoop.hbase.util.ReflectionUtils; import org.apache.hadoop.hbase.util.Threads; /** @@ -62,14 +55,12 @@ public MobFileCompactionChore(HMaster master) { MobConstants.DEFAULT_MOB_FILE_COMPACTION_CHORE_PERIOD), master); this.master = master; this.tableLockManager = master.getTableLockManager(); - this.pool = createThreadPool(); + this.pool = MobUtils.createThreadPool(master.getConfiguration()); } @Override protected void chore() { try { - String className = master.getConfiguration().get(MobConstants.MOB_FILE_COMPACTOR_CLASS_KEY, - PartitionedMobFileCompactor.class.getName()); TableDescriptors htds = master.getTableDescriptors(); Map map = htds.getAll(); for (HTableDescriptor htd : map.values()) { @@ -77,45 +68,8 @@ protected void chore() { if (!hcd.isMobEnabled()) { continue; } - // instantiate the mob file compactor. - MobFileCompactor compactor = null; - try { - compactor = ReflectionUtils.instantiateWithCustomCtor(className, new Class[] { - Configuration.class, FileSystem.class, TableName.class, HColumnDescriptor.class, - ExecutorService.class }, - new Object[] { master.getConfiguration(), master.getFileSystem(), htd.getTableName(), - hcd, pool }); - } catch (Exception e) { - throw new IOException("Unable to load configured mob file compactor '" + className - + "'", e); - } - // compact only for mob-enabled column. - // obtain a write table lock before performing compaction to avoid race condition - // with major compaction in mob-enabled column. - boolean tableLocked = false; - TableLock lock = null; - try { - // the tableLockManager might be null in testing. In that case, it is lock-free. - if (tableLockManager != null) { - lock = tableLockManager.writeLock(MobUtils.getTableLockName(htd.getTableName()), - "Run MobFileCompactChore"); - lock.acquire(); - } - tableLocked = true; - compactor.compact(); - } catch (Exception e) { - LOG.error("Fail to compact the mob files for the column " + hcd.getNameAsString() - + " in the table " + htd.getNameAsString(), e); - } finally { - if (lock != null && tableLocked) { - try { - lock.release(); - } catch (IOException e) { - LOG.error( - "Fail to release the write lock for the table " + htd.getNameAsString(), e); - } - } - } + MobUtils.doMobFileCompaction(master.getConfiguration(), master.getFileSystem(), + htd.getTableName(), hcd, pool, tableLockManager, false); } } } catch (Exception e) { @@ -128,35 +82,4 @@ protected void cleanup() { super.cleanup(); pool.shutdown(); } - - /** - * Creates a thread pool. - * @return A thread pool. - */ - private ExecutorService createThreadPool() { - Configuration conf = master.getConfiguration(); - int maxThreads = conf.getInt(MobConstants.MOB_FILE_COMPACTION_CHORE_THREADS_MAX, - MobConstants.DEFAULT_MOB_FILE_COMPACTION_CHORE_THREADS_MAX); - if (maxThreads == 0) { - maxThreads = 1; - } - long keepAliveTime = conf.getLong(MobConstants.MOB_FILE_COMPACTION_CHORE_THREADS_KEEPALIVETIME, - MobConstants.DEFAULT_MOB_FILE_COMPACTION_CHORE_THREADS_KEEPALIVETIME); - final SynchronousQueue queue = new SynchronousQueue(); - ThreadPoolExecutor pool = new ThreadPoolExecutor(1, maxThreads, keepAliveTime, - TimeUnit.SECONDS, queue, Threads.newDaemonThreadFactory("MobFileCompactionChore"), - new RejectedExecutionHandler() { - @Override - public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { - try { - // waiting for a thread to pick up instead of throwing exceptions. - queue.put(r); - } catch (InterruptedException e) { - throw new RejectedExecutionException(e); - } - } - }); - ((ThreadPoolExecutor) pool).allowCoreThreadTimeOut(true); - return pool; - } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java index d8b1376..15e70f3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java @@ -27,6 +27,12 @@ import java.util.Date; import java.util.List; import java.util.UUID; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -51,11 +57,17 @@ import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.io.hfile.HFileContext; import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; +import org.apache.hadoop.hbase.master.TableLockManager; +import org.apache.hadoop.hbase.master.TableLockManager.TableLock; +import org.apache.hadoop.hbase.mob.filecompactions.MobFileCompactor; +import org.apache.hadoop.hbase.mob.filecompactions.PartitionedMobFileCompactor; import org.apache.hadoop.hbase.regionserver.BloomType; import org.apache.hadoop.hbase.regionserver.HStore; import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.util.ReflectionUtils; +import org.apache.hadoop.hbase.util.Threads; /** * The mob utilities @@ -645,4 +657,88 @@ public static TableName getTableLockName(TableName tn) { byte[] tableName = tn.getName(); return TableName.valueOf(Bytes.add(tableName, MobConstants.MOB_TABLE_LOCK_SUFFIX)); } + + /** + * Do the mob file compaction. + * @param conf the Configuration + * @param fs the file system + * @param tableName the table the compact + * @param hcd the column descriptor + * @param pool the thread pool + * @param tableLockManager the tableLock manager + * @param isForceAllFiles Whether add all mob files into the compaction. + */ + public static void doMobFileCompaction(Configuration conf, FileSystem fs, TableName tableName, + HColumnDescriptor hcd, ExecutorService pool, TableLockManager tableLockManager, + boolean isForceAllFiles) throws IOException { + String className = conf.get(MobConstants.MOB_FILE_COMPACTOR_CLASS_KEY, + PartitionedMobFileCompactor.class.getName()); + // instantiate the mob file compactor. + MobFileCompactor compactor = null; + try { + compactor = ReflectionUtils.instantiateWithCustomCtor(className, new Class[] { + Configuration.class, FileSystem.class, TableName.class, HColumnDescriptor.class, + ExecutorService.class }, new Object[] { conf, fs, tableName, hcd, pool }); + } catch (Exception e) { + throw new IOException("Unable to load configured mob file compactor '" + className + "'", e); + } + // compact only for mob-enabled column. + // obtain a write table lock before performing compaction to avoid race condition + // with major compaction in mob-enabled column. + boolean tableLocked = false; + TableLock lock = null; + try { + // the tableLockManager might be null in testing. In that case, it is lock-free. + if (tableLockManager != null) { + lock = tableLockManager.writeLock(MobUtils.getTableLockName(tableName), + "Run MobFileCompaction"); + lock.acquire(); + } + tableLocked = true; + compactor.compact(isForceAllFiles); + } catch (Exception e) { + LOG.error("Fail to compact the mob files for the column " + hcd.getNameAsString() + + " in the table " + tableName.getNameAsString(), e); + } finally { + if (lock != null && tableLocked) { + try { + lock.release(); + } catch (IOException e) { + LOG.error("Fail to release the write lock for the table " + tableName.getNameAsString(), + e); + } + } + } + } + + /** + * Creates a thread pool. + * @param conf the Configuration + * @return A thread pool. + */ + public static ExecutorService createThreadPool(Configuration conf) { + int maxThreads = conf.getInt(MobConstants.MOB_FILE_COMPACTION_CHORE_THREADS_MAX, + MobConstants.DEFAULT_MOB_FILE_COMPACTION_CHORE_THREADS_MAX); + if (maxThreads == 0) { + maxThreads = 1; + } + long keepAliveTime = conf.getLong(MobConstants.MOB_FILE_COMPACTION_CHORE_THREADS_KEEPALIVETIME, + MobConstants.DEFAULT_MOB_FILE_COMPACTION_CHORE_THREADS_KEEPALIVETIME); + final SynchronousQueue queue = new SynchronousQueue(); + ThreadPoolExecutor pool = new ThreadPoolExecutor(1, maxThreads, keepAliveTime, + TimeUnit.SECONDS, queue, Threads.newDaemonThreadFactory("MobFileCompactionChore"), + new RejectedExecutionHandler() { + @Override + public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { + try { + // waiting for a thread to pick up instead of throwing exceptions. + queue.put(r); + } catch (InterruptedException e) { + throw new RejectedExecutionException(e); + } + } + }); + ((ThreadPoolExecutor) pool).allowCoreThreadTimeOut(true); + return pool; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/filecompactions/MobFileCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/filecompactions/MobFileCompactor.java index bbc358e..fcb39c5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/filecompactions/MobFileCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/filecompactions/MobFileCompactor.java @@ -65,14 +65,26 @@ public MobFileCompactor(Configuration conf, FileSystem fs, TableName tableName, * @throws IOException */ public List compact() throws IOException { - return compact(Arrays.asList(fs.listStatus(mobFamilyDir))); + return compact(false); + } + + /** + * Compacts the mob files by compaction type for the current column family. + * @param isForceAllFiles Whether add all mob files into the compaction. + * @return The paths of new mob files generated in the compaction. + * @throws IOException + */ + public List compact(boolean isForceAllFiles) throws IOException { + return compact(Arrays.asList(fs.listStatus(mobFamilyDir)), isForceAllFiles); } /** * Compacts the candidate mob files. * @param files The candidate mob files. + * @param isForceAllFiles Whether add all mob files into the compaction. * @return The paths of new mob files generated in the compaction. * @throws IOException */ - public abstract List compact(List files) throws IOException; + public abstract List compact(List files, boolean isForceAllFiles) + throws IOException; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/filecompactions/PartitionedMobFileCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/filecompactions/PartitionedMobFileCompactor.java index 6cd3172..bce618e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/filecompactions/PartitionedMobFileCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/filecompactions/PartitionedMobFileCompactor.java @@ -110,12 +110,12 @@ public PartitionedMobFileCompactor(Configuration conf, FileSystem fs, TableName } @Override - public List compact(List files) throws IOException { + public List compact(List files, boolean isForceAllFiles) throws IOException { if (files == null || files.isEmpty()) { return null; } // find the files to compact. - PartitionedMobFileCompactionRequest request = select(files); + PartitionedMobFileCompactionRequest request = select(files, isForceAllFiles); // compact the files. return performCompaction(request); } @@ -124,11 +124,12 @@ public PartitionedMobFileCompactor(Configuration conf, FileSystem fs, TableName * Selects the compacted mob/del files. * Iterates the candidates to find out all the del files and small mob files. * @param candidates All the candidates. + * @param isForceAllFiles Whether add all mob files into the compaction. * @return A compaction request. * @throws IOException */ - protected PartitionedMobFileCompactionRequest select(List candidates) - throws IOException { + protected PartitionedMobFileCompactionRequest select(List candidates, + boolean isForceAllFiles) throws IOException { Collection allDelFiles = new ArrayList(); Map filesToCompact = new HashMap(); @@ -152,8 +153,9 @@ protected PartitionedMobFileCompactionRequest select(List candidates } if (StoreFileInfo.isDelFile(linkedFile.getPath())) { allDelFiles.add(file); - } else if (linkedFile.getLen() < mergeableSize) { - // add the small files to the merge pool + } else if (isForceAllFiles || linkedFile.getLen() < mergeableSize) { + // add all files if isForceAllFiles is true, + // otherwise add the small files to the merge pool MobFileName fileName = MobFileName.create(linkedFile.getPath().getName()); CompactionPartitionId id = new CompactionPartitionId(fileName.getStartKey(), fileName.getDate()); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 22e4d88..c3daf92 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -33,12 +33,14 @@ import java.util.Set; import java.util.TreeSet; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellScannable; import org.apache.hadoop.hbase.CellScanner; @@ -46,6 +48,7 @@ import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.DroppedSnapshotException; import org.apache.hadoop.hbase.HBaseIOException; +import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; @@ -83,6 +86,8 @@ import org.apache.hadoop.hbase.ipc.RpcServerInterface; import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; import org.apache.hadoop.hbase.ipc.ServerRpcController; +import org.apache.hadoop.hbase.master.TableLockManager; +import org.apache.hadoop.hbase.mob.MobUtils; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.RequestConverter; import org.apache.hadoop.hbase.protobuf.ResponseConverter; @@ -202,6 +207,8 @@ */ private final int scannerLeaseTimeoutPeriod; + private final ExecutorService pool; + /** * Holder class which holds the RegionScanner and nextCallSeq together. */ @@ -747,6 +754,7 @@ public RSRpcServices(HRegionServer rs) throws IOException { isa = rpcServer.getListenerAddress(); rpcServer.setErrorHandler(this); rs.setName(name); + pool = MobUtils.createThreadPool(regionServer.getConfiguration()); } RegionScanner getScanner(long scannerId) { @@ -950,44 +958,71 @@ public CompactRegionResponse compactRegion(final RpcController controller, try { checkOpen(); requestCount.increment(); - HRegion region = getRegion(request.getRegion()); - region.startRegionOperation(Operation.COMPACT_REGION); - LOG.info("Compacting " + region.getRegionNameAsString()); - boolean major = false; - byte [] family = null; - Store store = null; - if (request.hasFamily()) { - family = request.getFamily().toByteArray(); - store = region.getStore(family); - if (store == null) { - throw new ServiceException(new IOException("column family " + Bytes.toString(family) - + " does not exist in region " + region.getRegionNameAsString())); + RegionSpecifier regionSpecifier = request.getRegion(); + String regionEncodedName = ProtobufUtil.getRegionEncodedName(request.getRegion()); + byte[] regionName = regionSpecifier.getValue().toByteArray(); + TableName tableName = HRegionInfo.getTable(regionName); + if (regionEncodedName.equals(MobUtils.getMobRegionInfo(tableName).getEncodedName())) { + boolean isForceAllFiles = false; + Configuration conf = regionServer.getConfiguration(); + FileSystem fs = regionServer.getFileSystem(); + TableLockManager tableLockManager = regionServer.getTableLockManager(); + byte[] family = request.getFamily().toByteArray(); + for (HColumnDescriptor hcd : regionServer.tableDescriptors.get(tableName) + .getColumnFamilies()) { + if (Bytes.equals(family, hcd.getName())) { + if (!hcd.isMobEnabled()) { + throw new IOException("Column family " + hcd.getName() + + " is not a MOB column family"); + } else { + if (request.hasMajor()) { + if (request.getMajor()) { + isForceAllFiles = true; + } + } + MobUtils.doMobFileCompaction(conf, fs, tableName, hcd, pool, tableLockManager, + isForceAllFiles); + } + } } - } - if (request.hasMajor()) { - major = request.getMajor(); - } - if (major) { - if (family != null) { - store.triggerMajorCompaction(); + } else { + HRegion region = getRegion(request.getRegion()); + region.startRegionOperation(Operation.COMPACT_REGION); + LOG.info("Compacting " + region.getRegionNameAsString()); + boolean major = false; + byte [] family = null; + Store store = null; + if (request.hasFamily()) { + family = request.getFamily().toByteArray(); + store = region.getStore(family); + if (store == null) { + throw new ServiceException(new IOException("column family " + Bytes.toString(family) + + " does not exist in region " + region.getRegionNameAsString())); + } + } + if (request.hasMajor()) { + major = request.getMajor(); + } + if (major) { + if (family != null) { + store.triggerMajorCompaction(); + } else { + region.triggerMajorCompaction(); + } + } + String familyLogMsg = (family != null)?" for column family: " + Bytes.toString(family):""; + if (LOG.isTraceEnabled()) { + LOG.trace("User-triggered compaction requested for region " + + region.getRegionNameAsString() + familyLogMsg); + } + String log = "User-triggered " + (major ? "major " : "") + "compaction" + familyLogMsg; + if(family != null) { + regionServer.compactSplitThread.requestCompaction(region, store, log, + Store.PRIORITY_USER, null); } else { - region.triggerMajorCompaction(); + regionServer.compactSplitThread.requestCompaction(region, log, Store.PRIORITY_USER, null); } } - - String familyLogMsg = (family != null)?" for column family: " + Bytes.toString(family):""; - if (LOG.isTraceEnabled()) { - LOG.trace("User-triggered compaction requested for region " - + region.getRegionNameAsString() + familyLogMsg); - } - String log = "User-triggered " + (major ? "major " : "") + "compaction" + familyLogMsg; - if(family != null) { - regionServer.compactSplitThread.requestCompaction(region, store, log, - Store.PRIORITY_USER, null); - } else { - regionServer.compactSplitThread.requestCompaction(region, log, - Store.PRIORITY_USER, null); - } return CompactRegionResponse.newBuilder().build(); } catch (IOException ie) { throw new ServiceException(ie); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/filecompactions/TestMobFileCompactor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/filecompactions/TestMobFileCompactor.java index 9a8b7d9..14ef12a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/filecompactions/TestMobFileCompactor.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/filecompactions/TestMobFileCompactor.java @@ -68,9 +68,9 @@ @Category(LargeTests.class) public class TestMobFileCompactor { private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); - private Configuration conf = null; - private String tableNameAsString; - private TableName tableName; + private static Configuration conf = null; + private static String tableNameAsString; + private static TableName tableName; private static HTable hTable; private static Admin admin; private static HTableDescriptor desc; @@ -93,6 +93,7 @@ public static void setUpBeforeClass() throws Exception { TEST_UTIL.getConfiguration().setInt("hbase.master.info.port", 0); TEST_UTIL.getConfiguration().setBoolean("hbase.regionserver.info.port.auto", true); + TEST_UTIL.getConfiguration().setLong(MobConstants.MOB_FILE_COMPACTION_MERGEABLE_THRESHOLD, 5000); TEST_UTIL.startMiniCluster(1); pool = createThreadPool(TEST_UTIL.getConfiguration()); } @@ -208,41 +209,6 @@ public void testCompactionWithDelFiles() throws Exception { assertRefFileNameEqual(family1); } - private void assertRefFileNameEqual(String familyName) throws IOException { - Scan scan = new Scan(); - scan.addFamily(Bytes.toBytes(familyName)); - // Do not retrieve the mob data when scanning - scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE)); - ResultScanner results = hTable.getScanner(scan); - Path mobFamilyPath = new Path(MobUtils.getMobRegionPath(TEST_UTIL.getConfiguration(), - tableName), familyName); - List actualFilePaths = new ArrayList<>(); - List expectFilePaths = new ArrayList<>(); - for (Result res : results) { - for (Cell cell : res.listCells()) { - byte[] referenceValue = CellUtil.cloneValue(cell); - String fileName = Bytes.toString(referenceValue, Bytes.SIZEOF_INT, - referenceValue.length - Bytes.SIZEOF_INT); - Path targetPath = new Path(mobFamilyPath, fileName); - if(!actualFilePaths.contains(targetPath)) { - actualFilePaths.add(targetPath); - } - } - } - results.close(); - if (fs.exists(mobFamilyPath)) { - FileStatus[] files = fs.listStatus(mobFamilyPath); - for (FileStatus file : files) { - if (!StoreFileInfo.isDelFile(file.getPath())) { - expectFilePaths.add(file.getPath()); - } - } - } - Collections.sort(actualFilePaths); - Collections.sort(expectFilePaths); - assertEquals(expectFilePaths, actualFilePaths); - } - @Test public void testCompactionWithDelFilesAndNotMergeAllFiles() throws Exception { resetConf(); @@ -428,6 +394,100 @@ public void testCompactionWithHFileLink() throws IOException, InterruptedExcepti assertEquals("After second compaction: family2 del file count", 0, countFiles(false, family2)); assertEquals("After second compaction: family1 hfilelink count", 0, countHFileLinks(family1)); assertEquals("After second compaction: family2 hfilelink count", 0, countHFileLinks(family2)); + assertRefFileNameEqual(family1); + } + + @Test + public void testCompactionFromAdmin() throws Exception { + int count = 4; + // generate mob files + loadData(count, rowNumPerFile); + int rowNumPerRegion = count*rowNumPerFile; + + assertEquals("Before deleting: mob rows count", regionNum*rowNumPerRegion, + countMobRows(hTable)); + assertEquals("Before deleting: mob cells count", regionNum*cellNumPerRow*rowNumPerRegion, + countMobCells(hTable)); + assertEquals("Before deleting: family1 mob file count", regionNum*count, + countFiles(true, family1)); + assertEquals("Before deleting: family2 mob file count", regionNum*count, + countFiles(true, family2)); + + createDelFile(); + + assertEquals("Before compaction: mob rows count", regionNum*(rowNumPerRegion-delRowNum), + countMobRows(hTable)); + assertEquals("Before compaction: mob cells count", + regionNum*(cellNumPerRow*rowNumPerRegion-delCellNum), countMobCells(hTable)); + assertEquals("Before compaction: family1 mob file count", regionNum*count, + countFiles(true, family1)); + assertEquals("Before compaction: family2 file count", regionNum*count, + countFiles(true, family2)); + assertEquals("Before compaction: family1 del file count", regionNum, + countFiles(false, family1)); + assertEquals("Before compaction: family2 del file count", regionNum, + countFiles(false, family2)); + + int largeFilesCount = countLargeFiles(5000, family1); + // do the mob file compaction + admin.compactMob(tableName, hcd1.getName()); + + assertEquals("After compaction: mob rows count", regionNum*(rowNumPerRegion-delRowNum), + countMobRows(hTable)); + assertEquals("After compaction: mob cells count", + regionNum*(cellNumPerRow*rowNumPerRegion-delCellNum), countMobCells(hTable)); + assertEquals("After compaction: family1 mob file count", regionNum + largeFilesCount, + countFiles(true, family1)); + assertEquals("After compaction: family2 mob file count", regionNum*count, + countFiles(true, family2)); + assertEquals("After compaction: family1 del file count", regionNum, countFiles(false, family1)); + assertEquals("After compaction: family2 del file count", regionNum, + countFiles(false, family2)); + assertRefFileNameEqual(family1); + } + + @Test + public void testMajorCompactionFromAdmin() throws Exception { + int count = 4; + // generate mob files + loadData(count, rowNumPerFile); + int rowNumPerRegion = count*rowNumPerFile; + + assertEquals("Before deleting: mob rows count", regionNum*rowNumPerRegion, + countMobRows(hTable)); + assertEquals("Before deleting: mob cells count", regionNum*cellNumPerRow*rowNumPerRegion, + countMobCells(hTable)); + assertEquals("Before deleting: mob file count", regionNum*count, countFiles(true, family1)); + + createDelFile(); + + assertEquals("Before compaction: mob rows count", regionNum*(rowNumPerRegion-delRowNum), + countMobRows(hTable)); + assertEquals("Before compaction: mob cells count", + regionNum*(cellNumPerRow*rowNumPerRegion-delCellNum), countMobCells(hTable)); + assertEquals("Before compaction: family1 mob file count", regionNum*count, + countFiles(true, family1)); + assertEquals("Before compaction: family2 mob file count", regionNum*count, + countFiles(true, family2)); + assertEquals("Before compaction: family1 del file count", regionNum, + countFiles(false, family1)); + assertEquals("Before compaction: family2 del file count", regionNum, + countFiles(false, family2)); + + // do the major mob file compaction, it will force all files to compaction + admin.majorCompactMob(tableName, hcd1.getName()); + + assertEquals("After compaction: mob rows count", regionNum*(rowNumPerRegion-delRowNum), + countMobRows(hTable)); + assertEquals("After compaction: mob cells count", + regionNum*(cellNumPerRow*rowNumPerRegion-delCellNum), countMobCells(hTable)); + assertEquals("After compaction: family1 mob file count", regionNum, + countFiles(true, family1)); + assertEquals("After compaction: family2 mob file count", regionNum*count, + countFiles(true, family2)); + assertEquals("After compaction: family1 del file count", 0, countFiles(false, family1)); + assertEquals("After compaction: family2 del file count", regionNum, + countFiles(false, family2)); } /** @@ -435,7 +495,7 @@ public void testCompactionWithHFileLink() throws IOException, InterruptedExcepti * @param table to get the scanner * @return the number of rows */ - private int countMobRows(final HTable table) throws IOException { + public static int countMobRows(final HTable table) throws IOException { Scan scan = new Scan(); // Do not retrieve the mob data when scanning scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE)); @@ -453,7 +513,7 @@ private int countMobRows(final HTable table) throws IOException { * @param table to get the scanner * @return the number of cells */ - private int countMobCells(final HTable table) throws IOException { + public static int countMobCells(final HTable table) throws IOException { Scan scan = new Scan(); // Do not retrieve the mob data when scanning scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE)); @@ -474,7 +534,7 @@ private int countMobCells(final HTable table) throws IOException { * @param familyName the family name * @return the number of the files */ - private int countFiles(boolean isMobFile, String familyName) throws IOException { + public static int countFiles(boolean isMobFile, String familyName) throws IOException { Path mobDirPath = MobUtils.getMobFamilyPath( MobUtils.getMobRegionPath(conf, tableName), familyName); int count = 0; @@ -500,7 +560,7 @@ private int countFiles(boolean isMobFile, String familyName) throws IOException * @param familyName the family name * @return the number of the HFileLink */ - private int countHFileLinks(String familyName) throws IOException { + public static int countHFileLinks(String familyName) throws IOException { Path mobDirPath = MobUtils.getMobFamilyPath( MobUtils.getMobRegionPath(conf, tableName), familyName); int count = 0; @@ -521,7 +581,7 @@ private int countHFileLinks(String familyName) throws IOException { * @param familyName the family name * @return the number of files large than the size */ - private int countLargeFiles(int size, String familyName) throws IOException { + public static int countLargeFiles(int size, String familyName) throws IOException { Path mobDirPath = MobUtils.getMobFamilyPath( MobUtils.getMobRegionPath(conf, tableName), familyName); int count = 0; @@ -542,7 +602,7 @@ private int countLargeFiles(int size, String familyName) throws IOException { * loads some data to the table. * @param count the mob file number */ - private void loadData(int fileNum, int rowNumPerFile) throws IOException, + public static void loadData(int fileNum, int rowNumPerFile) throws IOException, InterruptedException { if (fileNum <= 0) { throw new IllegalArgumentException(); @@ -569,7 +629,7 @@ private void loadData(int fileNum, int rowNumPerFile) throws IOException, /** * delete the row, family and cell to create the del file */ - private void createDelFile() throws IOException, InterruptedException { + public static void createDelFile() throws IOException, InterruptedException { for (byte k0 : KEYS) { byte[] k = new byte[] { k0 }; // delete a family @@ -601,7 +661,7 @@ private void createDelFile() throws IOException, InterruptedException { * @param the size of data * @return the dummy data */ - private byte[] makeDummyData(int size) { + public static byte[] makeDummyData(int size) { byte[] dummyData = new byte[size]; new Random().nextBytes(dummyData); return dummyData; @@ -640,6 +700,41 @@ public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { return pool; } + public static void assertRefFileNameEqual(String familyName) throws IOException { + Scan scan = new Scan(); + scan.addFamily(Bytes.toBytes(familyName)); + // Do not retrieve the mob data when scanning + scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE)); + ResultScanner results = hTable.getScanner(scan); + Path mobFamilyPath = new Path(MobUtils.getMobRegionPath(TEST_UTIL.getConfiguration(), + tableName), familyName); + List actualFilePaths = new ArrayList<>(); + List expectFilePaths = new ArrayList<>(); + for (Result res : results) { + for (Cell cell : res.listCells()) { + byte[] referenceValue = CellUtil.cloneValue(cell); + String fileName = Bytes.toString(referenceValue, Bytes.SIZEOF_INT, + referenceValue.length - Bytes.SIZEOF_INT); + Path targetPath = new Path(mobFamilyPath, fileName); + if(!actualFilePaths.contains(targetPath)) { + actualFilePaths.add(targetPath); + } + } + } + results.close(); + if (fs.exists(mobFamilyPath)) { + FileStatus[] files = fs.listStatus(mobFamilyPath); + for (FileStatus file : files) { + if (!StoreFileInfo.isDelFile(file.getPath())) { + expectFilePaths.add(file.getPath()); + } + } + } + Collections.sort(actualFilePaths); + Collections.sort(expectFilePaths); + assertEquals(expectFilePaths, actualFilePaths); + } + /** * Resets the configuration. */ diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/filecompactions/TestPartitionedMobFileCompactor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/filecompactions/TestPartitionedMobFileCompactor.java index 1d64c0c..c81e9e5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/filecompactions/TestPartitionedMobFileCompactor.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/filecompactions/TestPartitionedMobFileCompactor.java @@ -131,7 +131,7 @@ public void testCompactionSelectWithAllFiles() throws Exception { expectedStartKeys.add(startKey); } } - testSelectFiles(tableName, CompactionType.ALL_FILES, expectedStartKeys); + testSelectFiles(tableName, CompactionType.ALL_FILES, false, expectedStartKeys); } @Test @@ -156,7 +156,30 @@ public void testCompactionSelectWithPartFiles() throws Exception { } // set the mob file compaction mergeable threshold conf.setLong(MobConstants.MOB_FILE_COMPACTION_MERGEABLE_THRESHOLD, mergeSize); - testSelectFiles(tableName, CompactionType.PART_FILES, expectedStartKeys); + testSelectFiles(tableName, CompactionType.PART_FILES, false, expectedStartKeys); + } + + @Test + public void testCompactionSelectWithForceAllFiles() throws Exception { + resetConf(); + String tableName = "testCompactionSelectWithForceAllFiles"; + init(tableName); + int count = 10; + // create 10 mob files. + createStoreFiles(basePath, family, qf, count, Type.Put); + // create 10 del files + createStoreFiles(basePath, family, qf, count, Type.Delete); + listFiles(); + long mergeSize = 4000; + List expectedStartKeys = new ArrayList<>(); + for(FileStatus file : mobFiles) { + String fileName = file.getPath().getName(); + String startKey = fileName.substring(0, 32); + expectedStartKeys.add(startKey); + } + // set the mob file compaction mergeable threshold + conf.setLong(MobConstants.MOB_FILE_COMPACTION_MERGEABLE_THRESHOLD, mergeSize); + testSelectFiles(tableName, CompactionType.ALL_FILES, true, expectedStartKeys); } @Test @@ -169,7 +192,7 @@ public void testCompactDelFilesWithDefaultBatchSize() throws Exception { // create 13 del files createStoreFiles(basePath, family, qf, 13, Type.Delete); listFiles(); - testCompactDelFiles(tableName, 1, 13); + testCompactDelFiles(tableName, 1, 13, false); } @Test @@ -185,7 +208,7 @@ public void testCompactDelFilesWithSmallBatchSize() throws Exception { // set the mob file compaction batch size conf.setInt(MobConstants.MOB_FILE_COMPACTION_BATCH_SIZE, 4); - testCompactDelFiles(tableName, 1, 13); + testCompactDelFiles(tableName, 1, 13, false); } @Test @@ -203,7 +226,7 @@ public void testCompactDelFilesChangeMaxDelFileCount() throws Exception { conf.setInt(MobConstants.MOB_DELFILE_MAX_COUNT, 5); // set the mob file compaction batch size conf.setInt(MobConstants.MOB_FILE_COMPACTION_BATCH_SIZE, 2); - testCompactDelFiles(tableName, 4, 13); + testCompactDelFiles(tableName, 4, 13, false); } /** @@ -213,16 +236,17 @@ public void testCompactDelFilesChangeMaxDelFileCount() throws Exception { * @param expected the expected start keys */ private void testSelectFiles(String tableName, final CompactionType type, - final List expected) throws IOException { + final boolean isForceAllFiles, final List expected) throws IOException { PartitionedMobFileCompactor compactor = new PartitionedMobFileCompactor(conf, fs, TableName.valueOf(tableName), hcd, pool) { @Override - public List compact(List files) throws IOException { + public List compact(List files, boolean isForceAllFiles) + throws IOException { if (files == null || files.isEmpty()) { return null; } - PartitionedMobFileCompactionRequest request = select(files); - // assert the compaction type is ALL_FILES + PartitionedMobFileCompactionRequest request = select(files, isForceAllFiles); + // assert the compaction type Assert.assertEquals(type, request.type); // assert get the right partitions compareCompactedPartitions(expected, request.compactionPartitions); @@ -231,7 +255,7 @@ private void testSelectFiles(String tableName, final CompactionType type, return null; } }; - compactor.compact(allFiles); + compactor.compact(allFiles, isForceAllFiles); } /** @@ -241,7 +265,7 @@ private void testSelectFiles(String tableName, final CompactionType type, * @param expectedCellCount the expected cell count */ private void testCompactDelFiles(String tableName, final int expectedFileCount, - final int expectedCellCount) throws IOException { + final int expectedCellCount, boolean isForceAllFiles) throws IOException { PartitionedMobFileCompactor compactor = new PartitionedMobFileCompactor(conf, fs, TableName.valueOf(tableName), hcd, pool) { @Override @@ -258,8 +282,7 @@ private void testCompactDelFiles(String tableName, final int expectedFileCount, return null; } }; - - compactor.compact(allFiles); + compactor.compact(allFiles, isForceAllFiles); } /** diff --git a/hbase-shell/src/main/ruby/hbase/admin.rb b/hbase-shell/src/main/ruby/hbase/admin.rb index bc3cb89..4f8a612 100644 --- a/hbase-shell/src/main/ruby/hbase/admin.rb +++ b/hbase-shell/src/main/ruby/hbase/admin.rb @@ -76,6 +76,20 @@ def major_compact(table_or_region_name, family = nil) end #---------------------------------------------------------------------------------------------- + # Requests a mob column family compaction + def compact_mob(table_name, family) + # We are compacting a mob column family within a table. + @admin.compactMob(org.apache.hadoop.hbase.TableName.valueOf(table_name), family.to_java_bytes) + end + + #---------------------------------------------------------------------------------------------- + # Requests a mob column family major compaction + def major_compact_mob(table_name, family) + # We are major compacting a mob column family within a table. + @admin.majorCompactMob(org.apache.hadoop.hbase.TableName.valueOf(table_name), family.to_java_bytes) + end + + #---------------------------------------------------------------------------------------------- # Requests a regionserver's HLog roll def hlog_roll(server_name) @admin.rollHLogWriter(server_name) diff --git a/hbase-shell/src/main/ruby/shell.rb b/hbase-shell/src/main/ruby/shell.rb index 5b59254..64f7839 100644 --- a/hbase-shell/src/main/ruby/shell.rb +++ b/hbase-shell/src/main/ruby/shell.rb @@ -322,6 +322,8 @@ def help_footer catalogjanitor_switch catalogjanitor_enabled trace + compact_mob + major_compact_mob ] ) diff --git a/hbase-shell/src/main/ruby/shell/commands/compact_mob.rb b/hbase-shell/src/main/ruby/shell/commands/compact_mob.rb new file mode 100644 index 0000000..2286967 --- /dev/null +++ b/hbase-shell/src/main/ruby/shell/commands/compact_mob.rb @@ -0,0 +1,39 @@ +# +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +module Shell + module Commands + class CompactMob < Command + def help + return <<-EOF + Run compaction on a mob enabled column family within a table + Examples: + Compact a column family within a table: + hbase> compact_mob 't1', 'c1' + EOF + end + + def command(table_name, family) + format_simple_command do + admin.compact_mob(table_name, family) + end + end + end + end +end diff --git a/hbase-shell/src/main/ruby/shell/commands/major_compact_mob.rb b/hbase-shell/src/main/ruby/shell/commands/major_compact_mob.rb new file mode 100644 index 0000000..12f4799 --- /dev/null +++ b/hbase-shell/src/main/ruby/shell/commands/major_compact_mob.rb @@ -0,0 +1,39 @@ +# +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +module Shell + module Commands + class MajorCompactMob < Command + def help + return <<-EOF + Run major compaction on a mob enabled column family within a table + Examples: + Compact a column family within a table: + hbase> major_compact_mob 't1', 'c1' + EOF + end + + def command(table_name, family) + format_simple_command do + admin.major_compact_mob(table_name, family) + end + end + end + end +end