diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/Compactor.java b/src/main/java/org/apache/hadoop/hbase/regionserver/Compactor.java index 7d5f77f..5b15d27 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/Compactor.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/Compactor.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.io.InterruptedIOException; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.Map; @@ -145,16 +146,26 @@ class Compactor extends Configured { (store.getFamily().getCompactionCompression() != Compression.Algorithm.NONE) ? store.getFamily().getCompactionCompression(): compression; - // For each file, obtain a scanner: - List scanners = StoreFileScanner - .getScannersForStoreFiles(filesToCompact, false, false, true); - // Make the instantiation lazy in case compaction produces no product; i.e. // where all source cells are expired or deleted. StoreFile.Writer writer = null; // Find the smallest read point across all the Scanners. long smallestReadPoint = store.getHRegion().getSmallestReadPoint(); MultiVersionConsistencyControl.setThreadReadPoint(smallestReadPoint); + List scanners; + Collection readersToClose; + if (getConf().getBoolean("hbase.regionserver.compaction.private.readers", false)) { + // clone all StoreFiles, so we'll do the compaction on a independent copy of StoreFiles, + // HFileFiles, and their readers + readersToClose = new ArrayList(request.getFiles().size()); + for (StoreFile f : request.getFiles()) { + readersToClose.add(new StoreFile(f)); + } + scanners = createFileScanners(readersToClose); + } else { + readersToClose = Collections.emptyList(); + scanners = createFileScanners(request.getFiles()); + } try { InternalScanner scanner = null; try { @@ -226,9 +237,19 @@ class Compactor extends Configured { } } } finally { - if (writer != null) { - writer.appendMetadata(maxId, majorCompaction); - writer.close(); + try { + if (writer != null) { + writer.appendMetadata(maxId, majorCompaction); + writer.close(); + } + } finally { + for (StoreFile f : readersToClose) { + try { + f.closeReader(true); + } catch (IOException ioe) { + LOG.warn("close the cloned reader failed", ioe); + } + } } } return writer; @@ -247,4 +268,14 @@ class Compactor extends Configured { CompactionProgress getProgress() { return this.progress; } + + /** + * Creates file scanners for compaction. + * @param filesToCompact Files. + * @return Scanners. + */ + protected List createFileScanners(final Collection filesToCompact) + throws IOException { + return StoreFileScanner.getScannersForStoreFiles(filesToCompact, false, false, true); + } } diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java b/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java index 233f843..c4f0c11 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java @@ -299,6 +299,15 @@ public class StoreFile extends SchemaConfigured { SchemaMetrics.configureGlobally(conf); } + public StoreFile(final StoreFile other) { + this.fs = other.fs; + this.cacheConf = other.cacheConf; + this.cfBloomType = other.cfBloomType; + this.path = other.path; + this.dataBlockEncoder = other.dataBlockEncoder; + this.modificationTimeStamp = other.modificationTimeStamp; + } + /** * @return Path or null if this StoreFile was made with a Stream. */ diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java index c4ee384..79b5b11 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java @@ -97,7 +97,13 @@ public class StoreScanner extends NonLazyKeyValueScanner // for multi-row (non-"get") scans because this is not done in // StoreFile.passesBloomFilter(Scan, SortedSet). useRowColBloom = numCol > 1 || (!isGet && numCol == 1); - this.scanUsePread = scan.isSmall(); + if (store != null && store.getHRegion() != null) { + this.scanUsePread = + store.getHRegion().getBaseConf() + .getBoolean("hbase.storescanner.use.pread", scan.isSmall()); + } else { + this.scanUsePread = scan.isSmall(); + } } /**