diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 73fbf78..c859ce3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -251,6 +251,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi */ protected volatile long lastReplayedOpenRegionSeqId = -1L; protected volatile long lastReplayedCompactionSeqId = -1L; + + protected List storeSeqs = new ArrayList<>(); ////////////////////////////////////////////////////////////////////////////// // Members @@ -4883,6 +4885,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi startRegionOperation(); // obtain region close lock try { + Map map = new HashMap(); synchronized (writestate) { for (Store store : getStores()) { // TODO: some stores might see new data from flush, while others do not which @@ -4915,8 +4918,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } } - // Drop the memstore contents if they are now smaller than the latest seen flushed file - totalFreedSize += dropMemstoreContentsForSeqId(storeSeqId, store); + map.put(store, storeSeqId); } } @@ -4939,6 +4941,17 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi this.lastReplayedOpenRegionSeqId = smallestSeqIdInStores; } } + if (!map.isEmpty()) { + if (!force) { + for (Map.Entry entry : map.entrySet()) { + // Drop the memstore contents if they are now smaller than the latest seen flushed file + totalFreedSize += dropMemstoreContentsForSeqId(entry.getValue(), entry.getKey()); + } + } else synchronized (storeSeqs) { + // don't try to acquire write lock of updatesLock now + storeSeqs.add(map); + } + } // C. Finally notify anyone waiting on memstore to clear: // e.g. checkResources(). synchronized (this) { @@ -7160,6 +7173,26 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // We should refactor append and increment as local get-mutate-put // transactions, so all stores only go through one code path for puts. + // dropMemstoreContentsForSeqId() would acquire write lock of updatesLock + // We perform this operation outside of the read lock of updatesLock + private void dropMemstoreContents() throws IOException { + long totalFreedSize = 0; + while (!storeSeqs.isEmpty()) { + Map map = null; + synchronized (storeSeqs) { + if (storeSeqs.isEmpty()) break; + map = storeSeqs.remove(storeSeqs.size()-1); + } + for (Map.Entry entry : map.entrySet()) { + // Drop the memstore contents if they are now smaller than the latest seen flushed file + totalFreedSize += dropMemstoreContentsForSeqId(entry.getValue(), entry.getKey()); + } + } + if (totalFreedSize > 0) { + LOG.debug("Freed " + totalFreedSize + " bytes from memstore"); + } + } + @Override public Result append(Append mutate, long nonceGroup, long nonce) throws IOException { Operation op = Operation.APPEND; @@ -7312,6 +7345,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } } finally { this.updatesLock.readLock().unlock(); + dropMemstoreContents(); } } finally { @@ -7530,6 +7564,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } } finally { this.updatesLock.readLock().unlock(); + dropMemstoreContents(); } } finally { rowLock.release();