diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java index 6970d27..e1fa1fd 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java @@ -510,14 +510,16 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { block.getOffset() == this.curBlock.getOffset()) { return; } - if (this.curBlock != null) { + // We don't have to keep ref to EXCLUSIVE type of block + if (this.curBlock != null && this.curBlock.getMemoryType() == MemoryType.SHARED) { prevBlocks.add(this.curBlock); } this.curBlock = block; } void reset() { - if (this.curBlock != null) { + // We don't have to keep ref to EXCLUSIVE type of block + if (this.curBlock != null && this.curBlock.getMemoryType() == MemoryType.SHARED) { this.prevBlocks.add(this.curBlock); } this.curBlock = null; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java index 873d827..afb1adf 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java @@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo; import org.apache.hadoop.hbase.regionserver.HStore; import org.apache.hadoop.hbase.regionserver.InternalScanner; +import org.apache.hadoop.hbase.regionserver.KeyValueScanner; import org.apache.hadoop.hbase.regionserver.ScanType; import org.apache.hadoop.hbase.regionserver.ScannerContext; import org.apache.hadoop.hbase.regionserver.Store; @@ -58,13 +59,15 @@ import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix; @InterfaceAudience.Private public abstract class Compactor { private static final Log LOG = LogFactory.getLog(Compactor.class); + private static final long COMPACTION_PROGRESS_LOG_INTERVAL = 60 * 1000; + private static final long COMPACTION_PROGRESS_SHIPPED_CALL_INTERVAL = 2 * 1000; protected CompactionProgress progress; protected Configuration conf; protected Store store; protected int compactionKVMax; protected Compression.Algorithm compactionCompression; - + /** specify how many days to keep MVCC values during major compaction **/ protected int keepSeqIdPeriod; @@ -279,8 +282,9 @@ public abstract class Compactor { List cells = new ArrayList(); long closeCheckInterval = HStore.getCloseCheckInterval(); long lastMillis = 0; + long lastShippedCallTime = EnvironmentEdgeManager.currentTime(); if (LOG.isDebugEnabled()) { - lastMillis = EnvironmentEdgeManager.currentTime(); + lastMillis = lastShippedCallTime; } String compactionName = generateCompactionName(); long now = 0; @@ -289,6 +293,7 @@ public abstract class Compactor { ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build(); throughputController.start(compactionName); + KeyValueScanner kvs = (scanner instanceof KeyValueScanner)? (KeyValueScanner)scanner : null; try { do { hasMore = scanner.next(cells, scannerContext); @@ -320,10 +325,19 @@ public abstract class Compactor { } } } + if (kvs != null && now - lastShippedCallTime >= COMPACTION_PROGRESS_SHIPPED_CALL_INTERVAL) { + // The SHARED block references, being read for compaction, will be kept in prevBlocks list + // (See HFileScannerImpl#prevBlocks). In case of scan flow, after each set of cells being + // returned to client, we will call shipped() which can clear this list. Here by we are + // doing the similar thing. In between the compaction (after every 2 secs) we will call + // shipped which may clear prevBlocks list. + kvs.shipped(); + lastShippedCallTime = now; + } // Log the progress of long running compactions every minute if // logging at DEBUG level if (LOG.isDebugEnabled()) { - if ((now - lastMillis) >= 60 * 1000) { + if ((now - lastMillis) >= COMPACTION_PROGRESS_LOG_INTERVAL) { LOG.debug("Compaction progress: " + compactionName + " "