Index: src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java (revision 1339218) +++ src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java (working copy) @@ -51,6 +51,7 @@ import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder; import org.apache.hadoop.hbase.io.hfile.HFileScanner; import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.util.Bytes; import org.junit.experimental.categories.Category; @@ -76,6 +77,7 @@ private int compactionThreshold; private byte[] firstRowBytes, secondRowBytes, thirdRowBytes; final private byte[] col1, col2; + private static final long MAX_FILES_TO_COMPACT = 10; /** constructor */ public TestCompaction() throws Exception { @@ -614,7 +616,38 @@ fail("testCompactionWithCorruptResult failed since no exception was" + "thrown while completing a corrupt file"); } + + /** + * Test for HBASE-5920 - Test user requested major compactions always occurring + */ + public void testNonUserMajorCompactionRequest() throws Exception { + Store store = r.getStore(COLUMN_FAMILY); + createStoreFile(r); + for (int i = 0; i < MAX_FILES_TO_COMPACT + 1; i++) { + createStoreFile(r); + } + store.triggerMajorCompaction(); + CompactionRequest request = store.requestCompaction(Store.NO_PRIORITY); + assertNotNull("Expected to receive a compaction request", request); + assertEquals("System-requested major compaction should not occur if there are too many store files", false, request.isMajor()); + } + + /** + * Test for HBASE-5920 + */ + public void testUserMajorCompactionRequest() throws IOException{ + Store store = r.getStore(COLUMN_FAMILY); + createStoreFile(r); + for (int i = 0; i < MAX_FILES_TO_COMPACT + 1; i++) { + createStoreFile(r); + } + store.triggerMajorCompaction(); + CompactionRequest request = store.requestCompaction(Store.PRIORITY_USER); + assertNotNull("Expected to receive a compaction request", request); + assertEquals("User-requested major compaction should always occur, even if there are too many store files", true, request.isMajor()); + } + @org.junit.Rule public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu = new org.apache.hadoop.hbase.ResourceCheckerJUnitRule(); Index: src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java (revision 1339218) +++ src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java (working copy) @@ -50,12 +50,6 @@ private final ThreadPoolExecutor smallCompactions; private final ThreadPoolExecutor splits; - /* The default priority for user-specified compaction requests. - * The user gets top priority unless we have blocking compactions. (Pri <= 0) - */ - public static final int PRIORITY_USER = 1; - public static final int NO_PRIORITY = Integer.MIN_VALUE; - /** * Splitting should not take place if the total number of regions exceed this. * This is not a hard limit to the number of regions but it is a guideline to @@ -129,7 +123,7 @@ public synchronized boolean requestSplit(final HRegion r) { // don't split regions that are blocking - if (shouldSplitRegion() && r.getCompactPriority() >= PRIORITY_USER) { + if (shouldSplitRegion() && r.getCompactPriority() >= Store.PRIORITY_USER) { byte[] midKey = r.checkSplit(); if (midKey != null) { requestSplit(r, midKey); @@ -158,13 +152,13 @@ public synchronized void requestCompaction(final HRegion r, final String why) { for(Store s : r.getStores().values()) { - requestCompaction(r, s, why, NO_PRIORITY); + requestCompaction(r, s, why, Store.NO_PRIORITY); } } public synchronized void requestCompaction(final HRegion r, final Store s, final String why) { - requestCompaction(r, s, why, NO_PRIORITY); + requestCompaction(r, s, why, Store.NO_PRIORITY); } public synchronized void requestCompaction(final HRegion r, final String why, @@ -185,10 +179,10 @@ if (this.server.isStopped()) { return; } - CompactionRequest cr = s.requestCompaction(); + CompactionRequest cr = s.requestCompaction(priority); if (cr != null) { cr.setServer(server); - if (priority != NO_PRIORITY) { + if (priority != Store.NO_PRIORITY) { cr.setPriority(priority); } ThreadPoolExecutor pool = s.throttleCompaction(cr.getSize()) @@ -200,6 +194,9 @@ + (why != null && !why.isEmpty() ? "; Because: " + why : "") + "; " + this); } + } else { + if(LOG.isDebugEnabled()) + LOG.debug("Not compacting " + r.getRegionNameAsString() + " because compaction request was cancelled"); } } Index: src/main/java/org/apache/hadoop/hbase/regionserver/Store.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (revision 1339218) +++ src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (working copy) @@ -108,6 +108,7 @@ @InterfaceAudience.Private public class Store extends SchemaConfigured implements HeapSize { static final Log LOG = LogFactory.getLog(Store.class); + protected final MemStore memstore; // This stores directory in the filesystem. private final Path homedir; @@ -133,6 +134,12 @@ final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); private final boolean verifyBulkLoads; + /* The default priority for user-specified compaction requests. + * The user gets top priority unless we have blocking compactions. (Pri <= 0) + */ + public static final int PRIORITY_USER = 1; + public static final int NO_PRIORITY = Integer.MIN_VALUE; + // not private for testing /* package */ScanInfo scanInfo; /* @@ -166,7 +173,7 @@ * @param region * @param family HColumnDescriptor for this column * @param fs file system object - * @param conf configuration object + * @param confParam configuration object * failed. Can be null. * @throws IOException */ @@ -342,7 +349,7 @@ Path getHomedir() { return homedir; } - + /** * @return the data block encoder */ @@ -464,7 +471,7 @@ /** * Removes a kv from the memstore. The KeyValue is removed only - * if its key & memstoreTS matches the key & memstoreTS value of the + * if its key & memstoreTS matches the key & memstoreTS value of the * kv parameter. * * @param kv @@ -550,8 +557,8 @@ } /** - * This method should only be called from HRegion. It is assumed that the - * ranges of values in the HFile fit within the stores assigned region. + * This method should only be called from HRegion. It is assumed that the + * ranges of values in the HFile fit within the stores assigned region. * (assertBulkLoadHFileOk checks this) */ void bulkLoadHFile(String srcPathStr) throws IOException { @@ -631,7 +638,7 @@ ThreadPoolExecutor storeFileCloserThreadPool = this.region .getStoreFileOpenAndCloseThreadPool("StoreFileCloserThread-" + this.family.getNameAsString()); - + // close each store file in parallel CompletionService completionService = new ExecutorCompletionService(storeFileCloserThreadPool); @@ -643,7 +650,7 @@ } }); } - + try { for (int i = 0; i < result.size(); i++) { Future future = completionService.take(); @@ -772,7 +779,7 @@ scanner.close(); } if (LOG.isInfoEnabled()) { - LOG.info("Flushed " + + LOG.info("Flushed " + ", sequenceid=" + logCacheFlushId + ", memsize=" + StringUtils.humanReadableInt(flushed) + ", into tmp file " + pathName); @@ -983,7 +990,7 @@ *

We don't want to hold the structureLock for the whole time, as a compact() * can be lengthy and we want to allow cache-flushes during this period. * - * @param CompactionRequest + * @param cr * compaction details obtained from requestCompaction() * @throws IOException * @return Storefile we compacted into or null if we failed or opted out early. @@ -1221,7 +1228,7 @@ if (jitterPct > 0) { long jitter = Math.round(ret * jitterPct); // deterministic jitter avoids a major compaction storm on restart - ImmutableList snapshot = storefiles; + ImmutableList snapshot = storefiles; if (snapshot != null && !snapshot.isEmpty()) { String seed = snapshot.get(0).getPath().getName(); double curRand = new Random(seed.hashCode()).nextDouble(); @@ -1235,6 +1242,10 @@ } public CompactionRequest requestCompaction() { + return requestCompaction(NO_PRIORITY); + } + + public CompactionRequest requestCompaction(int priority) { // don't even select for compaction if writes are disabled if (!this.region.areWritesEnabled()) { return null; @@ -1265,7 +1276,7 @@ // coprocessor is overriding normal file selection filesToCompact = new CompactSelection(conf, candidates); } else { - filesToCompact = compactSelection(candidates); + filesToCompact = compactSelection(candidates, priority); } if (region.getCoprocessorHost() != null) { @@ -1295,7 +1306,7 @@ } // everything went better than expected. create a compaction request - int pri = getCompactPriority(); + int pri = getCompactPriority(priority); ret = new CompactionRequest(region, this, filesToCompact, isMajor, pri); } } catch (IOException ex) { @@ -1315,6 +1326,16 @@ } /** + * Algorithm to choose which files to compact, see {@link #compactSelection(java.util.List, int)} + * @param candidates + * @return + * @throws IOException + */ + CompactSelection compactSelection(List candidates) throws IOException { + return compactSelection(candidates,NO_PRIORITY); + } + + /** * Algorithm to choose which files to compact * * Configuration knobs: @@ -1333,7 +1354,7 @@ * @return subset copy of candidate list that meets compaction criteria * @throws IOException */ - CompactSelection compactSelection(List candidates) + CompactSelection compactSelection(List candidates, int priority) throws IOException { // ASSUMPTION!!! filesCompacting is locked when calling this function @@ -1381,10 +1402,15 @@ return compactSelection; } - // major compact on user action or age (caveat: we have too many files) - boolean majorcompaction = - (forcemajor || isMajorCompaction(compactSelection.getFilesToCompact())) - && compactSelection.getFilesToCompact().size() < this.maxFilesToCompact; + // Force a major compaction if this is a user-requested major compaction, + // or if we do not have too many files to compact and this was requested + // as a major compaction + boolean majorcompaction = (forcemajor && priority == PRIORITY_USER) || + (forcemajor || isMajorCompaction(compactSelection.getFilesToCompact())) && + (compactSelection.getFilesToCompact().size() < this.maxFilesToCompact + ); + LOG.debug(this.getHRegionInfo().getEncodedName() + " - " + + this.getColumnFamilyName() + ": Initiating " + (majorcompaction ? "major" : "minor") + "compaction"); if (!majorcompaction && !hasReferences(compactSelection.getFilesToCompact())) { @@ -1394,6 +1420,10 @@ // skip selection algorithm if we don't have enough files if (compactSelection.getFilesToCompact().size() < this.minFilesToCompact) { + if(LOG.isDebugEnabled()) { + LOG.debug("Not compacting files because we only have " + compactSelection.getFilesToCompact().size() + + " files ready for compaction. Need " + this.minFilesToCompact + " to initiate."); + } compactSelection.emptyFileList(); return compactSelection; } @@ -1461,11 +1491,17 @@ return compactSelection; } } else { - // all files included in this compaction, up to max - if (compactSelection.getFilesToCompact().size() > this.maxFilesToCompact) { - int pastMax = - compactSelection.getFilesToCompact().size() - this.maxFilesToCompact; - compactSelection.clearSubList(0, pastMax); + if(majorcompaction) { + if (compactSelection.getFilesToCompact().size() > this.maxFilesToCompact) { + LOG.debug("Warning, compacting more than " + this.maxFilesToCompact + " files, probably because of a user-requested major compaction"); + if(priority != PRIORITY_USER) { + LOG.error("Compacting more than max files on a non user-requested compaction"); + } + } + } else if (compactSelection.getFilesToCompact().size() > this.maxFilesToCompact) { + // all files included in this compaction, up to max + int pastMax = compactSelection.getFilesToCompact().size() - this.maxFilesToCompact; + compactSelection.getFilesToCompact().subList(0, pastMax).clear(); } } return compactSelection; @@ -1991,11 +2027,21 @@ return this.memstore.heapSize(); } + public int getCompactPriority() { + return getCompactPriority(NO_PRIORITY); + } + /** * @return The priority that this store should have in the compaction queue + * @param priority */ - public int getCompactPriority() { - return this.blockingStoreFileCount - this.storefiles.size(); + public int getCompactPriority(int priority) { + // If this is a user-requested compaction, leave this at the highest priority + if(priority == PRIORITY_USER) { + return PRIORITY_USER; + } else { + return this.blockingStoreFileCount - this.storefiles.size(); + } } boolean throttleCompaction(long compactionSize) { @@ -2131,7 +2177,7 @@ return this.cacheConf; } - public static final long FIXED_OVERHEAD = + public static final long FIXED_OVERHEAD = ClassSize.align(SchemaConfigured.SCHEMA_CONFIGURED_UNALIGNED_HEAP_SIZE + + (17 * ClassSize.REFERENCE) + (6 * Bytes.SIZEOF_LONG) + (5 * Bytes.SIZEOF_INT) + Bytes.SIZEOF_BOOLEAN); @@ -2209,7 +2255,7 @@ public boolean getKeepDeletedCells() { return keepDeletedCells; } - + public long getTimeToPurgeDeletes() { return timeToPurgeDeletes; } Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (revision 1339218) +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (working copy) @@ -3577,9 +3577,10 @@ if (major) { region.triggerMajorCompaction(); } + LOG.trace("User-triggered compaction requested for region " + region.getRegionNameAsString()); compactSplitThread.requestCompaction(region, "User-triggered " + (major ? "major " : "") + "compaction", - CompactSplitThread.PRIORITY_USER); + Store.PRIORITY_USER); return CompactRegionResponse.newBuilder().build(); } catch (IOException ie) { throw new ServiceException(ie);