.../apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java | 5 +++-- .../hadoop/hbase/regionserver/compactions/Compactor.java | 12 +++++------- 2 files changed, 8 insertions(+), 9 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java index 711f31d..55aea00 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java @@ -160,12 +160,13 @@ public class DefaultMobStoreCompactor extends DefaultCompactor { * @param cleanSeqId When true, remove seqId(used to be mvcc) value which is <= smallestReadPoint * @param throughputController The compaction throughput controller. * @param major Is a major compaction. + * @param numofFilesToCompact the number of files to compact * @return Whether compaction ended; false if it was interrupted for any reason. */ @Override protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer, - long smallestReadPoint, boolean cleanSeqId, - ThroughputController throughputController, boolean major) throws IOException { + long smallestReadPoint, boolean cleanSeqId, ThroughputController throughputController, + boolean major, int numofFilesToCompact) throws IOException { if (!(scanner instanceof MobCompactionStoreScanner)) { throw new IllegalArgumentException( "The scanner should be an instance of MobCompactionStoreScanner"); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java index 1796bca..c695788 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java @@ -310,7 +310,7 @@ public abstract class Compactor { } writer = sinkFactory.createWriter(scanner, fd, store.throttleCompaction(request.getSize())); finished = performCompaction(fd, scanner, writer, smallestReadPoint, cleanSeqId, - throughputController, request.isAllFiles()); + throughputController, request.isAllFiles(), request.getFiles().size()); if (!finished) { throw new InterruptedIOException("Aborting compaction of store " + store + " in region " + store.getRegionInfo().getRegionNameAsString() + " because it was interrupted."); @@ -385,11 +385,12 @@ public abstract class Compactor { * @param cleanSeqId When true, remove seqId(used to be mvcc) value which is <= * smallestReadPoint * @param major Is a major compaction. + * @param numofFilesToCompact the number of files to compact * @return Whether compaction ended; false if it was interrupted for some reason. */ protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer, - long smallestReadPoint, boolean cleanSeqId, - ThroughputController throughputController, boolean major) throws IOException { + long smallestReadPoint, boolean cleanSeqId, ThroughputController throughputController, + boolean major, int numofFilesToCompact) throws IOException { long bytesWrittenProgressForCloseCheck = 0; long bytesWrittenProgressForLog = 0; long bytesWrittenProgressForShippedCall = 0; @@ -409,10 +410,7 @@ public abstract class Compactor { throughputController.start(compactionName); KeyValueScanner kvs = (scanner instanceof KeyValueScanner)? (KeyValueScanner)scanner : null; - long minFilesToCompact = Math.max(2L, - conf.getInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, - /* old name */ conf.getInt("hbase.hstore.compactionThreshold", 3))); - long shippedCallSizeLimit = (long) minFilesToCompact * HConstants.DEFAULT_BLOCKSIZE; + long shippedCallSizeLimit = (long) numofFilesToCompact * this.store.getFamily().getBlocksize(); try { do { hasMore = scanner.next(cells, scannerContext);