diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java index 0cba2c3..eb5522d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java @@ -135,6 +135,27 @@ class DefaultStoreFileManager implements StoreFileManager { return (priority == HStore.PRIORITY_USER) ? priority + 1 : priority; } + @Override + public Collection getUnneededFiles(long maxTs, List filesCompacting) { + Collection expiredStoreFiles = null; + ImmutableList files = storefiles; + // 1) We can never get rid of the last file which has the maximum seqid. + // 2) Files that are not the latest can't become one due to (1), so the rest are fair game. + for (int i = 0; i < files.size() - 1; ++i) { + StoreFile sf = files.get(i); + long fileTs = sf.getReader().getMaxTimestamp(); + if (fileTs < maxTs && !filesCompacting.contains(sf)) { + LOG.info("Found an expired store file: " + sf.getPath() + + " whose maxTimeStamp is " + fileTs + ", which is below " + maxTs); + if (expiredStoreFiles == null) { + expiredStoreFiles = new ArrayList(); + } + expiredStoreFiles.add(sf); + } + } + return expiredStoreFiles; + } + private void sortAndSetStoreFiles(List storeFiles) { Collections.sort(storeFiles, StoreFile.Comparators.SEQ_ID); storefiles = ImmutableList.copyOf(storeFiles); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index a8c3fc3..c6b62d4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -153,6 +153,7 @@ public class HStore implements Store { private ScanInfo scanInfo; + // TODO: ideally, this should be part of storeFileManager, as we keep passing this to it. final List filesCompacting = Lists.newArrayList(); // All access must be synchronized. @@ -1368,6 +1369,9 @@ public class HStore implements Store { return null; } + // Before we do compaction, try to get rid of unneeded files to simplify things. + removeUnneededFiles(); + CompactionContext compaction = storeEngine.createCompaction(); this.lock.readLock().lock(); try { @@ -1422,13 +1426,7 @@ public class HStore implements Store { return null; } - // Update filesCompacting (check that we do not try to compact the same StoreFile twice). - if (!Collections.disjoint(filesCompacting, selectedFiles)) { - Preconditions.checkArgument(false, "%s overlaps with %s", - selectedFiles, filesCompacting); - } - filesCompacting.addAll(selectedFiles); - Collections.sort(filesCompacting, StoreFile.Comparators.SEQ_ID); + addToCompactingFiles(selectedFiles); // If we're enqueuing a major, clear the force flag. boolean isMajor = selectedFiles.size() == this.getStorefilesCount(); @@ -1452,6 +1450,44 @@ public class HStore implements Store { return compaction; } + /** Adds the files to compacting files. filesCompacting must be locked. */ + private void addToCompactingFiles(final Collection filesToAdd) { + if (filesToAdd == null) return; + // Check that we do not try to compact the same StoreFile twice. + if (!Collections.disjoint(filesCompacting, filesToAdd)) { + Preconditions.checkArgument(false, "%s overlaps with %s", filesToAdd, filesCompacting); + } + filesCompacting.addAll(filesToAdd); + Collections.sort(filesCompacting, StoreFile.Comparators.SEQ_ID); + } + + private void removeUnneededFiles() throws IOException { + if (!conf.getBoolean("hbase.store.delete.expired.storefile", true)) return; + this.lock.readLock().lock(); + Collection delSfs = null; + try { + synchronized (filesCompacting) { + long cfTtl = getStoreFileTtl(); + if (cfTtl != Long.MAX_VALUE) { + delSfs = storeEngine.getStoreFileManager().getUnneededFiles( + EnvironmentEdgeManager.currentTimeMillis() - cfTtl, filesCompacting); + addToCompactingFiles(delSfs); + } + } + } finally { + this.lock.readLock().unlock(); + } + if (delSfs == null || delSfs.isEmpty()) return; + + Collection newFiles = new ArrayList(); // No new files. + writeCompactionWalRecord(delSfs, newFiles); + replaceStoreFiles(delSfs, newFiles); + completeCompaction(delSfs); + LOG.info("Completed removal of " + delSfs.size() + " unnecessary (expired) file(s) in " + + this + " of " + this.getRegionInfo().getRegionNameAsString() + + "; total size for store is " + StringUtils.humanReadableInt(storeSize)); + } + @Override public void cancelRequestedCompaction(CompactionContext compaction) { finishCompactionRequest(compaction.getRequest()); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileManager.java index 8e838cf..f703420 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileManager.java @@ -127,4 +127,11 @@ public interface StoreFileManager { * @return The store compaction priority. */ int getStoreCompactionPriority(); + + /** + * @param maxTs Maximum expired timestamp. + * @param filesCompacting Files that are currently compacting. + * @return The files which don't have any necessary data according to TTL and other criteria. + */ + Collection getUnneededFiles(long maxTs, List filesCompacting); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFileManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFileManager.java index 548cce8..d4e8800 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFileManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFileManager.java @@ -928,4 +928,34 @@ public class StripeStoreFileManager public int getStripeCount() { return this.state.stripeFiles.size(); } + + @Override + public Collection getUnneededFiles(long maxTs, List filesCompacting) { + // 1) We can never get rid of the last file which has the maximum seqid in a stripe. + // 2) Files that are not the latest can't become one due to (1), so the rest are fair game. + State state = this.state; + Collection expiredStoreFiles = null; + for (ImmutableList stripe : state.stripeFiles) { + expiredStoreFiles = findExpiredFiles(stripe, maxTs, filesCompacting, expiredStoreFiles); + } + return findExpiredFiles(state.level0Files, maxTs, filesCompacting, expiredStoreFiles); + } + + private Collection findExpiredFiles(ImmutableList stripe, long maxTs, + List filesCompacting, Collection expiredStoreFiles) { + // Order by seqnum is reversed. + for (int i = 1; i < stripe.size(); ++i) { + StoreFile sf = stripe.get(i); + long fileTs = sf.getReader().getMaxTimestamp(); + if (fileTs < maxTs && !filesCompacting.contains(sf)) { + LOG.info("Found an expired store file: " + sf.getPath() + + " whose maxTimeStamp is " + fileTs + ", which is below " + maxTs); + if (expiredStoreFiles == null) { + expiredStoreFiles = new ArrayList(); + } + expiredStoreFiles.add(sf); + } + } + return expiredStoreFiles; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java index 19a75f3..dc89512 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java @@ -61,7 +61,6 @@ public class CompactionConfiguration { double compactionRatio; double offPeekCompactionRatio; long throttlePoint; - boolean shouldDeleteExpired; long majorCompactionPeriod; float majorCompactionJitter; @@ -80,7 +79,6 @@ public class CompactionConfiguration { throttlePoint = conf.getLong("hbase.regionserver.thread.compaction.throttle", 2 * maxFilesToCompact * storeConfigInfo.getMemstoreFlushSize()); - shouldDeleteExpired = conf.getBoolean("hbase.store.delete.expired.storefile", true); majorCompactionPeriod = conf.getLong(HConstants.MAJOR_COMPACTION_PERIOD, 1000*60*60*24*7); // Make it 0.5 so jitter has us fall evenly either side of when the compaction should run majorCompactionJitter = conf.getFloat("hbase.hregion.majorcompaction.jitter", 0.50F); @@ -92,7 +90,7 @@ public class CompactionConfiguration { public String toString() { return String.format( "size [%d, %d); files [%d, %d); ratio %f; off-peak ratio %f; throttle point %d;" - + "%s delete expired; major period %d, major jitter %f", + + " major period %d, major jitter %f", minCompactSize, maxCompactSize, minFilesToCompact, @@ -100,7 +98,6 @@ public class CompactionConfiguration { compactionRatio, offPeekCompactionRatio, throttlePoint, - shouldDeleteExpired ? "" : " don't", majorCompactionPeriod, majorCompactionJitter); } @@ -169,11 +166,4 @@ public class CompactionConfiguration { float getMajorCompactionJitter() { return majorCompactionJitter; } - - /** - * @return Whether expired files should be deleted ASAP using compactions - */ - boolean shouldDeleteExpired() { - return shouldDeleteExpired; - } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/RatioBasedCompactionPolicy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/RatioBasedCompactionPolicy.java index 94f6395..c70b061 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/RatioBasedCompactionPolicy.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/RatioBasedCompactionPolicy.java @@ -93,16 +93,7 @@ public class RatioBasedCompactionPolicy extends CompactionPolicy { filesCompacting.size() + " compacting, " + candidateSelection.size() + " eligible, " + storeConfigInfo.getBlockingFileCount() + " blocking"); - long cfTtl = this.storeConfigInfo.getStoreFileTtl(); if (!forceMajor) { - // If there are expired files, only select them so that compaction deletes them - if (comConf.shouldDeleteExpired() && (cfTtl != Long.MAX_VALUE)) { - ArrayList expiredSelection = selectExpiredStoreFiles( - candidateSelection, EnvironmentEdgeManager.currentTimeMillis() - cfTtl); - if (expiredSelection != null) { - return new CompactionRequest(expiredSelection); - } - } candidateSelection = skipLargeFiles(candidateSelection); } @@ -130,41 +121,6 @@ public class RatioBasedCompactionPolicy extends CompactionPolicy { } /** - * Select the expired store files to compact - * - * @param candidates the initial set of storeFiles - * @param maxExpiredTimeStamp - * The store file will be marked as expired if its max time stamp is - * less than this maxExpiredTimeStamp. - * @return A CompactSelection contains the expired store files as - * filesToCompact - */ - private ArrayList selectExpiredStoreFiles( - ArrayList candidates, long maxExpiredTimeStamp) { - if (candidates == null || candidates.size() == 0) return null; - ArrayList expiredStoreFiles = null; - - for (StoreFile storeFile : candidates) { - if (storeFile.getReader().getMaxTimestamp() < maxExpiredTimeStamp) { - LOG.info("Deleting the expired store file by compaction: " - + storeFile.getPath() + " whose maxTimeStamp is " - + storeFile.getReader().getMaxTimestamp() - + " while the max expired timestamp is " + maxExpiredTimeStamp); - if (expiredStoreFiles == null) { - expiredStoreFiles = new ArrayList(); - } - expiredStoreFiles.add(storeFile); - } - } - if (expiredStoreFiles != null && expiredStoreFiles.size() == 1 - && expiredStoreFiles.get(0).getReader().getEntries() == 0) { - // If just one empty store file, do not select for compaction. - return null; - } - return expiredStoreFiles; - } - - /** * @param candidates pre-filtrate * @return filtered subset * exclude all files above maxCompactSize diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java index d03bc63..9efd887 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java @@ -61,6 +61,7 @@ import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.io.hfile.HFileContext; import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; import org.apache.hadoop.hbase.monitoring.MonitoredTask; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration; import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor; @@ -275,11 +276,15 @@ public class TestStore { Configuration conf = HBaseConfiguration.create(); // Enable the expired store file deletion conf.setBoolean("hbase.store.delete.expired.storefile", true); + // Set the compaction threshold higher to avoid normal compactions. + conf.setInt(CompactionConfiguration.MIN_KEY, 5); + HColumnDescriptor hcd = new HColumnDescriptor(family); hcd.setTimeToLive(ttl); init(name.getMethodName(), conf, hcd); - long sleepTime = this.store.getScanInfo().getTtl() / storeFileNum; + long storeTtl = this.store.getScanInfo().getTtl(); + long sleepTime = storeTtl / storeFileNum; long timeStamp; // There are 4 store files and the max time stamp difference among these // store files will be (this.store.ttl / storeFileNum) @@ -296,29 +301,27 @@ public class TestStore { // Verify the total number of store files Assert.assertEquals(storeFileNum, this.store.getStorefiles().size()); - // Each compaction request will find one expired store file and delete it - // by the compaction. - for (int i = 1; i <= storeFileNum; i++) { + // Each call will find one expired store file and delete it before compaction happens. + // There will be no compaction due to threshold above. Last file will not be replaced. + for (int i = 1; i <= storeFileNum - 1; i++) { // verify the expired store file. - CompactionContext compaction = this.store.requestCompaction(); - CompactionRequest cr = compaction.getRequest(); - // the first is expired normally. - // If not the first compaction, there is another empty store file, - List files = new ArrayList(cr.getFiles()); - Assert.assertEquals(Math.min(i, 2), cr.getFiles().size()); - for (int j = 0; j < files.size(); j++) { - Assert.assertTrue(files.get(j).getReader().getMaxTimestamp() < (edge - .currentTimeMillis() - this.store.getScanInfo().getTtl())); + Assert.assertNull(this.store.requestCompaction()); + Collection sfs = this.store.getStorefiles(); + // Ensure i files are gone. + Assert.assertEquals(storeFileNum - i, sfs.size()); + // Ensure only non-expired files remain. + for (StoreFile sf : sfs) { + Assert.assertTrue(sf.getReader().getMaxTimestamp() >= (edge.currentTimeMillis() - storeTtl)); } - // Verify that the expired store file is compacted to an empty store file. - // Default compaction policy creates just one and only one compacted file. - StoreFile compactedFile = this.store.compact(compaction).get(0); - // It is an empty store file. - Assert.assertEquals(0, compactedFile.getReader().getEntries()); - // Let the next store file expired. edge.incrementTime(sleepTime); } + Assert.assertNull(this.store.requestCompaction()); + Collection sfs = this.store.getStorefiles(); + // Assert the last expired file is not removed. + Assert.assertEquals(1, sfs.size()); + long ts = sfs.iterator().next().getReader().getMaxTimestamp(); + Assert.assertTrue(ts < (edge.currentTimeMillis() - storeTtl)); } @Test