.../regionserver/DefaultStoreFileManager.java | 29 +- .../apache/hadoop/hbase/regionserver/HRegion.java | 66 +++- .../apache/hadoop/hbase/regionserver/HStore.java | 199 ++++++----- .../hbase/regionserver/ReversedStoreScanner.java | 19 +- .../apache/hadoop/hbase/regionserver/Store.java | 9 + .../hadoop/hbase/regionserver/StoreFile.java | 90 +++++ .../hbase/regionserver/StoreFileManager.java | 11 +- .../hbase/regionserver/StoreFileScanner.java | 10 +- .../hadoop/hbase/regionserver/StoreScanner.java | 87 ++--- .../hadoop/hbase/regionserver/StoreUtils.java | 38 ++ .../hbase/regionserver/StripeStoreFileManager.java | 72 +++- .../compactions/CompactedHFilesCleaner.java | 49 +++ .../compactions/RatioBasedCompactionPolicy.java | 27 +- .../org/apache/hadoop/hbase/TestIOFencing.java | 11 - .../example/TestZooKeeperTableArchiveClient.java | 26 +- .../hbase/client/TestBlockEvictionFromClient.java | 43 ++- .../hadoop/hbase/client/TestFromClientSide.java | 5 +- .../hadoop/hbase/client/TestFromClientSide3.java | 2 +- .../master/cleaner/TestSnapshotFromMaster.java | 1 + .../hadoop/hbase/regionserver/MockStoreFile.java | 5 + .../hadoop/hbase/regionserver/TestCompaction.java | 3 + .../regionserver/TestEncryptionKeyRotation.java | 47 ++- .../hadoop/hbase/regionserver/TestHMobStore.java | 3 +- .../hadoop/hbase/regionserver/TestHRegion.java | 20 +- .../regionserver/TestHRegionReplayEvents.java | 4 +- .../hbase/regionserver/TestMajorCompaction.java | 52 +-- .../hbase/regionserver/TestMinorCompaction.java | 14 +- .../hbase/regionserver/TestRegionReplicas.java | 11 +- .../TestSplitTransactionOnCluster.java | 5 +- .../hadoop/hbase/regionserver/TestStore.java | 76 +++- .../regionserver/TestStripeStoreFileManager.java | 75 ++-- .../compactions/TestCompactedHFilesCleaner.java | 388 +++++++++++++++++++++ .../TestCompactionWithThroughputController.java | 13 +- 33 files changed, 1186 insertions(+), 324 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java index c143c40..114b95c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java @@ -93,10 +93,9 @@ class DefaultStoreFileManager implements StoreFileManager { } @Override - public void addCompactionResults( + public void addNewCompactionResults( Collection compactedFiles, Collection results) { ArrayList newStoreFiles = Lists.newArrayList(storefiles); - newStoreFiles.removeAll(compactedFiles); if (!results.isEmpty()) { newStoreFiles.addAll(results); } @@ -104,6 +103,13 @@ class DefaultStoreFileManager implements StoreFileManager { } @Override + public void removeCompactedFiles(Collection compactedFiles) throws IOException { + ArrayList newStoreFiles = Lists.newArrayList(storefiles); + newStoreFiles.removeAll(compactedFiles); + sortAndSetStoreFiles(newStoreFiles); + } + + @Override public final Iterator getCandidateFilesForRowKeyBefore(final KeyValue targetKey) { return new ArrayList(Lists.reverse(this.storefiles)).iterator(); } @@ -123,7 +129,8 @@ class DefaultStoreFileManager implements StoreFileManager { if (this.storefiles.isEmpty()) { return null; } - return StoreUtils.getLargestFile(this.storefiles).getFileSplitPoint(this.kvComparator); + return StoreUtils.getLargestFileExcludingReferenceFiles(this.storefiles) + .getFileSplitPoint(this.kvComparator); } @Override @@ -147,14 +154,16 @@ class DefaultStoreFileManager implements StoreFileManager { // 2) Files that are not the latest can't become one due to (1), so the rest are fair game. for (int i = 0; i < files.size() - 1; ++i) { StoreFile sf = files.get(i); - long fileTs = sf.getReader().getMaxTimestamp(); - if (fileTs < maxTs && !filesCompacting.contains(sf)) { - LOG.info("Found an expired store file: " + sf.getPath() - + " whose maxTimeStamp is " + fileTs + ", which is below " + maxTs); - if (expiredStoreFiles == null) { - expiredStoreFiles = new ArrayList(); + if (!sf.isCompacted()) { + long fileTs = sf.getReader().getMaxTimestamp(); + if (fileTs < maxTs && !filesCompacting.contains(sf)) { + LOG.info("Found an expired store file: " + sf.getPath() + " whose maxTimeStamp is " + + fileTs + ", which is below " + maxTs); + if (expiredStoreFiles == null) { + expiredStoreFiles = new ArrayList(); + } + expiredStoreFiles.add(sf); } - expiredStoreFiles.add(sf); } } return expiredStoreFiles; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index b6cdd29..6718cf1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -1009,6 +1009,16 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi return false; } + /** + * @return True if all the references have been compacted. + */ + public boolean hasCompactedReferences() { + for (Store store : this.stores.values()) { + if (!store.hasCompactedReferences()) return false; + } + return true; + } + @Override public HDFSBlocksDistribution getHDFSBlocksDistribution() { HDFSBlocksDistribution hdfsBlocksDistribution = @@ -1229,7 +1239,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi /** @return true if region is splittable */ public boolean isSplittable() { - return isAvailable() && !hasReferences(); + return isAvailable() && hasCompactedReferences(); } /** @@ -1241,9 +1251,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi + " is not mergeable because it is closing or closed"); return false; } - if (hasReferences()) { + if (!hasCompactedReferences()) { + // check if we have references and they are marked compacted LOG.debug("Region " + getRegionInfo().getRegionNameAsString() - + " is not mergeable because it has references"); + + " is not mergeable because it has references that are not compacted"); return false; } @@ -4895,7 +4906,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi LOG.trace(getRegionInfo().getEncodedName() + " : Store files for region: "); for (Store s : stores.values()) { for (StoreFile sf : s.getStorefiles()) { - LOG.trace(getRegionInfo().getEncodedName() + " : " + sf); + if (!sf.isCompacted()) { + LOG.trace(getRegionInfo().getEncodedName() + " : " + sf); + } } } } @@ -5003,7 +5016,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi new String(column) + " available"); } for (StoreFile storeFile: store.getStorefiles()) { - storeFileNames.add(storeFile.getPath().toString()); + if (!storeFile.isCompacted()) { + storeFileNames.add(storeFile.getPath().toString()); + } } logRegionFiles(); @@ -6575,11 +6590,18 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi dstRegion.getRegionFileSystem().logFileSystemState(LOG); } - if (dstRegion.getRegionFileSystem().hasReferences(dstRegion.getTableDesc())) { - throw new IOException("Merged region " + dstRegion - + " still has references after the compaction, is compaction canceled?"); + // check if the references are cleared now by seeing if the ref files are in COMPACTED state + for (Store s : dstRegion.getStores()) { + if(!dstRegion.isCompacted((HStore)s, dstRegion.getRegionFileSystem())) { + throw new IOException("Merged region " + dstRegion + + " still has files that are not yet compacted, is compaction canceled?"); + } + } + // Try to clear the compacted files + for (Store s : dstRegion.getStores()) { + // Once this is done archiving should be completed + s.closeAndArchiveCompactedFiles(); } - // Archiving the 'A' region HFileArchiver.archiveRegion(a.getBaseConf(), fs, a.getRegionInfo()); // Archiving the 'B' region @@ -6589,6 +6611,32 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi return dstRegion; } + private boolean isCompacted(final HStore store, final HRegionFileSystem fs) throws IOException { + Collection storeFileInfosForRegion = + fs.getStoreFiles(store.getColumnFamilyName()); + Collection storefiles = store.getStoreEngine().getStoreFileManager().getStorefiles(); + List regionStoreFiles = new ArrayList(storefiles.size()); + // Find if the store files are compacted only for those store files associated with the region + for (StoreFile file : storefiles) { + if (storeFileInfosForRegion.contains(file.getFileInfo())) { + regionStoreFiles.add(file); + } + } + if (storefiles != null) { + for (StoreFile file : regionStoreFiles) { + try { + StoreFile.Reader r = file.getReader(); + if (r != null && !r.isReadyForCloseAfterCompaction()) { + return false; + } + } catch (Exception e) { + LOG.error("Exception while trying to check for compacted store file " + + file.getPath().getName()); + } + } + } + return true; + } @Override public Result get(final Get get) throws IOException { prepareGet(get); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index cfda1c6..d535de1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -50,6 +50,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.ChoreService; import org.apache.hadoop.hbase.CompoundConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; @@ -79,6 +80,7 @@ import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor; +import org.apache.hadoop.hbase.regionserver.compactions.CompactedHFilesCleaner; import org.apache.hadoop.hbase.regionserver.compactions.OffPeakHours; import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController; import org.apache.hadoop.hbase.regionserver.wal.WALUtil; @@ -90,7 +92,6 @@ import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.ReflectionUtils; -import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix; @@ -181,6 +182,7 @@ public class HStore implements Store { private long blockingFileCount; private int compactionCheckMultiplier; + private ChoreService choreService; protected Encryption.Context cryptoContext = Encryption.Context.NONE; @@ -260,6 +262,7 @@ public class HStore implements Store { this.storeEngine = createStoreEngine(this, this.conf, this.comparator); this.storeEngine.getStoreFileManager().loadFiles(loadStoreFiles()); + // Initialize checksum type from name. The names are CRC32, CRC32C, etc. this.checksumType = getChecksumType(conf); // initilize bytes per checksum @@ -273,6 +276,19 @@ public class HStore implements Store { + flushRetriesNumber); } + // Start the CompactedHFileCleaner here + if (this.region.getRegionServerServices() != null) { + choreService = this.region.getRegionServerServices().getChoreService(); + if (choreService != null) { + // default is 5 mins + int cleanerInterval = + conf.getInt("hbase.hfile.compactions.cleaner.interval", 5 * 60 * 1000); + CompactedHFilesCleaner compactedFileCleaner = new CompactedHFilesCleaner(cleanerInterval, + this.region.getRegionServerServices(), this); + choreService.scheduleChore(compactedFileCleaner); + } + } + // Crypto context for new store files String cipherName = family.getEncryptionType(); if (cipherName != null) { @@ -551,14 +567,15 @@ public class HStore implements Store { try { Future future = completionService.take(); StoreFile storeFile = future.get(); - long length = storeFile.getReader().length(); - this.storeSize += length; - this.totalUncompressedBytes += - storeFile.getReader().getTotalUncompressedBytes(); - if (LOG.isDebugEnabled()) { - LOG.debug("loaded " + storeFile.toStringDetailed()); + if (storeFile != null) { + long length = storeFile.getReader().length(); + this.storeSize += length; + this.totalUncompressedBytes += storeFile.getReader().getTotalUncompressedBytes(); + if (LOG.isDebugEnabled()) { + LOG.debug("loaded " + storeFile.toStringDetailed()); + } + results.add(storeFile); } - results.add(storeFile); } catch (InterruptedException e) { if (ioe == null) ioe = new InterruptedIOException(e.getMessage()); } catch (ExecutionException e) { @@ -622,7 +639,9 @@ public class HStore implements Store { HashMap currentFilesSet = new HashMap(currentFiles.size()); for (StoreFile sf : currentFiles) { - currentFilesSet.put(sf.getFileInfo(), sf); + if (!sf.isCompacted()) { + currentFilesSet.put(sf.getFileInfo(), sf); + } } HashSet newFilesSet = new HashSet(newFiles); @@ -655,7 +674,7 @@ public class HStore implements Store { } // notify scanners, close file readers, and recompute store size - completeCompaction(toBeRemovedStoreFiles, false); + completeCompaction(toBeRemovedStoreFiles); } private StoreFile createStoreFileAndReader(final Path p) throws IOException { @@ -668,8 +687,14 @@ public class HStore implements Store { info.setRegionCoprocessorHost(this.region.getCoprocessorHost()); StoreFile storeFile = new StoreFile(this.getFileSystem(), info, this.conf, this.cacheConf, this.family.getBloomFilterType()); - StoreFile.Reader r = storeFile.createReader(); - r.setReplicaStoreFile(isPrimaryReplicaStore()); + try { + StoreFile.Reader r = storeFile.createReader(); + r.setReplicaStoreFile(isPrimaryReplicaStore()); + } catch (Exception e) { + LOG.info( + "The reader for the file " + storeFile.getFileInfo().getPath() + " could not be found"); + return null; + } return storeFile; } @@ -832,7 +857,6 @@ public class HStore implements Store { // the lock. this.lock.writeLock().unlock(); } - notifyChangedReadersObservers(); LOG.info("Loaded HFile " + sf.getFileInfo() + " into store '" + getColumnFamilyName()); if (LOG.isTraceEnabled()) { String traceMessage = "BULK LOAD time,size,store size,store files [" @@ -1084,9 +1108,6 @@ public class HStore implements Store { this.lock.writeLock().unlock(); } - // Tell listeners of the change in readers. - notifyChangedReadersObservers(); - if (LOG.isTraceEnabled()) { long totalSize = 0; for (StoreFile sf : sfs) { @@ -1100,16 +1121,6 @@ public class HStore implements Store { return needsCompaction(); } - /* - * Notify all observers that set of Readers has changed. - * @throws IOException - */ - private void notifyChangedReadersObservers() throws IOException { - for (ChangedReadersObserver o: this.changedReaderObservers) { - o.updateReaders(); - } - } - /** * Get all scanners with no filtering based on TTL (that happens further down * the line). @@ -1256,7 +1267,7 @@ public class HStore implements Store { compactedCellsSize += getCompactionProgress().totalCompactedSize; } // At this point the store will use new files for all new scanners. - completeCompaction(filesToCompact, true); // Archive old files & update store size. + completeCompaction(filesToCompact); // Archive old files & update store size. logCompactionEndMessage(cr, sfs, compactionStartTime); return sfs; @@ -1316,7 +1327,7 @@ public class HStore implements Store { final Collection result) throws IOException { this.lock.writeLock().lock(); try { - this.storeEngine.getStoreFileManager().addCompactionResults(compactedFiles, result); + this.storeEngine.getStoreFileManager().addNewCompactionResults(compactedFiles, result); filesCompacting.removeAll(compactedFiles); // safe bc: lock.writeLock(); } finally { this.lock.writeLock().unlock(); @@ -1427,7 +1438,7 @@ public class HStore implements Store { LOG.info("Replaying compaction marker, replacing input files: " + inputStoreFiles + " with output files : " + outputStoreFiles); this.replaceStoreFiles(inputStoreFiles, outputStoreFiles); - this.completeCompaction(inputStoreFiles, removeFiles); + this.completeCompaction(inputStoreFiles); } } @@ -1479,7 +1490,7 @@ public class HStore implements Store { this.getCoprocessorHost().postCompact(this, sf, null); } replaceStoreFiles(filesToCompact, Lists.newArrayList(sf)); - completeCompaction(filesToCompact, true); + completeCompaction(filesToCompact); } } finally { synchronized (filesCompacting) { @@ -1494,6 +1505,12 @@ public class HStore implements Store { } @Override + public boolean hasCompactedReferences() { + return StoreUtils + .hasCompactedReferences(this.storeEngine.getStoreFileManager().getStorefiles()); + } + + @Override public CompactionProgress getCompactionProgress() { return this.storeEngine.getCompactor().getProgress(); } @@ -1700,63 +1717,29 @@ public class HStore implements Store { */ @VisibleForTesting protected void completeCompaction(final Collection compactedFiles) - throws IOException { - completeCompaction(compactedFiles, true); - } - - - /** - *

It works by processing a compaction that's been written to disk. - * - *

It is usually invoked at the end of a compaction, but might also be - * invoked at HStore startup, if the prior execution died midway through. - * - *

Moving the compacted TreeMap into place means: - *

-   * 1) Unload all replaced StoreFile, close and collect list to delete.
-   * 2) Compute new store size
-   * 
- * - * @param compactedFiles list of files that were compacted - */ - @VisibleForTesting - protected void completeCompaction(final Collection compactedFiles, boolean removeFiles) throws IOException { - try { - // Do not delete old store files until we have sent out notification of - // change in case old files are still being accessed by outstanding scanners. - // Don't do this under writeLock; see HBASE-4485 for a possible deadlock - // scenario that could have happened if continue to hold the lock. - notifyChangedReadersObservers(); - // At this point the store will use new files for all scanners. - - // let the archive util decide if we should archive or delete the files - LOG.debug("Removing store files after compaction..."); - for (StoreFile compactedFile : compactedFiles) { - compactedFile.closeReader(true); - } - if (removeFiles) { - this.fs.removeStoreFiles(this.getColumnFamilyName(), compactedFiles); - } - } catch (IOException e) { - e = e instanceof RemoteException ? - ((RemoteException)e).unwrapRemoteException() : e; - LOG.error("Failed removing compacted files in " + this + - ". Files we were trying to remove are " + compactedFiles.toString() + - "; some of them may have been already removed", e); + LOG.debug("Removing store files after compaction..."); + for (StoreFile compactedFile : compactedFiles) { + // Cannot close the reader directly. Just mark the state as compacted + // Let a background thread close the actual reader and also + // ensure to evict the blocks from block cache so that they are no longer in + // cache + compactedFile.markCompacted(); } // 4. Compute new store size this.storeSize = 0L; this.totalUncompressedBytes = 0L; for (StoreFile hsf : this.storeEngine.getStoreFileManager().getStorefiles()) { - StoreFile.Reader r = hsf.getReader(); - if (r == null) { - LOG.warn("StoreFile " + hsf + " has a null Reader"); - continue; + if (!hsf.isCompacted()) { + StoreFile.Reader r = hsf.getReader(); + if (r == null) { + LOG.warn("StoreFile " + hsf + " has a null Reader"); + continue; + } + this.storeSize += r.length(); + this.totalUncompressedBytes += r.getTotalUncompressedBytes(); } - this.storeSize += r.length(); - this.totalUncompressedBytes += r.getTotalUncompressedBytes(); } } @@ -1811,7 +1794,7 @@ public class HStore implements Store { this.lock.readLock().lock(); try { // Not split-able if we find a reference store file present in the store. - boolean result = !hasReferences(); + boolean result = hasCompactedReferences(); if (!result && LOG.isDebugEnabled()) { LOG.debug("Cannot split region due to reference files being there"); } @@ -1828,7 +1811,7 @@ public class HStore implements Store { // Should already be enforced by the split policy! assert !this.getRegionInfo().isMetaRegion(); // Not split-able if we find a reference store file present in the store. - if (hasReferences()) { + if (!hasCompactedReferences()) { return null; } return this.storeEngine.getStoreFileManager().getSplitPoint(); @@ -2170,7 +2153,7 @@ public class HStore implements Store { } public static final long FIXED_OVERHEAD = - ClassSize.align(ClassSize.OBJECT + (16 * ClassSize.REFERENCE) + (10 * Bytes.SIZEOF_LONG) + ClassSize.align(ClassSize.OBJECT + (17 * ClassSize.REFERENCE) + (10 * Bytes.SIZEOF_LONG) + (5 * Bytes.SIZEOF_INT) + (2 * Bytes.SIZEOF_BOOLEAN)); public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD @@ -2287,4 +2270,56 @@ public class HStore implements Store { public boolean isPrimaryReplicaStore() { return getRegionInfo().getReplicaId() == HRegionInfo.DEFAULT_REPLICA_ID; } + + @Override + public void closeAndArchiveCompactedFiles() { + lock.readLock().lock(); + Collection storefiles = null; + try { + storefiles = this.getStoreEngine().getStoreFileManager().getStorefiles(); + } finally { + lock.readLock().unlock(); + } + try { + // Try using futures here?? + List filesToRemove = new ArrayList(); + if (storefiles != null) { + for (StoreFile file : storefiles) { + try { + // Ideally this should not create a new one + StoreFile.Reader r = file.getReader(); + if (r != null && r.isReadyForCloseAfterCompaction()) { + // Even if deleting fails we need not bother as any new scanners won't be + // able to use the compacted file as the status is already set to COMPACTED + if (LOG.isTraceEnabled()) { + LOG.trace("Closing and archiving the file " + file.getPath()); + } + r.close(true); + filesToRemove.add(file); + } + } catch (Exception e) { + LOG.error("Exception while trying to close the compacted store file " + + file.getPath().getName()); + } + } + } + removedCompactedFiles(filesToRemove); + } catch (Exception e) { + // catch any exception. + LOG.error("Exception while removing the compacted for the family " + + this.getFamily().getNameAsString()); + } + } + + private void removedCompactedFiles(List filesToArchive) throws IOException { + lock.writeLock().lock(); + try { + if (!filesToArchive.isEmpty()) { + this.getStoreEngine().getStoreFileManager().removeCompactedFiles(filesToArchive); + this.fs.removeStoreFiles(this.getFamily().getNameAsString(), filesToArchive); + } + } finally { + lock.writeLock().unlock(); + } + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedStoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedStoreScanner.java index d198d7b..0e1d90f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedStoreScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedStoreScanner.java @@ -123,24 +123,13 @@ class ReversedStoreScanner extends StoreScanner implements KeyValueScanner { @Override public boolean seekToPreviousRow(Cell key) throws IOException { - lock.lock(); - try { - checkReseek(); - return this.heap.seekToPreviousRow(key); - } finally { - lock.unlock(); - } - + checkReseek(); + return this.heap.seekToPreviousRow(key); } @Override public boolean backwardSeek(Cell key) throws IOException { - lock.lock(); - try { - checkReseek(); - return this.heap.backwardSeek(key); - } finally { - lock.unlock(); - } + checkReseek(); + return this.heap.backwardSeek(key); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java index 8d35a7d..9ebd86d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java @@ -267,6 +267,10 @@ public interface Store extends HeapSize, StoreConfigInformation, PropagatingConf boolean hasReferences(); /** + * @return true if the store has compacted references, false if the references are not compacted + */ + boolean hasCompactedReferences(); + /** * @return The size of this store's memstore, in bytes */ long getMemStoreSize(); @@ -452,4 +456,9 @@ public interface Store extends HeapSize, StoreConfigInformation, PropagatingConf void bulkLoadHFile(StoreFileInfo fileInfo) throws IOException; boolean isPrimaryReplicaStore(); + + /** + * Closes and archives the compacted files under this store + */ + void closeAndArchiveCompactedFiles(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java index 11d71cf..e36c0a9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java @@ -29,6 +29,8 @@ import java.util.Map; import java.util.SortedSet; import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -60,6 +62,7 @@ import org.apache.hadoop.hbase.util.Writables; import org.apache.hadoop.io.WritableUtils; import org.apache.hadoop.hbase.io.hfile.HFileBlock; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; @@ -129,6 +132,11 @@ public class StoreFile { // Set when we obtain a Reader. private long maxMemstoreTS = -1; + // Indicates whether the current store file is compacted or not + private enum CompactionStatus { + NOT_COMPACTED, COMPACTED; + } + public long getMaxMemstoreTS() { return maxMemstoreTS; } @@ -354,6 +362,19 @@ public class StoreFile { return bulkLoadedHFile || metadataMap.containsKey(BULKLOAD_TIME_KEY); } + @VisibleForTesting + public boolean isCompacted() { + if (this.reader != null) { + return this.reader.isCompacted(); + } + return true; + } + + @VisibleForTesting + public int getRefCount() { + return this.reader.refCount.get(); + } + /** * Return the timestamp at which this bulk load file was generated. */ @@ -523,6 +544,15 @@ public class StoreFile { } /** + * Marks the status of the file as compacted. + */ + public void markCompacted() { + if(this.reader != null) { + this.reader.setCompactionStatus(CompactionStatus.COMPACTED); + } + } + + /** * Delete this file * @throws IOException */ @@ -1105,6 +1135,10 @@ public class StoreFile { private boolean bulkLoadResult = false; private KeyValue.KeyOnlyKeyValue lastBloomKeyOnlyKV = null; private boolean skipResetSeqId = true; + private AtomicInteger refCount = new AtomicInteger(0); + // lock to update the compaction status and read the compaction status + private ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + private CompactionStatus compactionStatus = CompactionStatus.NOT_COMPACTED; public Reader(FileSystem fs, Path path, CacheConfig cacheConf, Configuration conf) throws IOException { @@ -1112,6 +1146,15 @@ public class StoreFile { bloomFilterType = BloomType.NONE; } + void setCompactionStatus(CompactionStatus status) { + lock.writeLock().lock(); + try { + this.compactionStatus = status; + } finally { + lock.writeLock().unlock(); + } + } + public Reader(FileSystem fs, Path path, FSDataInputStreamWrapper in, long size, CacheConfig cacheConf, Configuration conf) throws IOException { reader = HFile.createReader(fs, path, in, size, cacheConf, conf); @@ -1121,6 +1164,7 @@ public class StoreFile { public void setReplicaStoreFile(boolean isPrimaryReplicaStoreFile) { reader.setPrimaryReplicaReader(isPrimaryReplicaStoreFile); } + public boolean isPrimaryReplicaReader() { return reader.isPrimaryReplicaReader(); } @@ -1163,12 +1207,58 @@ public class StoreFile { public StoreFileScanner getStoreFileScanner(boolean cacheBlocks, boolean pread, boolean isCompaction, long readPt) { + // Increment every time + lock.readLock().lock(); + try { + if (compactionStatus == CompactionStatus.COMPACTED) { + // outside the lock increment the ref count. Handle null in the caller + return null; + } + } finally { + lock.readLock().unlock(); + } + refCount.incrementAndGet(); return new StoreFileScanner(this, getScanner(cacheBlocks, pread, isCompaction), !isCompaction, reader.hasMVCCInfo(), readPt); } /** + * Decrement the ref count associated with the reader when ever a scanner associated + * with the reader is closed + */ + void decrementRefCount() { + refCount.decrementAndGet(); + } + + /** + * @return true if the file is compacted and if there are no references + */ + public boolean isReadyForCloseAfterCompaction() { + boolean compacted = false; + lock.readLock().lock(); + try { + compacted = compactionStatus == CompactionStatus.COMPACTED; + } finally { + lock.readLock().unlock(); + } + return (compacted && refCount.get() == 0); + } + + /** + * @return true if the file is compacted + */ + boolean isCompacted() { + lock.readLock().lock(); + try { + return ((compactionStatus == CompactionStatus.COMPACTED) ? true + : false); + } finally { + lock.readLock().unlock(); + } + } + + /** * Warning: Do not write further code which depends on this call. Instead * use getStoreFileScanner() which uses the StoreFileScanner class/interface * which is the preferred way to scan a store with higher level concepts. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileManager.java index 11993db..a1ca686 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileManager.java @@ -53,14 +53,21 @@ public interface StoreFileManager { void insertNewFiles(Collection sfs) throws IOException; /** - * Adds compaction results into the structure. + * Adds only the new compaction results into the structure. Does not remove the compacted files * @param compactedFiles The input files for the compaction. * @param results The resulting files for the compaction. */ - void addCompactionResults( + void addNewCompactionResults( Collection compactedFiles, Collection results) throws IOException; /** + * Remove the compacted files + * @param compactedFiles the list of compacted files + * @throws IOException + */ + void removeCompactedFiles(Collection compactedFiles) throws IOException; + + /** * Clears all the files currently in use and returns them. * @return The files previously in use. */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java index f9e1a3c..7a29b7b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java @@ -120,8 +120,11 @@ public class StoreFileScanner implements KeyValueScanner { r.setReplicaStoreFile(isPrimaryReplica); StoreFileScanner scanner = r.getStoreFileScanner(cacheBlocks, usePread, isCompaction, readPt); - scanner.setScanQueryMatcher(matcher); - scanners.add(scanner); + if (scanner != null) { + // It could be an already compacted file + scanner.setScanQueryMatcher(matcher); + scanners.add(scanner); + } } return scanners; } @@ -248,6 +251,9 @@ public class StoreFileScanner implements KeyValueScanner { public void close() { cur = null; this.hfs.close(); + if (this.reader != null) { + this.reader.decrementRefCount(); + } } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java index 26fc338..9c3719e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java @@ -423,15 +423,10 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner @Override public Cell peek() { - lock.lock(); - try { if (this.heap == null) { return this.lastTop; } return this.heap.peek(); - } finally { - lock.unlock(); - } } @Override @@ -446,45 +441,35 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner } private void close(boolean withHeapClose){ - lock.lock(); - try { - if (this.closing) { - return; + if (this.closing) { + return; + } + if (withHeapClose) this.closing = true; + // under test, we dont have a this.store + if (this.store != null) this.store.deleteChangedReaderObserver(this); + if (withHeapClose) { + for (KeyValueHeap h : this.heapsForDelayedClose) { + h.close(); } - if (withHeapClose) this.closing = true; - // under test, we dont have a this.store - if (this.store != null) this.store.deleteChangedReaderObserver(this); - if (withHeapClose) { - for (KeyValueHeap h : this.heapsForDelayedClose) { - h.close(); - } - this.heapsForDelayedClose.clear(); - if (this.heap != null) { - this.heap.close(); - this.heap = null; // CLOSED! - } - } else { - if (this.heap != null) { - this.heapsForDelayedClose.add(this.heap); - this.heap = null; - } + this.heapsForDelayedClose.clear(); + if (this.heap != null) { + this.heap.close(); + this.heap = null; // CLOSED! + } + } else { + if (this.heap != null) { + this.heapsForDelayedClose.add(this.heap); + this.heap = null; } - this.lastTop = null; // If both are null, we are closed. - } finally { - lock.unlock(); } + this.lastTop = null; // If both are null, we are closed. } @Override public boolean seek(Cell key) throws IOException { - lock.lock(); - try { // reset matcher state, in case that underlying store changed checkReseek(); return this.heap.seek(key); - } finally { - lock.unlock(); - } } @Override @@ -500,9 +485,6 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner */ @Override public boolean next(List outResult, ScannerContext scannerContext) throws IOException { - lock.lock(); - - try { if (scannerContext == null) { throw new IllegalArgumentException("Scanner context cannot be null"); } @@ -666,9 +648,6 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner // No more keys close(false);// Do all cleanup except heap.close() return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); - } finally { - lock.unlock(); - } } /* @@ -815,19 +794,14 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner @Override public boolean reseek(Cell kv) throws IOException { - lock.lock(); - try { - //Heap will not be null, if this is called from next() which. - //If called from RegionScanner.reseek(...) make sure the scanner - //stack is reset if needed. + // Heap will not be null, if this is called from next() which. + // If called from RegionScanner.reseek(...) make sure the scanner + // stack is reset if needed. checkReseek(); if (explicitColumnQuery && lazySeekEnabledGlobally) { return heap.requestSeek(kv, true, useRowColBloom); } return heap.reseek(kv); - } finally { - lock.unlock(); - } } @Override @@ -905,17 +879,12 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner @Override public void shipped() throws IOException { - lock.lock(); - try { - for (KeyValueHeap h : this.heapsForDelayedClose) { - h.close();// There wont be further fetch of Cells from these scanners. Just close. - } - this.heapsForDelayedClose.clear(); - if (this.heap != null) { - this.heap.shipped(); - } - } finally { - lock.unlock(); + for (KeyValueHeap h : this.heapsForDelayedClose) { + h.close();// There wont be further fetch of Cells from these scanners. Just close. + } + this.heapsForDelayedClose.clear(); + if (this.heap != null) { + this.heap.shipped(); } } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreUtils.java index 3d4e990..402d1e5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreUtils.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreUtils.java @@ -54,6 +54,22 @@ public class StoreUtils { } /** + * Determines whether any files in the collection are references. + * @param files The files. + * @return true if the references are all compacted, false otherwise + */ + public static boolean hasCompactedReferences(final Collection files) { + if (files != null) { + for (StoreFile hsf : files) { + if (hsf.isReference() && !hsf.isCompacted()) { + return false; + } + } + } + return true; + } + + /** * Gets lowest timestamp from candidate StoreFiles */ public static long getLowestTimestamp(final Collection candidates) @@ -84,4 +100,26 @@ public class StoreUtils { } return largestSf; } + + /** + * Gets the largest file (with reader) out of the list of files excluding + * reference files. + * @param candidates The files to choose from. + * @return The largest file; null if no file has a reader. + */ + static StoreFile getLargestFileExcludingReferenceFiles(final Collection candidates) { + long maxSize = -1L; + StoreFile largestSf = null; + for (StoreFile sf : candidates) { + StoreFile.Reader r = sf.getReader(); + if (sf.isReference() && sf.isCompacted()) continue; + if (r == null) continue; + long size = r.length(); + if (size > maxSize) { + maxSize = size; + largestSf = sf; + } + } + return largestSf; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFileManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFileManager.java index bb49aba..d6ed193 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFileManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFileManager.java @@ -143,6 +143,7 @@ public class StripeStoreFileManager @Override public void insertNewFiles(Collection sfs) throws IOException { CompactionOrFlushMergeCopy cmc = new CompactionOrFlushMergeCopy(true); + // Passing null does not cause NPE?? cmc.mergeResults(null, sfs); debugDumpState("Added new files"); } @@ -297,7 +298,7 @@ public class StripeStoreFileManager } @Override - public void addCompactionResults( + public void addNewCompactionResults( Collection compactedFiles, Collection results) throws IOException { // See class comment for the assumptions we make here. LOG.debug("Attempting to merge compaction results: " + compactedFiles.size() @@ -305,11 +306,21 @@ public class StripeStoreFileManager // In order to be able to fail in the middle of the operation, we'll operate on lazy // copies and apply the result at the end. CompactionOrFlushMergeCopy cmc = new CompactionOrFlushMergeCopy(false); - cmc.mergeResults(compactedFiles, results); + cmc.addResults(results); debugDumpState("Merged compaction results"); } @Override + public void removeCompactedFiles(Collection compactedFiles) throws IOException { + // See class comment for the assumptions we make here. + LOG.debug("Attempting to delete compaction results: " + compactedFiles.size()); + // In order to be able to fail in the middle of the operation, we'll operate on lazy + // copies and apply the result at the end. + CompactionOrFlushMergeCopy cmc = new CompactionOrFlushMergeCopy(false); + cmc.deleteResults(compactedFiles); + debugDumpState("Deleted compaction results"); + } + @Override public int getStoreCompactionPriority() { // If there's only L0, do what the default store does. // If we are in critical priority, do the same - we don't want to trump all stores all @@ -684,7 +695,7 @@ public class StripeStoreFileManager this.isFlush = isFlush; } - public void mergeResults(Collection compactedFiles, Collection results) + private void mergeResults(Collection compactedFiles, Collection results) throws IOException { assert this.compactedFiles == null && this.results == null; this.compactedFiles = compactedFiles; @@ -696,12 +707,35 @@ public class StripeStoreFileManager processNewCandidateStripes(newStripes); } // Create new state and update parent. - State state = createNewState(); + State state = createNewState(false); + StripeStoreFileManager.this.state = state; + updateMetadataMaps(false); + } + + private void addResults(Collection results) throws IOException { + this.results = results; + // Do logical processing. + TreeMap newStripes = processResults(); + if (newStripes != null) { + processNewCandidateStripes(newStripes); + } + // Create new state and update parent. + State state = createNewState(false); + StripeStoreFileManager.this.state = state; + updateMetadataMaps(false); + } + + private void deleteResults(Collection compactedFiles) throws IOException { + this.compactedFiles = compactedFiles; + // Do logical processing. + if (!isFlush) removeCompactedFiles(); + // Create new state and update parent. + State state = createNewState(true); StripeStoreFileManager.this.state = state; - updateMetadataMaps(); + updateMetadataMaps(true); } - private State createNewState() { + private State createNewState(boolean compacted) { State oldState = StripeStoreFileManager.this.state; // Stripe count should be the same unless the end rows changed. assert oldState.stripeFiles.size() == this.stripeFiles.size() || this.stripeEndRows != null; @@ -717,15 +751,15 @@ public class StripeStoreFileManager } List newAllFiles = new ArrayList(oldState.allFilesCached); - if (!isFlush) newAllFiles.removeAll(compactedFiles); - newAllFiles.addAll(results); + if (compacted && !isFlush) newAllFiles.removeAll(compactedFiles); + if (!compacted) newAllFiles.addAll(results); newState.allFilesCached = ImmutableList.copyOf(newAllFiles); return newState; } - private void updateMetadataMaps() { + private void updateMetadataMaps(boolean compacted) { StripeStoreFileManager parent = StripeStoreFileManager.this; - if (!isFlush) { + if (compacted && !isFlush) { for (StoreFile sf : this.compactedFiles) { parent.fileStarts.remove(sf); parent.fileEnds.remove(sf); @@ -828,7 +862,7 @@ public class StripeStoreFileManager } /** - * See {@link #addCompactionResults(Collection, Collection)} - updates the stripe list with + * See {@link #addNewCompactionResults(Collection, Collection)} - updates the stripe list with * new candidate stripes/removes old stripes; produces new set of stripe end rows. * @param newStripes New stripes - files by end row. */ @@ -970,14 +1004,16 @@ public class StripeStoreFileManager // Order by seqnum is reversed. for (int i = 1; i < stripe.size(); ++i) { StoreFile sf = stripe.get(i); - long fileTs = sf.getReader().getMaxTimestamp(); - if (fileTs < maxTs && !filesCompacting.contains(sf)) { - LOG.info("Found an expired store file: " + sf.getPath() - + " whose maxTimeStamp is " + fileTs + ", which is below " + maxTs); - if (expiredStoreFiles == null) { - expiredStoreFiles = new ArrayList(); + if (!sf.isCompacted()) { + long fileTs = sf.getReader().getMaxTimestamp(); + if (fileTs < maxTs && !filesCompacting.contains(sf)) { + LOG.info("Found an expired store file: " + sf.getPath() + " whose maxTimeStamp is " + + fileTs + ", which is below " + maxTs); + if (expiredStoreFiles == null) { + expiredStoreFiles = new ArrayList(); + } + expiredStoreFiles.add(sf); } - expiredStoreFiles.add(sf); } } return expiredStoreFiles; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactedHFilesCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactedHFilesCleaner.java new file mode 100644 index 0000000..e17d052 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactedHFilesCleaner.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.compactions; + +import org.apache.hadoop.hbase.ScheduledChore; +import org.apache.hadoop.hbase.Stoppable; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.regionserver.HStore; + +/** + * A cleaner that periodically cleans up the compacted files when there are no active readers + * using those compacted files and also helps in clearing the block cache with these compacted + * file entries + */ +@InterfaceAudience.Private +public class CompactedHFilesCleaner extends ScheduledChore { + private HStore store; + + /** + * @param period the period of time to sleep between each run + * @param stopper the stopper + * @param store the store to identify the family name + */ + public CompactedHFilesCleaner(final int period, final Stoppable stopper, final HStore store) { + // Need to add the config classes + super("HFileCompactedFilesCleaner", stopper, period); + this.store = store; + } + + @Override + public void chore() { + store.closeAndArchiveCompactedFiles(); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/RatioBasedCompactionPolicy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/RatioBasedCompactionPolicy.java index 5aeff5c..441e0b8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/RatioBasedCompactionPolicy.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/RatioBasedCompactionPolicy.java @@ -81,23 +81,32 @@ public class RatioBasedCompactionPolicy extends CompactionPolicy { final List filesCompacting, final boolean isUserCompaction, final boolean mayUseOffPeak, final boolean forceMajor) throws IOException { // Preliminary compaction subject to filters - ArrayList candidateSelection = new ArrayList(candidateFiles); + ArrayList nonCompactedCandidateSelection = + new ArrayList(candidateFiles.size()); + for (StoreFile file : candidateFiles) { + // Remove the compacted file from the list + if (!file.isCompacted()) { + nonCompactedCandidateSelection.add(file); + } + } + ArrayList candidateSelection = + new ArrayList(nonCompactedCandidateSelection); // Stuck and not compacting enough (estimate). It is not guaranteed that we will be // able to compact more if stuck and compacting, because ratio policy excludes some // non-compacting files from consideration during compaction (see getCurrentEligibleFiles). int futureFiles = filesCompacting.isEmpty() ? 0 : 1; - boolean mayBeStuck = (candidateFiles.size() - filesCompacting.size() + futureFiles) - >= storeConfigInfo.getBlockingFileCount(); + boolean mayBeStuck = (nonCompactedCandidateSelection.size() - filesCompacting.size() + + futureFiles) >= storeConfigInfo.getBlockingFileCount(); candidateSelection = getCurrentEligibleFiles(candidateSelection, filesCompacting); - LOG.debug("Selecting compaction from " + candidateFiles.size() + " store files, " + - filesCompacting.size() + " compacting, " + candidateSelection.size() + - " eligible, " + storeConfigInfo.getBlockingFileCount() + " blocking"); + LOG.debug("Selecting compaction from " + nonCompactedCandidateSelection.size() + + " store files, " + filesCompacting.size() + " compacting, " + candidateSelection.size() + + " eligible, " + storeConfigInfo.getBlockingFileCount() + " blocking"); // If we can't have all files, we cannot do major anyway - boolean isAllFiles = candidateFiles.size() == candidateSelection.size(); + boolean isAllFiles = nonCompactedCandidateSelection.size() == candidateSelection.size(); if (!(forceMajor && isAllFiles)) { candidateSelection = skipLargeFiles(candidateSelection); - isAllFiles = candidateFiles.size() == candidateSelection.size(); + isAllFiles = nonCompactedCandidateSelection.size() == candidateSelection.size(); } // Try a major compaction if this is a user-requested major compaction, @@ -115,7 +124,7 @@ public class RatioBasedCompactionPolicy extends CompactionPolicy { } candidateSelection = removeExcessFiles(candidateSelection, isUserCompaction, isTryingMajor); // Now we have the final file list, so we can determine if we can do major/all files. - isAllFiles = (candidateFiles.size() == candidateSelection.size()); + isAllFiles = (nonCompactedCandidateSelection.size() == candidateSelection.size()); CompactionRequest result = new CompactionRequest(candidateSelection); result.setOffPeak(!candidateSelection.isEmpty() && !isAllFiles && mayUseOffPeak); result.setIsMajor(isTryingMajor && isAllFiles, isAllFiles); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java index bb216b6..eea82d5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java @@ -187,17 +187,6 @@ public class TestIOFencing { } @Override - protected void completeCompaction(final Collection compactedFiles, - boolean removeFiles) throws IOException { - try { - r.compactionsWaiting.countDown(); - r.compactionsBlocked.await(); - } catch (InterruptedException ex) { - throw new IOException(ex); - } - super.completeCompaction(compactedFiles, removeFiles); - } - @Override protected void completeCompaction(Collection compactedFiles) throws IOException { try { r.compactionsWaiting.countDown(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/example/TestZooKeeperTableArchiveClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/example/TestZooKeeperTableArchiveClient.java index eba3c0b..f9b0cea 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/example/TestZooKeeperTableArchiveClient.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/example/TestZooKeeperTableArchiveClient.java @@ -42,8 +42,13 @@ import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.master.cleaner.BaseHFileCleanerDelegate; import org.apache.hadoop.hbase.master.cleaner.HFileCleaner; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.HRegionFileSystem; +import org.apache.hadoop.hbase.regionserver.HStore; import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.Store; +import org.apache.hadoop.hbase.regionserver.StoreFile; +import org.apache.hadoop.hbase.regionserver.compactions.CompactedHFilesCleaner; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.MiscTests; import org.apache.hadoop.hbase.util.Bytes; @@ -170,10 +175,11 @@ public class TestZooKeeperTableArchiveClient { // create the region HColumnDescriptor hcd = new HColumnDescriptor(TEST_FAM); - Region region = UTIL.createTestRegion(STRING_TABLE_NAME, hcd); - + HRegion region = UTIL.createTestRegion(STRING_TABLE_NAME, hcd); + final CompactedHFilesCleaner compactionCleaner = + new CompactedHFilesCleaner(100, stop, (HStore) region.getStore(TEST_FAM)); loadFlushAndCompact(region, TEST_FAM); - + compactionCleaner.chore(); // get the current hfiles in the archive directory List files = getAllFiles(fs, archiveDir); if (files == null) { @@ -217,18 +223,22 @@ public class TestZooKeeperTableArchiveClient { HFileCleaner cleaner = setupAndCreateCleaner(conf, fs, archiveDir, stop); List cleaners = turnOnArchiving(STRING_TABLE_NAME, cleaner); final LongTermArchivingHFileCleaner delegate = (LongTermArchivingHFileCleaner) cleaners.get(0); - // create the region HColumnDescriptor hcd = new HColumnDescriptor(TEST_FAM); - Region region = UTIL.createTestRegion(STRING_TABLE_NAME, hcd); + HRegion region = UTIL.createTestRegion(STRING_TABLE_NAME, hcd); + final CompactedHFilesCleaner compactionCleaner = + new CompactedHFilesCleaner(100, stop, (HStore) region.getStore(TEST_FAM)); loadFlushAndCompact(region, TEST_FAM); - + compactionCleaner.chore(); // create the another table that we don't archive hcd = new HColumnDescriptor(TEST_FAM); - Region otherRegion = UTIL.createTestRegion(otherTable, hcd); + HRegion otherRegion = UTIL.createTestRegion(otherTable, hcd); + final CompactedHFilesCleaner compactionCleaner1 = + new CompactedHFilesCleaner(100, stop, (HStore) otherRegion.getStore(TEST_FAM)); loadFlushAndCompact(otherRegion, TEST_FAM); - + compactionCleaner1.chore(); // get the current hfiles in the archive directory + // Should be archived List files = getAllFiles(fs, archiveDir); if (files == null) { FSUtils.logFileSystemState(fs, archiveDir, LOG); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestBlockEvictionFromClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestBlockEvictionFromClient.java index a3cd8d0..bd9f4fe 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestBlockEvictionFromClient.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestBlockEvictionFromClient.java @@ -55,6 +55,7 @@ import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.regionserver.ScannerContext; import org.apache.hadoop.hbase.regionserver.Store; +import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.util.Bytes; @@ -105,6 +106,8 @@ public class TestBlockEvictionFromClient { conf.setFloat("hbase.regionserver.global.memstore.size", 0.1f); conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 5000); + // don't allow compaction cleaner + conf.setInt("hbase.hfile.compactions.cleaner.interval", 10000 * 1000); FAMILIES_1[0] = FAMILY; TEST_UTIL.startMiniCluster(SLAVES); } @@ -204,11 +207,11 @@ public class TestBlockEvictionFromClient { } // CustomInnerRegionObserver.sleepTime.set(0); Iterator iterator = cache.iterator(); - iterateBlockCache(cache, iterator); + iterateBlockCache(cache, iterator, false); // read the data and expect same blocks, one new hit, no misses assertTrue(Bytes.equals(table.get(new Get(ROW)).value(), data)); iterator = cache.iterator(); - iterateBlockCache(cache, iterator); + iterateBlockCache(cache, iterator, false); // Check how this miss is happening // insert a second column, read the row, no new blocks, 3 new hits byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER); @@ -220,28 +223,27 @@ public class TestBlockEvictionFromClient { assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), data)); assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER2), data2)); iterator = cache.iterator(); - iterateBlockCache(cache, iterator); + iterateBlockCache(cache, iterator, false); // flush, one new block System.out.println("Flushing cache"); region.flush(true); iterator = cache.iterator(); - iterateBlockCache(cache, iterator); + iterateBlockCache(cache, iterator, false); // compact, net minus two blocks, two hits, no misses System.out.println("Compacting"); assertEquals(2, store.getStorefilesCount()); store.triggerMajorCompaction(); region.compact(true); waitForStoreFileCount(store, 1, 10000); // wait 10 seconds max - assertEquals(1, store.getStorefilesCount()); iterator = cache.iterator(); - iterateBlockCache(cache, iterator); + iterateBlockCache(cache, iterator, false); // read the row, this should be a cache miss because we don't cache data // blocks on compaction r = table.get(new Get(ROW)); assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), data)); assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER2), data2)); iterator = cache.iterator(); - iterateBlockCache(cache, iterator); + iterateBlockCache(cache, iterator, false); } finally { if (table != null) { table.close(); @@ -623,7 +625,7 @@ public class TestBlockEvictionFromClient { // Verify whether the gets have returned the blocks that it had CustomInnerRegionObserver.waitForGets.set(true); // giving some time for the block to be decremented - iterateBlockCache(cache, iterator); + iterateBlockCache(cache, iterator, false); getLatch.countDown(); System.out.println("Gets should have returned the bloks"); } finally { @@ -848,7 +850,7 @@ public class TestBlockEvictionFromClient { System.out.println("Flushing cache"); region.flush(true); Iterator iterator = cache.iterator(); - iterateBlockCache(cache, iterator); + iterateBlockCache(cache, iterator, false); // Create three sets of scan ScanThread[] scanThreads = initiateScan(table, reversed); Thread.sleep(100); @@ -877,7 +879,6 @@ public class TestBlockEvictionFromClient { store.triggerMajorCompaction(); region.compact(true); waitForStoreFileCount(store, 1, 10000); // wait 10 seconds max - assertEquals(1, store.getStorefilesCount()); // Even after compaction is done we will have some blocks that cannot // be evicted this is because the scan is still referencing them iterator = cache.iterator(); @@ -906,13 +907,14 @@ public class TestBlockEvictionFromClient { } // by this time all blocks should have been evicted iterator = cache.iterator(); - iterateBlockCache(cache, iterator); + // Till the compaction cleaner runs the blocks would still remain in the block cache + iterateBlockCache(cache, iterator, true); Result r = table.get(new Get(ROW)); assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), data)); assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER2), data2)); // The gets would be working on new blocks iterator = cache.iterator(); - iterateBlockCache(cache, iterator); + iterateBlockCache(cache, iterator, false); } finally { if (table != null) { table.close(); @@ -1024,7 +1026,8 @@ public class TestBlockEvictionFromClient { } } - private void iterateBlockCache(BlockCache cache, Iterator iterator) { + private void iterateBlockCache(BlockCache cache, Iterator iterator, + boolean compact) { int refCount; while (iterator.hasNext()) { CachedBlock next = iterator.next(); @@ -1036,7 +1039,11 @@ public class TestBlockEvictionFromClient { } else { continue; } - assertEquals(0, refCount); + if (compact) { + assertEquals(3, refCount); + } else { + assertEquals(0, refCount); + } } } @@ -1285,7 +1292,13 @@ public class TestBlockEvictionFromClient { } System.out.println("start=" + start + ", now=" + System.currentTimeMillis() + ", cur=" + store.getStorefilesCount()); - assertEquals(count, store.getStorefilesCount()); + int newfiles = 0; + for (StoreFile file : store.getStorefiles()) { + if (!file.isCompacted()) { + newfiles++; + } + } + assertEquals(count, newfiles); } private static class CustomScanner implements RegionScanner { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java index 3e988d6..b04f4ae 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java @@ -5179,8 +5179,9 @@ public class TestFromClientSide { assertEquals(2, store.getStorefilesCount()); store.triggerMajorCompaction(); region.compact(true); - waitForStoreFileCount(store, 1, 10000); // wait 10 seconds max - assertEquals(1, store.getStorefilesCount()); + // compacted files are not removed + waitForStoreFileCount(store, 3, 10000); // wait 10 seconds max + assertEquals(3, store.getStorefilesCount()); expectedBlockCount -= 2; // evicted two blocks, cached none assertEquals(expectedBlockCount, cache.getBlockCount()); expectedBlockHits += 2; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java index 22309ef..0fb066f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java @@ -136,7 +136,7 @@ public class TestFromClientSide3 { } // override the config settings at the CF level and ensure priority - @Test(timeout = 60000) + @Test(timeout = 600000) public void testAdvancedConfigOverride() throws Exception { /* * Overall idea: (1) create 3 store files and issue a compaction. config's diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestSnapshotFromMaster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestSnapshotFromMaster.java index 3c070e9..a3b1731 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestSnapshotFromMaster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestSnapshotFromMaster.java @@ -124,6 +124,7 @@ public class TestSnapshotFromMaster { conf.setLong(SnapshotHFileCleaner.HFILE_CACHE_REFRESH_PERIOD_CONF_KEY, cacheRefreshPeriod); conf.set(HConstants.HBASE_REGION_SPLIT_POLICY_KEY, ConstantSizeRegionSplitPolicy.class.getName()); + conf.setInt("hbase.hfile.compactions.cleaner.interval", 20 * 1000); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MockStoreFile.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MockStoreFile.java index f99226f..f8f4db3 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MockStoreFile.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MockStoreFile.java @@ -79,6 +79,11 @@ public class MockStoreFile extends StoreFile { } @Override + public boolean isCompacted() { + return false; + } + + @Override public byte[] getMetadataValue(byte[] key) { return this.metadata.get(key); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java index a377325..deb72b0 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java @@ -214,6 +214,9 @@ public class TestCompaction { for (StoreFile f: this.r.stores. get(COLUMN_FAMILY_TEXT).getStorefiles()) { HFileScanner scanner = f.getReader().getScanner(false, false); + if(f.getReader().isCompacted()) { + continue; + } if (!scanner.seekTo()) { continue; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestEncryptionKeyRotation.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestEncryptionKeyRotation.java index 45a95c4..cf097bb 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestEncryptionKeyRotation.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestEncryptionKeyRotation.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.regionserver; import static org.junit.Assert.*; +import java.io.IOException; import java.security.Key; import java.security.SecureRandom; import java.util.ArrayList; @@ -80,6 +81,7 @@ public class TestEncryptionKeyRotation { conf.set(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, "hbase"); // Enable online schema updates conf.setBoolean("hbase.online.schema.update.enable", true); + conf.setInt("hbase.hfile.compactions.cleaner.interval", 120 * 1000); // Start the minicluster TEST_UTIL.startMiniCluster(1); @@ -123,13 +125,14 @@ public class TestEncryptionKeyRotation { // And major compact TEST_UTIL.getHBaseAdmin().majorCompact(htd.getTableName()); + final List updatePaths = findStorefilePathsThatAreCompacted(htd.getTableName()); TEST_UTIL.waitFor(30000, 1000, true, new Predicate() { @Override public boolean evaluate() throws Exception { // When compaction has finished, all of the original files will be // gone boolean found = false; - for (Path path: initialPaths) { + for (Path path: updatePaths) { found = TEST_UTIL.getTestFileSystem().exists(path); if (found) { LOG.info("Found " + path); @@ -139,8 +142,8 @@ public class TestEncryptionKeyRotation { return !found; } }); - // Verify we have store file(s) with only the new key + waitForCompaction(htd.getTableName()); List pathsAfterCompaction = findStorefilePaths(htd.getTableName()); assertTrue(pathsAfterCompaction.size() > 0); for (Path path: pathsAfterCompaction) { @@ -200,7 +203,45 @@ public class TestEncryptionKeyRotation { TEST_UTIL.getRSForFirstRegionInTable(tableName).getOnlineRegions(tableName)) { for (Store store: region.getStores()) { for (StoreFile storefile: store.getStorefiles()) { - paths.add(storefile.getPath()); + if (!storefile.isCompacted()) { + paths.add(storefile.getPath()); + } + } + } + } + return paths; + } + + private static void waitForCompaction(TableName tableName) + throws IOException, InterruptedException { + boolean compacted = false; + for (Region region : TEST_UTIL.getRSForFirstRegionInTable(tableName) + .getOnlineRegions(tableName)) { + for (Store store : region.getStores()) { + compacted = false; + while (!compacted) { + for (StoreFile storefile : store.getStorefiles()) { + if (storefile.isCompacted()) { + compacted = true; + break; + } + Thread.sleep(100); + } + } + } + } + } + + private static List findStorefilePathsThatAreCompacted(TableName tableName) + throws Exception { + List paths = new ArrayList(); + for (Region region : TEST_UTIL.getRSForFirstRegionInTable(tableName) + .getOnlineRegions(tableName)) { + for (Store store : region.getStores()) { + for (StoreFile storefile : store.getStorefiles()) { + if (storefile.isCompacted()) { + paths.add(storefile.getPath()); + } } } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHMobStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHMobStore.java index 2cb3b38..d5afbe8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHMobStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHMobStore.java @@ -539,8 +539,7 @@ public class TestHMobStore { this.store.triggerMajorCompaction(); CompactionContext requestCompaction = this.store.requestCompaction(1, null); this.store.compact(requestCompaction, NoLimitCompactionThroughputController.INSTANCE); - Assert.assertEquals(1, this.store.getStorefiles().size()); - + Assert.assertEquals(3, this.store.getStorefiles().size()); //Check encryption after compaction checkMobHFileEncrytption(this.store.getStorefiles()); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index ed45c2d..d9f6cad 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -129,6 +129,7 @@ import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor; import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl; import org.apache.hadoop.hbase.regionserver.Region.RowLock; import org.apache.hadoop.hbase.regionserver.TestStore.FaultyFileSystem; +import org.apache.hadoop.hbase.regionserver.compactions.CompactedHFilesCleaner; import org.apache.hadoop.hbase.regionserver.handler.FinishRegionRecoveringHandler; import org.apache.hadoop.hbase.regionserver.wal.FSHLog; import org.apache.hadoop.hbase.regionserver.wal.HLogKey; @@ -909,7 +910,7 @@ public class TestHRegion { for (StoreFile sf : sfs) { LOG.info(sf.getPath()); } - assertEquals(1, region.getStore(family).getStorefilesCount()); + assertEquals(4, region.getStore(family).getStorefilesCount()); files = FSUtils.listStatus(fs, tmpDir); assertTrue("Expected to find 0 files inside " + tmpDir, files == null || files.length == 0); @@ -3536,6 +3537,10 @@ public class TestHRegion { TreeMap sortedMap = new TreeMap(); // Split these two daughter regions so then I'll have 4 regions. Will // split because added data above. + for (Store s : region.getStores()) { + CompactedHFilesCleaner cleaner = new CompactedHFilesCleaner(100, null, (HStore) s); + cleaner.chore(); + } for (int i = 0; i < regions.length; i++) { HRegion[] rs = null; if (midkeys[i] != null) { @@ -4202,11 +4207,14 @@ public class TestHRegion { // after compaction storeFiles = store.getStorefiles(); for (StoreFile storefile : storeFiles) { - StoreFile.Reader reader = storefile.getReader(); - reader.loadFileInfo(); - reader.loadBloomfilter(); - assertEquals(num_unique_rows * duplicate_multiplier * num_storefiles, reader.getEntries()); - assertEquals(num_unique_rows, reader.getFilterEntries()); + if (!storefile.isCompacted()) { + StoreFile.Reader reader = storefile.getReader(); + reader.loadFileInfo(); + reader.loadBloomfilter(); + assertEquals(num_unique_rows * duplicate_multiplier * num_storefiles, + reader.getEntries()); + assertEquals(num_unique_rows, reader.getFilterEntries()); + } } } finally { HBaseTestingUtility.closeRegionAndWAL(this.region); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java index 04e9b56..deaf7bb 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java @@ -372,7 +372,7 @@ public class TestHRegionReplayEvents { // assert that the compaction is applied for (Store store : secondaryRegion.getStores()) { if (store.getColumnFamilyName().equals("cf1")) { - assertEquals(1, store.getStorefilesCount()); + assertEquals(4, store.getStorefilesCount()); } else { assertEquals(expectedStoreFileCount, store.getStorefilesCount()); } @@ -390,7 +390,7 @@ public class TestHRegionReplayEvents { verifyData(primaryRegion, 0, lastReplayed, cq, families); for (Store store : primaryRegion.getStores()) { if (store.getColumnFamilyName().equals("cf1")) { - assertEquals(1, store.getStorefilesCount()); + assertEquals(4, store.getStorefilesCount()); } else { assertEquals(expectedStoreFileCount, store.getStorefilesCount()); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMajorCompaction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMajorCompaction.java index 0409df3..dac8467 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMajorCompaction.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMajorCompaction.java @@ -93,6 +93,7 @@ public class TestMajorCompaction { // Set cache flush size to 1MB conf.setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 1024*1024); conf.setInt(HConstants.HREGION_MEMSTORE_BLOCK_MULTIPLIER, 100); + conf.setInt("hbase.hfile.compactions.cleaner.interval", 100 * 1000); compactionThreshold = conf.getInt("hbase.hstore.compactionThreshold", 3); secondRowBytes = START_KEY_BYTES.clone(); @@ -270,7 +271,14 @@ public class TestMajorCompaction { // Force major compaction. r.compact(true); - assertEquals(r.getStore(COLUMN_FAMILY_TEXT).getStorefiles().size(), 1); + assertEquals(r.getStore(COLUMN_FAMILY_TEXT).getStorefiles().size(), 9); + int compactedfiles = 0; + for(StoreFile file : r.getStore(COLUMN_FAMILY_TEXT).getStorefiles()) { + if (file.isCompacted()) { + compactedfiles++; + } + } + assertEquals(8, compactedfiles); result = r.get(new Get(secondRowBytes).addFamily(COLUMN_FAMILY_TEXT).setMaxVersions(100)); assertTrue("Second row should still be deleted", result.isEmpty()); @@ -316,7 +324,7 @@ public class TestMajorCompaction { // add one more file & verify that a regular compaction won't work createStoreFile(r); r.compact(false); - assertEquals(2, s.getStorefilesCount()); + assertEquals(4, s.getStorefilesCount()); // ensure that major compaction time is deterministic RatioBasedCompactionPolicy @@ -336,7 +344,7 @@ public class TestMajorCompaction { // trigger a compaction request and ensure that it's upgraded to major r.compact(false); - assertEquals(1, s.getStorefilesCount()); + assertEquals(5, s.getStorefilesCount()); } finally { // reset the timed compaction settings conf.setLong(HConstants.MAJOR_COMPACTION_PERIOD, 1000*60*60*24); @@ -344,7 +352,7 @@ public class TestMajorCompaction { // run a major to reset the cache createStoreFile(r); r.compact(true); - assertEquals(1, s.getStorefilesCount()); + assertEquals(7, s.getStorefilesCount()); } } @@ -352,16 +360,18 @@ public class TestMajorCompaction { int count1 = 0; int count2 = 0; for (StoreFile f: r.getStore(COLUMN_FAMILY_TEXT).getStorefiles()) { - HFileScanner scanner = f.getReader().getScanner(false, false); - scanner.seekTo(); - do { - byte [] row = CellUtil.cloneRow(scanner.getCell()); - if (Bytes.equals(row, STARTROW)) { - count1++; - } else if(Bytes.equals(row, secondRowBytes)) { - count2++; - } - } while(scanner.next()); + if (!f.isCompacted()) { + HFileScanner scanner = f.getReader().getScanner(false, false); + scanner.seekTo(); + do { + byte[] row = CellUtil.cloneRow(scanner.getCell()); + if (Bytes.equals(row, STARTROW)) { + count1++; + } else if (Bytes.equals(row, secondRowBytes)) { + count2++; + } + } while (scanner.next()); + } } assertEquals(countRow1,count1); assertEquals(countRow2,count2); @@ -371,13 +381,15 @@ public class TestMajorCompaction { private int count() throws IOException { int count = 0; for (StoreFile f: r.getStore(COLUMN_FAMILY_TEXT).getStorefiles()) { - HFileScanner scanner = f.getReader().getScanner(false, false); - if (!scanner.seekTo()) { - continue; + if (!f.isCompacted()) { + HFileScanner scanner = f.getReader().getScanner(false, false); + if (!scanner.seekTo()) { + continue; + } + do { + count++; + } while (scanner.next()); } - do { - count++; - } while(scanner.next()); } return count; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMinorCompaction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMinorCompaction.java index 47f3a8f..37a424a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMinorCompaction.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMinorCompaction.java @@ -207,11 +207,23 @@ public class TestMinorCompaction { // do a compaction Store store2 = r.getStore(fam2); int numFiles1 = store2.getStorefiles().size(); + int compactedFiles1 = 0; + for (StoreFile file : store2.getStorefiles()) { + if (file.isCompacted()) { + compactedFiles1++; + } + } assertTrue("Was expecting to see 4 store files", numFiles1 > compactionThreshold); // > 3 ((HStore)store2).compactRecentForTestingAssumingDefaultPolicy(compactionThreshold); // = 3 int numFiles2 = store2.getStorefiles().size(); + int compactedFiles2 = 0; + for (StoreFile file : store2.getStorefiles()) { + if (file.isCompacted()) { + compactedFiles2++; + } + } // Check that we did compact - assertTrue("Number of store files should go down", numFiles1 > numFiles2); + assertTrue("Number of store files should go down", compactedFiles2 > compactedFiles1); // Check that it was a minor compaction. assertTrue("Was not supposed to be a major compaction", numFiles2 > 1); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicas.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicas.java index 4a931a7..2d44159 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicas.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicas.java @@ -293,7 +293,8 @@ public class TestRegionReplicas { } // ensure that we see the compacted file only - Assert.assertEquals(1, secondaryRegion.getStore(f).getStorefilesCount()); + // 3 will be the old files that were compacted and 1 the new file after compaction + Assert.assertEquals(4, secondaryRegion.getStore(f).getStorefilesCount()); } finally { HTU.deleteNumericRows(table, HConstants.CATALOG_FAMILY, 0, 1000); @@ -451,7 +452,8 @@ public class TestRegionReplicas { // force compaction LOG.info("Force Major compaction on primary region " + hriPrimary); primaryRegion.compact(true); - Assert.assertEquals(1, primaryRegion.getStore(f).getStorefilesCount()); + // 3 old files and 1 new file + Assert.assertEquals(4, primaryRegion.getStore(f).getStorefilesCount()); // scan all the hfiles on the secondary. // since there are no read on the secondary when we ask locations to @@ -462,7 +464,10 @@ public class TestRegionReplicas { for (StoreFile sf: secondaryRegion.getStore(f).getStorefiles()) { // Our file does not exist anymore. was moved by the compaction above. LOG.debug(getRS().getFileSystem().exists(sf.getPath())); - Assert.assertFalse(getRS().getFileSystem().exists(sf.getPath())); + if (sf.isCompacted()) { + // will get removed + Assert.assertFalse(getRS().getFileSystem().exists(sf.getPath())); + } HFileScanner scanner = sf.getReader().getScanner(false, false); scanner.seekTo(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java index 2fe5654..d3ef2d5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java @@ -440,10 +440,11 @@ public class TestSplitTransactionOnCluster { } assertTrue(daughterRegion != null); for (int i=0; i<100; i++) { - if (!daughterRegion.hasReferences()) break; + if (daughterRegion.hasCompactedReferences()) break; Threads.sleep(100); } - assertFalse("Waiting for reference to be compacted", daughterRegion.hasReferences()); + assertTrue("Waiting for reference to be compacted", + daughterRegion.hasCompactedReferences()); LOG.info("Daughter hri before split (has been compacted): " + daughter); split(daughter, server, regionCount); // Get list of daughters diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java index 6b669a0..c799c01 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java @@ -329,17 +329,24 @@ public class TestStore { // Each call will find one expired store file and delete it before compaction happens. // There will be no compaction due to threshold above. Last file will not be replaced. + int compactedFiles = 0; for (int i = 1; i <= storeFileNum - 1; i++) { + compactedFiles = 0; // verify the expired store file. assertNull(this.store.requestCompaction()); Collection sfs = this.store.getStorefiles(); // Ensure i files are gone. if (minVersions == 0) { - assertEquals(storeFileNum - i, sfs.size()); + assertEquals(storeFileNum, sfs.size()); // Ensure only non-expired files remain. for (StoreFile sf : sfs) { + if(sf.isCompacted()) { + compactedFiles++; + continue; + } assertTrue(sf.getReader().getMaxTimestamp() >= (edge.currentTime() - storeTtl)); } + assertEquals(i, compactedFiles); } else { assertEquals(storeFileNum, sfs.size()); } @@ -351,7 +358,8 @@ public class TestStore { Collection sfs = this.store.getStorefiles(); // Assert the last expired file is not removed. if (minVersions == 0) { - assertEquals(1, sfs.size()); + assertEquals(4, sfs.size()); + assertEquals(3, compactedFiles); } long ts = sfs.iterator().next().getReader().getMaxTimestamp(); assertTrue(ts < (edge.currentTime() - storeTtl)); @@ -1051,32 +1059,69 @@ public class TestStore { assertEquals(1, this.store.getStorefilesCount()); store.refreshStoreFiles(); - assertEquals(2, this.store.getStorefilesCount()); + int compactedFiles = 0; + for (StoreFile file : store.getStorefiles()) { + if (file.isCompacted()) { + compactedFiles++; + } + } + assertEquals(3, this.store.getStorefilesCount()); + assertEquals(1, compactedFiles); // add three more files addStoreFile(); addStoreFile(); addStoreFile(); - assertEquals(2, this.store.getStorefilesCount()); + assertEquals(3, this.store.getStorefilesCount()); store.refreshStoreFiles(); - assertEquals(5, this.store.getStorefilesCount()); - - archiveStoreFile(0); + compactedFiles = 0; + for (StoreFile file : store.getStorefiles()) { + if (file.isCompacted()) { + compactedFiles++; + } + } + assertEquals(1, compactedFiles); + assertEquals(6, this.store.getStorefilesCount()); + + int index = 0; + for(StoreFile file : this.store.getStorefiles()) { + if(!file.isCompacted()) { + index++; + break; + } + } + archiveStoreFile(index); assertEquals(5, this.store.getStorefilesCount()); store.refreshStoreFiles(); assertEquals(4, this.store.getStorefilesCount()); - archiveStoreFile(0); - archiveStoreFile(1); - archiveStoreFile(2); + index = 0; + for(StoreFile file : this.store.getStorefiles()) { + if(!file.isCompacted()) { + index++; + break; + } + } + archiveStoreFile(index); + index++; + archiveStoreFile(index); + index++; + archiveStoreFile(index); assertEquals(4, this.store.getStorefilesCount()); store.refreshStoreFiles(); assertEquals(1, this.store.getStorefilesCount()); - archiveStoreFile(0); + index = 0; + for(StoreFile file : this.store.getStorefiles()) { + if(!file.isCompacted()) { + index++; + break; + } + } + archiveStoreFile(index); store.refreshStoreFiles(); assertEquals(0, this.store.getStorefilesCount()); } @@ -1098,7 +1143,14 @@ public class TestStore { // call first time after files changed spiedStore.refreshStoreFiles(); - assertEquals(2, this.store.getStorefilesCount()); + int compactedFiles = 0; + for (StoreFile file : spiedStore.getStorefiles()) { + if (file.isCompacted()) { + compactedFiles++; + } + } + assertEquals(3, this.store.getStorefilesCount()); + assertEquals(1, compactedFiles); verify(spiedStore, times(1)).replaceStoreFiles(any(Collection.class), any(Collection.class)); // call second time diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreFileManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreFileManager.java index 3bb0384..6e46c4b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreFileManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreFileManager.java @@ -94,7 +94,7 @@ public class TestStripeStoreFileManager { assertTrue(filesForGet.contains(sf)); // Add some stripes and make sure we get this file for every stripe. - manager.addCompactionResults(al(), al(createFile(OPEN_KEY, KEY_B), + manager.addNewCompactionResults(al(), al(createFile(OPEN_KEY, KEY_B), createFile(KEY_B, OPEN_KEY))); assertTrue(manager.getFilesForScanOrGet(true, KEY_A, KEY_A).contains(sf)); assertTrue(manager.getFilesForScanOrGet(true, KEY_C, KEY_C).contains(sf)); @@ -105,7 +105,7 @@ public class TestStripeStoreFileManager { StripeStoreFileManager manager = createManager(); manager.insertNewFiles(al(createFile())); manager.insertNewFiles(al(createFile())); - manager.addCompactionResults(al(), al(createFile(OPEN_KEY, KEY_B), + manager.addNewCompactionResults(al(), al(createFile(OPEN_KEY, KEY_B), createFile(KEY_B, OPEN_KEY))); assertEquals(4, manager.getStorefileCount()); Collection allFiles = manager.clearFiles(); @@ -136,7 +136,8 @@ public class TestStripeStoreFileManager { // Now add some stripes (remove L0 file too) MockStoreFile stripe0a = createFile(0, 100, OPEN_KEY, KEY_B), stripe1 = createFile(KEY_B, OPEN_KEY); - manager.addCompactionResults(al(l0File), al(stripe0a, stripe1)); + manager.addNewCompactionResults(al(l0File), al(stripe0a, stripe1)); + manager.removeCompactedFiles(al(l0File)); // If we want a key <= KEY_A, we should get everything except stripe1. ArrayList sfsDump = dumpIterator(manager.getCandidateFilesForRowKeyBefore(KV_A)); assertEquals(2, sfsDump.size()); @@ -161,7 +162,8 @@ public class TestStripeStoreFileManager { // This file should be returned in preference to older L0 file; also, after we get // a candidate from the first file, the old one should not be removed. StoreFile stripe0b = createFile(0, 101, OPEN_KEY, KEY_B); - manager.addCompactionResults(al(l0File2), al(stripe0b)); + manager.addNewCompactionResults(al(l0File2), al(stripe0b)); + manager.removeCompactedFiles(al(l0File2)); sfs = manager.getCandidateFilesForRowKeyBefore(KV_A); assertEquals(stripe0b, sfs.next()); sfs.remove(); @@ -183,13 +185,13 @@ public class TestStripeStoreFileManager { assertEquals(sf5.splitPoint, manager.getSplitPoint()); // Same if there's one stripe but the biggest file is still in L0. - manager.addCompactionResults(al(), al(createFile(2, 0, OPEN_KEY, OPEN_KEY))); + manager.addNewCompactionResults(al(), al(createFile(2, 0, OPEN_KEY, OPEN_KEY))); assertEquals(sf5.splitPoint, manager.getSplitPoint()); // If the biggest file is in the stripe, should get from it. MockStoreFile sf6 = createFile(6, 0, OPEN_KEY, OPEN_KEY); sf6.splitPoint = new byte[1]; - manager.addCompactionResults(al(), al(sf6)); + manager.addNewCompactionResults(al(), al(sf6)); assertEquals(sf6.splitPoint, manager.getSplitPoint()); } @@ -245,7 +247,7 @@ public class TestStripeStoreFileManager { conf.setFloat(StripeStoreConfig.MAX_REGION_SPLIT_IMBALANCE_KEY, splitRatioToVerify); } StripeStoreFileManager manager = createManager(al(), conf); - manager.addCompactionResults(al(), sfs); + manager.addNewCompactionResults(al(), sfs); int result = Bytes.toInt(manager.getSplitPoint()); // Either end key and thus positive index, or "middle" of the file and thus negative index. assertEquals(splitPointAfter * (shouldSplitStripe ? -1 : 1), result); @@ -275,7 +277,7 @@ public class TestStripeStoreFileManager { MockStoreFile sfC = createFile(KEY_B, KEY_C); MockStoreFile sfD = createFile(KEY_C, KEY_D); MockStoreFile sfE = createFile(KEY_D, OPEN_KEY); - manager.addCompactionResults(al(), al(sfA, sfB, sfC, sfD, sfE)); + manager.addNewCompactionResults(al(), al(sfA, sfB, sfC, sfD, sfE)); verifyGetAndScanScenario(manager, null, null, sf0, sfA, sfB, sfC, sfD, sfE); verifyGetAndScanScenario(manager, keyAfter(KEY_A), null, sf0, sfB, sfC, sfD, sfE); @@ -349,14 +351,16 @@ public class TestStripeStoreFileManager { assertEquals(0, manager.getLevel0Files().size()); // Here, [B, C] is logically [B, inf), so we should be able to compact it to that only. verifyInvalidCompactionScenario(manager, al(sf), al(createFile(KEY_B, KEY_C))); - manager.addCompactionResults(al(sf), al(createFile(KEY_B, OPEN_KEY))); + manager.addNewCompactionResults(al(sf), al(createFile(KEY_B, OPEN_KEY))); + manager.removeCompactedFiles(al(sf)); // Do the same for other variants. manager = createManager(al(sf, createFile(KEY_C, OPEN_KEY))); verifyInvalidCompactionScenario(manager, al(sf), al(createFile(KEY_B, KEY_C))); - manager.addCompactionResults(al(sf), al(createFile(OPEN_KEY, KEY_C))); + manager.addNewCompactionResults(al(sf), al(createFile(OPEN_KEY, KEY_C))); + manager.removeCompactedFiles(al(sf)); manager = createManager(al(sf)); verifyInvalidCompactionScenario(manager, al(sf), al(createFile(KEY_B, KEY_C))); - manager.addCompactionResults(al(sf), al(createFile(OPEN_KEY, OPEN_KEY))); + manager.addNewCompactionResults(al(sf), al(createFile(OPEN_KEY, OPEN_KEY))); } @Test @@ -378,7 +382,8 @@ public class TestStripeStoreFileManager { StoreFile sf_i2B_0 = createFile(OPEN_KEY, KEY_B); StoreFile sf_B2C_0 = createFile(KEY_B, KEY_C); StoreFile sf_C2i_0 = createFile(KEY_C, OPEN_KEY); - manager.addCompactionResults(al(sf_L0_0a), al(sf_i2B_0, sf_B2C_0, sf_C2i_0)); + manager.addNewCompactionResults(al(sf_L0_0a), al(sf_i2B_0, sf_B2C_0, sf_C2i_0)); + manager.removeCompactedFiles(al(sf_L0_0a)); verifyAllFiles(manager, al(sf_L0_0b, sf_i2B_0, sf_B2C_0, sf_C2i_0)); // Add another l0 file, "compact" both L0 into two stripes @@ -386,52 +391,62 @@ public class TestStripeStoreFileManager { StoreFile sf_i2B_1 = createFile(OPEN_KEY, KEY_B); StoreFile sf_B2C_1 = createFile(KEY_B, KEY_C); manager.insertNewFiles(al(sf_L0_1)); - manager.addCompactionResults(al(sf_L0_0b, sf_L0_1), al(sf_i2B_1, sf_B2C_1)); + manager.addNewCompactionResults(al(sf_L0_0b, sf_L0_1), al(sf_i2B_1, sf_B2C_1)); + manager.removeCompactedFiles(al(sf_L0_0b, sf_L0_1)); verifyAllFiles(manager, al(sf_i2B_0, sf_B2C_0, sf_C2i_0, sf_i2B_1, sf_B2C_1)); // Try compacting with invalid file (no metadata) - should add files to L0. StoreFile sf_L0_2 = createFile(null, null); - manager.addCompactionResults(al(), al(sf_L0_2)); + manager.addNewCompactionResults(al(), al(sf_L0_2)); + manager.removeCompactedFiles(al()); verifyAllFiles(manager, al(sf_i2B_0, sf_B2C_0, sf_C2i_0, sf_i2B_1, sf_B2C_1, sf_L0_2)); // Remove it... - manager.addCompactionResults(al(sf_L0_2), al()); + manager.addNewCompactionResults(al(sf_L0_2), al()); + manager.removeCompactedFiles(al(sf_L0_2)); // Do regular compaction in the first stripe. StoreFile sf_i2B_3 = createFile(OPEN_KEY, KEY_B); - manager.addCompactionResults(al(sf_i2B_0, sf_i2B_1), al(sf_i2B_3)); + manager.addNewCompactionResults(al(sf_i2B_0, sf_i2B_1), al(sf_i2B_3)); + manager.removeCompactedFiles(al(sf_i2B_0, sf_i2B_1)); verifyAllFiles(manager, al(sf_B2C_0, sf_C2i_0, sf_B2C_1, sf_i2B_3)); // Rebalance two stripes. StoreFile sf_B2D_4 = createFile(KEY_B, KEY_D); StoreFile sf_D2i_4 = createFile(KEY_D, OPEN_KEY); - manager.addCompactionResults(al(sf_B2C_0, sf_C2i_0, sf_B2C_1), al(sf_B2D_4, sf_D2i_4)); + manager.addNewCompactionResults(al(sf_B2C_0, sf_C2i_0, sf_B2C_1), al(sf_B2D_4, sf_D2i_4)); + manager.removeCompactedFiles(al(sf_B2C_0, sf_C2i_0, sf_B2C_1)); verifyAllFiles(manager, al(sf_i2B_3, sf_B2D_4, sf_D2i_4)); // Split the first stripe. StoreFile sf_i2A_5 = createFile(OPEN_KEY, KEY_A); StoreFile sf_A2B_5 = createFile(KEY_A, KEY_B); - manager.addCompactionResults(al(sf_i2B_3), al(sf_i2A_5, sf_A2B_5)); + manager.addNewCompactionResults(al(sf_i2B_3), al(sf_i2A_5, sf_A2B_5)); + manager.removeCompactedFiles(al(sf_i2B_3)); verifyAllFiles(manager, al(sf_B2D_4, sf_D2i_4, sf_i2A_5, sf_A2B_5)); // Split the middle stripe. StoreFile sf_B2C_6 = createFile(KEY_B, KEY_C); StoreFile sf_C2D_6 = createFile(KEY_C, KEY_D); - manager.addCompactionResults(al(sf_B2D_4), al(sf_B2C_6, sf_C2D_6)); + manager.addNewCompactionResults(al(sf_B2D_4), al(sf_B2C_6, sf_C2D_6)); + manager.removeCompactedFiles(al(sf_B2D_4)); verifyAllFiles(manager, al(sf_D2i_4, sf_i2A_5, sf_A2B_5, sf_B2C_6, sf_C2D_6)); // Merge two different middle stripes. StoreFile sf_A2C_7 = createFile(KEY_A, KEY_C); - manager.addCompactionResults(al(sf_A2B_5, sf_B2C_6), al(sf_A2C_7)); + manager.addNewCompactionResults(al(sf_A2B_5, sf_B2C_6), al(sf_A2C_7)); + manager.removeCompactedFiles(al(sf_A2B_5, sf_B2C_6)); verifyAllFiles(manager, al(sf_D2i_4, sf_i2A_5, sf_C2D_6, sf_A2C_7)); // Merge lower half. StoreFile sf_i2C_8 = createFile(OPEN_KEY, KEY_C); - manager.addCompactionResults(al(sf_i2A_5, sf_A2C_7), al(sf_i2C_8)); + manager.addNewCompactionResults(al(sf_i2A_5, sf_A2C_7), al(sf_i2C_8)); + manager.removeCompactedFiles(al(sf_i2A_5, sf_A2C_7)); verifyAllFiles(manager, al(sf_D2i_4, sf_C2D_6, sf_i2C_8)); // Merge all. StoreFile sf_i2i_9 = createFile(OPEN_KEY, OPEN_KEY); - manager.addCompactionResults(al(sf_D2i_4, sf_C2D_6, sf_i2C_8), al(sf_i2i_9)); + manager.addNewCompactionResults(al(sf_D2i_4, sf_C2D_6, sf_i2C_8), al(sf_i2i_9)); + manager.removeCompactedFiles(al(sf_D2i_4, sf_C2D_6, sf_i2C_8)); verifyAllFiles(manager, al(sf_i2i_9)); } @@ -450,13 +465,15 @@ public class TestStripeStoreFileManager { assertEquals(2, sfm.getLevel0Files().size()); verifyGetAndScanScenario(sfm, KEY_C, KEY_C, sf_i2d, sf_d2i, sf_c2i); // Remove these files. - sfm.addCompactionResults(al(sf_i2d, sf_d2i), al()); + sfm.addNewCompactionResults(al(sf_i2d, sf_d2i), al()); + sfm.removeCompactedFiles(al(sf_i2d, sf_d2i)); assertEquals(0, sfm.getLevel0Files().size()); // Add another file to stripe; then "rebalance" stripes w/o it - the file, which was // presumably flushed during compaction, should go to L0. StoreFile sf_i2c_2 = createFile(OPEN_KEY, KEY_C); sfm.insertNewFiles(al(sf_i2c_2)); - sfm.addCompactionResults(al(sf_i2c, sf_c2i), al(sf_i2d, sf_d2i)); + sfm.addNewCompactionResults(al(sf_i2c, sf_c2i), al(sf_i2d, sf_d2i)); + sfm.removeCompactedFiles(al(sf_i2c, sf_c2i)); assertEquals(1, sfm.getLevel0Files().size()); verifyGetAndScanScenario(sfm, KEY_C, KEY_C, sf_i2d, sf_i2c_2); } @@ -471,10 +488,12 @@ public class TestStripeStoreFileManager { manager.insertNewFiles(al(sf0b)); ArrayList compacted = al(createFile(OPEN_KEY, KEY_B), createFile(KEY_B, KEY_C), createFile(KEY_C, OPEN_KEY)); - manager.addCompactionResults(al(sf0a), compacted); + manager.addNewCompactionResults(al(sf0a), compacted); + manager.removeCompactedFiles(al(sf0a)); // Next L0 compaction only produces file for the first and last stripe. ArrayList compacted2 = al(createFile(OPEN_KEY, KEY_B), createFile(KEY_C, OPEN_KEY)); - manager.addCompactionResults(al(sf0b), compacted2); + manager.addNewCompactionResults(al(sf0b), compacted2); + manager.removeCompactedFiles(al(sf0b)); compacted.addAll(compacted2); verifyAllFiles(manager, compacted); } @@ -512,7 +531,7 @@ public class TestStripeStoreFileManager { stripe.add(createFile( (j == 0) ? OPEN_KEY : keys[j - 1], (j == stripes - 1) ? OPEN_KEY : keys[j])); } - sfm.addCompactionResults(al(), stripe); + sfm.addNewCompactionResults(al(), stripe); } assertEquals(expectedPriority, sfm.getStoreCompactionPriority()); } @@ -521,7 +540,7 @@ public class TestStripeStoreFileManager { ArrayList filesToCompact, ArrayList filesToInsert) throws Exception { Collection allFiles = manager.getStorefiles(); try { - manager.addCompactionResults(filesToCompact, filesToInsert); + manager.addNewCompactionResults(filesToCompact, filesToInsert); fail("Should have thrown"); } catch (IOException ex) { // Ignore it. diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactedHFilesCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactedHFilesCleaner.java new file mode 100644 index 0000000..af34b25 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactedHFilesCleaner.java @@ -0,0 +1,388 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.compactions; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.Stoppable; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.HStore; +import org.apache.hadoop.hbase.regionserver.Region; +import org.apache.hadoop.hbase.regionserver.RegionScanner; +import org.apache.hadoop.hbase.regionserver.Store; +import org.apache.hadoop.hbase.regionserver.StoreFile; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ MediumTests.class, RegionServerTests.class }) +public class TestCompactedHFilesCleaner { + private final HBaseTestingUtility testUtil = new HBaseTestingUtility(); + private Region region; + private final static byte[] fam = Bytes.toBytes("cf_1"); + private final static byte[] qual1 = Bytes.toBytes("qf_1"); + private final static byte[] val = Bytes.toBytes("val"); + private static CountDownLatch latch = new CountDownLatch(3); + private static AtomicInteger counter = new AtomicInteger(0); + private static AtomicInteger scanCompletedCounter = new AtomicInteger(0); + + @Before + public void setUp() throws Exception { + TableName tableName = TableName.valueOf(getClass().getSimpleName()); + HTableDescriptor htd = new HTableDescriptor(tableName); + htd.addFamily(new HColumnDescriptor(fam)); + HRegionInfo info = new HRegionInfo(tableName, null, null, false); + Path path = testUtil.getDataTestDir(getClass().getSimpleName()); + region = HBaseTestingUtility.createRegionAndWAL(info, path, testUtil.getConfiguration(), htd); + } + + @After + public void tearDown() throws IOException { + counter.set(0); + scanCompletedCounter.set(0); + latch = new CountDownLatch(3); + HBaseTestingUtility.closeRegionAndWAL(region); + testUtil.cleanupTestDir(); + } + + @Test + public void testCompactedHFilesCleaner() throws Exception { + // Create the cleaner object + CompactedHFilesCleaner cleaner = + new CompactedHFilesCleaner(1000, (Stoppable) null, (HStore) region.getStore(fam)); + // Add some data to the region and do some flushes + for (int i = 1; i < 10; i++) { + Put p = new Put(Bytes.toBytes("row" + i)); + p.addColumn(fam, qual1, val); + region.put(p); + } + // flush them + region.flush(true); + for (int i = 11; i < 20; i++) { + Put p = new Put(Bytes.toBytes("row" + i)); + p.addColumn(fam, qual1, val); + region.put(p); + } + // flush them + region.flush(true); + for (int i = 21; i < 30; i++) { + Put p = new Put(Bytes.toBytes("row" + i)); + p.addColumn(fam, qual1, val); + region.put(p); + } + // flush them + region.flush(true); + + Store store = region.getStore(fam); + assertEquals(3, store.getStorefilesCount()); + + Collection storefiles = store.getStorefiles(); + // None of the files should be in compacted state. + for (StoreFile file : storefiles) { + assertFalse(file.isCompacted()); + } + // Try to run the cleaner without compaction. there should not be any change + cleaner.chore(); + storefiles = store.getStorefiles(); + // None of the files should be in compacted state. + for (StoreFile file : storefiles) { + assertFalse(file.isCompacted()); + } + // now do some compaction + region.compact(true); + // Still the flushed files should be present until the cleaner runs. But the state of it should + // be in COMPACTED state + assertEquals(4, store.getStorefilesCount()); + + storefiles = store.getStorefiles(); + int compacted = 0; + int noncompacted = 0; + for (StoreFile file : storefiles) { + if (file.isCompacted()) { + compacted++; + } else { + noncompacted++; + } + } + assertEquals("compacted file count should be 3", 3, compacted); + assertEquals("non compacted file count should be 1", 1, noncompacted); + // Run the cleaner + cleaner.chore(); + assertEquals(1, store.getStorefilesCount()); + storefiles = store.getStorefiles(); + for (StoreFile file : storefiles) { + // Should not be in compacted state + assertFalse(file.isCompacted()); + } + } + + @Test + public void testCleanerWithParallelScannersAfterCompaction() throws Exception { + // Create the cleaner object + CompactedHFilesCleaner cleaner = + new CompactedHFilesCleaner(1000, (Stoppable) null, (HStore) region.getStore(fam)); + // Add some data to the region and do some flushes + for (int i = 1; i < 10; i++) { + Put p = new Put(Bytes.toBytes("row" + i)); + p.addColumn(fam, qual1, val); + region.put(p); + } + // flush them + region.flush(true); + for (int i = 11; i < 20; i++) { + Put p = new Put(Bytes.toBytes("row" + i)); + p.addColumn(fam, qual1, val); + region.put(p); + } + // flush them + region.flush(true); + for (int i = 21; i < 30; i++) { + Put p = new Put(Bytes.toBytes("row" + i)); + p.addColumn(fam, qual1, val); + region.put(p); + } + // flush them + region.flush(true); + + Store store = region.getStore(fam); + assertEquals(3, store.getStorefilesCount()); + + Collection storefiles = store.getStorefiles(); + // None of the files should be in compacted state. + for (StoreFile file : storefiles) { + assertFalse(file.isCompacted()); + } + // Do compaction + region.compact(true); + startScannerThreads(); + + storefiles = store.getStorefiles(); + int usedReaderCount = 0; + int unusedReaderCount = 0; + for (StoreFile file : storefiles) { + if (file.getRefCount() == 0) { + unusedReaderCount++; + } else { + assertEquals("Refcount should be 3", 3, file.getRefCount()); + usedReaderCount++; + } + } + // Though there are files we are not using them for reads + assertEquals("unused reader count should be 3", 3, unusedReaderCount); + assertEquals("used reader count should be 1", 1, usedReaderCount); + // now run the cleaner + cleaner.chore(); + countDown(); + assertEquals(1, store.getStorefilesCount()); + storefiles = store.getStorefiles(); + for (StoreFile file : storefiles) { + // Should not be in compacted state + assertFalse(file.isCompacted()); + } + } + + @Test + public void testCleanerWithParallelScanners() throws Exception { + // Create the cleaner object + CompactedHFilesCleaner cleaner = + new CompactedHFilesCleaner(1000, (Stoppable) null, (HStore) region.getStore(fam)); + // Add some data to the region and do some flushes + for (int i = 1; i < 10; i++) { + Put p = new Put(Bytes.toBytes("row" + i)); + p.addColumn(fam, qual1, val); + region.put(p); + } + // flush them + region.flush(true); + for (int i = 11; i < 20; i++) { + Put p = new Put(Bytes.toBytes("row" + i)); + p.addColumn(fam, qual1, val); + region.put(p); + } + // flush them + region.flush(true); + for (int i = 21; i < 30; i++) { + Put p = new Put(Bytes.toBytes("row" + i)); + p.addColumn(fam, qual1, val); + region.put(p); + } + // flush them + region.flush(true); + + Store store = region.getStore(fam); + assertEquals(3, store.getStorefilesCount()); + + Collection storefiles = store.getStorefiles(); + // None of the files should be in compacted state. + for (StoreFile file : storefiles) { + assertFalse(file.isCompacted()); + } + startScannerThreads(); + // Do compaction + region.compact(true); + + storefiles = store.getStorefiles(); + int usedReaderCount = 0; + int unusedReaderCount = 0; + for (StoreFile file : storefiles) { + if (file.getRefCount() == 0) { + unusedReaderCount++; + } else { + assertEquals("Refcount should be 3", 3, file.getRefCount()); + usedReaderCount++; + } + } + // The newly compacted file will not be used by any scanner + assertEquals("unused reader count should be 1", 1, unusedReaderCount); + assertEquals("used reader count should be 3", 3, usedReaderCount); + // now run the cleaner + cleaner.chore(); + countDown(); + // No change in the number of store files as none of the flushed files could be cleaned up + assertEquals(4, store.getStorefilesCount()); + storefiles = store.getStorefiles(); + int compacted = 0; + int noncompacted = 0; + for (StoreFile file : storefiles) { + if (file.isCompacted()) { + compacted++; + } else { + noncompacted++; + } + } + assertEquals("compacted file count should be 3", 3, compacted); + assertEquals("non compacted file count should be 1", 1, noncompacted); + while (scanCompletedCounter.get() != 3) { + Thread.sleep(100); + } + // reset + latch = new CountDownLatch(3); + scanCompletedCounter.set(0); + counter.set(0); + // Try creating a new scanner and it should use only the new file created after compaction + startScannerThreads(); + storefiles = store.getStorefiles(); + usedReaderCount = 0; + unusedReaderCount = 0; + for (StoreFile file : storefiles) { + if (file.getRefCount() == 0) { + unusedReaderCount++; + } else { + assertEquals("Refcount should be 3", 3, file.getRefCount()); + usedReaderCount++; + } + } + // Though there are files we are not using them for reads + assertEquals("unused reader count should be 3", 3, unusedReaderCount); + assertEquals("used reader count should be 1", 1, usedReaderCount); + countDown(); + while (scanCompletedCounter.get() != 3) { + Thread.sleep(100); + } + // Run the cleaner again + cleaner.chore(); + // Now the cleaner should be able to clear it up because there are no active readers + assertEquals(1, store.getStorefilesCount()); + storefiles = store.getStorefiles(); + for (StoreFile file : storefiles) { + // Should not be in compacted state + assertFalse(file.isCompacted()); + } + } + + protected void countDown() { + // count down 3 times + latch.countDown(); + latch.countDown(); + latch.countDown(); + } + + protected void startScannerThreads() throws InterruptedException { + // Start parallel scan threads + ScanThread[] scanThreads = new ScanThread[3]; + for (int i = 0; i < 3; i++) { + scanThreads[i] = new ScanThread((HRegion) region); + } + for (ScanThread thread : scanThreads) { + thread.start(); + } + while (counter.get() != 3) { + Thread.sleep(100); + } + } + + private static class ScanThread extends Thread { + private final HRegion region; + + public ScanThread(HRegion region) { + this.region = region; + } + + @Override + public void run() { + try { + initiateScan(region); + } catch (IOException e) { + // do nothing + } + } + + private void initiateScan(HRegion region) throws IOException { + Scan scan = new Scan(); + scan.setCaching(1); + RegionScanner resScanner = null; + try { + resScanner = region.getScanner(scan); + List results = new ArrayList(); + boolean next = resScanner.next(results); + try { + counter.incrementAndGet(); + latch.await(); + } catch (InterruptedException e) { + } + while (!next) { + resScanner.next(results); + } + } finally { + scanCompletedCounter.incrementAndGet(); + resScanner.close(); + } + } + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactionWithThroughputController.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactionWithThroughputController.java index 93c8ebc..04d1496 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactionWithThroughputController.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactionWithThroughputController.java @@ -45,6 +45,7 @@ import org.apache.hadoop.hbase.regionserver.HStore; import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.StoreEngine; +import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.regionserver.StripeStoreConfig; import org.apache.hadoop.hbase.regionserver.StripeStoreEngine; import org.apache.hadoop.hbase.testclassification.MediumTests; @@ -103,6 +104,7 @@ public class TestCompactionWithThroughputController { private long testCompactionWithThroughputLimit() throws Exception { long throughputLimit = 1024L * 1024; Configuration conf = TEST_UTIL.getConfiguration(); + conf.setInt("hbase.hfile.compactions.cleaner.interval", 1000 * 1000); conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, DefaultStoreEngine.class.getName()); conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 100); conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY, 200); @@ -123,11 +125,18 @@ public class TestCompactionWithThroughputController { assertEquals(10, store.getStorefilesCount()); long startTime = System.currentTimeMillis(); TEST_UTIL.getHBaseAdmin().majorCompact(tableName); - while (store.getStorefilesCount() != 1) { + int newfiles = 0; + while (newfiles != 1) { + newfiles = 0; + for (StoreFile file : store.getStorefiles()) { + if (!file.isCompacted()) { + newfiles++; + } + } Thread.sleep(20); } long duration = System.currentTimeMillis() - startTime; - double throughput = (double) store.getStorefilesSize() / duration * 1000; + double throughput = (double) newfiles / duration * 1000; // confirm that the speed limit work properly(not too fast, and also not too slow) // 20% is the max acceptable error rate. assertTrue(throughput < throughputLimit * 1.2);